22 May 2023

Min Read

Denormalizing Distributed Datasets in Real-Time

While a distributed data mesh empowers teams in a company to securely build modern applications as they reduce data dependency, it also poses challenges for non-product teams. Certain teams within a company may require access to anonymous and denormalized data to further grow the business. In this post, we will take a look at how such teams can use DeltaStream to capture the data they need to do their work, while the data owners control the security of their data.

Training Machine Learning Models

For the purpose of this exercise, let’s assume a Machine Learning team needs access to anonymous user data for building models to reduce fraud in a financial institution based on frequency and location of payments made to an account. This team stores their data in a topic in an Apache Kafka cluster that is declared as a Store in DeltaStream:

  1. mldb.product/mlai_msk# LIST STORES;
  2. Name | Kind | Access Region | Metadata | Owner | Created at | Updated at
  3. ---------------+---------+---------------+----------+----------+----------------------+-----------------------
  4. mlai_msk | Kafka | AWS us-east-1 | {} | sysadmin | 2023-01-12T20:38:16Z | 2023-01-12T20:38:16Z

and we already have access to the payments made by `payerid` over time:

  1. CREATE STREAM payments_log (
  2. paymenttime BIGINT,
  3. payerid VARCHAR,
  4. accountid VARCHAR,
  5. paymentid VARCHAR
  6. ) WITH (‘topic’=’topic_name’, 'value.format'='JSON');

DDL 1: `payments_log` definition for the `payments_log` Kafka topic

`DDL 1` defines the running log of each payment through the product that was created using the `CREATE STREAM` statement. The `payments_log` references the `accountId` that is the recipient of each payment, and `paymentId` that includes extra payment information.

In addition to frequency of payments made to a specific `accountid`, we also need to take into account the location that payments are being made from so the training model can better detect anomalies over time. We will expand on this in the next section.

Sharing Anonymous User Data

As the stream of payments are provided in the `payments_log` Stream above, we need to securely denormalize the `payerid` field to also include where the payments are coming from without exposing users’ sensitive information. This can be done by the team that owns the additional payer information, identified by a `userid` and described by the following Changelog in the `userdb.product` Schema:

  1. CREATE CHANGELOG userdb.product.users_log (
  2. registertime BIGINT,
  3. userid VARCHAR,
  4. regionid VARCHAR,
  5. contactinfo STRUCT<email VARCHAR, phone VARCHAR, city VARCHAR, country VARCHAR>,
  6. PRIMARY KEY(userid)
  7. ) WITH ('topic'='users', 'value.format'='json');

For simplicity, let’s assume all payers are registered as a user with the product. At this point, only the users team has access to the `userdb` Database, hence the `users_log` is not accessible by the Machine Learning team, for data security reasons. The users team is the owner of the `payments_log` data, so they have usage permissions and can read from and write to the Stream.

Using the following query, we can provide the anonymous user information to the Machine Learning team in real time:

  1. CREATE STREAM payments_location
  2. AS SELECT
  3. p.paymenttime AS paytime,
  4. u.registertime AS payer_register_time,
  5. u.regionid AS region,
  6. contactinfo->city AS payment_city,
  7. contactinfo->country AS payment_country,
  8. p.accountid AS payee,
  9. p.paymentid AS paymentid
  10. FROM payments p
  11. JOIN users_log u ON u.userid = p.payerid;

Query 1: Enrich payments with anonymous payer location info with a temporal join on `users_log.userid`

In `Query 1`, we are looking up the payer represented by `payerid` in the `users_log` Changelog identified by `userid`, and while doing that, we omitted `userid`, `contactinfo.email`, and `contactinfo.phone` as they were identified as Personally Identifiable Information (PII) by the users team, prevented this data from leaking outside of the `userdb` Database.

As a result of `Query 1`, a new `payments_location` Stream is created that provides the location information for each payment made to an account in addition to the existing payment information:

  1. CREATE STREAM payments_location (
  2. paytime BIGINT,
  3. payer_register_time BIGINT,
  4. region VARCHAR,
  5. payment_city VARCHAR,
  6. payment_country VARCHAR,
  7. payee VARCHAR,
  8. paymentid VARCHAR
  9. ) WITH (‘topic’=’topicname’, 'value.format'='json');

DDL 2: Underlying DDL for the denormalized `payments_location` in `Query 1`

`DDL 2` statement reveals how `payments_location` Stream was created when `Query 1` was launched.

Model Training with Real-Time Data

