Most of us who have lived in a big city have had some experience taking public transportation. While it’s extremely satisfying when everything works out, I know I’m not alone in my frustrations when subway time estimates aren’t accurate or buses just never show up. Standing there and waiting for a bus or train can be very stressful as you begin to wonder if the train has already left, if this is going to impact your transfer to another bus, or if you’ll make it to your destination on time (especially stressful if you’re trying to catch a plane). One way that Google is playing a part in improving some of these challenges is with the General Transit Feed Specification (GTFS) and its real-time counterpart, GTFS Realtime. GTFS helps bring standardization to transit feeds by providing a framework for transportation companies to submit feeds and for developers to write applications to process these feeds.

In this blog post, we’ll be showcasing how you can use DeltaStream to process New York City’s real-time bus feed, which adopts the GTFS realtime specification, to identify buses that are becoming increasingly delayed in real time.

Setting up DeltaStream 

To set up our use case, first we need to load the bus data into Kafka:

  1. Sign up for an MTA BusTime Developer Key
  2. Have Kafka or a Kafka-compatible integration such as a Confluent or RedPanda cluster available, Confluent Cloud and RedPanda both offer free trials of their product
  3. Clone this github repository then follow the instructions to build and run a Java program which will poll the bus feed and forward the events into your Kafka cluster

DeltaStream for real-time processing

Now that we have our bus feed data in Kafka, we can use DeltaStream to process the data. If you are new to DeltaStream and don’t have an account, you can sign up for a free trial here.

Connect to and inspect source data

First, add your Kafka cluster as a Store in DeltaStream. Adding the Store defines the connectivity between DeltaStream and your storage layer, in this case a Kafka cluster. You can choose any name you want for your Store, but for this use case let’s assume the Store is called “kafka_store”. From here, we can inspect the topics by printing them. The two topics we’ll be using for our example are the “nyc_bus_trip_updates” and “nyc_bus_vehicle_positions” topics.

Print the nyc_bus_trip_updates topic:

  1. db.public/kafka_store# PRINT TOPIC nyc_bus_trip_updates;
  2. {
  3. "trip": {
  4. "tripId": "FP_D3-Weekday-SDon-103300_B13_6",
  5. "startDate": "20231030",
  6. "routeId": "Q55",
  7. "directionId": 1
  8. },
  9. "stopTimeUpdate": [{
  10. "stopSequence": 0,
  11. "arrival": {
  12. "time": "1698701100"
  13. },
  14. "departure": {
  15. "time": "1698701100"
  16. },
  17. "stopId": "504354"
  18. }, {
  19. "stopSequence": 1,
  20. "arrival": {
  21. "time": "1698701144"
  22. },
  23. "departure": {
  24. "time": "1698701144"
  25. },
  26. "stopId": "504434"
  27. }, …],
  28. "vehicle": {
  29. "id": "MTA NYCT_1234"
  30. },
  31. "timestamp": "1698699097",
  32. "delay": 385
  33. }

Print the nyc_bus_vehicle_positions topic:

  1.  
  2. db.public/kafka_store# PRINT TOPIC nyc_bus_vehicle_positions;
  3. {
  4. "trip": {
  5. "tripId": "FP_D3-Weekday-SDon-103300_B13_6",
  6. "startDate": "20231030",
  7. "routeId": "Q55",
  8. "directionId": 1
  9. },
  10. "position": {
  11. "latitude": 40.69352,
  12. "longitude": -73.990486,
  13. "bearing": 63.434948
  14. },
  15. "timestamp": "1698700533",
  16. "stopId": "504434",
  17. "vehicle": {
  18. "id": "MTA NYCT_1234"
  19. }
  20. }

We can define Streams for nyc_bus_trip_updates and for nyc_bus_vehicle_positions with the following queries.

