05 Dec 2023
Min Read
Exploring Pattern Recognition using MATCH_RECOGNIZE
Table of contents
Pattern recognition is a common use case in data processing. Detecting trend reversals, identifying anomalies, and finding sequences in data are all examples of pattern recognition problems. In SQL, Row Pattern Recognition (RPR) became part of the SQL standard in 2016 (ISO/IEC 9075:2016) and introduced the MATCH RECOGNIZE SQL syntax. Using this new syntax, users can write concise SQL queries to solve pattern recognition problems.
While pattern recognition is an important challenge to solve in the batch world, there are also many pattern recognition use cases in the real-time context. That’s why as a leading streaming platform, it is necessary for DeltaStream to support MATCH RECOGNIZE in its SQL syntax.
In our previous blog post Analyzing Real-Time NYC Bus Data with DeltaStream, we showed how we could write a SQL query in DeltaStream to solve the pattern recognition use case of detecting bus trips where the delays are significantly increasing. In this blog post we will do a deep dive into that query and explain the purpose and meaning behind each line.
DSQL MATCH_RECOGNIZE Query
Below is the same pattern recognition SQL query from our Analyzing Real-Time NYC Bus Data with DeltaStream post:
CREATE STREAM trips_delay_increasing AS SELECT trip, vehicle, start_delay, end_delay, start_ts, end_ts, CAST(FROM_UNIXTIME((start_epoch_secs + end_epoch_secs) / 2) AS TIMESTAMP) AS avg_ts FROM trip_updates MATCH_RECOGNIZE( PARTITION BY trip ORDER BY "ts" MEASURES C.row_timestamp AS row_timestamp, C.row_key AS row_key, C.row_metadata AS row_metadata, C.vehicle AS vehicle, A.delay AS start_delay, C.delay AS end_delay, A.ts AS start_ts, C.ts AS end_ts, A.epoch_secs AS start_epoch_secs, C.epoch_secs AS end_epoch_secs ONE ROW PER MATCH AFTER MATCH SKIP TO LAST C PATTERN (A B C) DEFINE A AS delay > 0, B AS delay > A.delay + 30, C AS delay > B.delay + 30 ) AS MR WITH ('timestamp'='ts') QUERY WITH ('state.ttl.millis'='3600000');
Breakdown of MATCH_RECOGNIZE Query
First of all, you can find all of our documentation on MATCH RECOGNIZE here. In this section, we’ll be breaking down the above query and discussing the thought process behind each line of the query.
In this query, the source Stream is bus trip updates with a field called delay
which represents the number of seconds a bus is delayed on its current route. The goal of the query is to output bus trips where the delay is significantly increasing so that we can get a better understanding of when buses will actually arrive at their upcoming stops.
CSAS (lines 1-2)
This query is what we call a CREATE STREAM AS SELECT (CSAS) query, where we are creating a new Stream that will be the output of running the continuous SELECT statement. Running a continuous query will launch a long-lived stream processing job behind the scenes and write the results to the physical storage that is backing the new Stream. As explained in the original blog post Analyzing Real-Time NYC Bus Data with DeltaStream, a Kafka topic is the physical storage layer that is backing this Stream.
Projection (lines 2-9)
The first few lines of the SELECT query are the fields that are being projected for the resulting Stream trips_delay_increasing
. These fields are made available by the MEASURE
and the PARTITION BY clauses in the MATCH_RECOGNIZE
statement. The following fields are being projected:
trip
andvehicle
represents the particular bus trip that is experiencing increasing delaysstart_delay, start_ts, end_delay
, andend_ts
give insights into the pattern that was matched and how fast delays are increasingavg_ts
is the midpoint between thestart_epoch_secs
andend_epoch_secs
, which also represents the midpoint of the matched pattern. To evaluate this field, we use built-in functions to convert from epoch seconds, anINTEGER
, into aTIMESTAMP
. In the original blog post, this field was used in a follow up query to find the positions of the buses during the time that the delays were increasing.
Source (lines 10-11, 32)
The FROM clause defines the source of data for the query. In this query, we are sourcing from the result of the MATCH_RECOGNIZE
clause on the trip_updates
Stream. This source also has a WITH
clause to specify a timestamp field, ts
, which is a field in the trip_updates
Stream. By specifying the timestamp field, we are setting the event time of incoming records to be equal to the value of that field, so later on in the ORDER BY clause we can use the correct timestamp for ordering events in our patterns.
MATCH_RECOGNIZE – PARTITION BY (line 12)
The first line in the MATCH_RECOGNIZE query is the optional PARTITION BY clause. This subclause groups the underlying data based on the partitioning expression, and optimizes DeltaStream’s compute resources for parallel processing of the source data. In this query, we are partitioning the events by the trip
field, so any detected patterns are unique to a particular bus trip. In other words, we want to know for each particular bus trip if there are increasing delays. The PARTITION BY clause is necessary for our use case because the delays for one bus trip are not relevant to other bus trips. Note that the fields in the PARTITION BY clause are available for projection, as shown in this query by selecting the trip
field in the SELECT statement.
MATCH_RECOGNIZE – ORDER BY (line 13)
The ORDER BY clause is required for MATCH_RECOGNIZE
. This clause defines the order in which the data should be sorted for each partition before they are evaluated for matched patterns. For our query, the ordering field is ts
, so bus trip updates will be ordered in ascending order according to the value of the ts
field. One requirement for the ORDER BY
subclause is that the ordering field must be the same as the timestamp field for the source Relation, which is set on line 32 (also mentioned in the Source section above).
MATCH_RECOGNIZE – MEASURES (lines 14-24)
The MEASURES clause defines the output schema for the MATCH_RECOGNIZE
statement and has a similar meaning as a SELECT clause. The fields specified in the MEASURES subclause are made available to the SELECT clause. The MEASURES subclause and the DEFINE subclause (lines 28-31) are closely related, as the MEASURES subclause is projecting fields from rows defined by the DEFINE subclause. For example, on line 19 we define start_delay
as A.delay
. In this case, the delay for the row matching A
’s definition is being projected as start_delay
, whereas on line 20 the delay
for the row matching C’s definition is being projected as end_delay
. There are 3 fields in the MEASURES
sub-clause that aren’t being used in our query’s SELECT clause. These are the row metadata columns – row_timestamp, row_key
, and row_metadata
. Since the MATCH_RECOGNIZE
operator alters the projection columns of its input Relation, a PATTERN variable must be chosen for these special fields as part of the MEASURES
subclause, which we do on lines 15-17. See the MATCH_RECOGNIZE - PATTERN
below for more information on the PATTERN variables.
MATCH_RECOGNIZE – Output Mode (line 25)
ONE ROW PER MATCH
is the only supported output mode, which means for a given sequence of events that matches our pattern, we should output one result. So in our query, for a bus trip with significantly increasing delays, we should output one event for this trip.
MATCH_RECOGNIZE – AFTER MATCH strategy (line 26)
The after match strategy defines where to begin looking for the next pattern. In this query, we specify AFTER MATCH SKIP TO LAST C
, which means after finding a pattern match, look for the next pattern match starting with the last event of the current match. Other match strategies could inform our query to start looking for the next pattern starting from a different event, such as the event after a pattern match. However in our case, we want to make sure we are capturing continuous patterns. Specifically for our query, the pattern that we are looking for is 3 trip updates with increases in delay in a row (see MATCH_RECOGNIZE – PATTERN
section below). So, if there was a bus trip with 5 trip updates with strictly increasing delay, then there would be 2 results from our MATCH_RECOGNIZE
statement with our after match strategy. The first result would be for updates 1-3, and the second result for updates 3-5. The first match’s C would also be the second match’s A in this case. See other after match strategies in our documentation.
MATCH_RECOGNIZE – PATTERN (line 27)
The PATTERN clause specifies the pattern of events that should be considered a match. The PATTERN
subclause contains pattern variables, which can each be associated with a quantifier to specify how many rows of that variable to allow in the pattern (see the documentation for more details). In our query, we have a simple pattern of (A B C)
without any quantifiers, meaning that in order for a series of events to be considered a match, there needs to be 3 consecutive events with the first one matching A
’s definition, the second one matching B’s definition, and the third one matching C
’s definition.
MATCH_RECOGNIZE – DEFINE (lines 28-31)
The DEFINE subclause defines each of the pattern variables from the PATTERN
subclause. If a variable is not defined then it will evaluate to true for an event contributing to a pattern match. This clause is similar to the WHERE clause in SQL, in that it specifies conditions that an event must meet in order to be considered as one of the pattern variables. When defining these pattern variables, we can access fields from the original events of the source Relation, in our case the trip_updates
Stream, and define expressions for evaluation. Aggregation and offset functions can also be applied here. In our query, we are defining our pattern variables based on their delay. For the first event in our pattern, A
, we want to see bus trips that are already delayed by 30 seconds. B
is defined as an event that is 30 seconds more delayed than A, and similarly C
is defined as an event that is 30 seconds more delayed than B
. Combined with our PATTERN
of (A B C)
, our query is essentially finding patterns where the delay is increasing by 30 seconds with each trip update for 3 trip updates in a row.
QUERY WITH (line 33)
The last line of our query is the QUERY WITH clause. This optional clause is used to set query properties. For our query, we are setting the state.ttl.millis
which is used to inform the query when it is safe to purge its state. Another way to limit the state size for MATCH_RECOGNIZE
queries is the optional WITHIN subclause that specifies a duration for patterns. Without specifying some way for the query to purge state, the amount of information the query needs to keep in memory will grow indefinitely and the query will eventually fail.
Conclusion
Queries using the MATCH_RECOGNIZE
syntax can seem very complex at first glance. This blog post aims to bring clarity to the different parts of a MATCH_RECOGNIZE
query by using a specific, easy to follow example. Hopefully this post helps make pattern recognition queries easier to understand, as it is quite a powerful operator that can solve many use cases.
If you’re interested in trying out MATCH_RECOGNIZE
queries in DeltaStream yourself, sign up for a free trial or schedule a demo with us.