18 Jul 2023

Min Read

Create Stream Processing Pipelines with Superblocks and DeltaStream

In the previous installments of the DeltaStream 101 blog series, we’ve covered the basics of creating stores and defining different types of relations such as Streams, Changelogs and Materialized views on top of the data records in streaming stores. In this blog post, we look into how we can quickly create an application to explore and visualize real-time changes in our data via integrating DeltaStream with a third party data visualization platform such as Superblocks.

Let’s assume we are creating a live dashboard for a delivery company to let them track their vehicles and drivers in real-time. Each driver is identified via a unique identifier. Similarly, one can refer to a given vehicle using its unique ID. The company’s vehicles are equipped with sensors and GPS that keep sending various information about the vehicles to a configured streaming store, in this case an Apache Kafka cluster. The company is interested in tracking its drivers and vehicles, at any given time, via a live feed on a map, using either a vehicle’s or driver’s ID. The map shows the latest location of a vehicle based on its most recent speed and geographical location.

First, let’s take a look at an example of a “navigation record”, in the JSON format, which captures information about a driver, his/her vehicle along with its latest reported location in our system. A stream of such records, collected from different vehicles, is continuously ingested into a topic, called “navigation”, in a Kafka store.

  1. {
  2. "driver": {
  3. "id": "38200",
  4. "name": "James Smith",
  5. "license_id": "TX42191S",
  6. "car": {
  7. "id": "21820700",
  8. "model": "Ford",
  9. "plate": "2HTY910",
  10. "is_electric": false
  11. }
  12. },
  13. "route": [
  14. {
  15. "location": {
  16. "latitude": 38.083128,
  17. "longitude": -121.472887
  18. },
  19. "speed": 64
  20. },
  21. {
  22. "location": {
  23. "latitude": 38.339525,
  24. "longitude": -123.253794
  25. },
  26. "speed": 72
  27. }
  28. ]
  29. }

Sample navigation record in Json

DSQL statements to access and query the data

As the first step, we need to create a “store” in DeltaStream to access the data in the Kafka topic that collects navigation records. We also assume we have already defined our database and schema. Our previous blog post covers details on how one can define them.

In order to query the data, we create a stream, called “navigation”. In DeltaStream, we use the “STRUCT” data type to define records with nesting, and the “ARRAY” data type is used to define an ordered collection of data items of the same type.  As you can see in the DDL statement, shown in Statement 1 below, the navigation stream has two columns: ‘driver’ and ‘route’. The driver column’s data type is a struct, whose fields capture information on a driver’s ID, fullname, license_id and his/her car. The car field is defined as a nested struct, inside the driver’s struct, and shows various information about the driver’s car such as its id, model, etc. The route column lists a sequence of data items which report the latest location and speed of the driver’s car. Therefore, the data type for the route column is defined as an array of structs where each struct has two fields: location and speed. The location field is a nested struct containing latitude and longitude values, collected by the vehicle’s GPS, and the speed field is defined as integer.

  1. CREATE STREAM navigation (
  2. driver STRUCT<id VARCHAR, fullname VARCHAR, license_id VARCHAR, car<id VARCHAR, model VARCHAR, plate VARCHAR, is_electric BOOLEAN>>,
  3.  
  4. route ARRAY<STRUCT<location STRUCT<latitude DOUBLE, longitude DOUBLE>, speed INTEGER>>
  5. ) WITH ('topic'='navigation', 'value.format'='json');

Statement 1. DDL to define navigation stream.

