13 Feb 2024
Min Read
Stream Processing for IoT Data
The Internet of Things (IoT) refers to sensors and other devices that share and exchange data over a network. IoT has been on the rise for years and only seems to continue in its growing popularity with other technological advances, such as 5G cellular networks and more “smart” devices. From tracking patient health to monitoring agriculture, the applications for IoT are plentiful and diverse. Other sectors where IoT are used include security, transportation, home automation, and manufacturing.
Oracle defines Big Data as “data that contains greater variety, arriving in increasing volumes and with more velocity.” This definition is simply described with the 3 Vs – volume, velocity, and variety. IoT definitely matches this description, as sensors can emit a lot of data from numerous sensors and devices.
A platform capable of processing IoT data needs to be scalable in order to keep up with the volume of Big Data. It’s very common for many IoT applications to have new sensors added. Consider a drone fleet for package deliveries as an example – you may start off with 10 or 20 drones, but as demands for deliveries increases the size of your drone fleet can grow by orders of magnitude. The underlying systems processing these data needs to be able to scale horizontally to match the increase in data volume.
Many IoT use cases such as tracking patient health and monitoring security feeds require low latency insights. Sensors and devices providing real-time data often need to be acted on in real-time as well. For this reason, streaming and stream processing technologies have become increasingly popular and perhaps essential for solving these use cases. Streaming storage technologies such as Apache Kafka, Amazon Kinesis, and RedPanda can meet the low latency data transportation requirements of IoT. On the stream processing side, technologies such as Apache Flink and managed solutions such as DeltaStream can provide low latency streaming analytics.
IoT data can also come in various types and structures. Different sensors can have different data formats. Take a smart home for example, the cameras in a smart home will likely send very different data from a light or a thermometer. However, these sensors are all related to the same smart home. It’s important for a data platform handling IoT use cases to be able to join across different data sets and handle any variations in data structure, format, or type.
DeltaStream as a Streaming Analytics Platform and a Streaming Database
DeltaStream is a platform to unify, process, and govern streaming data. DeltaStream sits as the compute and governance layer on top of streaming storage systems such as Kafka. Powered by Apache Flink, DeltaStream is a fully managed solution that can process streaming data with very low latencies.
In this blog post we’ll cover 2 examples to show how DeltaStream can solve real-time IoT use cases. In the first use case, we’ll use DeltaStream’s Materialized Views to build a real-time request driven application. For the second use case, we’ll use DeltaStream to power real-time event-driven pipelines.
Use Case Setup: Transportation Sensor Data
For simplicity, both use cases will use the same source data. Let’s assume that our data is available in Apache Kafka and represents updates and sensor information for a truck fleet. We’ll first define Relations for the data in 2 Kafka topics.
The first Relation represents truck information. This includes an identifier for the truck, the speed of the truck, which thermometer is in the truck, and a timestamp for this update event represented as epoch milliseconds. Later on, we will use this event timestamp field to perform a join with data from other sensors. Since we expect regular truck information updates, we’ll define a Stream for this data.
Create truck_info
Stream:
CREATE STREAM truck_info ( event_ts BIGINT, truck_id INT, speed_kmph INT, thermometer VARCHAR ) WITH ( 'topic' = 'truck_info', 'value.format' = 'json', 'timestamp' = 'event_ts' );
The second Relation represents a thermometer sensor’s readings. The fields include an identifier for the thermometer, the temperature reading, and a timestamp for when the temperature was taken that is represented as epoch milliseconds. Later on, the event timestamp will be used when joining with the truck_info
Stream. We will define a Changelog for this data using sensor_id
as the primary key.
Create temperature_sensor
Changelog:
CREATE CHANGELOG temperature_sensor ( "time" BIGINT, temperature_c INTEGER, sensor_id VARCHAR, PRIMARY KEY (sensor_id) ) WITH ( 'topic' = 'temperature_sensor', 'value.format' = 'json', 'timestamp' = 'time' );
Using the Relations we have just defined, we want to find out what the latest temperature readings are in each truck. We can achieve this by using a temporal join to enrich our truck_info
updates with the latest temperature readings from the temperature_sensor Changelog. The result of this join will be a Stream of enriched truck information updates with the latest temperature readings in the truck. The following SQL statement will launch a long-lived continuous query that will continually join these two Relations and write the results to a new Stream that is backed by a new Kafka topic.
Create truck_info_enriched
Stream using CSAS:
CREATE STREAM truck_info_enriched AS SELECT truck_info.event_ts, truck_info.truck_id, truck_info.speed_kmph, temp.sensor_id AS thermometer, temp.temperature_c FROM truck_info JOIN temperature_sensor temp ON truck_info.thermometer = temp.sensor_id;
While a truck fleet in a real-world environment will likely have many more sensors, such as cameras, humidity sensors, and others, we’ll keep this use case simple and just use a thermometer as the additional sensor. However, users could continue to enrich their truck information events with joins for each additional sensor data feed.
Use Case Part 1: Powering a real-time dashboard
Monitoring and health metrics are essential for managing a truck fleet. Being able to check on the status of particular trucks and generally see that trucks are doing fine can provide peace of mind for the truck fleet manager. This is where a real-time dashboard can be helpful – to have the latest metrics readily available on the status of the truck fleet.
So for our first use case, we’ll use Materialized Views to power a real-time dashboard. By materializing our truck_info_enriched
Stream into a queryable view, we can build charts that can query the view and get the latest truck information. We’ll build the Materialized View in two steps. First we’ll define a new Changelog that mirrors the truck_info_enriched
Stream, then we’ll create a Materialized View from this Changelog.
Create truck_info_enriched_changelog
Changelog:
CREATE CHANGELOG truck_info_enriched_changelog ( event_ts BIGINT, truck_id INT, speed_kmph INT, thermometer VARCHAR, temperature_c INTEGER, PRIMARY KEY (truck_id) ) WITH ( 'topic' = 'truck_info_enriched', 'value.format' = 'json' );
Create truck_info_mview
Materialized View using CVAS:
CREATE MATERIALIZED VIEW truck_info_mview AS SELECT * FROM truck_info_enriched_changelog;
Note that we could have created this Materialized View sourcing from the truck_info_enriched
Stream, but if we created the Materialized View from the Stream, then each event would be a new row in the Materialized View (append mode). Instead we are building the Materialized View from a Changelog so that each event will add a new row or update an existing one based on the Changelog’s primary key (upsert mode). For our example, we only need to know the current status of each truck, so building the Materialized View with upsert mode better suits our use case.
A continuous query will power this Materialized View, constantly ingesting records from the truck_info_enriched
Stream and sinking the results to truck_info_mview
. Then, we can write queries to SELECT from the Materialized View. A dashboard can easily be built that simply queries this Materialized View to get the latest statuses for trucks. Here are some example queries that might be helpful when building a dashboard for the truck fleet.
Query to get truck IDs with the highest 10 temperatures:
SELECT truck_id, temperature_c FROM truck_info_mview ORDER BY temperature_c DESC LIMIT 10;
Query to get all information about a truck:
SELECT * FROM truck_info_mview WHERE truck_id = 3;
Query to count the number of trucks that are speeding:
SELECT COUNT(truck_id) AS num_speeding_trucks FROM truck_info_mview WHERE speed_kmph > 90;
Use Case Part 2: Building a real-time alerting pipeline
While it’s great to be able to pull for real-time metrics for our truck fleet (Use Case Part 1), there are also situations where we may want the truck update events themselves to trigger actions. In our example, we’ve included thermometers as one of the sensors in each of our delivery trucks. Groceries, medicines, and some chemicals need to be delivered in refrigerated trucks. If the trucks aren’t able to stay within a desired temperature range, it could cause the items inside to go bad or degrade. This can be quite serious, especially for medicines and hazardous materials that can have a direct impact on people’s health.
For our second use case, we want to build out a streaming analytics pipeline to power an alerting service. We can use a CSAS to perform real-time stateful transformations on our data set, then sink the results into a new Stream backed by a Kafka topic. Then the sink topic will contain alertable events that the truck fleet company can feed into their alerting system or other backend systems. Let’s stick to our refrigeration example and write a query that detects if a truck’s temperature exceeds a certain threshold.
Create overheated_trucks
Stream using CSAS:
CREATE STREAM overheated_trucks AS SELECT * FROM truck_info_enriched WHERE temperature_c > 10;
Submitting this CSAS will launch a long-lived continuous query that ingests from the truck_info_enriched
Stream, filters for only events where the truck’s temperature is greater than 10 degrees celsius, and sink the results to a new Stream called overheated_trucks
. Downstream, the truck fleet company can ingest these records and send alerts to the correct teams or use these records to trigger actions in other backend systems.
Processing IoT Data with DeltaStream
IoT data can be challenging to process due to the high volume of data, the inherent real-time requirements of many IoT applications, and the distributed nature of collecting data from many different sources. While we often treat IoT use cases as their own category, they really span many sectors and use cases. That’s why using a real-time streaming platform, such as DeltaStream, that is able to keep up with the processing demands of IoT and can serve as both a streaming database and streaming analytics platform is essential.
If you want to learn more about how DeltaStream can help your business, schedule a demo with us. We also have a free trial available if you want to try out DeltaStream yourself.