11 Mar 2025
Min Read
Real-time Anomaly Detection with Sensor Data
Table of contents
Real-time sensor data monitoring is critical for industries ranging from manufacturing to IoT-enabled vehicle sensors for transport. Identifying anomalies, unexpected deviations that could signal equipment failure, environmental hazards, or system inefficiencies, becomes a key challenge when dealing with a stream of sensor readings flowing through a Kafka topic. This blog dives into the process of performing anomaly detection on a set of sensors publishing to a Kafka topic, guiding you through the steps to harness streaming data, apply detection techniques, and ensure timely insights. Whether you’re a data engineer or a curious developer, you’ll discover practical methods to spot the unusual in the constant hum of sensor activity.
Use Case: Missing Data as an Anomaly
For our scenario, we will work with 100 sensors that send a heartbeat every 5 minutes. The sensor is considered down if a heartbeat isn’t detected after 15 minutes. There are two scenarios this leads to: either the sensor is faulty and is sending heartbeats after the window, or it stops sending a heartbeat entirely. To simulate this data, I wrote a Python program with the following parameters:
- 100 sensors comprised of:
- UUID as a sensor ID
- Unix epoch timestamp
- Generate sensor records for 100 days starting from November 1, 2024
- Each sensor will start at random times over 48 hours from the start date
- 10% of the sensors will fail to generate a timestamp for a random amount of time between 15 and 120 minutes on a random number of days
- 2% of the sensors will stop sending a timestamp between 15 and 60 days from the first timestamp.
Here is the Python code if you want to play with it yourself in a free DeltaStream trial; you just need a Kafka cluster of your own available:
import json import time import random import uuid import datetime from kafka import KafkaProducer # Kafka Configuration KAFKA_BROKER = "your bootstrap server" TOPIC = "sensor_data" # Kafka Producer Setup producer = KafkaProducer( bootstrap_servers=KAFKA_BROKER, security_protocol='SASL_SSL', sasl_mechanism='SCRAM-SHA-512', sasl_plain_username='your username', sasl_plain_password='your password', value_serializer=lambda v: json.dumps(v).encode("utf-8"), key_serializer=lambda k: k.encode("utf-8") if k is not None else None, ) # Simulation Constants SENSOR_COUNT = 100 TEMP_FAILURE_PERCENTAGE = 0.10 # 10% of sensors fail randomly PERM_FAILURE_PERCENTAGE = 0.02 # 2% of sensors fail permanently INTERVAL_SECONDS = 5 * 60 # 5 minutes TEMP_FAILURE_MIN = 15 * 60 # 15 minutes TEMP_FAILURE_MAX = 120 * 60 # 120 minutes PERM_FAILURE_MIN_DAYS = 15 # 15 days PERM_FAILURE_MAX_DAYS = 60 # 60 days START_DATE = datetime.datetime(2024, 11, 1, 0, 0, 0) SIMULATION_DAYS = 100 SIMULATION_DURATION = SIMULATION_DAYS * 24 * 60 * 60 # 100 days in seconds RANDOM_START_WINDOW = 48 * 60 * 60 # 48 hours in seconds # Generate 100 Unique Sensor IDs sensors = [str(uuid.uuid4()) for _ in range(SENSOR_COUNT)] # Select 10% of sensors for temporary failures temp_fail_sensors = set(random.sample(sensors, int(SENSOR_COUNT * TEMP_FAILURE_PERCENTAGE))) # Select 2% of sensors for permanent failures perm_fail_sensors = set(random.sample(sensors, int(SENSOR_COUNT * PERM_FAILURE_PERCENTAGE))) # Assign each sensor a random start time within the first 48 hours sensor_start_times = { sensor: START_DATE.timestamp() + random.randint(0, RANDOM_START_WINDOW) for sensor in sensors } # Generate failure windows (15-120 min) for temporary failures temp_fail_windows = {} for sensor in temp_fail_sensors: num_failures = random.randint(3, 10) # Number of random failure periods failure_days = random.sample(range(1, SIMULATION_DAYS + 1), num_failures) temp_fail_windows[sensor] = [ ( START_DATE.timestamp() + (day * 24 * 60 * 60) + random.randint(0, 24 * 60 * 60), START_DATE.timestamp() + (day * 24 * 60 * 60) + random.randint(0, 24 * 60 * 60) + random.randint(TEMP_FAILURE_MIN, TEMP_FAILURE_MAX) ) for day in failure_days ] # Assign permanent failure times (between 15 and 60 days from start) perm_fail_times = { sensor: sensor_start_times[sensor] + random.randint(PERM_FAILURE_MIN_DAYS * 24 * 60 * 60, PERM_FAILURE_MAX_DAYS * 24 * 60 * 60) for sensor in perm_fail_sensors } # Track permanently failed sensors perm_failed_sensors = set() # Track last failure timestamp for each sensor (prevents duplicate logs) last_fail_timestamp = {sensor: None for sensor in temp_fail_sensors} # Start simulation at the given start date current_time = START_DATE.timestamp() while current_time <= START_DATE.timestamp() + SIMULATION_DURATION: for sensor in sensors: start_time = sensor_start_times[sensor] if current_time >= start_time: # Check if the sensor has permanently failed if sensor in perm_fail_sensors and current_time >= perm_fail_times[sensor]: if sensor not in perm_failed_sensors: print(f"❌ Sensor {sensor} permanently stopped at {datetime.datetime.utcfromtimestamp(perm_fail_times[sensor])}") perm_failed_sensors.add(sensor) continue # Sensor is permanently down # Check if the sensor is temporarily failing if sensor in temp_fail_sensors: for fail_start, fail_end in temp_fail_windows[sensor]: if fail_start <= current_time <= fail_end: if last_fail_timestamp[sensor] != fail_start: print(f"⚠️ Sensor {sensor} temporarily down from {datetime.datetime.utcfromtimestamp(fail_start)} to {datetime.datetime.utcfromtimestamp(fail_end)}") last_fail_timestamp[sensor] = fail_start # Prevent duplicate logs continue # Skip sending data # Normal sensor data (only sensor_id and timestamp) data = { "sensor_id": sensor, "time_stamp": int(current_time) # Use integer UNIX timestamp for clarity } # Send data to Kafka producer.send(TOPIC, value=data, key=sensor) # Move simulation time forward by 5 minutes current_time += INTERVAL_SECONDS # Close producer after all data is sent producer.close()
Our resulting JSON file looks like this:
{ "key": { "key": "268aded0-0cd4-49ce-ab49-cfc123fdabf9" }, "value": { "sensor_id": "268aded0-0cd4-49ce-ab49-cfc123fdabf9", "time_stamp": 1730447400 } }
Solution: Detecting failed Sensors
All of the SELECT statements are displayed in the following screenshot, as well as some of the results of glitching sensors and the number of minutes they failed to send their heartbeat.