Now that we can access the data in Kafka, using the navigation stream we just created, we run a CSAS statement to extract the columns and nested fields that are relevant to our application. We are going to query the latest navigation information about a driver or car, using their IDs. Hence, we need to select the driver’s ID and his/her car’s ID from the driver column. We also select the driver’s name to show it on the dashboard. We pick the first element of the route column to show the latest location and speed of the car on the map. We unnest location coordinates and show them as separate columns along with the speed in the new stream. As you can see in Statement 2, in DeltaStream the `->` operator is used to access fields of a struct. Moreover, given that arrays are one-indexed (i.e., the first element of an array is at index 1), route[1] is fetching the very first element from a given route array.

  1. Create Stream flat_data AS
  2. SELECT driver->id AS driver_id,
  3. driver->fullname AS driver_name,
  4. driver->car->id AS car_id,
  5. route[1]->location->latitude AS latitude,
  6. route[1]->location->longitude AS longitude,
  7. route[1]->speed AS speed
  8. FROM navigation;

Statement 2. CSAS to define flat_data stream

For a given driver or car ID, we can run a query on the flat_data stream and get the latest relevant data we need to show on the map. We are going to use Superblocks to visualize the location of the queried drivers or cars. Currently, Superblock’s available APIs do not let us directly send the result of our query to update the map. We can achieve this by creating a materialized view on top of the flat_data stream. Moreover, given that we are only interested in the most recent location of a car or a driver, when showing it on the map, we need to make sure our materialized view ingests the data in the “upsert” mode. This way, if a new record is inserted into the materialized view for an existing driver or car id, it overwrites the current record and updates the materialized view. We can use a changelog in DeltaStream to interpret the records in a given topic in the upsert mode. You can use the DDL statement in Statement 3 to define such a changelog. We define the primary key for the changelog as a composite key, using the driver_id and car_id columns.

  1. CREATE CHANGELOG navigation_logs (
  2. driver_id VARCHAR,
  3. car_id VARCHAR,
  4. driver_name VARCHAR,
  5. latitude DOUBLE,
  6. longitude DOUBLE,
  7. speed INTEGER,
  8. PRIMARY KEY (driver_id, car_id)
  9. ) WITH ('topic'='flat_data', 'value.format'='json');

Statement 3. DDL to define navigation_logs Changelog

Our final step to prepare the data for Superblocks is creating a materialized view, called “naviagtion_view”, by selecting all records from the navigation_logs changelog defined above. Now, for a given driver or car id, we can run a simple filter query on the navigation_view to fetch the latest location coordinates and speed of the queried driver or car. This query’s result is directly usable by Superblock to update the map on our dashboard.

  1. CREATE MATERIALIZED VIEW navigation_view AS
  2. SELECT * FROM navigation_logs;

Statement 4. Statement to define navigation_view Materialized View

Visualize the data using Superblocks

Now, let’s use Superblocks to visualize the location of drivers and cars on a map, in real time. We can achieve this by creating an application in Superblocks which fetches the latest location of a driver or car from the naviagation_view Materialized View, we defined above.

Generate API Token in DeltaStream

DeltaStream uses Api Tokens to authenticate third-party applications and let them run queries and access the results securely. In order to generate an Api Token, on your DeltaStream Home page, click on your avatar icon on the main navigation bar, and under your profile select the “Api Token” tab. You need to pick a name for the new token and DeltaStream will generate it for you. Let’s call our new Api Token “SuperblocksToken”. You won’t be able to access the content of a generated token once you exit this page; Therefore make sure you download the new token and save it in a safe place for future reference.

Create a Superblocks Application

Next step is creating a Superblocks application and connecting it to DeltaStream. Our new application receives the driver and car ids as inputs from the user, generates a DSQL query and submits it to DeltaStream to fetch the latest location of the car from the Materialized View. It then shows this location on a map. Login into your Superblocks account, and select the “new application” option to create one.

First step is defining the input fields. Use the “Components” panel on the left to create two input boxes, named “Driver” and “Car”, and give them proper labels and placeholders. Make sure both fields are set as required.

