WarpStream is an Apache Kafka® compatible data streaming platform built directly on top of object storage. It’s delivered as a single, stateless Go binary eliminating the management of local disks, rebalancing of brokers and operating ZooKeeper to operate. WarpStream clusters can scale out, and in, instantaneously, without rebalancing.

DeltaStream is a serverless stream processing platform powered by Flink. With features including Role-Based Access Control (RBAC), Streaming Catalog and Materialized Views, DeltaStream offers all the capabilities needed to process, share and govern ALL your streaming data across your streaming storage systems.

In this blog post, we will walk you through a simple but powerful use case that demonstrates how to process a stream of data produced by a WarpStream cluster using DeltaStream. 

Before getting started, sign up for DeltaStream and WarpStream.  Also, make sure to install the WarpStream CLI by running the following command into your Terminal:

  1. curl https://console.warpstream.com/install.sh | sh

We’re also going to use a producer client that’s written in Go, so your environment needs to be set up to run Go. The easiest way to do this is using Homebrew:

  1. brew install go

WarpStream Setup

After signing up for WarpStream, we’re going to log into the WarpStream Console and create a cluster. Then, we’ll use the ClickHouse ClickPipes data loader to produce data to WarpStream.

Step 1: Create a WarpStream cluster

Log into the WarpStream Console, and click New Cluster.

In the dropdown on the cluster creation screen, select Serverless.

Then, navigate to the Connect tab, and click Create Credentials. 

Store these values, as well as the Bootstrap URL, as environment variables for easy reference:

  1. export BOOTSTRAP_URL=<YOUR_BOOTSTRAP_URL> \
  2. SASL_USERNAME=<YOUR_SASL_USERNAME> \
  3. SASL_PASSWORD=<YOUR_SASL_PASSWORD>;

Note: Do not copy the port (“:9092”) for your Bootstrap URL. Just copy the URL.

Also, download the .txt file containing your credentials (you’ll need these later). 

Step 2: Produce some Data

After your cluster is set up, use the ClickHouse ClickPipes data loader to produce some data to your WarpStream cluster.

Auto-topic creation is enabled by default in WarpStream, so you won’t need to create a topic manually.

The data loader is a producer client written in Go that reads data from a CSV file and writes it to a topic using the Kafka protocol. Simply download the contents of the repo locally, and download one of the compressed datasets linked in the README. Then, make sure that the file var in main.go contains the path to your compressed CSV. By default, we will be reading logs-534.csv.gz, but if you downloaded a different dataset, just replace the file name when you edit the path. There’s no need to uncompress the file.

Once your producer is running, navigate to the Topics screen in the WarpStream Console and click on View Details next to the newly-created dns_logs_topic_test topic to confirm that the producer is producing data:

Congrats! You’re producing a stream of 10,000 records per second to WarpStream. Now it’s time to configure DeltaStream to process this stream of data.

DeltaStream Setup

Once you sign up and login to DeltaStream, you’ll need to complete two steps to start processing data from WarpStream.

Step 1: Connect WarpStream to DeltaStream

From the Stores Tab click on New Store and provide the WarpStream Bootstrap URL (you can get this from the Connect tab in the WarpStream console. It ends in 9092).

Navigate to the Next step and provide the Username, Password and SASL Hash Function. Choose PLAIN in the SASL Hash Function drop down and for Username and Password use the credentials that you obtained from the WarpStream console when you created your cluster.

If you forgot to save your SASL password, that’s OK, just go back to the WarpStream Console and create another set of credentials.

Click Save, and now your WarpStream cluster is connected to DeltaStream.

Step 2: Create a Database

Create a Database from the Catalog tab. Give it name and Save.

Once this is done, you can start writing your SQL queries in the SQL Editor tab.

Process your Data Stream

Now, let’s do some stream processing. In this example, we are going to inspect a stream of DNS logs to capture errors based on the “status” codes. Here’s a sample message from the dns_log_topic_test  topic on WarpStream.

  1. {
  2. "key": {},
  3. "value": {
  4. "referer": "https://www.zanbil.ir/m/filter/b8%2Ct31",
  5. "remote_addr": "5.214.135.81",
  6. "remote_user": "-",
  7. "request_path": "/image/59053/productModel/200x200",
  8. "request_protocol": "HTTP/1.1",
  9. "request_type": "GET",
  10. "runtime": "9540",
  11. "size": "6788",
  12. "status": "200",
  13. "time_local": "2019-02-01 09:56:13",
  14. "user_agent": "Mozilla/5.0 (Linux; Android 6.0.1; SAMSUNG SM-A800I Build/MMB29K) AppleWebKit/537.36 (KHTML, like Gecko) SamsungBrowser/8.2 Chrome/63.0.3239.111 Mobile Safari/537.36"
  15. }
  16. }

To start processing the incoming messages, we are going to CREATE a STREAM called dns_logs using SQL:

  1. CREATE STREAM dns_logs
  2. (referer VARCHAR, request_type VARCHAR, remote_addr VARCHAR, status VARCHAR, user_agent VARCHAR)
  3. WITH ('topic'='dns_logs_topic_test', 'value.format'='json');

From here we can filter out all the error messages as they come using a simple SQL statement:

  1.  
  2. SELECT * from dns_logs WHERE status != "200";

You can take this further and store all the “filtered” messages in a new topic in WarpStream, so that you can use them for more downstream processing. This can be done using the following statement:

  1. CREATE STREAM dns_logs_error AS SELECT * from dns_logs WHERE status != "200";

This statement runs continuously filtering all the incoming messages as they arrive and creates a new dns_logs_error  topic on your WarpStream cluster to store the filtered messages.

You can also perform aggregations and Create Materialized Views. These materialized views are accessible over APIs, so you can build real-time dashboards and apps powered by these views pretty quickly.

  1.  
  2. CREATE MATERIALIZED VIEW log_request_count AS
  3. SELECT remote_addr, count(request_type) AS request_count
  4. FROM dns_logs WHERE remote_addr != '200' GROUP BY remote_addr;

This statement creates a Materialized View , which in DeltaStream is an always up-to-date snapshot of your data, of the error messages aggregated by the IP address (represented by remote_addr in the JSON). With this materialized view, you can: 

  • Geolocate where the errors are happening to help with diagnosis
  • Stop malicious attacks
  • Build dashboards to monitor errors and alert the right personnel

Conclusion

By now you should have a pretty solid understanding of how to create a streaming data pipeline between WarpStream and DeltaStream.

Next, learn how the WarpStream architecture is a better fit than Apache Kafka for streaming workloads running in the cloud, or learn more about how DeltaStream makes it easy to process streams of data.

With WarpStream and DeltaStream you have everything you need to build a real time streaming platform. Get started and take the first steps towards becoming an event-driven organization.

Thank you to the WarpStream team for co-authoring this blog.