Now, let’s assume that additional payment information can be provided by the `paymentid` field, and by inspecting the `payments` Stream, the `chargeinfo` structure can be very useful to our fraud detection model:

  1. CREATE STREAM payments (
  2. id VARCHAR,
  3. chargeinfo STRUCT<vcc VARCHAR, amount FLOAT, type VARCHAR>,
  4. payer VARCHAR,
  5. payee VARCHAR,
  6. paymenttime BIGINT
  7. ) WITH (‘topic’=’topicname’, 'value.format'='json');

Using the `payments` DDL, the following query can be created to continuously provide the additional charge information to the ML team:

  1. CREATE STREAM payments_full
  2. AS SELECT
  3. pl.paytime AS paytime,
  4. pl.payer_register_time AS payer_register_time,
  5. pl.region AS region,
  6. pl.payment_city AS payment_city,
  7. pl.payment_country AS payment_country,
  8. pl.payee AS payee,
  9. p.chargeinfo AS charge
  10. FROM payments_location pl
  11. JOIN payments p ON p.id = pl.paymentid;

Query 2: Denormalize payment ID into charge information

In `Query 2`, we directly replaced the `paymentid` reference with the charge information to allow the model training pipeline to get the full picture for finding payment anomalies that may be occurring within our product. As a result, the `payments_full` Stream is created as such:

  1. CREATE STREAM payments_location (
  2. paytime BIGINT,
  3. payer_register_time BIGINT,
  4. region VARCHAR,
  5. payment_city VARCHAR,
  6. payment_country VARCHAR,
  7. payee VARCHAR,
  8. charge STRUCT<vcc VARCHAR, amount FLOAT, type VARCHAR>
  9. ) WITH (‘topic’=’topicname’, 'value.format'='json');

In addition to providing the right information to the model training pipeline, the pipeline is receiving this information in real-time where it can evolve faster over time, positively impacting the business.

What’s next?

In this post, we looked at some of the techniques that can be used alongside modern products today to securely denormalize data that may be useful to other teams within the company without access to the original data. While this may be an oversimplification of the scenario, we have extensive support for different data types and data processing operations that fit endless production cases. Please refer to our developer documentation to learn more about how your scenario can be simplified using DeltaStream.

If you are using streaming storage systems such as Apache Kafka (Confluent Cloud, AWS MSK, Redpanda or any other Apache Kafka) or AWS Kinesis, you should check out DeltaStream as the platform for processing, organizing and securing your streaming data. You can schedule a demo where you can see all these capabilities in the context of a real world streaming application.

27 Apr 2023

Min Read

On-time Delivery Shipments using interval Left Join

On-time deliveries can make or break a business. When orders are dropped, it can be extremely frustrating for customers and can tarnish the brand of the supplier. However, the reality is that on-time shipping is challenging and mistakes are bound to happen. With DeltaStream, you easily build an application in minutes to detect unshipped orders in real-time. This way, missing orders can be caught ASAP instead of customers calling and complaining that they haven’t received their goods.

In DeltaStream 101, we demonstrated how to add DeltaStream Stores, create Streams, and write queries. In this blog post, we will use these core concepts to demonstrate how a DeltaStream user can easily join two Streams, `Orders` and `Shipments`, to ensure that delivery orders are shipped on time. If a specific order is not shipped within 30 minutes, that order will be flagged to indicate that it needs a closer look.

Kafka Store Topics for E-commerce

Assume we have created a Kafka Store called `KafkaStore` with two topics – `Orders` and `Shipments`. We can observe the contents for each of these topics by printing them:

A record in the `Orders` topic

A record in the `Shipments` topic

When an item is ordered, an event is logged in the `Orders` topic with an `orderID` that is unique for that order. When that item is shipped, a corresponding event is logged in the `Shipments` topic that has the same `orderID` value in its payload. The `orderID` field is used to associate an order with its shipment and will be used later on for the join condition in our join query. Suppose for this use case that we expect every order to be shipped within 30 minutes. If an order is not shipped within that time frame, it indicates that something is wrong and someone should be notified.

Define Relations, Creating Data Streams

As a first step, let’s create Streams for our `Orders` and `Shipments` topics that we will later use as sources to our queries. We will name the Streams the same name as the topics that back them.

DDL Statement to create the `Orders` Stream

DDL Statement to create the `Shipments` Stream