Next step is creating the “Submit” button for the application. The Submit button calls DeltaStream’s REST API to run new queries and get their results. It puts the DeltaStream API token, generated before, in the header of requests to authenticate for secure access. For this purpose, add a new button to the application and set its click handler to be a new REST API. The API should be configured as below to connect to DeltaStream:

  • Method: POST
  • URLhttps://api.deltastream.io/run-statement
  • Headers: Authorization: Bearer <YOUR-API-TOKEN>
  • Body Content Type: Form
  • Set Form Data as following:
    • orgID: <YOUR-ORGANIZATION-ID>
    • roleName: sysadmin
    • storeName: <YOUR-STORE-NAME>
    • databaseName: <YOUR-DATABASE-NAME>
    • schemaName: <YOUR-SCHEMA-NAME>
    • statement: select * from navigation_views where driver_id = ‘{{Input1.value}}’ and car_id = ‘{{Input2.value}}’;

You can check your organization ID in your DeltaStream account’s Home page. Click on your avatar icon and find it under the “organizations” tab.

As you can see, the “Statement” defined in the above REST API configuration is  the DSQL query that is generated using the input values for the Driver and Car ids. Superblocks generates this query each  time the “Submit” button is clicked and sends it to the configured DeltaStream’s endpoint. Go ahead and set valid input values for the Driver and Car ids and submit a request. Once the query is run successfully, DeltaStream returns its results, wrapped in a list. In order to show the latest location of the driver, we only need the very first element in that list. We define a function to pick that element from the returned results. Click on the plus button on the left panel and select the ”Javascript Function” option. This opens a new panel with a placeholder to add code for defining a new function. Set the function code as: “return Step1.output[0].data;”.

The very last step is creating a map to show the fetched location on it. For this purpose, select the “Map” option from the component panel and configure it as:

  • Initial location: {{API1.response[0]}}
  • Default markers: {{API1.response}}

Now, once you put valid values for the driver and car ids in the input field boxes and submit a request, the latest location of the car is marked on the map.

22 May 2023

Min Read

Denormalizing Distributed Datasets in Real-Time

While a distributed data mesh empowers teams in a company to securely build modern applications as they reduce data dependency, it also poses challenges for non-product teams. Certain teams within a company may require access to anonymous and denormalized data to further grow the business. In this post, we will take a look at how such teams can use DeltaStream to capture the data they need to do their work, while the data owners control the security of their data.

Training Machine Learning Models

For the purpose of this exercise, let’s assume a Machine Learning team needs access to anonymous user data for building models to reduce fraud in a financial institution based on frequency and location of payments made to an account. This team stores their data in a topic in an Apache Kafka cluster that is declared as a Store in DeltaStream:

  1. mldb.product/mlai_msk# LIST STORES;
  2. Name | Kind | Access Region | Metadata | Owner | Created at | Updated at
  3. ---------------+---------+---------------+----------+----------+----------------------+-----------------------
  4. mlai_msk | Kafka | AWS us-east-1 | {} | sysadmin | 2023-01-12T20:38:16Z | 2023-01-12T20:38:16Z

and we already have access to the payments made by `payerid` over time:

  1. CREATE STREAM payments_log (
  2. paymenttime BIGINT,
  3. payerid VARCHAR,
  4. accountid VARCHAR,
  5. paymentid VARCHAR
  6. ) WITH (‘topic’=’topic_name’, 'value.format'='JSON');

DDL 1: `payments_log` definition for the `payments_log` Kafka topic

`DDL 1` defines the running log of each payment through the product that was created using the `CREATE STREAM` statement. The `payments_log` references the `accountId` that is the recipient of each payment, and `paymentId` that includes extra payment information.

In addition to frequency of payments made to a specific `accountid`, we also need to take into account the location that payments are being made from so the training model can better detect anomalies over time. We will expand on this in the next section.

Sharing Anonymous User Data

As the stream of payments are provided in the `payments_log` Stream above, we need to securely denormalize the `payerid` field to also include where the payments are coming from without exposing users’ sensitive information. This can be done by the team that owns the additional payer information, identified by a `userid` and described by the following Changelog in the `userdb.product` Schema:

  1. CREATE CHANGELOG userdb.product.users_log (
  2. registertime BIGINT,
  3. userid VARCHAR,
  4. regionid VARCHAR,
  5. contactinfo STRUCT<email VARCHAR, phone VARCHAR, city VARCHAR, country VARCHAR>,
  6. PRIMARY KEY(userid)
  7. ) WITH ('topic'='users', 'value.format'='json');