Let’s break down each of the statements and explain what we’re doing:
This first statement creates a DeltaStream Object of type STREAM that defines the layout of the sensor_data topic in our Kafka cluster. This gives us an entity that we can perform actions on against the topic.
CREATE STREAM sensor_data ( sensor_id VARCHAR, time_stamp BIGINT ) WITH ('topic' = 'sensor_data', 'value.format' = 'json');
This part has the secret sauce to making this work: the ds_lag_bigint function. It is a built-in function to access the value of a column from a previous row within the same relation. This is particularly useful for scenarios where you need to compare or compute differences between rows. In our case, we use it to determine the time interval between consecutive sensor events. The function requires specifying the column from which to retrieve values and the number of rows from which to look back. For example, ds_lag_bigint(time_stamp, 1) returns the time_stamp from the previous row, allowing us to calculate the time difference between consecutive sensor readings and thus determine if the difference is outside our acceptable bound of 15 minutes.
Other than that, we’re creating the stream topic with a retention of 72 hours (in milliseconds) to ensure the data lasts long enough to run my tests. Finally, the FROM clause forces the SELECT to read from the beginning of the topic, and what field is the timestamp.
-- This calculates the diff between current time_stamp and previous time_stamp per sensor CREATE STREAM sensor_history WITH ('kafka.topic.retention.ms' = '172800000') AS SELECT sensor_id, time_stamp, ds_lag_bigint (time_stamp, 1) OVER ( partition by sensor_id ORDER BY time_stamp ROWS BETWEEN 1 PRECEDING AND CURRENT ROW ) AS prev_ts FROM sensor_data WITH ('starting.position' = 'earliest', 'timestamp' = 'time_stamp');
Next, we create a new Kafka Topic, bad_sensors, for any record in sensor_history with two timestamps greater than 15 minutes. This will be our list of sensors with anomalies that need to be researched. If they go online and offline outside of our acceptable range, they are likely failing, or something else needs to be investigated.
-- This computes the diff between current and prev time_stamp values and picks the bad ones CREATE STREAM bad_sensors WITH ('kafka.topic.retention.ms' = '172800000') AS SELECT sensor_id, time_stamp, prev_ts, (time_stamp - prev_ts) AS diff FROM sensor_history WITH ('starting.position' = 'earliest') WHERE prev_ts IS NOT NULL AND (time_stamp - prev_ts) > (15 * 60);
Finally, let’s take a look at the data in bad_sensors. This formats out the current and previous heartbeat timestamps and the difference between them in minutes.
SELECT sensor_id, FROM_UNIXTIME(prev_ts) AS fmt_prev, FROM_UNIXTIME(time_stamp) AS fmt_time, (time_stamp - prev_ts) AS diff FROM bad_sensors WITH ('starting.position' = 'earliest');

In a production environment, it would make sense for the bad_sensors data to be fed into a location that populates a dashboard where decisions can be made in near real-time. This could be something like Iceberg, Clickhouse, Databricks, Snowflake, or whatever you use.
Summary
This blog explores anomaly detection for real-time sensor data on a Kafka topic, using a 100-sensor setup with heartbeats every 5 minutes. We define anomalies as misses exceeding 15 minutes, simulate 100 days of data with Python, and apply DeltaStream to process it, racking history, calculating gaps with ds_lag_bigint, and pinpointing failures via SQL. This approach is streamlined for engineers and easily adaptable to production dashboards.