The Data Definition Language (DDL) statements above are describing the columns and column types of the data we are creating a Stream on. This metadata, along with the `value.format` provided in the query’s `WITH` clause, allows us to properly deserialize the data coming from the physical Kafka topics. Note that also in the query’s `WITH` clause, we specify a column for our `timestamp` property. The `timestamp` property signals which value should be used as the event time in our queries, and if it’s left unspecified then the record’s timestamp will be used by default (i.e. the Kafka record’s timestamp in this case). We also specify the `topic` property which associates each Stream with the physical Kafka topic matching the name of the property’s value. If `topic` isn’t provided, the topic matching the name of the Stream will be used. So in our first DDL for example, specifying the `topic` property value of ‘Orders’ is redundant, but included for extra clarity.

Joining Data Streams with the interval Left Join

Now that we have created our source Streams, the next step is to create an output Stream that joins the orders with the shipments so we can see which orders were shipped on time and which orders weren’t. We can achieve this with a `CREATE STREAM AS SELECT` query (CSAS), where we create a new Stream `OrdersShipmentsJoin` by writing a query that selects fields from our sources. In our query, we will use an interval `LEFT JOIN` to join `Orders` on `Shipments` so that we can access fields from both Streams. The interval in this case is 30 minutes, as defined in the problem statement. Note that we are using a `LEFT JOIN` so that the output includes all `Orders` records, regardless of if there is a matching `Shipments` record to join with.

  1. CREATE STREAM "OrdersShipmentsJoin" AS
  2. SELECT
  3. *
  4. FROM "Orders" o
  5. LEFT JOIN "Shipments" s
  6. WITHIN 30 minutes
  7. ON o."orderID" = s."orderID";

CSAS 1: query that joins orders with their corresponding shipments within 30 minutes of the order_time

Filter out Orders with Late or Missing Shipments using SQL

For this use case, we are specifically interested in `Orders` records that didn’t have a matching `Shipments` record within 30 minutes of their event times. We can filter for this condition using a `WHERE` clause from the `OrdersShipmentsJoin` Stream that we’ve just created, and output the results to another Stream called `OrdersNotShipped`.

CSAS 2: query that filters out any orders that were properly shipped so we are left with a Stream of orders that didn’t ship or didn’t ship on time

The new `OrdersNotShipped` Stream that we’ve created is essentially a real-time log of all orders that did not ship on time. Other fields such as `customerID`, `itemID`, and `destination_zipcode` are provided in the payload to make it easier to track what went wrong with the shipment and inform the customer if needed. Let’s observe how this query behaves with some example inputs and outputs.

Sample Input Records in `Orders` Topic:

  1. {
  2. "order_timestamp": "2023-03-02T13:53:22",
  3. "orderID": "Order_1",
  4. "customerID": "customer_32",
  5. "price": "12.31",
  6. "itemID": "7740485A-836A-447F-96F9-DBBD00E92387",
  7. "destination_address": {
  8. "number": 2555,
  9. "street": "University",
  10. "city": "Palo Alto",
  11. "state": "CA",
  12. "zipcode": "94500"
  13. }
  14. }
  15. {
  16. "order_timestamp": "2023-03-02T14:00:17",
  17. "orderID": "Order_2",
  18. "customerID": "customer_851",
  19. "price": "189.12",
  20. "itemID": "80D2C068-CDFA-431C-9A33-079792A8D95C",
  21. "destination_address": {
  22. "number": 1660,
  23. "street": "Johnson RD NW",
  24. "city": "Atlanta",
  25. "state": "GA",
  26. "zipcode": "30318"
  27. }
  28. }
  29. {
  30. "order_timestamp": "2023-03-02T14:12:44",
  31. "orderID": "Order_3",
  32. "customerID": "customer_192",
  33. "price": "562.11",
  34. "itemID": "98E090E3-756B-4A06-8E4E-15DF9026295E",
  35. "destination_address": {
  36. "number": 400,
  37. "street": "Whittier St",
  38. "city": "Columbus",
  39. "state": "OH",
  40. "zipcode": "43215"
  41. }
  42. }
  43. {
  44. "order_timestamp": "2023-03-02T15:13:29",
  45. "orderID": "Order_4",
  46. "customerID": "customer_621",
  47. "price": "298.37",
  48. "itemID": "7F38AA51-C018-41E5-8EBD-57B77DEFB2D1",
  49. "destination_address": {
  50. "number": 2050,
  51. "street": "River Rd",
  52. "city": "Louisville",
  53. "state": "KY",
  54. "zipcode": "40206"
  55. }
  56. }