For simplicity, let’s assume all payers are registered as a user with the product. At this point, only the users team has access to the `userdb` Database, hence the `users_log` is not accessible by the Machine Learning team, for data security reasons. The users team is the owner of the `payments_log` data, so they have usage permissions and can read from and write to the Stream.

Using the following query, we can provide the anonymous user information to the Machine Learning team in real time:

  1. CREATE STREAM payments_location
  2. AS SELECT
  3. p.paymenttime AS paytime,
  4. u.registertime AS payer_register_time,
  5. u.regionid AS region,
  6. contactinfo->city AS payment_city,
  7. contactinfo->country AS payment_country,
  8. p.accountid AS payee,
  9. p.paymentid AS paymentid
  10. FROM payments p
  11. JOIN users_log u ON u.userid = p.payerid;

Query 1: Enrich payments with anonymous payer location info with a temporal join on `users_log.userid`

In `Query 1`, we are looking up the payer represented by `payerid` in the `users_log` Changelog identified by `userid`, and while doing that, we omitted `userid`, `contactinfo.email`, and `contactinfo.phone` as they were identified as Personally Identifiable Information (PII) by the users team, prevented this data from leaking outside of the `userdb` Database.

As a result of `Query 1`, a new `payments_location` Stream is created that provides the location information for each payment made to an account in addition to the existing payment information:

  1. CREATE STREAM payments_location (
  2. paytime BIGINT,
  3. payer_register_time BIGINT,
  4. region VARCHAR,
  5. payment_city VARCHAR,
  6. payment_country VARCHAR,
  7. payee VARCHAR,
  8. paymentid VARCHAR
  9. ) WITH (‘topic’=’topicname’, 'value.format'='json');

DDL 2: Underlying DDL for the denormalized `payments_location` in `Query 1`

`DDL 2` statement reveals how `payments_location` Stream was created when `Query 1` was launched.

Model Training with Real-Time Data

Now, let’s assume that additional payment information can be provided by the `paymentid` field, and by inspecting the `payments` Stream, the `chargeinfo` structure can be very useful to our fraud detection model:

  1. CREATE STREAM payments (
  2. id VARCHAR,
  3. chargeinfo STRUCT<vcc VARCHAR, amount FLOAT, type VARCHAR>,
  4. payer VARCHAR,
  5. payee VARCHAR,
  6. paymenttime BIGINT
  7. ) WITH (‘topic’=’topicname’, 'value.format'='json');

Using the `payments` DDL, the following query can be created to continuously provide the additional charge information to the ML team:

  1. CREATE STREAM payments_full
  2. AS SELECT
  3. pl.paytime AS paytime,
  4. pl.payer_register_time AS payer_register_time,
  5. pl.region AS region,
  6. pl.payment_city AS payment_city,
  7. pl.payment_country AS payment_country,
  8. pl.payee AS payee,
  9. p.chargeinfo AS charge
  10. FROM payments_location pl
  11. JOIN payments p ON p.id = pl.paymentid;

Query 2: Denormalize payment ID into charge information

In `Query 2`, we directly replaced the `paymentid` reference with the charge information to allow the model training pipeline to get the full picture for finding payment anomalies that may be occurring within our product. As a result, the `payments_full` Stream is created as such:

  1. CREATE STREAM payments_location (
  2. paytime BIGINT,
  3. payer_register_time BIGINT,
  4. region VARCHAR,
  5. payment_city VARCHAR,
  6. payment_country VARCHAR,
  7. payee VARCHAR,
  8. charge STRUCT<vcc VARCHAR, amount FLOAT, type VARCHAR>
  9. ) WITH (‘topic’=’topicname’, 'value.format'='json');