DDL to create nyc_bus_trip_updates Stream:

  1. CREATE STREAM nyc_bus_trip_updates (
  2. trip STRUCT <
  3. "tripId" VARCHAR,
  4. "startDate" VARCHAR,
  5. "routeId" VARCHAR,
  6. "directionId" TINYINT >,
  7. "stopTimeUpdate" ARRAY <
  8. STRUCT <
  9. "stopSequence" INTEGER,
  10. "arrival" STRUCT <
  11. "time" BIGINT >,
  12. "departure" STRUCT <
  13. "time" BIGINT >,
  14. "stopId" INTEGER >>,
  15. vehicle STRUCT <
  16. id VARCHAR >,
  17. "timestamp" BIGINT,
  18. delay INTEGER
  19. ) WITH ('topic' = 'nyc_bus_trip_updates', 'value.format'='JSON');

DDL to create nyc_bus_vehicle_positions Stream:

  1. CREATE STREAM nyc_bus_vehicle_positions (
  2. trip STRUCT <
  3. "tripId" VARCHAR,
  4. "startDate" VARCHAR,
  5. "routeId" VARCHAR,
  6. "directionId" TINYINT >,
  7. "position" STRUCT <
  8. "latitude" DOUBLE,
  9. "longitude" DOUBLE,
  10. "bearing" DOUBLE>,
  11. vehicle STRUCT <
  12. id VARCHAR >,
  13. "timestamp" BIGINT,
  14. "stopId" INTEGER
  15. ) WITH ('topic' = 'nyc_bus_vehicle_positions', 'value.format'='JSON');

Notice that both feeds have a field called trip which represents a particular trip a bus is taking. We’ll be using this field to join these Streams later on.

Also, since the timestamp field is given as epoch seconds, let’s make our data easier to read by defining new Streams that convert these fields to timestamps. We can do this with the following CREATE STREAM AS SELECT (CSAS) queries:

CSAS to create trip_updates:

  1. CREATE STREAM trip_updates AS
  2. SELECT
  3. trip,
  4. "stopTimeUpdate",
  5. vehicle,
  6. CAST(FROM_UNIXTIME("timestamp") AS TIMESTAMP) AS ts,
  7. "timestamp" AS epoch_secs,
  8. delay
  9. FROM
  10. nyc_bus_trip_updates;

CSAS to create vehicle_positions:

  1. CREATE STREAM vehicle_positions AS
  2. SELECT
  3. trip,
  4. "position",
  5. vehicle,
  6. CAST(FROM_UNIXTIME("timestamp") AS TIMESTAMP) AS ts,
  7. "stopId"
  8. FROM
  9. nyc_bus_vehicle_positions;

Detecting transportation delays

You may have noticed in the nyc_bus_trip_updates topic that there is a field called delay. This field represents the number of seconds that a bus is currently delayed from its original route. Reporting this data is really helpful, as it provides transparency to transit-goers on how late they’re going to be or how long they may need to wait for the bus. However, what’s not always clear is if delays are increasing. For our use case, this is exactly what we want to detect. Once we detect which bus trips are becoming increasingly delayed, we also want to provide additional information about where the bus is and where the bus has recently been so that city officials and bus planners can see where delays may be occurring.

Processing real-time bus data

For our use case, we’ll be splitting up the processing into two queries.

In the first query, we will analyze the trip_updates Stream to find trips where delays are significantly increasing. We consider three consecutive trip updates that each grow in delay by 30 seconds to be significant, so we can write a pattern recognition query to detect trips that match our requirements. Those trips will then be written to a Stream to be used as the input for our second query.

Query 1:

  1. CREATE STREAM trips_delay_increasing AS
  2. SELECT
  3. trip,
  4. vehicle,
  5. start_delay,
  6. end_delay,
  7. start_ts,
  8. end_ts,
  9. CAST(FROM_UNIXTIME((start_epoch_secs + end_epoch_secs) / 2) AS TIMESTAMP) AS avg_ts
  10. FROM trip_updates
  11. MATCH_RECOGNIZE(
  12. PARTITION BY trip
  13. ORDER BY "ts"
  14. MEASURES
  15. C.row_timestamp AS row_timestamp,
  16. C.row_key AS row_key,
  17. C.row_metadata AS row_metadata,
  18. C.vehicle AS vehicle,
  19. A.delay AS start_delay,
  20. C.delay AS end_delay,
  21. A.ts AS start_ts,
  22. C.ts AS end_ts,
  23. A.epoch_secs AS start_epoch_secs,
  24. C.epoch_secs AS end_epoch_secs
  25. ONE ROW PER MATCH
  26. AFTER MATCH SKIP TO LAST C
  27. PATTERN (A B C)
  28. DEFINE
  29. A AS delay > 0,
  30. B AS delay > A.delay + 30,
  31. C AS delay > B.delay + 30
  32. ) AS MR WITH ('timestamp'='ts')
  33. QUERY WITH ('state.ttl.millis'='3600000');