Sample Input Records in `Shipments` Topic:

  1. {
  2. "shipment_timestamp": "2023-03-02T14:08:43",
  3. "orderID": "Order_1",
  4. "shipment_center_ID": 7
  5. }
  6. {
  7. "shipment_timestamp": "2023-03-02T14:45:17",
  8. "orderID": "Order_2",
  9. "shipment_center_ID": 3
  10. }
  11. {
  12. "shipment_timestamp": "2023-03-02T15:20:10",
  13. "orderID": "Order_4",
  14. "shipment_center_ID": 3
  15. }

Sample Output Records in `OrdersShipmentsJoin` Topic:

  1. {
  2. "order_timestamp": "2023-03-02T13:53:22",
  3. "o_orderID": "Order_1",
  4. "customerID": "customer_32",
  5. "price": 12.31,
  6. "itemID": "7740485A-836A-447F-96F9-DBBD00E92387",
  7. "destination_address": {
  8. "number": 2555,
  9. "street": "University",
  10. "city": "Palo Alto",
  11. "state": "CA",
  12. "zipcode": "94500"
  13. },
  14. "shipment_timestamp": "2023-03-02T14:08:43",
  15. "s_orderID": "Order_1",
  16. "shipment_center_ID": 7
  17. }
  18. {
  19. "order_timestamp": "2023-03-02T15:33:29",
  20. "o_orderID": "Order_4",
  21. "customerID": "customer_621",
  22. "price": 298.37,
  23. "itemID": "7F38AA51-C018-41E5-8EBD-57B77DEFB2D1",
  24. "destination_address": {
  25. "number": 2050,
  26. "street": "River Rd",
  27. "city": "Louisville",
  28. "state": "KY",
  29. "zipcode": "40206"
  30. },
  31. "shipment_timestamp": "2023-03-02T15:40:10",
  32. "s_orderID": "Order_4",
  33. "shipment_center_ID": 3
  34. }
  35. {
  36. "order_timestamp": "2023-03-02T14:00:17",
  37. "o_orderID": "Order_2",
  38. "customerID": "customer_851",
  39. "price": 189.12,
  40. "itemID": "80D2C068-CDFA-431C-9A33-079792A8D95C",
  41. "destination_address": {
  42. "number": 1660,
  43. "street": "Johnson RD NW",
  44. "city": "Atlanta",
  45. "state": "GA",
  46. "zipcode": "30318"
  47. },
  48. "shipment_timestamp": null,
  49. "s_orderID": null,
  50. "shipment_center_ID": null
  51. }
  52. {
  53. "order_timestamp": "2023-03-02T14:12:44",
  54. "o_orderID": "Order_3",
  55. "customerID": "customer_192",
  56. "price": 562.11,
  57. "itemID": "98E090E3-756B-4A06-8E4E-15DF9026295E",
  58. "destination_address": {
  59. "number": 400,
  60. "street": "Whittier St",
  61. "city": "Columbus",
  62. "state": "OH",
  63. "zipcode": "43215"
  64. },
  65. "shipment_timestamp": null,
  66. "s_orderID": null,
  67. "shipment_center_ID": null
  68. }

Sample Output Records in `OrdersNotShipped` Topic:

  1. {
  2. "order_timestamp": "2023-03-02T14:00:17",
  3. "orderID": "Order_2",
  4. "customerID": "customer_851",
  5. "itemID": "80D2C068-CDFA-431C-9A33-079792A8D95C",
  6. "Destination_zipcode": "30318"
  7. }
  8. {
  9. "order_timestamp": "2023-03-02T14:12:44",
  10. "orderID": "Order_3",
  11. "customerID": "customer_192",
  12. "itemID": "98E090E3-756B-4A06-8E4E-15DF9026295E",
  13. "Destination_zipcode": "43215"
  14. }

Conclusion

In this blog post, we’ve demonstrated how using DeltaStream, you can easily build a real-time application in minutes using easy to understand SQL queries. Specifically, we were able to solve the use case of determining if an order has been shipped on time by using a continuous interval join. Further, by using a left outer join, we were able to then filter for unmatched results – orders that didn’t have a matching shipment. In this case, this kind of application is invaluable for setting up downstream alerting so the right team can be notified right away when an order isn’t shipped or isn’t shipped on time, indicating a problem with the order or a problem with the shipping facility.

Expect more blog posts in the coming weeks as we showcase more of DeltaStream’s capabilities for a variety of use cases. Meanwhile, if you want to try this yourself, you can request a demo.

alert-icon

Please enter a valid email address.

Request Submitted

Thank you for requesting a demo.
You will receive your login information to your email soon.