In addition to providing the right information to the model training pipeline, the pipeline is receiving this information in real-time where it can evolve faster over time, positively impacting the business.

What’s next?

In this post, we looked at some of the techniques that can be used alongside modern products today to securely denormalize data that may be useful to other teams within the company without access to the original data. While this may be an oversimplification of the scenario, we have extensive support for different data types and data processing operations that fit endless production cases. Please refer to our developer documentation to learn more about how your scenario can be simplified using DeltaStream.

If you are using streaming storage systems such as Apache Kafka (Confluent Cloud, AWS MSK, Redpanda or any other Apache Kafka) or AWS Kinesis, you should check out DeltaStream as the platform for processing, organizing and securing your streaming data. You can schedule a demo where you can see all these capabilities in the context of a real world streaming application.

27 Apr 2023

Min Read

On-time Delivery Shipments using interval Left Join

On-time deliveries can make or break a business. When orders are dropped, it can be extremely frustrating for customers and can tarnish the brand of the supplier. However, the reality is that on-time shipping is challenging and mistakes are bound to happen. With DeltaStream, you easily build an application in minutes to detect unshipped orders in real-time. This way, missing orders can be caught ASAP instead of customers calling and complaining that they haven’t received their goods.

In DeltaStream 101, we demonstrated how to add DeltaStream Stores, create Streams, and write queries. In this blog post, we will use these core concepts to demonstrate how a DeltaStream user can easily join two Streams, `Orders` and `Shipments`, to ensure that delivery orders are shipped on time. If a specific order is not shipped within 30 minutes, that order will be flagged to indicate that it needs a closer look.

Kafka Store Topics for E-commerce

Assume we have created a Kafka Store called `KafkaStore` with two topics – `Orders` and `Shipments`. We can observe the contents for each of these topics by printing them:

A record in the `Orders` topic

A record in the `Shipments` topic

When an item is ordered, an event is logged in the `Orders` topic with an `orderID` that is unique for that order. When that item is shipped, a corresponding event is logged in the `Shipments` topic that has the same `orderID` value in its payload. The `orderID` field is used to associate an order with its shipment and will be used later on for the join condition in our join query. Suppose for this use case that we expect every order to be shipped within 30 minutes. If an order is not shipped within that time frame, it indicates that something is wrong and someone should be notified.

Define Relations, Creating Data Streams

As a first step, let’s create Streams for our `Orders` and `Shipments` topics that we will later use as sources to our queries. We will name the Streams the same name as the topics that back them.

DDL Statement to create the `Orders` Stream

DDL Statement to create the `Shipments` Stream

The Data Definition Language (DDL) statements above are describing the columns and column types of the data we are creating a Stream on. This metadata, along with the `value.format` provided in the query’s `WITH` clause, allows us to properly deserialize the data coming from the physical Kafka topics. Note that also in the query’s `WITH` clause, we specify a column for our `timestamp` property. The `timestamp` property signals which value should be used as the event time in our queries, and if it’s left unspecified then the record’s timestamp will be used by default (i.e. the Kafka record’s timestamp in this case). We also specify the `topic` property which associates each Stream with the physical Kafka topic matching the name of the property’s value. If `topic` isn’t provided, the topic matching the name of the Stream will be used. So in our first DDL for example, specifying the `topic` property value of ‘Orders’ is redundant, but included for extra clarity.

Joining Data Streams with the interval Left Join

Now that we have created our source Streams, the next step is to create an output Stream that joins the orders with the shipments so we can see which orders were shipped on time and which orders weren’t. We can achieve this with a `CREATE STREAM AS SELECT` query (CSAS), where we create a new Stream `OrdersShipmentsJoin` by writing a query that selects fields from our sources. In our query, we will use an interval `LEFT JOIN` to join `Orders` on `Shipments` so that we can access fields from both Streams. The interval in this case is 30 minutes, as defined in the problem statement. Note that we are using a `LEFT JOIN` so that the output includes all `Orders` records, regardless of if there is a matching `Shipments` record to join with.

  1. CREATE STREAM "OrdersShipmentsJoin" AS
  2. SELECT
  3. *
  4. FROM "Orders" o
  5. LEFT JOIN "Shipments" s
  6. WITHIN 30 minutes
  7. ON o."orderID" = s."orderID";