Pattern recognition query to find bus trips that are increasing in delay

In the second query, we will join the output of our first query with the vehicle_positions Stream on the trip field. When joining two Streams, we need to specify a WITHIN interval as part of the join condition (these kinds of joins are called Interval Joins). For our query, we’ll specify the timestamp field to be avg_ts, the middle point in our increasing delay interval that we identified from the first query. We’ll also use 3 minutes for our WITHIN interval, meaning positions for a trip with a timestamp 3 minutes before and 3 minutes after avg_ts will satisfy the join condition. The resulting records of this query will represent the positions of buses that are part of delayed trips.

Query 2:

  1. CREATE STREAM delayed_trip_positions AS
  2. SELECT
  3. t.trip,
  4. t.vehicle,
  5. t.start_delay,
  6. t.end_delay,
  7. t.start_ts,
  8. t.end_ts,
  9. p."position",
  10. p.ts AS position_ts
  11. FROM
  12. trips_delay_increasing t WITH ('timestamp'='avg_ts')
  13. JOIN vehicle_positions p WITH ('timestamp'='ts')
  14. WITHIN 3 MINUTES
  15. ON t.trip = p.trip;

Interval join query to join the bus trips that are growing in delay with bus locations

By submitting these queries, DeltaStream has launched long-lived jobs that will continually read from their sources, transform the data, then write to their sinks. So, as bus data arrives in our Kafka topics, we can expect data processing to happen immediately and the results to arrive nearly instantaneously.

Inspecting real-time results

Let’s inspect the contents of these Streams to see how our queries behaved.

Data in our source trip_updates Stream:

  1.  
  2. SELECT * FROM trip_updates WITH ('starting.position'='earliest');
  3. | {"trip":{"tripId":"CS_D3-Weekday-SDon-124200_Q28_655","startDate":"20231031","routeId":"Q28","directionId":1},"stopTimeUpdate":[...],"vehicle":{"id":"MTA NYCT_8865"},"ts":"2023-11-01 01:39:01","epoch_secs":1698802741,"delay":1689}
  4. | {"trip":{"tripId":"CS_D3-Weekday-SDon-124200_Q28_655","startDate":"20231031","routeId":"Q28","directionId":1},"stopTimeUpdate":[...],"vehicle":{"id":"MTA NYCT_8865"},"ts":"2023-11-01 01:41:31","epoch_secs":1698802891,"delay":1839}
  5. | {"trip":{"tripId":"CS_D3-Weekday-SDon-124200_Q28_655","startDate":"20231031","routeId":"Q28","directionId":1},"stopTimeUpdate":[...],"vehicle":{"id":"MTA NYCT_8865"},"ts":"2023-11-01 01:45:01","epoch_secs":1698803101,"delay":2049}