CSAS 1: query that joins orders with their corresponding shipments within 30 minutes of the order_time

Filter out Orders with Late or Missing Shipments using SQL

For this use case, we are specifically interested in `Orders` records that didn’t have a matching `Shipments` record within 30 minutes of their event times. We can filter for this condition using a `WHERE` clause from the `OrdersShipmentsJoin` Stream that we’ve just created, and output the results to another Stream called `OrdersNotShipped`.

CSAS 2: query that filters out any orders that were properly shipped so we are left with a Stream of orders that didn’t ship or didn’t ship on time

The new `OrdersNotShipped` Stream that we’ve created is essentially a real-time log of all orders that did not ship on time. Other fields such as `customerID`, `itemID`, and `destination_zipcode` are provided in the payload to make it easier to track what went wrong with the shipment and inform the customer if needed. Let’s observe how this query behaves with some example inputs and outputs.

Sample Input Records in `Orders` Topic:

  1. {
  2. "order_timestamp": "2023-03-02T13:53:22",
  3. "orderID": "Order_1",
  4. "customerID": "customer_32",
  5. "price": "12.31",
  6. "itemID": "7740485A-836A-447F-96F9-DBBD00E92387",
  7. "destination_address": {
  8. "number": 2555,
  9. "street": "University",
  10. "city": "Palo Alto",
  11. "state": "CA",
  12. "zipcode": "94500"
  13. }
  14. }
  15. {
  16. "order_timestamp": "2023-03-02T14:00:17",
  17. "orderID": "Order_2",
  18. "customerID": "customer_851",
  19. "price": "189.12",
  20. "itemID": "80D2C068-CDFA-431C-9A33-079792A8D95C",
  21. "destination_address": {
  22. "number": 1660,
  23. "street": "Johnson RD NW",
  24. "city": "Atlanta",
  25. "state": "GA",
  26. "zipcode": "30318"
  27. }
  28. }
  29. {
  30. "order_timestamp": "2023-03-02T14:12:44",
  31. "orderID": "Order_3",
  32. "customerID": "customer_192",
  33. "price": "562.11",
  34. "itemID": "98E090E3-756B-4A06-8E4E-15DF9026295E",
  35. "destination_address": {
  36. "number": 400,
  37. "street": "Whittier St",
  38. "city": "Columbus",
  39. "state": "OH",
  40. "zipcode": "43215"
  41. }
  42. }
  43. {
  44. "order_timestamp": "2023-03-02T15:13:29",
  45. "orderID": "Order_4",
  46. "customerID": "customer_621",
  47. "price": "298.37",
  48. "itemID": "7F38AA51-C018-41E5-8EBD-57B77DEFB2D1",
  49. "destination_address": {
  50. "number": 2050,
  51. "street": "River Rd",
  52. "city": "Louisville",
  53. "state": "KY",
  54. "zipcode": "40206"
  55. }
  56. }

Sample Input Records in `Shipments` Topic:

  1. {
  2. "shipment_timestamp": "2023-03-02T14:08:43",
  3. "orderID": "Order_1",
  4. "shipment_center_ID": 7
  5. }
  6. {
  7. "shipment_timestamp": "2023-03-02T14:45:17",
  8. "orderID": "Order_2",
  9. "shipment_center_ID": 3
  10. }
  11. {
  12. "shipment_timestamp": "2023-03-02T15:20:10",
  13. "orderID": "Order_4",
  14. "shipment_center_ID": 3
  15. }

Sample Output Records in `OrdersShipmentsJoin` Topic:

  1. {
  2. "order_timestamp": "2023-03-02T13:53:22",
  3. "o_orderID": "Order_1",
  4. "customerID": "customer_32",
  5. "price": 12.31,
  6. "itemID": "7740485A-836A-447F-96F9-DBBD00E92387",
  7. "destination_address": {
  8. "number": 2555,
  9. "street": "University",
  10. "city": "Palo Alto",
  11. "state": "CA",
  12. "zipcode": "94500"
  13. },
  14. "shipment_timestamp": "2023-03-02T14:08:43",
  15. "s_orderID": "Order_1",
  16. "shipment_center_ID": 7
  17. }
  18. {
  19. "order_timestamp": "2023-03-02T15:33:29",
  20. "o_orderID": "Order_4",
  21. "customerID": "customer_621",
  22. "price": 298.37,
  23. "itemID": "7F38AA51-C018-41E5-8EBD-57B77DEFB2D1",
  24. "destination_address": {
  25. "number": 2050,
  26. "street": "River Rd",
  27. "city": "Louisville",
  28. "state": "KY",
  29. "zipcode": "40206"
  30. },
  31. "shipment_timestamp": "2023-03-02T15:40:10",
  32. "s_orderID": "Order_4",
  33. "shipment_center_ID": 3
  34. }
  35. {
  36. "order_timestamp": "2023-03-02T14:00:17",
  37. "o_orderID": "Order_2",
  38. "customerID": "customer_851",
  39. "price": 189.12,
  40. "itemID": "80D2C068-CDFA-431C-9A33-079792A8D95C",
  41. "destination_address": {
  42. "number": 1660,
  43. "street": "Johnson RD NW",
  44. "city": "Atlanta",
  45. "state": "GA",
  46. "zipcode": "30318"
  47. },
  48. "shipment_timestamp": null,
  49. "s_orderID": null,
  50. "shipment_center_ID": null
  51. }
  52. {
  53. "order_timestamp": "2023-03-02T14:12:44",
  54. "o_orderID": "Order_3",
  55. "customerID": "customer_192",
  56. "price": 562.11,
  57. "itemID": "98E090E3-756B-4A06-8E4E-15DF9026295E",
  58. "destination_address": {
  59. "number": 400,
  60. "street": "Whittier St",
  61. "city": "Columbus",
  62. "state": "OH",
  63. "zipcode": "43215"
  64. },
  65. "shipment_timestamp": null,
  66. "s_orderID": null,
  67. "shipment_center_ID": null
  68. }

Sample Output Records in `OrdersNotShipped` Topic:

  1. {
  2. "order_timestamp": "2023-03-02T14:00:17",
  3. "orderID": "Order_2",
  4. "customerID": "customer_851",
  5. "itemID": "80D2C068-CDFA-431C-9A33-079792A8D95C",
  6. "Destination_zipcode": "30318"
  7. }
  8. {
  9. "order_timestamp": "2023-03-02T14:12:44",
  10. "orderID": "Order_3",
  11. "customerID": "customer_192",
  12. "itemID": "98E090E3-756B-4A06-8E4E-15DF9026295E",
  13. "Destination_zipcode": "43215"
  14. }

Conclusion

In this blog post, we’ve demonstrated how using DeltaStream, you can easily build a real-time application in minutes using easy to understand SQL queries. Specifically, we were able to solve the use case of determining if an order has been shipped on time by using a continuous interval join. Further, by using a left outer join, we were able to then filter for unmatched results – orders that didn’t have a matching shipment. In this case, this kind of application is invaluable for setting up downstream alerting so the right team can be notified right away when an order isn’t shipped or isn’t shipped on time, indicating a problem with the order or a problem with the shipping facility.

Expect more blog posts in the coming weeks as we showcase more of DeltaStream’s capabilities for a variety of use cases. Meanwhile, if you want to try this yourself, you can request a demo.

alert-icon

Please enter a valid email address.

Request Submitted

Thank you for requesting a demo.
You will receive your login information to your email soon.