Data in our source vehicle_positions Stream:

  1. SELECT * FROM vehicle_positions WITH ('starting.position'='earliest');
  2. | {"trip":{"tripId":"CS_D3-Weekday-SDon-124200_Q28_655","startDate":"20231031","routeId":"Q28","directionId":1},"position":{"latitude":40.76075,"longitude":-73.8282,"bearing":13.835851},"vehicle":{"id":"MTA NYCT_8865"},"ts":"2023-11-01 01:39:31","stopId":505121}
  3. | {"trip":{"tripId":"CS_D3-Weekday-SDon-124200_Q28_655","startDate":"20231031","routeId":"Q28","directionId":1},"position":{"latitude":40.76072,"longitude":-73.82832,"bearing":13.835851},"vehicle":{"id":"MTA NYCT_8865"},"ts":"2023-11-01 01:42:31","stopId":505121}
  4. | {"trip":{"tripId":"CS_D3-Weekday-SDon-124200_Q28_655","startDate":"20231031","routeId":"Q28","directionId":1},"position":{"latitude":40.76073,"longitude":-73.828285,"bearing":13.835851},"vehicle":{"id":"MTA NYCT_8865"},"ts":"2023-11-01 01:45:01","stopId":505121}
  5.  

The results of Query 1 in our trips_delay_increasing Stream:

  1.  
  2. SELECT * FROM trips_delay_increasing WITH ('starting.position'='earliest');
  3. | {"trip":{"tripId":"CS_D3-Weekday-SDon-124200_Q28_655","startDate":"20231031","routeId":"Q28","directionId":1},"vehicle":{"id":"MTA NYCT_8865"},"start_delay":1689,"end_delay":2049,"start_ts":"2023-11-01 01:39:01","end_ts":"2023-11-01 01:45:01","avg_ts":"2023-11-01 01:42:01"}

The results of Query 2 in our delayed_trip_positions Stream:

  1.  
  2. SELECT * FROM delayed_trip_positions WITH ('starting.position'='earliest');
  3. | {"trip":{"tripId":"CS_D3-Weekday-SDon-124200_Q28_655","startDate":"20231031","routeId":"Q28","directionId":1},"vehicle":{"id":"MTA NYCT_8865"},"start_delay":1689,"end_delay":2049,"start_ts":"2023-11-01 01:39:01","end_ts":"2023-11-01 01:45:01","position":{"latitude":40.76075,"longitude":-73.8282,"bearing":13.835851},"position_ts":"2023-11-01 01:39:31"}
  4. | {"trip":{"tripId":"CS_D3-Weekday-SDon-124200_Q28_655","startDate":"20231031","routeId":"Q28","directionId":1},"vehicle":{"id":"MTA NYCT_8865"},"start_delay":1689,"end_delay":2049,"start_ts":"2023-11-01 01:39:01","end_ts":"2023-11-01 01:45:01","position":{"latitude":40.76072,"longitude":-73.82832,"bearing":13.835851},"position_ts":"2023-11-01 01:42:31"}
  5. | {"trip":{"tripId":"CS_D3-Weekday-SDon-124200_Q28_655","startDate":"20231031","routeId":"Q28","directionId":1},"vehicle":{"id":"MTA NYCT_8865"},"start_delay":1689,"end_delay":2049,"start_ts":"2023-11-01 01:39:01","end_ts":"2023-11-01 01:45:01","position":{"latitude":40.76073,"longitude":-73.828285,"bearing":13.835851},"position_ts":"2023-11-01 01:45:01"}

In the results above, we can see the trip with a tripId of CS_D3-Weekday-SDon-124200_Q28_655 with increasing delays in a short period of time. Our first query identifies that this trip’s delay is growing, and outputs a record for this trip. Our second query ingests that record and finds the vehicle positions at the time of the delay.

By plotting the positions seen in our results for delayed_trip_positions above, we get the following map:

There must be some traffic on 39th Avenue!

In this example, we’ve highlighted two ways real-time processing can help provide a better experience to public transit users:

  1. Having real-time data about growing delays can help provide more accurate time estimates on bus arrival times
  2. Insights into locations where bus delays grow can help city planners better understand and improve traffic patterns in their cities

Conclusion

The GTFS real-time data feeds are great for building real-time transit applications. However, for any real-time computations that are complex or require stateful operations, it can be difficult. In this blog post, we showcased how you can easily build stateful real-time jobs on top of this data feed in minutes using DeltaStream. Further, as a fully managed serverless product, DeltaStream handles all of the operational overhead of running long-lived stream processing jobs.

If you want to learn more about DeltaStream or try it for yourself, you can request a demo or join our free trial.