17 Dec 2024

Min Read

Enhancing Fraud Detection with PuppyGraph and DeltaStream

The banking and finance industry has been one of the biggest beneficiaries of digital advancements. Many technological innovations find practical applications in finance, providing convenience and efficiency that can set institutions apart in a competitive market. However, this ease and accessibility have also led to increased fraud, particularly in credit card transactions, which remain a growing concern for consumers and financial institutions.

Traditional fraud detection systems rely on rule-based methods that struggle in real-time scenarios. These outdated approaches are often reactive, identifying fraud only after it occurs. Without real-time capabilities or advanced reasoning, they fail to match fraudsters’ rapidly evolving tactics. A more proactive and sophisticated solution is essential to combat this threat effectively.

This is where graph analytics and real-time stream processing come into play. Combining PuppyGraph, the first and only graph query engine, with DeltaStream, a stream processing engine powered by Apache Flink, enables institutions to improve fraud detection accuracy and efficiency, including real-time capabilities. In this blog post, we’ll explore the challenges of modern fraud detection and the advantages of using graph analytics and real-time processing. We will also provide a step-by-step guide to building a fraud detection system with PuppyGraph and DeltaStream. 

Let’s start by examining the challenges of modern fraud detection.

Common Fraud Detection Challenges

Credit card fraud has always been a game of cat and mouse. Even before the rise of digital processing and online transactions, fraudsters found ways to exploit vulnerabilities. With the widespread adoption of technology, fraud has only intensified, creating a constantly evolving fraud landscape that is increasingly difficult to navigate. Key challenges in modern fraud detection include:

  • Volume: Daily credit card transactions are too vast to review and identify suspicious activity manually. Automation is critical to sorting through all that data and identifying anomalies.
  • Complexities: Fraudulent activity often involves complex patterns and relationships that traditional rule-based systems can’t detect. For example, fraudsters may use stolen credit card information to make a series of small transactions before a large one or use multiple cards in different locations in a short period.
  • Real-time: The sooner fraud is detected, the less financial loss there will be. Real-time analysis is crucial in detecting and preventing transactions as they happen, especially when fraud can be committed at scale in seconds.
  • Agility: Fraudsters will adapt to new security measures. Fraud detection systems must be agile, even learning as they go, to keep up with the evolving threats and tactics.
  • False positives: While catching fraudulent transactions is essential, it’s equally important to avoid flagging legitimate transactions as fraud. False positives can frustrate customers, especially when a card is automatically locked out due to legitimate purchases. As a consequence, they can adversely affect revenue.

To tackle these challenges, businesses require a solution that processes large volumes of data in real-time, identifies complex patterns, and evolves with new fraud tactics. Graph analytics and real-time stream processing are essential components of such a system. By mapping and analyzing transaction networks, businesses can more effectively detect anomalies in customer behavior and identify potentially fraudulent transactions.

Leveraging Graph Analytics for Fraud Detection

Traditional fraud detection methods analyze individual transactions in isolation. This can miss connections and patterns that emerge when we examine the bigger picture. Graph analytics allows us to visualize and analyze transactions as a network of connected things.

Think of it like a social network. Each customer, credit card, merchant, and device becomes a node in the graph, and each transaction connects those nodes. We can find hidden patterns and anomalies that indicate fraud by looking at the relationships between nodes.

Figure: an example schema for fraud detection use case

Here’s how graph analytics can be applied to fraud detection:

  • Finding suspicious connections: Graph algorithms can discover unusual patterns of connections between entities. For example, if the same person uses multiple credit cards in different locations in a short period or a single card is used to buy from a group of merchants known for fraud, those connections will appear in the graph and be flagged as suspicious.
  • Uncovering fraud rings: Fraudsters often work within the same circles, using multiple identities and accounts to carry out scams. Graph analytics can find those complex networks of people and their connections, helping to identify and potentially break up entire fraud rings.
  • Surfacing identity theft: When a stolen credit card is used, the spending patterns will generally be quite different from the cardholder’s normal behavior. By looking at the historical and current transactions within a graph, you can see sudden changes in spending habits, locations, and types of purchases that may indicate identity theft.
  • Predicting future fraud: Graph analytics can predict future fraud by looking at historical data and the patterns that precede a fraudulent transaction. By predicting fraud before it happens, businesses can take action to prevent it.

Of course, all of these benefits are extremely helpful. However, the biggest hurdle to realizing them is the complexity of implementing a graph database. Let’s look at some of those challenges and how PuppyGraph can help users avoid them entirely.

Challenges of Implementing and Running Graph Databases

As shown, graph databases can be an excellent tool for fraud detection. So why aren’t they used more frequently? This usually boils down to implementing and managing them, which can be complex for those unfamiliar with the technology. The hurdles that come with implementing a graph database can far outweigh the benefits for some businesses, even stopping them from adopting this technology altogether. Here are some of the issues generally faced by companies implementing graph databases:

  • Cost: Traditional relational databases have been the norm for decades, and many organizations have invested heavily in their infrastructure. Switching to a graph database or even running a proof of concept requires a significant upfront investment in new software, hardware, and training. 
  • Implementing ETL: Extracting, transforming, and loading (ETL) data into a graph database can be tricky and time-consuming. Data needs to be restructured to fit into a graph model, which requires knowledge of the underlying data to be moved over and how to represent these entities and relationships within a graph model. This requires specific skills and adds to the implementation time and cost, meaning the benefits may be delayed.
  • Bridging the skills gap: Graph databases require a different data modeling and querying approach from traditional databases. In addition to the previous point regarding ETL, finding people with the skills to manage, maintain, and query the data within a graph database can also be challenging. Without these skills, graph technology adoption is mostly dead in the water.
  • Integration challenges: Integrating a graph database with existing systems and applications is complex. This usually involves taking the output from graph queries and mapping them into downstream systems, which requires careful planning and execution. Getting data to flow smoothly and be compatible with different systems is significant.

These challenges highlight the need for solutions that make graph database adoption and management more accessible. A graph query engine like PuppyGraph addresses these issues by enabling teams to integrate their data and query it as a graph in minutes without the complexity of ETL processes or the need to set up a traditional graph database. Let’s look at how PuppyGraph helps teams become graph-enabled without ETL or the need for a graph database.

How PuppyGraph Solves Graph Database Challenges

PuppyGraph is built to tackle the challenges that often hinder graph database adoption. By rethinking graph analytics, PuppyGraph removes many entry barriers, opening up graph capabilities to more teams than otherwise possible. Here’s how PuppyGraph addresses many of the hurdles mentioned above:

  • Zero-ETL: One of PuppyGraph’s most significant advantages is connecting directly to your existing data warehouses and data lakes—no more complex and time-consuming ETL. There is no need to restructure data or create separate graph databases. Simply connect the graph query engine directly to your SQL data store and start querying your data as a graph in minutes.
  • Cost: PuppyGraph reduces the expenses of graph analytics by using your existing data infrastructure. There is no need to invest in new database infrastructure or software and no ongoing maintenance costs of traditional graph databases. Eliminating the ETL process significantly reduces the engineering effort required to build and maintain fragile data pipelines, saving time and resources.
  • Reduced learning curve: Traditional graph databases often require users to master complex graph query languages for every operation, including basic data manipulation. PuppyGraph simplifies this by functioning as a graph query engine that operates alongside your existing SQL query engine using the same data. You can continue using familiar SQL tools for data preparation, aggregation, and management. When more complex queries suited to graph analytics arise, PuppyGraph handles them seamlessly. This approach saves time and allows teams to reserve graph query languages specifically for graph traversal tasks, reducing the learning curve and broadening access to graph analytics.
  • Multi-query language support: Engineers can continue to use their existing SQL skills and platform, allowing them to leverage graph querying when needed. The platform offers many ways to build graph queries, including Gremlin and Cypher support, so your existing team can quickly adopt and use graph technology.
  • Effortless scaling: PuppyGraph’s architecture separates compute and storage so it can easily handle petabytes of data. By leveraging their underlying SQL storage, teams can effortlessly scale their compute as required. You can focus on extracting value from your data without scaling headaches.
  • Fast deployment: With PuppyGraph, you can deploy and start querying your data as a graph in 10 minutes. There are no long setup processes or complex configurations. Fast deployment means you can start seeing the benefits of graph analytics and speed up your fraud detection.

In short, PuppyGraph removes the traditional barriers to graph adoption so more institutions can use graph analytics for fraud detection use cases. By simplifying, reducing costs, and empowering existing teams with effortless graph adoption, PuppyGraph makes graph technology accessible for all teams and organizations.

Real-Time Fraud Prevention with DeltaStream

Speed is key in the fight against fraud, and responsiveness is crucial to preventing or minimizing the impact of an attack. Systems and processes that act on events with minimal latency can mean the difference between successful and unsuccessful cyber attacks. DeltaStream empowers businesses to analyze and respond to suspicious transactions in real-time, minimizing losses and preventing further damage.

Why Real-Time Matters:

  • Immediate Response: Rapid incident response means security and data teams can detect, isolate, and trigger mitigation protocols, minimizing their vulnerability window faster than ever. With real-time data and sub-second latency, the Mean Time to Detect (MTTD) and Mean Time to Respond (MTTR) can be significantly reduced.
  • Proactive Prevention: Data and security teams can identify behavior patterns as they emerge and implement mitigation tactics. Real-time allows for continuous monitoring of system health and security with predictive models. 
  • Improved Accuracy: Real-time data provides a more accurate view of customer behavior for precise detection. Threats are more complex than ever and often involve multi-stage attack patterns; streaming data aids in identifying these complex and ever-evolving threat tactics.

DeltaStream’s Key Features:

  • Speed: Increase the speed of your data processing and your team’s ability to create data applications. Reduce latency and cost by shifting your data transformations out of your warehouse and into DeltaStream. Data teams can also quickly write queries in SQL to create analytics pipelines with no other complex languages to learn.
  • Team Focus: Eliminate maintenance tasks with our continually optimizing Flink operator. Your team isn’t focused on infrastructure, meaning they can focus on building and strengthening pipelines.
  • Unified View: An organization’s data rarely comes from just one source. Process streaming data from multiple sources in real-time to get a complete picture of activities. This means transaction data, user behavior, and other relevant signals can be analyzed together as they occur.

By combining PuppyGraph’s graph analytics with DeltaStream’s real-time processing, businesses can create a dynamic fraud detection system that stays ahead of evolving threats.

Step-by-Step tutorial: DeltaStream and PuppyGraph

In this tutorial, we go through the high-level steps of integrating DeltaStream and PuppyGraph. 

The detailed steps are available at:

Starting a Kafka Cluster

We start a Kafka Server as the data input. (Later in the tutorial, we’ll send financial data through Kafka.)

We create topics for financial data like this:

  1. bin/kafka-topics.sh --create --topic kafka-Account --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

Setting up DeltaStream

Connecting to Kafka

Log in to the Deltastream console. Then, navigate to Resources and add a Kafka Store – for example, kafka_demo – with the Kafka Cluster parameters we created in the previous step.

Next, in the Workspace, create a deltastream database – for example: kafka_db
After that, we use DeltaStream SQL to create streams for the Kafka topics we created in the previous step. The stream describes the topic’s physical layout so it can be easily referenced with SQL. Here is an example of one of the streams we create in DeltaStream for a Kafka topic. Once we declare the streams, we can build streaming data pipelines to transform, enrich, aggregate, and prepare streaming data for analysis in PuppyGraph. First, we’ll define the account_stream from the kafka-Account topic.

  1. CREATE STREAM account_stream (
  2. "label" STRING,
  3. "accountId" BIGINT,
  4. "createTime" STRING,
  5. "isBlocked" BOOLEAN,
  6. "accoutType" STRING,
  7. "nickname" STRING,
  8. "phonenum" STRING,
  9. "email" STRING,
  10. "freqLoginType" STRING,
  11. "lastLoginTime" STRING,
  12. "accountLevel" STRING
  13. ) WITH (
  14. 'topic' = 'kafka-Account',
  15. 'value.format' = 'JSON'
  16. );

Next, we’ll define the accountrepayloan_stream from the kafka-AccountRepayLoan topic:

  1. CREATE STREAM accountrepayloan_stream (
  2. "label" STRING,
  3. "accountrepayloandid" BIGINT,
  4. "loanId" BIGINT,
  5. "amount" DOUBLE,
  6. "createTime" STRING
  7. ) WITH (
  8. 'topic' = 'kafka-AccountRepayLoan',
  9. 'value.format' = 'JSON'
  10. );

And finally, we’ll show the accounttransferaccount_stream from the kafka-AccountTransferAccount. You’ll note there is both a fromid and toid that will like to the loanId. This allows us to enrich data in the account payment stream with account information from the account_stream and combine it with the account transfer stream. 

With DeltaStream, this can then easily be written out as a more succinct and enriched stream of data to our destination, such as Snowflake or Databricks. We combine data from three streams with just the information we want, preparing the data in real-time from multiple streaming sources, which we then graph using PuppyGraph.

  1. CREATE STREAM accounttransferaccount_stream (
  2. "label" VARCHAR,
  3. "accounttransferaccountid", BIGINT,
  4. "fromd" BIGINT,
  5. "toid" BIGINT,
  6. "amount" DOUBLE,
  7. "createTime" STRING,
  8. "ordernum" BIGINT,
  9. "comment" VARCHAR,
  10. "paytype" VARCHAR,
  11. "goodstype" VARCHAR
  12. ) WITH (
  13. 'topic' = 'kafka-AccountTransferAccount',
  14. 'value.format' = 'JSON'
  15. );

Adding a Store for Integration

PuppyGraph will connect to the stores and allow querying as a graph.

Once our data is ready in the desired format, we can write streaming SQL queries in DeltaStream to write data continuously in the desired storage. In this case, we can use DeltaStream’s native integration with Snowflake or Databricks, where we will use PoppyGraph. Here is an example of writing data continuously into a table in Snowflake or Databricks from DeltaStream:

  1. CREATE TABLE ds_account
  2. WITH
  3. (
  4. 'store' = '<store_name>'
  5. <Storage parameters>
  6. ) AS
  7. SELECT * FROM account_stream;

Starting data processing

Now, you can start a Kafka Producer to send the financial JSON data to Kafka. For example, to send account data, run:

  1. kafka-console-producer.sh --broker-list localhost:9092 --topic kafka-Account < json_data/Account.json

DeltaStream will process the data, and then we will query it as a graph.

Query your data as a graph

You can start PuppyGraph using Docker. Then upload the Graph schema, and that’s it! You can now query the financial data as a graph as DeltaStream processes it.

Start PuppyGraph using the following command:

  1. docker run -p 8081:8081 -p 8182:8182 -p 7687:7687 \
  2. -e DATAACCESS_DATA_CACHE_STRATEGY=adaptive \
  3. -e <STORAGE PARAMETERS> \
  4. --name puppy --rm -itd puppygraph/puppygraph:stable

Log into the PuppyGraph Web UI at http://localhost:8081 with the following credentials:

Username: puppygraph

Password: puppygraph123

Upload the schema:Select the file schema_<storage>.json in the Upload Graph Schema JSON section and click Upload.

Navigate to the Query panel on the left side. The Gremlin Query tab offers an interactive environment for querying the graph using Gremlin. For example, to query the accounts owned by a specific company and the transaction records of these accounts, you can run:

  1. g.V("Company[237]")
  2. .outE('CompanyOwnAccount').inV()
  3. .outE('AccountTransferAccount').inV()
  4. .path()

Conclusion

As this blog post explores, traditional fraud detection methods simply can’t keep pace with today’s sophisticated criminals. Real-time analysis and the ability to identify complex patterns are critical. By combining the power of graph analytics with real-time stream processing, businesses can gain a significant advantage against fraudsters.

PuppyGraph and DeltaStream offer robust and accessible solutions for building real-time dynamic fraud detection systems. We’ve seen how PuppyGraph unlocks hidden relationships and how DeltaStream analyzes real-time data to quickly and accurately identify and prevent fraudulent activity. Ready to take control and build a future-proof, graph-enabled fraud detection system? Try PuppyGraph and DeltaStream today. Visit PuppyGraph and DeltaStream to get started!

06 Jun 2024

Min Read

Real-time Airline Data Pipeline for on time Flight Status with SQL

Air travel is one of the most popular modes of transportation, but it doesn’t come without the risk of flight delays from weather, technical malfunctions, or other reasons. These delays, while most of the time is out of the airline’s control, can be frustrating for passengers who only wish for stress-free travel. Accurate and timely communication of flight delays, gate changes, or other status updates are essential for maintaining customer satisfaction. To achieve this, airlines need a robust data platform that can handle flight status updates in real-time.

In this post, we will show off how DeltaStream can be used to easily set up streaming pipelines that process real-time airline data. As a fully managed service powered by Apache Flink, users are given the capabilities of Flink without needing to deal with the complexities of running and scaling Flink jobs. In fact, Flink is just an implementation detail, as users can simply write SQL queries to set up their stream processing pipelines. For our example, we will transform flight status to create an always up-to-date view of current flight statuses.

Raw Source Data for Airline Flight Status

For our example, we have 2 raw source streams:

  • flights: flight information, including scheduled departure and arrival times.
  • flight_updates: updates to flight status, including a new departure time.

In DeltaStream, we can create Relations to represent these data:

  1. CREATE CHANGELOG flights (
  2. flight_id VARCHAR,
  3. event_ts TIMESTAMP,
  4. origin VARCHAR,
  5. destination VARCHAR,
  6. scheduled_dep TIMESTAMP,
  7. scheduled_arr TIMESTAMP,
  8. PRIMARY KEY (flight_id)
  9. ) WITH (
  10. 'topic' = 'flights',
  11. 'value.format' = 'json',
  12. 'timestamp' = 'event_ts'
  13. );

  1. CREATE STREAM flight_updates (
  2. flight_id VARCHAR,
  3. event_ts TIMESTAMP,
  4. updated_departure TIMESTAMP,
  5. "description" VARCHAR
  6. ) WITH (
  7. 'topic' = 'flight_updates',
  8. 'value.format' = 'json',
  9. 'timestamp' = 'event_ts'
  10. );

Use Case: Create an Always Up-to-Date View of Current Flight Status

In order to get the latest complete flight status information, we need to enrich our flight_updates Stream by joining it with our flights Changelog. The result of this join will be a stream of flight updates which include the latest departure and arrival times. We can achieve this with the following query:

  1. CREATE STREAM enriched_flight_updates AS
  2. SELECT
  3. u.flight_id,
  4. u.event_ts,
  5. f.origin,
  6. f.destination,
  7. (DS_TOEPOCH(u.updated_departure) - DS_TOEPOCH(f.scheduled_dep)) / 60000 AS mins_delayed,
  8. u.updated_departure AS current_departure,
  9. CAST(TO_TIMESTAMP_LTZ(
  10. (
  11. DS_TOEPOCH(f.scheduled_arr) +
  12. (DS_TOEPOCH(u.updated_departure) - DS_TOEPOCH(f.scheduled_dep))
  13. ), 3) AS TIMESTAMP
  14. ) AS current_arrival,
  15. u."description"
  16. FROM
  17. flight_updates u
  18. JOIN flights f ON u.flight_id = f.flight_id;

We want to eventually materialize these update statuses into a view, using flight_id as the primary key. So, we can define a Changelog that is backed by the same data as the enriched_flight_updates Stream. Notice in the WITH clause of the following statement that the topic parameter is set to enriched_flight_updates.

  1. CREATE CHANGELOG enriched_flight_updates_log (
  2. flight_id VARCHAR,
  3. event_ts TIMESTAMP,
  4. origin VARCHAR,
  5. destination VARCHAR,
  6. mins_delayed BIGINT,
  7. current_departure TIMESTAMP,
  8. current_arrival TIMESTAMP,
  9. "description" VARCHAR,
  10. PRIMARY KEY (flight_id)
  11. ) WITH (
  12. 'topic' = 'enriched_flight_updates',
  13. 'value.format' = 'json',
  14. 'timestamp' = 'event_ts'
  15. );

Now, the enriched_flight_updates_log Changelog will contain all flight updates with the flight information that the original flight_updates Stream was missing. However, only updates are currently included and there are no events for flights that don’t have delays or updates in this Stream. To fix this, we can write an INSERT INTO query to generate updates from the flights. This will ensure that our enriched_flight_updates_log Changelog will capture all the statuses of all flights.

  1. INSERT INTO enriched_flight_updates_log
  2. SELECT
  3. flight_id,
  4. event_ts,
  5. origin,
  6. destination,
  7. CAST(0 AS BIGINT) AS mins_delayed,
  8. scheduled_dep AS current_departure,
  9. scheduled_arr AS current_arrival,
  10. CAST(NULL AS VARCHAR) AS "description"
  11. FROM
  12. flights;

Finally, we can materialize our enriched_flight_updates_log into a materialized view that users can query at any time and get the most up-to-date information. Since our enriched_flight_updates_log Changelog has the primary key of flight_id, the view will be updated with UPSERT mode on the flight_id key. If we had instead created our materialized view from the enriched_flight_updates Stream, then the view would be created in append mode where each event is a new row in the view. Using upsert mode, we can update the existing row, if any, based on the Changelog’s primary key.

  1. CREATE MATERIALIZED VIEW flight_status_view AS
  2. SELECT
  3. *
  4. FROM
  5. enriched_flight_updates_log;

After creating our materialized view, we can query it at any moment and get the latest flight statuses. The view can be queried directly from the DeltaStream console or CLI, as well as through the DeltaStream REST API for other applications to access programmatically. Let’s look at some sample input and output data below.

Input for flights:

  1. {"flight_id": "Flight_1", "event_ts": "2024-03-28 10:12:13.489", "origin": "LAX", "destination": "YVR", "scheduled_dep": "2024-05-29 10:33:00", "scheduled_arr": "2024-05-29 13:37:00"}
  2. {"flight_id": "Flight_2", "event_ts": "2024-04-11 11:58:56.489", "origin": "JFK", "destination": "SFO", "scheduled_dep": "2024-05-29 12:30:00", "scheduled_arr": "2024-05-29 19:10:00"}
  3. {"flight_id": "Flight_3", "event_ts": "2024-04-23 10:12:13.489", "origin": "SIN", "destination": "NRT", "scheduled_dep": "2024-05-30 09:25:00", "scheduled_arr": "2024-05-30 17:30:00"}
  4. {"flight_id": "Flight_4", "event_ts": "2024-05-30 15:52:13.837", "origin": "AUS", "destination": "ORD", "scheduled_dep": "2024-07-20 09:15:00", "scheduled_arr": "2024-07-20 11:30:00"}

Input for flight_updates:

  1. {"flight_id": "Flight_1", "event_ts": "2024-05-28 15:52:13.837", "updated_departure": "2024-05-29 12:30:00", "description": "Thunderstorms" }
  2. {"flight_id": "Flight_2", "event_ts": "2024-05-29 12:30:13.837", "updated_departure": "2024-05-29 13:30:00", "description": "Waiting for connecting passengers" }
  3. {"flight_id": "Flight_1", "event_ts": "2024-05-29 12:52:13.837", "updated_departure": "2024-05-29 13:30:00", "description": "More Thunderstorms" }
  4. {"flight_id": "Flight_3", "event_ts": "2024-05-30 06:52:13.837", "updated_departure": "2024-05-30 12:30:00", "description": "Mechanical delays" }

Query flight_status_view:

  1. SELECT * FROM flight_status_view ORDER BY flight_id;

Output results:

  1. flight_id | event_ts | origin | destination | mins_delayed | current_departure | current_arrival | description
  2. ------------+--------------------------+--------+-------------+--------------+----------------------+----------------------+---------------------------------
  3. Flight_1 | 2024-05-29T12:52:13.837Z | LAX | YVR | 177 | 2024-05-29T13:30:00Z | 2024-05-29T16:34:00Z | More Thunderstorms
  4. Flight_2 | 2024-05-29T12:30:13.837Z | JFK | SFO | 60 | 2024-05-29T13:30:00Z | 2024-05-29T20:10:00Z | Waiting FOR connecting
  5. | | | | | | | passengers
  6. Flight_3 | 2024-05-30T06:52:13.837Z | SIN | NRT | 185 | 2024-05-30T12:30:00Z | 2024-05-30T20:35:00Z | Mechanical delays
  7. Flight_4 | 2024-05-30T15:52:13.837Z | AUS | ORD | 0 | 2024-07-20T09:15:00Z | 2024-07-20T11:30:00Z | <nil>

Conclusion

When it comes to air travel, travelers have an expectation that airlines will communicate flight delays and flight status effectively. In this post, we demonstrated a simple use case of how airlines can set up real-time data pipelines to process and join flight and flight updates data, with just a few simple SQL queries using DeltaStream. As a fully managed and serverless service, DeltaStream enables its users to easily create powerful real-time applications without any of the overhead.

If you want to learn more about DeltaStream, sign up for our free trial or reach out to us.

27 Mar 2024

Min Read

Maximizing Performance: Processing Real-time Online Gaming Data

The gaming industry has seen immense growth in the past decade. According to an analysis by EY, the global market for gaming in 2021 was $193.4b, up from $93.6b in 2016, and the market is only expected to continue growing with an estimated market of $210.7b in 2025. With billions of people playing video games, gaming companies need to ensure that their data platforms can handle the demands and requirements of this massive amount of data. Online gaming, which makes up a large portion of the gaming market, often has to handle and act on millions of real-time events per second. Consider all the real-time player interactions, chatrooms, leaderboards, and telemetry data that are part of modern online games. For this reason, game developers need a real-time streaming and stream processing platform that can seamlessly scale, process, and govern these data. This is where DeltaStream comes in – to manage and process all of the streaming data in your organization. In this blog post, we’ll cover how DeltaStream can help game developers for two use cases:

  • Keeping leaderboards up to date
  • Temporarily ban players for leaving games early

Connecting a Streaming Store

Although DeltaStream can source real-time data from many different data stores such as Kafka, Kinesis, PostgreSQL (as CDC data), and others, for our use cases we’ll be using RedPanda. As mentioned in an article on gaming from RedPanda, RedPanda is a cost-effective, easily scalable, and very performant alternative to using Kafka. These attributes make it a great streaming storage option for real-time gaming data.

Since RedPanda is compatible with Kafka’s APIs, users can add RedPanda as a Store in DeltaStream with the following statement:

Keeping Leaderboards Up to Date with SQL and Materialized Views

Let’s assume we have a topic called “game_results” in our RedPanda Store. We can think of the events in this topic to be the results of playing some game. So, every time a player finishes a game, a new record is logged into the topic which includes the timestamp of the end of the game, the player ID, and whether or not they won the game. This topic contains records that look like the following:

We can define a DeltaStream Stream that is backed by this topic with the following query:

Next, let’s create a Materialized View to keep track for each player how many games they have completed and how many games they have won:

By creating this Materialized View, DeltaStream launches a Flink job behind the scenes which continuously ingests from the “game_results” topic and updates the view with the latest data. In the world of online gaming, thousands of games could be finished every minute, and as these games are completed, the view will stay up to date with the latest high scores.

Next, a leaderboard can be generated by querying the Materialized View. For example, the following query will return the 10 players with the most wins:

If we wanted to find out which 10 players have the highest win/loss ratio, then we can run the following query:

Temporarily Ban Players for Leaving Games Early with Applications

Although most people enjoy online gaming, sometimes our competitive nature can bring out the worst in us. It’s not uncommon for players to leave games early, phenomenon commonly known as “rage quitting.” For team-based competitive online games however, rage quitters can be detrimental to maintaining a fun and balanced competitive game, as the teammates of the rage quitter have to deal with the consequences of being a man down. To deal with this, gaming developers often add a timeout to players who continuously quit games early to dissuade this behavior.

For this use case, we want to detect when a player has quit 2 of their last 4 games. Let’s assume that there is a topic called “player_game_actions” in our RedPanda Store. Below is an example of a record in this topic:

The action field here describes the interaction between the player and the game lobby. Possible values include JOIN, QUIT, COMPLETE. We can define a Stream backed by this topic:

Now, let’s process this Stream of data to find out which players have left 2 of their last 4 games. While we can solve this problem writing a query with very nested subqueries, we’ll use DeltaStream’s latest Application feature for this example:

Now, let’s process this Stream of data to find out which players have left 2 of their last 4 games. While we can solve this problem writing a query with very nested subqueries, we’ll use DeltaStream’s latest Application feature for this example:

Use DeltaStream to Unlock Real-time Gaming Analytics

Real-time interactions are one of the core components of online gaming, and as the industry continues to grow, it becomes increasingly necessary for these gaming companies to find cost-effective, scalable, and low-latency data solutions. As a fully-managed solution powered by Apache Flink, DeltaStream is an easy-to-use, scalable, and resilient system for all real-time stream processing workloads.

In the examples in this post, we built solutions using DeltaStream to process real-time gaming data, ingesting from storage systems such as RedPanda or Confluent Cloud. In the first use case, we used Materialized Views to build a real-time user-driven solution that keeps track of each player’s wins and losses. In the second use case, we built a real-time event-driven solution to detect when a player is being unsportsmanlike, so that downstream backend services can decide how to act on these players with minimal latency. As a system that can do Streaming Analytics and act as a Streaming Database, DeltaStream is a system built for all stream processing workloads. If you want to learn more about how DeltaStream can help unlock real-time insights for your gaming data, reach out to us or get a trial.

28 Feb 2024

Min Read

Up-to-date Data Pipelines for Rideshare App Driver Ratings

Markets like e-commerce, the gig economy, and local businesses all rely on ratings to give an accurate representation of how good a product or service is. Ratings are not only useful for consumers, but they’re also useful for the companies managing these goods and services. While the use of ratings don’t typically require low-latencies, there are still some cases where having an up-to-date rating can unlock opportunities. For example, in a rideshare app, a company may want to block a driver whose rating has fallen below a certain threshold from taking more trips. This is to prevent riders from taking rides from unsafe or unpleasant drivers. Without a rating that is up to date, the systems that validate if a driver is eligible to drive more passengers are working with outdated data and cannot accurately determine if a driver should be allowed to drive or not.

In this post, we’ll showcase how DeltaStream can be used to keep ratings up to date to solve this rideshare use case.

Connect a Store and Create a Stream

Let’s assume we have a Kafka cluster with a topic called “driver_ratings” which contains the data that we’ll be sourcing from, and we’ve already defined a DeltaStream Store for this Kafka cluster (see tutorial for how to create a Store in DeltaStream) called “kafka_cluster.” We can create a Stream to represent this data with the following statement in DeltaStream:

  1. CREATE STREAM driver_ratings (
  2. event_ts TIMESTAMP,
  3. driver_id VARCHAR,
  4. rider_id VARCHAR,
  5. rating DOUBLE
  6. ) WITH (
  7. 'store' = 'kafka_store',
  8. 'topic' = 'driver_ratings',
  9. 'value.format' = 'json',
  10. 'timestamp' = 'event_ts',
  11. 'timestamp.format' = 'iso8601'
  12. );

Real-Time Driver Ratings

For our use case, we want to keep an up-to-date rating for drivers, as well as an up-to-date count of how many reviews a driver has. These values will help downstream applications determine if a driver should be suspended. If a driver has a certain number of rides and their rating is below some threshold, then they should be suspended.

Notice in the schema of the data, from the Stream definition in the setup, that there is a driver_id and rating field. The driver_id field specifies which driver is being rated, and the rating field specifies the rating that the driver has received for a trip. To determine the driver’s rating, we need to keep an up-to-date average of all of a driver’s rides for each driver. We can do this in SQL by grouping by the driver_id field, then using the AVG function to calculate the average. Similarly, for finding the number of reviews, we can use the COUNT function. These results will be persisted to a materialized view so that data consumers can easily query the view to find the latest driver ratings.

CREATE MATERIALIZED VIEW AS SELECT Query:

  1. CREATE MATERIALIZED VIEW avg_driver_ratings AS
  2. SELECT
  3. driver_id,
  4. AVG(rating) AS avg_driver_rating,
  5. COUNT(*) AS num_reviews
  6. FROM
  7. driver_ratings
  8. GROUP BY
  9. driver_id;

Since the above query performs an aggregation grouping by the driver_id field, the result has a primary key of driver_id. This creates a materialized view in UPSERT mode, such that there is always one row per driver_id that reflects that driver’s current ratings.

By submitting the query, DeltaStream launches a long-lived continuous job in the background which constantly reads from the driver_ratings topic, computes the latest averages and counts, then updates the materialized view. This way, as new ratings arrive in the source topic, the materialized view is updated immediately.

Users can use DeltaStream’s Web AppCLI, or REST API to query the materialized view. Using one of these methods, downstream data consumers, such as the team responsible for driver suspensions in the rideshare company, can query the materialized view for the latest results. For example, we can query the materialized view for driver IDs with a rating below 4 and at least 15 rides.

Query 1 against avg_driver_ratings materialized view:

  1. SELECT
  2. *
  3. FROM
  4. avg_driver_ratings
  5. WHERE
  6. avg_driver_rating < 4
  7. AND num_reviews >= 15;

  1. driver_id | avg_driver_rating | num_reviews
  2. ------------+--------------------+--------------
  3. Driver_5 | 3.75 | 16
  4. Driver_1 | 3.8823529411764706 | 17

Let’s also run a query to select all of our rows in our materialized view to see what the full result set looks like.

Query 2 against avg_driver_ratings materialized view:

  1. SELECT * FROM avg_driver_ratings ORDER BY avg_driver_rating;

  1. driver_id | avg_driver_rating | num_reviews
  2. ------------+--------------------+--------------
  3. Driver_6 | 3.5714285714285716 | 14
  4. Driver_5 | 3.75 | 16
  5. Driver_1 | 3.8823529411764706 | 17
  6. Driver_3 | 4.111111111111111 | 18
  7. Driver_7 | 4.166666666666667 | 18
  8. Driver_2 | 4.166666666666667 | 18
  9. Driver_9 | 4.208333333333333 | 24
  10. Driver_4 | 4.222222222222222 | 9
  11. Driver_8 | 4.25 | 16

Wrapping Up

In this post, we showcased how DeltaStream can keep average ratings up to date in real time. While calculating average reviews is typically a job done with batch processing, recreating this use case with stream processing creates opportunities for features and data products to be built on the new real-time assumptions. Although we focused on ridesharing in this example, ratings are used in plenty of different contexts and the same pipeline can be used to keep those ratings up to date in DeltaStream.

DeltaStream is the platform to unify, process, and govern all of your streaming data. If you want to learn more about DeltaStream’s materialized views or other capabilities, come schedule a demo with us or sign up for a free trial.

21 Feb 2024

Min Read

Stream Processing for Blockchain Data

Cryptocurrencies, smart contracts, NFTs, and Web3 have infiltrated mainstream media as the newest hot tech (other than generative AI of course). These technologies are backed by blockchains, which are distributed ledgers that rely on cryptography and decentralization to make secure transactions. In this blog post, we’ll be building a stream processing application using DeltaStream to inspect the gas fees associated with Ethereum transactions.

Ethereum is one of the most popular cryptocurrencies. Using Ethereum, users can set up wallets, transfer ether between accounts, interact with smart contracts, and more. With over a million transactions occurring on the Ethereum network every day (the latest Ethereum usage statistics), there is a lot of  real-time data getting processed by the network. However, off-chain analytics can also play a role – to extract insights from the blockchain or the metadata associated with the blockchain.

For this use case, we are going to be doing real-time analysis from the transactions data persisted to Ethereum’s blockchain. Any time an Ethereum user wants to perform an action in the network, whether it’s running a smart contract or simply transferring ether from their wallet to another wallet, a transaction needs to be created. The user will send this transaction to what are called “validators” who will persist this transaction to the blockchain. Once a transaction is part of a block on the blockchain, that transaction is completed as blocks are generally irreversible. However, each block on the blockchain has a gas limit, which essentially caps how many transactions can be in a particular block. Each transaction requires a certain amount of gas – a simple transfer of ether from one wallet to another costs 21,000 gas for example. Running complex smart contracts will require more gas and will fill up a block’s gas limit more quickly. This means that not every transaction automatically gets persisted to the blockchain as validators pick which set of transactions they want to include in the next block (read more about gas fees in Ethereum).

After the Ethereum Improvement Proposal (EIP) 1559 upgrade was added to Ethereum, the gas fee structure changed to include priority fees. In order to make your own transaction more attractive for validators to pick, a priority fee can be attached to the transaction. This priority fee is a tip to the validator if they write your transaction to a block. So, the larger the priority fee, the more likely a validator will include the transaction. The question we want to help solve in this post is what should we set the priority fee to be?

Using real Ethereum transactions data from Infura.io, we want to look at what transactions are being persisted to Ethereum’s blockchain in real-time and get a sense of how big the priority fees are for these transactions. So, in the following sections you’ll see how we can create an application to analyze gas fees in real time using DeltaStream. As an Ethereum user, these insights will be valuable for setting reasonable priority fees on transactions without overpaying.

Setup and Assumptions for Analyzing Real-Time Data

Using Infura.io’s APIs, we are able to get Ethereum’s entire blockchain block data. We wrote a small program to wait for new blocks, get the transactions in each block, and write these transactions as JSON-formatted payloads to Kafka. We’ll be using these transactions as the source data for our use case. However, there are some assumptions that we’ll be making to simplify our use case. These assumptions are listed out below for completeness:

  • We are ignoring legacy transactions that are not type 2 (read more about Ethereum’s transaction types). Type 2 transactions are those that adhere to the post EIP-1559 specifications and use priority fees. Since the EIP-1559 upgrade is backwards compatible, users can still send transactions using the old specifications, but we are ignoring these transactions to simplify our use case.
  • For our transactions data, we are enriching each transaction payload with the respective block’s timestamp and the transaction’s hash. These fields are not part of the transaction message itself.
  • Users set the maxPriorityFeePerGas and the maxFeePerGas when making a transaction. While it’s not always the case, for simplicity we will assume that the (maxPriorityFeePerGas + baseFeePerGas) > maxFeePerGas.
  • Occasionally, transactions with maxPriorityFeePerGas set to 0 or very low values will make it onto the blockchain. The reason validators choose these transactions is likely because users have bribed the validators (learn more about bribes in Ethereum in this research paper). As you’ll see later in the setup, we are going to filter out any transactions with maxPriorityFeePerGas <= 100.

Let’s get on with the setup of our use case. In DeltaStream, after setting up Kafka as a Store, we can create a Stream that is backed by the transactions data in our Kafka topic. The Stream that we create is metadata that informs DeltaStream how to deserialize the records in the Kafka topic. The following CREATE STREAM statement creates a new Stream called eth_txns that is backed by the ethereum_transactions topic.

  1. CREATE STREAM eth_txns (
  2. "txn_hash" VARCHAR,
  3. "block_ts" BIGINT,
  4. "blockNumber" BIGINT,
  5. "gas" BIGINT,
  6. "maxFeePerGas" BIGINT,
  7. "maxPriorityFeePerGas" BIGINT,
  8. "value" BIGINT
  9. ) WITH (
  10. 'topic' = 'ethereum_transactions',
  11. 'value.format' = 'json',
  12. 'timestamp' = 'block_ts'
  13. );

Now that we have our eth_txns source Stream defined, we first need to filter out transactions that don’t fit our assumptions. We can write a CREATE STREAM AS SELECT (CSAS) query that will continuously ingest data from the eth_txns Stream, filter out records that don’t meet our criteria, and sink the resulting records to a new Stream backed by a new Kafka topic. In the following query, note in the WHERE clause that we only accept transactions that have maxPriorityFeePerGas > 100 (transactions chosen due to bribes) and maxPriorityFeePerGas < maxFeePerGas (transactions whose priority fee is not accurately represented by maxPriorityFeePerGas).

CSAS query to create eth_txns_filtered Stream:

  1. CREATE STREAM eth_txns_filtered AS
  2. SELECT
  3. "txn_hash",
  4. "block_ts",
  5. "blockNumber",
  6. "maxPriorityFeePerGas",
  7. "gas"
  8. FROM eth_txns WITH ('source.deserialization.error.handling'='IGNORE')
  9. WHERE "maxPriorityFeePerGas" > 100 AND "maxPriorityFeePerGas" < "maxFeePerGas";

Analyzing Ethereum’s Blockchain Transaction Gas Fee Data

When forming the next block, Ethereum’s validators basically get to select whichever transactions they want from a pool of pending transactions. Since new blocks are only added every several seconds and blocks have a gas limit, we can expect validators to choose transactions that are in their best financial interest and choose the transactions with the highest priority fees per gas. So, we’ll analyze the maxPriorityFeePerGas field over time as transactions flow in to get a sense of what priority fees are currently being accepted.

The following query is a CREATE CHANGELOG AS SELECT (CCAS) query that is calculating the moving average, min, max, and standard deviation of priority fees over a 2 minute window.

CCAS query to create eth_txns_priority_fee_analysis Changelog:

  1. CREATE CHANGELOG eth_txns_priority_fee_analysis AS
  2. SELECT
  3. window_start,
  4. window_end,
  5. COUNT(*) AS txns_cnt,
  6. MIN("maxPriorityFeePerGas") AS min_priority_fee,
  7. MAX("maxPriorityFeePerGas") AS max_priority_fee,
  8. AVG("maxPriorityFeePerGas") AS avg_priority_fee,
  9. STDDEV_SAMP("maxPriorityFeePerGas") AS priority_fee_stddev
  10. FROM HOP(eth_txns_filtered, SIZE 2 MINUTES, ADVANCE BY 15 SECONDS)
  11. GROUP BY window_start, window_end;

Let’s see some result records in the eth_txns_priority_fee_analysis topic:

  1. {
  2. "window_start": "2023-12-18 21:14:45",
  3. "window_end": "2023-12-18 21:16:45",
  4. "txns_cnt": 368,
  5. "min_priority_fee": 50000000,
  6. "max_priority_fee": 32250000000,
  7. "avg_priority_fee": 859790456,
  8. "priority_fee_stddev": 97003259
  9. }
  10. {
  11. "window_start": "2023-12-18 21:15:00",
  12. "window_end": "2023-12-18 21:17:00",
  13. "txns_cnt": 514,
  14. "min_priority_fee": 50000000,
  15. "max_priority_fee": 219087884691,
  16. "avg_priority_fee": 1951491416,
  17. "priority_fee_stddev": 79308531
  18. }

Using these results, users can be better informed at setting priority fees for their own transactions. For example, if the transactions are more urgent, they can choose to set the priority fee to a value greater than the average. Similarly, if users want to save money and don’t mind waiting for some time for their transactions to make it onto the blockchain, they can choose a priority fee that is less than the average. These results are also useful for follow on use cases, such as tracking priority fees over a period of time. DeltaStream’s pattern recognition capabilities also allow users to track patterns in the priority fees. For example, users could set up a pattern recognition query to detect when priority fees stop trending upwards or when priority fees experience a sudden drop off.

Intersecting Web3 and Stream Processing

In this blog post, we put together a real-time streaming analytics pipeline to analyze Ethereum’s gas fees. With DeltaStream’s easy-to-use platform, we were able to solve the use case and deploy our pipeline within minutes, using only a few simple SQL queries. Although this is an entry level example, we illustrate a use case at the intersection of these two emerging technologies.

If you are interested in learning more about DeltaStream, schedule a demo with us or sign up for a free trial.

13 Feb 2024

Min Read

Stream Processing for IoT Data

The Internet of Things (IoT) refers to sensors and other devices that share and exchange data over a network. IoT has been on the rise for years and only seems to continue in its growing popularity with other technological advances, such as 5G cellular networks and more “smart” devices. From tracking patient health to monitoring agriculture, the applications for IoT are plentiful and diverse. Other sectors where IoT are used include security, transportation, home automation, and manufacturing.

Oracle defines Big Data as “data that contains greater variety, arriving in increasing volumes and with more velocity.” This definition is simply described with the 3 Vs – volume, velocity, and variety. IoT definitely matches this description, as sensors can emit a lot of data from numerous sensors and devices.

A platform capable of processing IoT data needs to be scalable in order to keep up with the volume of Big Data. It’s very common for many IoT applications to have new sensors added. Consider a drone fleet for package deliveries as an example – you may start off with 10 or 20 drones, but as demands for deliveries increases the size of your drone fleet can grow by orders of magnitude. The underlying systems processing these data needs to be able to scale horizontally to match the increase in data volume.

Many IoT use cases such as tracking patient health and monitoring security feeds require low latency insights. Sensors and devices providing real-time data often need to be acted on in real-time as well. For this reason, streaming and stream processing technologies have become increasingly popular and perhaps essential for solving these use cases. Streaming storage technologies such as Apache Kafka, Amazon Kinesis, and RedPanda can meet the low latency data transportation requirements of IoT. On the stream processing side, technologies such as Apache Flink and managed solutions such as DeltaStream can provide low latency streaming analytics.

IoT data can also come in various types and structures. Different sensors can have different data formats. Take a smart home for example, the cameras in a smart home will likely send very different data from a light or a thermometer. However, these sensors are all related to the same smart home. It’s important for a data platform handling IoT use cases to be able to join across different data sets and handle any variations in data structure, format, or type.

DeltaStream as a Streaming Analytics Platform and a Streaming Database

DeltaStream is a platform to unify, process, and govern streaming data. DeltaStream sits as the compute and governance layer on top of streaming storage systems such as Kafka. Powered by Apache Flink, DeltaStream is a fully managed solution that can process streaming data with very low latencies.

In this blog post we’ll cover 2 examples to show how DeltaStream can solve real-time IoT use cases. In the first use case, we’ll use DeltaStream’s Materialized Views to build a real-time request driven application. For the second use case, we’ll use DeltaStream to power real-time event-driven pipelines.

Use Case Setup: Transportation Sensor Data

For simplicity, both use cases will use the same source data. Let’s assume that our data is available in Apache Kafka and represents updates and sensor information for a truck fleet. We’ll first define Relations for the data in 2 Kafka topics.

The first Relation represents truck information. This includes an identifier for the truck, the speed of the truck, which thermometer is in the truck, and a timestamp for this update event represented as epoch milliseconds. Later on, we will use this event timestamp field to perform a join with data from other sensors. Since we expect regular truck information updates, we’ll define a Stream for this data.

Create truck_info Stream:

  1. CREATE STREAM truck_info (
  2. event_ts BIGINT,
  3. truck_id INT,
  4. speed_kmph INT,
  5. thermometer VARCHAR
  6. ) WITH (
  7. 'topic' = 'truck_info', 'value.format' = 'json', 'timestamp' = 'event_ts'
  8. );

The second Relation represents a thermometer sensor’s readings. The fields include an identifier for the thermometer, the temperature reading, and a timestamp for when the temperature was taken that is represented as epoch milliseconds. Later on, the event timestamp will be used when joining with the truck_info Stream. We will define a Changelog for this data using sensor_id as the primary key.

Create temperature_sensor Changelog:

  1. CREATE CHANGELOG temperature_sensor (
  2. "time" BIGINT,
  3. temperature_c INTEGER,
  4. sensor_id VARCHAR,
  5. PRIMARY KEY (sensor_id)
  6. ) WITH (
  7. 'topic' = 'temperature_sensor', 'value.format' = 'json', 'timestamp' = 'time'
  8. );

Using the Relations we have just defined, we want to find out what the latest temperature readings are in each truck. We can achieve this by using a temporal join to enrich our truck_info updates with the latest temperature readings from the temperature_sensor Changelog. The result of this join will be a Stream of enriched truck information updates with the latest temperature readings in the truck. The following SQL statement will launch a long-lived continuous query that will continually join these two Relations and write the results to a new Stream that is backed by a new Kafka topic.

Create truck_info_enriched Stream using CSAS:

  1. CREATE STREAM truck_info_enriched AS
  2. SELECT
  3. truck_info.event_ts,
  4. truck_info.truck_id,
  5. truck_info.speed_kmph,
  6. temp.sensor_id AS thermometer,
  7. temp.temperature_c
  8. FROM truck_info
  9. JOIN temperature_sensor temp
  10. ON truck_info.thermometer = temp.sensor_id;

While a truck fleet in a real-world environment will likely have many more sensors, such as cameras, humidity sensors, and others, we’ll keep this use case simple and just use a thermometer as the additional sensor. However, users could continue to enrich their truck information events with joins for each additional sensor data feed.

Use Case Part 1: Powering a real-time dashboard

Monitoring and health metrics are essential for managing a truck fleet. Being able to check on the status of particular trucks and generally see that trucks are doing fine can provide peace of mind for the truck fleet manager. This is where a real-time dashboard can be helpful – to have the latest metrics readily available on the status of the truck fleet.

So for our first use case, we’ll use Materialized Views to power a real-time dashboard. By materializing our truck_info_enriched Stream into a queryable view, we can build charts that can query the view and get the latest truck information. We’ll build the Materialized View in two steps. First we’ll define a new Changelog that mirrors the truck_info_enriched Stream, then we’ll create a Materialized View from this Changelog.

Create truck_info_enriched_changelog Changelog:

  1. CREATE CHANGELOG truck_info_enriched_changelog (
  2. event_ts BIGINT,
  3. truck_id INT,
  4. speed_kmph INT,
  5. thermometer VARCHAR,
  6. temperature_c INTEGER,
  7. PRIMARY KEY (truck_id)
  8. ) WITH (
  9. 'topic' = 'truck_info_enriched',
  10. 'value.format' = 'json'
  11. );

Create truck_info_mview Materialized View using CVAS:

  1. CREATE MATERIALIZED VIEW truck_info_mview AS
  2. SELECT * FROM truck_info_enriched_changelog;

Note that we could have created this Materialized View sourcing from the truck_info_enriched Stream, but if we created the Materialized View from the Stream, then each event would be a new row in the Materialized View (append mode). Instead we are building the Materialized View from a Changelog so that each event will add a new row or update an existing one based on the Changelog’s primary key (upsert mode). For our example, we only need to know the current status of each truck, so building the Materialized View with upsert mode better suits our use case.

A continuous query will power this Materialized View, constantly ingesting records from the truck_info_enriched Stream and sinking the results to truck_info_mview. Then, we can write queries to SELECT from the Materialized View. A dashboard can easily be built that simply queries this Materialized View to get the latest statuses for trucks. Here are some example queries that might be helpful when building a dashboard for the truck fleet.

Query to get truck IDs with the highest 10 temperatures:

  1. SELECT truck_id, temperature_c
  2. FROM truck_info_mview
  3. ORDER BY temperature_c DESC
  4. LIMIT 10;

Query to get all information about a truck:

  1. SELECT *
  2. FROM truck_info_mview
  3. WHERE truck_id = 3;

Query to count the number of trucks that are speeding:

  1. SELECT COUNT(truck_id) AS num_speeding_trucks
  2. FROM truck_info_mview
  3. WHERE speed_kmph > 90;

Use Case Part 2: Building a real-time alerting pipeline

While it’s great to be able to pull for real-time metrics for our truck fleet (Use Case Part 1), there are also situations where we may want the truck update events themselves to trigger actions. In our example, we’ve included thermometers as one of the sensors in each of our delivery trucks. Groceries, medicines, and some chemicals need to be delivered in refrigerated trucks. If the trucks aren’t able to stay within a desired temperature range, it could cause the items inside to go bad or degrade. This can be quite serious, especially for medicines and hazardous materials that can have a direct impact on people’s health.

For our second use case, we want to build out a streaming analytics pipeline to power an alerting service. We can use a CSAS to perform real-time stateful transformations on our data set, then sink the results into a new Stream backed by a Kafka topic. Then the sink topic will contain alertable events that the truck fleet company can feed into their alerting system or other backend systems. Let’s stick to our refrigeration example and write a query that detects if a truck’s temperature exceeds a certain threshold.

Create overheated_trucks Stream using CSAS:

  1. CREATE STREAM overheated_trucks AS
  2. SELECT * FROM truck_info_enriched WHERE temperature_c > 10;

Submitting this CSAS will launch a long-lived continuous query that ingests from the truck_info_enriched Stream, filters for only events where the truck’s temperature is greater than 10 degrees celsius, and sink the results to a new Stream called overheated_trucks. Downstream, the truck fleet company can ingest these records and send alerts to the correct teams or use these records to trigger actions in other backend systems.

Processing IoT Data with DeltaStream

IoT data can be challenging to process due to the high volume of data, the inherent real-time requirements of many IoT applications, and the distributed nature of collecting data from many different sources. While we often treat IoT use cases as their own category, they really span many sectors and use cases. That’s why using a real-time streaming platform, such as DeltaStream, that is able to keep up with the processing demands of IoT and can serve as both a streaming database and streaming analytics platform is essential.

If you want to learn more about how DeltaStream can help your business, schedule a demo with us. We also have a free trial available if you want to try out DeltaStream yourself.

06 Feb 2024

Min Read

Data Governance for Teams with RBAC

In our previous blog post, Streaming Data Governance and DeltaStream, we discussed the importance of Data Unification and Data Governance in a stream processing data platform. In particular, we highlighted the Streaming Catalog and Role-Based Access Control (RBAC) on the Streaming Catalog tools DeltaStream exposes to users to help them govern their streaming data. In this post, we’ll go over an example use case to show off how the Streaming Catalog and RBAC work in DeltaStream.

For our use case, let’s assume we are a company that needs to do real-time analytics for ads and marketing reports. In our setup, we have the following:

  • 3 streaming Stores – ”CC_kafka” (Confluent Cloud), “kafka_store”, “kinesis_store”
  • 2 Teams – ”Reports Team” and “Ads Team”
  • 1 Organization administrator

Notice in our setup that there are 3 different Stores, 2 Kafka Stores and 1 Kinesis Store. The data in these Stores do not belong to a single team, in fact each team may actually be responsible for data in multiple Stores. For instance, our “Ads Team” needs read and write access to one topic from each of the Stores (when we say “topic” we are referring to the topics in Kafka and the streams in Kinesis).

The goal of this use case is twofold. First, to unify and organize the streaming data from the 3 Stores so that the organization of the data aligns with the team structure. Second, to set up roles for each of the teams so that users belonging to those teams can easily be granted the appropriate access to the resources that pertain to their team.

Below is a visualization of our use case.

The Administrator Role

The administrator will have access to the sysadmin and useradmin built-in roles. These, along with the securityadmin and orgadmin roles, are special roles in DeltaStream with powerful privileges that should only be given to a handful of people in an organization. To solve our use case, our administrator will first assume the useradmin role to create the appropriate roles that specific team members will be granted access to. Then, the administrator needs to use the sysadmin role to set up the Streaming Catalog and define Stores for our data sources, as well as grant the appropriate permissions for the roles created by the useradmin.

useradmin

The useradmin role has privileges to manage users and roles within the Organization. The administrator will assume the useradmin role to create new custom roles for our “Reports Team” and “Ads Team.”

We can switch to the useradmin role using the USE ROLE command before we start creating custom roles.

  1. USER ROLE useradmin;

Following the best practices for creating custom roles, we will build out a role hierarchy where sysadmin is the top-most role. The below diagram illustrates the hierarchy of roles.

The following statements create roles to match the diagram in Figure 2:

  1. CREATE ROLE "MarketingRole" WITH (IN ROLE (sysadmin));
  2.  
  3. CREATE ROLE "ContentRole" WITH (IN ROLE ("MarketingRole"));
  4.  
  5. CREATE ROLE "ReportsRole" WITH (IN ROLE ("MarketingRole"));
  6.  
  7. CREATE ROLE "AdsRole" WITH (IN ROLE (sysadmin));
  8.  
  9. CREATE ROLE "TrafficRole" WITH (IN ROLE ("AdsRole"));

Although we’ve created the roles, we haven’t actually assigned them any permissions. We’ll do this using the sysadmin role in the next section.

To invite our team members to our Organization, we can use the INVITE USER command. The following statement invites a new user on the “Ads” team and grants them the new “AdsRole” role.

  1. INVITE USER 'ads_[email protected]' WITH ('roles'=("AdsRole"), 'default'="AdsRole");

Similarly, we can invite a new user on the “Reports” team and assign the “ReportsRole” role to them.

  1. INVITE USER 'reports_[email protected]' WITH ('roles'=("ReportsRole"), 'default'="ReportsRole");

sysadmin

The sysadmin role has privileges to create, manage, and drop objects. As the administrator, we’ll be using this role to do the following:

  1. Add the connectivity to our data storage systems (ie. Kafka and Kinesis) by creating Stores
  2. Set up the Databases and Schemas in the Streaming Catalog to provide the organizational framework for step 3
  3. Define Relations for the topics in our Stores and assign them to the correct Database and Schema
  4. Grant access to these Databases and Schemas to the appropriate roles

Before we begin, let’s ensure that we are using the sysadmin role.

  1. USE ROLE sysadmin;

First, we’ll define the Stores for our data. Since we can’t share our real Kafka or Kinesis connection configurations, the below SQL statement is a template for the CREATE STORE statement (CREATE STORE documentation).

  1. CREATE STORE kafka_store WITH (
  2. 'type' = KAFKA, 'access_region' = "AWS us-east-1",
  3. 'kafka.sasl.hash_function' = PLAIN,
  4. 'kafka.sasl.password' = '',
  5. 'kafka.sasl.username' = '',
  6. 'uris' = ''
  7. );

The next step is to create Databases and Schemas for our Streaming Catalog. As you can see in Figure 1 above, there will be two Databases – ”Marketing” and “Ads”. Within the “Marketing” Database, there exists a “Content” Schema and a “Reports” Schema. Within the “Ads” Database, there exists a single “Traffic” Schema.

  1. CREATE DATABASE "Marketing";
  2.  
  3. CREATE SCHEMA "Content" IN DATABASE "Marketing";
  4.  
  5. CREATE SCHEMA "Reports" IN DATABASE "Marketing";
  6.  
  7. CREATE DATABASE "Ads";
  8.  
  9. CREATE SCHEMA "Traffic" IN DATABASE "Ads";

Now that we have the namespaces in our Streaming Catalog set up, we can move on to our third task of defining Relations backed by the topics in our Stores to populate the Streaming Catalog. As you can see in Figure 1 above, there are many topics that exist in our Stores, and thus many Relations that need to be written. For the sake of brevity, we’ll just provide one example statement for CREATE STREAM (tutorial on creating Relations).

  1. CREATE STREAM "Marketing"."Reports".reports_data (
  2. col0 BIGINT, col1 VARCHAR, col2 VARCHAR
  3. ) WITH (
  4. 'store' = 'cc_kafka', 'topic' = 'reporting',
  5. 'value.format' = 'json'
  6. );

This CREATE STREAM statement is creating a Stream called “reports_data” in the “Reports” Schema, which is in the “Marketing” Database. This Stream has three fields, simply called “col0”, “col1”, and “col2”, and is backed by the topic “reporting” in the “cc_kafka” Store. Similar CREATE STREAM or CREATE CHANGELOG statements can be created for the other topics in the same Store or other Stores.

For our fourth task, we must now grant the custom roles, which were created by the useradmin in the previous section, access to the Databases and Schemas. Based on the diagram in Figure 1, the following statements will grant privileges to the correct data objects corresponding to the appropriate roles. The USAGE privilege is similar to read, and the CREATE privilege is similar to write.

  1. GRANT USAGE, CREATE ON DATABASE "Marketing" TO ROLE "MarketingRole";
  2. GRANT USAGE, CREATE ON SCHEMA "Marketing"."Content" TO ROLE "ContentRole";
  3. GRANT USAGE, CREATE ON SCHEMA "Marketing"."Reports" TO ROLE "ReportsRole";
  4. GRANT USAGE, CREATE ON DATABASE "Ads" TO ROLE "AdsRole";
  5. GRANT USAGE, CREATE ON SCHEMA "Ads"."Traffic" TO ROLE "TrafficRole";

Member of the Ads Team Role

As a new user on the “Reports” team, after accepting the invitation that the useradmin sent, I should expect the following:

  1. Access to the “ReportsRole” only
  2. Access to the “Ads” Database and “Traffic” Schema

By listing our roles in the DeltaStream CLI, we can see which role is currently being used:

  1. /# LIST ROLES;
  2.  
  3. Name | Current | Created at
  4. ----------------+---------+-----------------------
  5. ContentRole | | 2024-01-05T04:57:30Z
  6. ReportsRole | ✓ | 2024-01-05T04:57:30Z
  7. MarketingRole | | 2024-01-05T04:57:30Z

We can also describe the “AdsRole” role to see that “TrafficRole” is properly inherited:

  1. /# DESCRIBE ROLE "ReportsRole";
  2.  
  3. Name | Created at
  4. ------------+-----------------------
  5. ReportsRole | 2024-01-05T04:57:30Z
  6.  
  7. Granted Roles
  8.  
  9. Name
  10. ----------
  11. public
  12.  
  13.  
  14. Granted Privileges
  15.  
  16. Type | Target | ID/Name | Grant option
  17. -------+----------+-----------+---------------
  18. Usage | Database | Marketing |
  19. Usage | Schema | Reports |
  20. Create | Schema | Reports |

Finally, we can list the Databases and Schemas to see that we indeed have access to the “Ads” Database and “Traffic” Schema. Note that the “Marketing” Database is not visible, because only the “MarketingRole” role and any roles that inherit from the “MarketingRole” have access to that Database.

  1. /# LIST DATABASES;
  2.  
  3. Name | Default | Owner | Created at | Updated at
  4. ----------+---------+----------+----------------------+-----------------------
  5. Marketing | | sysadmin | 2024-01-04T23:12:15Z | 2024-01-04T23:12:15Z
  6.  
  7.  
  8. /# LIST SCHEMAS IN DATABASE "Marketing";
  9.  
  10. Name | Default | Owner | Created at | Updated at
  11. --------+---------+----------+----------------------+-----------------------
  12. Reports | | sysadmin | 2024-01-04T23:12:15Z | 2024-01-04T23:12:15Z

Conclusion

RBAC is one of DeltaStream’s core features that manages the access to different data objects in DeltaStream. In this example, we show off how different roles can be created to match an organization’s team structure. This way, giving permissions to specific roles inherently gives permissions to entire teams. While we focused on RBAC in the context of DeltaStream’s Streaming Catalog, giving access to Databases and Schemas in particular, RBAC can also be applied to other data assets such as Stores, Descriptors, and Queries.

If you want to learn more about DeltaStream’s RBAC, or try it for yourself, get a free trial.

13 Dec 2023

Min Read

Detecting Suspicious Login Activity with Stream Processing

Cybersecurity is challenging, but it’s one of the most important components of any digital business. Cyberattacks can cause disruptions to your application and put your users into harm’s way. Successful cyberattacks can result in things like identity and credit card theft, which can have a very tangible effect on people’s lives and reputations. With regulations such as the General Data Protection Regulation (GDPR), businesses can in fact be fined for lackluster cybersecurity (e.g. Cathay Pacific fined £500k by UK’s ICO over data breach disclosed in 2018). One of the most popular tools for cybersecurity is stream processing. For most cyber threats, responsiveness is crucial to prevent or minimize the impact of an attack. In this post, we’ll show how DeltaStream can be used to quickly identify suspicious login activity from a stream of login events. By identifying suspicious activity quickly, follow-up actions such as freezing accounts, sending notifications to account owners, and involving security teams can happen right away.

Setting up your Data Stream

We’ll assume that a Kafka Store has already been set up with a topic called login_attempts. The records in this topic contain failed login events. Before we get to our use case, we need to set up a Stream that is backed by this topic. We’ll use this Stream later on as the source data for our use case.

CREATE STREAM DDL to create the login_attempts Stream:

Cybersecurity Use Case: Detecting Suspicious User Login Activity

For our use case, we want to determine if a user is attempting to gain access to accounts they are not authorized to use. One common way attackers will try to gain access to accounts is by writing scripts or having bots attempt to log in to different accounts using commonly used passphrases. We can use our stream of login data to detect these malicious users. Based on our source Stream, we have fields ip_address and user_agent which can identify a particular user. The account_id field represents the account that a user is trying to log in to. If a particular user attempts to log in to 3 unique accounts in the span of 1 minute, then we want to flag this user as being suspicious. The following query does this by utilizing OVER Aggregation and writing the results into a new Stream called suspicious_user_login_activity.

Create Stream As Select Query

CSAS to create suspicious_user_login_activity Stream:

In this query’s subquery, an OVER aggregation is used to count the number of unique accounts that a particular user has attempted to log in to. The outer query then filters for results where the projected field of the aggregation, num_distinct_accounts_user_login_attempted, is equal to 3. Thus, the output of the entire query contains the IP address and user agent information for suspicious users who have attempted to log in to 3 different accounts within a 1 minute window. The resulting event stream can then be ingested by downstream applications for further review or actions.

By submitting this SQL statement, a long-lived continuous query will be launched in the background. This continuous query will constantly ingest from the source Stream as new records arrive, process the data, then write the results to the sink Stream instantaneously. Any downstream applications reading from this sink Stream will then be able to act on these suspicious users right away.

Create Stream As Select Query Results

To get a better understanding of how the CSAS query behaves, we can inspect some records from our source login_attempts Stream and our results suspicious_user_login_activity Stream.

Records in the source Stream login_attempts:

Records in the sink Stream suspicious_user_login_activity:

In the results Stream, there is a record for a Windows user who tried to log in to 3 different accounts. Inspecting the source Stream, we can see that records 3 through 5 are associated with that output. Records 1, 2, and 6 also are from the same Android user, but this user only attempted to log in to 2 unique accounts, so there is no output record for this user since we don’t deem this activity as suspicious.

The Power of Stream Processing and Cybersecurity

Streaming and stream processing capabilities are incredibly helpful for tackling cybersecurity challenges. Having systems and processes that act on events with minimal latency can be the difference between a successful or unsuccessful cyber attack. In this post, we showcased one example of how DeltaStream users can set up and deploy a streaming analytics pipeline to detect cyber threats as they’re happening. While this example is relatively simple, DeltaStream’s rich SQL feature set is capable of handling much more complex queries to support all kinds of cybersecurity use cases.

DeltaStream is the platform to unify, process, and govern all of your streaming data. If you want to learn more about DeltaStream, sign up for a free trial or schedule a demo with us.

20 Nov 2023

Min Read

Analyzing Real-Time NYC Bus Data with DeltaStream

Most of us who have lived in a big city have had some experience taking public transportation. While it’s extremely satisfying when everything works out, I know I’m not alone in my frustrations when subway time estimates aren’t accurate or buses just never show up. Standing there and waiting for a bus or train can be very stressful as you begin to wonder if the train has already left, if this is going to impact your transfer to another bus, or if you’ll make it to your destination on time (especially stressful if you’re trying to catch a plane). One way that Google is playing a part in improving some of these challenges is with the General Transit Feed Specification (GTFS) and its real-time counterpart, GTFS Realtime. GTFS helps bring standardization to transit feeds by providing a framework for transportation companies to submit feeds and for developers to write applications to process these feeds.

In this blog post, we’ll be showcasing how you can use DeltaStream to process New York City’s real-time bus feed, which adopts the GTFS realtime specification, to identify buses that are becoming increasingly delayed in real time.

Setting up DeltaStream 

To set up our use case, first we need to load the bus data into Kafka:

  1. Sign up for an MTA BusTime Developer Key
  2. Have Kafka or a Kafka-compatible integration such as a Confluent or RedPanda cluster available, Confluent Cloud and RedPanda both offer free trials of their product
  3. Clone this github repository then follow the instructions to build and run a Java program which will poll the bus feed and forward the events into your Kafka cluster

DeltaStream for real-time processing

Now that we have our bus feed data in Kafka, we can use DeltaStream to process the data. If you are new to DeltaStream and don’t have an account, you can sign up for a free trial here.

Connect to and inspect source data

First, add your Kafka cluster as a Store in DeltaStream. Adding the Store defines the connectivity between DeltaStream and your storage layer, in this case a Kafka cluster. You can choose any name you want for your Store, but for this use case let’s assume the Store is called “kafka_store”. From here, we can inspect the topics by printing them. The two topics we’ll be using for our example are the “nyc_bus_trip_updates” and “nyc_bus_vehicle_positions” topics.

Print the nyc_bus_trip_updates topic:

  1. db.public/kafka_store# PRINT TOPIC nyc_bus_trip_updates;
  2. {
  3. "trip": {
  4. "tripId": "FP_D3-Weekday-SDon-103300_B13_6",
  5. "startDate": "20231030",
  6. "routeId": "Q55",
  7. "directionId": 1
  8. },
  9. "stopTimeUpdate": [{
  10. "stopSequence": 0,
  11. "arrival": {
  12. "time": "1698701100"
  13. },
  14. "departure": {
  15. "time": "1698701100"
  16. },
  17. "stopId": "504354"
  18. }, {
  19. "stopSequence": 1,
  20. "arrival": {
  21. "time": "1698701144"
  22. },
  23. "departure": {
  24. "time": "1698701144"
  25. },
  26. "stopId": "504434"
  27. }, …],
  28. "vehicle": {
  29. "id": "MTA NYCT_1234"
  30. },
  31. "timestamp": "1698699097",
  32. "delay": 385
  33. }

Print the nyc_bus_vehicle_positions topic:

  1.  
  2. db.public/kafka_store# PRINT TOPIC nyc_bus_vehicle_positions;
  3. {
  4. "trip": {
  5. "tripId": "FP_D3-Weekday-SDon-103300_B13_6",
  6. "startDate": "20231030",
  7. "routeId": "Q55",
  8. "directionId": 1
  9. },
  10. "position": {
  11. "latitude": 40.69352,
  12. "longitude": -73.990486,
  13. "bearing": 63.434948
  14. },
  15. "timestamp": "1698700533",
  16. "stopId": "504434",
  17. "vehicle": {
  18. "id": "MTA NYCT_1234"
  19. }
  20. }

We can define Streams for nyc_bus_trip_updates and for nyc_bus_vehicle_positions with the following queries.

DDL to create nyc_bus_trip_updates Stream:

  1. CREATE STREAM nyc_bus_trip_updates (
  2. trip STRUCT <
  3. "tripId" VARCHAR,
  4. "startDate" VARCHAR,
  5. "routeId" VARCHAR,
  6. "directionId" TINYINT >,
  7. "stopTimeUpdate" ARRAY <
  8. STRUCT <
  9. "stopSequence" INTEGER,
  10. "arrival" STRUCT <
  11. "time" BIGINT >,
  12. "departure" STRUCT <
  13. "time" BIGINT >,
  14. "stopId" INTEGER >>,
  15. vehicle STRUCT <
  16. id VARCHAR >,
  17. "timestamp" BIGINT,
  18. delay INTEGER
  19. ) WITH ('topic' = 'nyc_bus_trip_updates', 'value.format'='JSON');

DDL to create nyc_bus_vehicle_positions Stream:

  1. CREATE STREAM nyc_bus_vehicle_positions (
  2. trip STRUCT <
  3. "tripId" VARCHAR,
  4. "startDate" VARCHAR,
  5. "routeId" VARCHAR,
  6. "directionId" TINYINT >,
  7. "position" STRUCT <
  8. "latitude" DOUBLE,
  9. "longitude" DOUBLE,
  10. "bearing" DOUBLE>,
  11. vehicle STRUCT <
  12. id VARCHAR >,
  13. "timestamp" BIGINT,
  14. "stopId" INTEGER
  15. ) WITH ('topic' = 'nyc_bus_vehicle_positions', 'value.format'='JSON');

Notice that both feeds have a field called trip which represents a particular trip a bus is taking. We’ll be using this field to join these Streams later on.

Also, since the timestamp field is given as epoch seconds, let’s make our data easier to read by defining new Streams that convert these fields to timestamps. We can do this with the following CREATE STREAM AS SELECT (CSAS) queries:

CSAS to create trip_updates:

  1. CREATE STREAM trip_updates AS
  2. SELECT
  3. trip,
  4. "stopTimeUpdate",
  5. vehicle,
  6. CAST(FROM_UNIXTIME("timestamp") AS TIMESTAMP) AS ts,
  7. "timestamp" AS epoch_secs,
  8. delay
  9. FROM
  10. nyc_bus_trip_updates;

CSAS to create vehicle_positions:

  1. CREATE STREAM vehicle_positions AS
  2. SELECT
  3. trip,
  4. "position",
  5. vehicle,
  6. CAST(FROM_UNIXTIME("timestamp") AS TIMESTAMP) AS ts,
  7. "stopId"
  8. FROM
  9. nyc_bus_vehicle_positions;

Detecting transportation delays

You may have noticed in the nyc_bus_trip_updates topic that there is a field called delay. This field represents the number of seconds that a bus is currently delayed from its original route. Reporting this data is really helpful, as it provides transparency to transit-goers on how late they’re going to be or how long they may need to wait for the bus. However, what’s not always clear is if delays are increasing. For our use case, this is exactly what we want to detect. Once we detect which bus trips are becoming increasingly delayed, we also want to provide additional information about where the bus is and where the bus has recently been so that city officials and bus planners can see where delays may be occurring.

Processing real-time bus data

For our use case, we’ll be splitting up the processing into two queries.

In the first query, we will analyze the trip_updates Stream to find trips where delays are significantly increasing. We consider three consecutive trip updates that each grow in delay by 30 seconds to be significant, so we can write a pattern recognition query to detect trips that match our requirements. Those trips will then be written to a Stream to be used as the input for our second query.

Query 1:

  1. CREATE STREAM trips_delay_increasing AS
  2. SELECT
  3. trip,
  4. vehicle,
  5. start_delay,
  6. end_delay,
  7. start_ts,
  8. end_ts,
  9. CAST(FROM_UNIXTIME((start_epoch_secs + end_epoch_secs) / 2) AS TIMESTAMP) AS avg_ts
  10. FROM trip_updates
  11. MATCH_RECOGNIZE(
  12. PARTITION BY trip
  13. ORDER BY "ts"
  14. MEASURES
  15. C.row_timestamp AS row_timestamp,
  16. C.row_key AS row_key,
  17. C.row_metadata AS row_metadata,
  18. C.vehicle AS vehicle,
  19. A.delay AS start_delay,
  20. C.delay AS end_delay,
  21. A.ts AS start_ts,
  22. C.ts AS end_ts,
  23. A.epoch_secs AS start_epoch_secs,
  24. C.epoch_secs AS end_epoch_secs
  25. ONE ROW PER MATCH
  26. AFTER MATCH SKIP TO LAST C
  27. PATTERN (A B C)
  28. DEFINE
  29. A AS delay > 0,
  30. B AS delay > A.delay + 30,
  31. C AS delay > B.delay + 30
  32. ) AS MR WITH ('timestamp'='ts')
  33. QUERY WITH ('state.ttl.millis'='3600000');

Pattern recognition query to find bus trips that are increasing in delay

In the second query, we will join the output of our first query with the vehicle_positions Stream on the trip field. When joining two Streams, we need to specify a WITHIN interval as part of the join condition (these kinds of joins are called Interval Joins). For our query, we’ll specify the timestamp field to be avg_ts, the middle point in our increasing delay interval that we identified from the first query. We’ll also use 3 minutes for our WITHIN interval, meaning positions for a trip with a timestamp 3 minutes before and 3 minutes after avg_ts will satisfy the join condition. The resulting records of this query will represent the positions of buses that are part of delayed trips.

Query 2:

  1. CREATE STREAM delayed_trip_positions AS
  2. SELECT
  3. t.trip,
  4. t.vehicle,
  5. t.start_delay,
  6. t.end_delay,
  7. t.start_ts,
  8. t.end_ts,
  9. p."position",
  10. p.ts AS position_ts
  11. FROM
  12. trips_delay_increasing t WITH ('timestamp'='avg_ts')
  13. JOIN vehicle_positions p WITH ('timestamp'='ts')
  14. WITHIN 3 MINUTES
  15. ON t.trip = p.trip;

Interval join query to join the bus trips that are growing in delay with bus locations

By submitting these queries, DeltaStream has launched long-lived jobs that will continually read from their sources, transform the data, then write to their sinks. So, as bus data arrives in our Kafka topics, we can expect data processing to happen immediately and the results to arrive nearly instantaneously.

Inspecting real-time results

Let’s inspect the contents of these Streams to see how our queries behaved.

Data in our source trip_updates Stream:

  1.  
  2. SELECT * FROM trip_updates WITH ('starting.position'='earliest');
  3. | {"trip":{"tripId":"CS_D3-Weekday-SDon-124200_Q28_655","startDate":"20231031","routeId":"Q28","directionId":1},"stopTimeUpdate":[...],"vehicle":{"id":"MTA NYCT_8865"},"ts":"2023-11-01 01:39:01","epoch_secs":1698802741,"delay":1689}
  4. | {"trip":{"tripId":"CS_D3-Weekday-SDon-124200_Q28_655","startDate":"20231031","routeId":"Q28","directionId":1},"stopTimeUpdate":[...],"vehicle":{"id":"MTA NYCT_8865"},"ts":"2023-11-01 01:41:31","epoch_secs":1698802891,"delay":1839}
  5. | {"trip":{"tripId":"CS_D3-Weekday-SDon-124200_Q28_655","startDate":"20231031","routeId":"Q28","directionId":1},"stopTimeUpdate":[...],"vehicle":{"id":"MTA NYCT_8865"},"ts":"2023-11-01 01:45:01","epoch_secs":1698803101,"delay":2049}

Data in our source vehicle_positions Stream:

  1. SELECT * FROM vehicle_positions WITH ('starting.position'='earliest');
  2. | {"trip":{"tripId":"CS_D3-Weekday-SDon-124200_Q28_655","startDate":"20231031","routeId":"Q28","directionId":1},"position":{"latitude":40.76075,"longitude":-73.8282,"bearing":13.835851},"vehicle":{"id":"MTA NYCT_8865"},"ts":"2023-11-01 01:39:31","stopId":505121}
  3. | {"trip":{"tripId":"CS_D3-Weekday-SDon-124200_Q28_655","startDate":"20231031","routeId":"Q28","directionId":1},"position":{"latitude":40.76072,"longitude":-73.82832,"bearing":13.835851},"vehicle":{"id":"MTA NYCT_8865"},"ts":"2023-11-01 01:42:31","stopId":505121}
  4. | {"trip":{"tripId":"CS_D3-Weekday-SDon-124200_Q28_655","startDate":"20231031","routeId":"Q28","directionId":1},"position":{"latitude":40.76073,"longitude":-73.828285,"bearing":13.835851},"vehicle":{"id":"MTA NYCT_8865"},"ts":"2023-11-01 01:45:01","stopId":505121}
  5.  

The results of Query 1 in our trips_delay_increasing Stream:

  1.  
  2. SELECT * FROM trips_delay_increasing WITH ('starting.position'='earliest');
  3. | {"trip":{"tripId":"CS_D3-Weekday-SDon-124200_Q28_655","startDate":"20231031","routeId":"Q28","directionId":1},"vehicle":{"id":"MTA NYCT_8865"},"start_delay":1689,"end_delay":2049,"start_ts":"2023-11-01 01:39:01","end_ts":"2023-11-01 01:45:01","avg_ts":"2023-11-01 01:42:01"}

The results of Query 2 in our delayed_trip_positions Stream:

  1.  
  2. SELECT * FROM delayed_trip_positions WITH ('starting.position'='earliest');
  3. | {"trip":{"tripId":"CS_D3-Weekday-SDon-124200_Q28_655","startDate":"20231031","routeId":"Q28","directionId":1},"vehicle":{"id":"MTA NYCT_8865"},"start_delay":1689,"end_delay":2049,"start_ts":"2023-11-01 01:39:01","end_ts":"2023-11-01 01:45:01","position":{"latitude":40.76075,"longitude":-73.8282,"bearing":13.835851},"position_ts":"2023-11-01 01:39:31"}
  4. | {"trip":{"tripId":"CS_D3-Weekday-SDon-124200_Q28_655","startDate":"20231031","routeId":"Q28","directionId":1},"vehicle":{"id":"MTA NYCT_8865"},"start_delay":1689,"end_delay":2049,"start_ts":"2023-11-01 01:39:01","end_ts":"2023-11-01 01:45:01","position":{"latitude":40.76072,"longitude":-73.82832,"bearing":13.835851},"position_ts":"2023-11-01 01:42:31"}
  5. | {"trip":{"tripId":"CS_D3-Weekday-SDon-124200_Q28_655","startDate":"20231031","routeId":"Q28","directionId":1},"vehicle":{"id":"MTA NYCT_8865"},"start_delay":1689,"end_delay":2049,"start_ts":"2023-11-01 01:39:01","end_ts":"2023-11-01 01:45:01","position":{"latitude":40.76073,"longitude":-73.828285,"bearing":13.835851},"position_ts":"2023-11-01 01:45:01"}

In the results above, we can see the trip with a tripId of CS_D3-Weekday-SDon-124200_Q28_655 with increasing delays in a short period of time. Our first query identifies that this trip’s delay is growing, and outputs a record for this trip. Our second query ingests that record and finds the vehicle positions at the time of the delay.

By plotting the positions seen in our results for delayed_trip_positions above, we get the following map:

There must be some traffic on 39th Avenue!

In this example, we’ve highlighted two ways real-time processing can help provide a better experience to public transit users:

  1. Having real-time data about growing delays can help provide more accurate time estimates on bus arrival times
  2. Insights into locations where bus delays grow can help city planners better understand and improve traffic patterns in their cities

Conclusion

The GTFS real-time data feeds are great for building real-time transit applications. However, for any real-time computations that are complex or require stateful operations, it can be difficult. In this blog post, we showcased how you can easily build stateful real-time jobs on top of this data feed in minutes using DeltaStream. Further, as a fully managed serverless product, DeltaStream handles all of the operational overhead of running long-lived stream processing jobs.

If you want to learn more about DeltaStream or try it for yourself, you can request a demo or join our free trial.

18 Jul 2023

Min Read

Create Stream Processing Pipelines with Superblocks and DeltaStream

In the previous installments of the DeltaStream 101 blog series, we’ve covered the basics of creating stores and defining different types of relations such as Streams, Changelogs and Materialized views on top of the data records in streaming stores. In this blog post, we look into how we can quickly create an application to explore and visualize real-time changes in our data via integrating DeltaStream with a third party data visualization platform such as Superblocks.

Let’s assume we are creating a live dashboard for a delivery company to let them track their vehicles and drivers in real-time. Each driver is identified via a unique identifier. Similarly, one can refer to a given vehicle using its unique ID. The company’s vehicles are equipped with sensors and GPS that keep sending various information about the vehicles to a configured streaming store, in this case an Apache Kafka cluster. The company is interested in tracking its drivers and vehicles, at any given time, via a live feed on a map, using either a vehicle’s or driver’s ID. The map shows the latest location of a vehicle based on its most recent speed and geographical location.

First, let’s take a look at an example of a “navigation record”, in the JSON format, which captures information about a driver, his/her vehicle along with its latest reported location in our system. A stream of such records, collected from different vehicles, is continuously ingested into a topic, called “navigation”, in a Kafka store.

  1. {
  2. "driver": {
  3. "id": "38200",
  4. "name": "James Smith",
  5. "license_id": "TX42191S",
  6. "car": {
  7. "id": "21820700",
  8. "model": "Ford",
  9. "plate": "2HTY910",
  10. "is_electric": false
  11. }
  12. },
  13. "route": [
  14. {
  15. "location": {
  16. "latitude": 38.083128,
  17. "longitude": -121.472887
  18. },
  19. "speed": 64
  20. },
  21. {
  22. "location": {
  23. "latitude": 38.339525,
  24. "longitude": -123.253794
  25. },
  26. "speed": 72
  27. }
  28. ]
  29. }

Sample navigation record in Json

DSQL statements to access and query the data

As the first step, we need to create a “store” in DeltaStream to access the data in the Kafka topic that collects navigation records. We also assume we have already defined our database and schema. Our previous blog post covers details on how one can define them.

In order to query the data, we create a stream, called “navigation”. In DeltaStream, we use the “STRUCT” data type to define records with nesting, and the “ARRAY” data type is used to define an ordered collection of data items of the same type.  As you can see in the DDL statement, shown in Statement 1 below, the navigation stream has two columns: ‘driver’ and ‘route’. The driver column’s data type is a struct, whose fields capture information on a driver’s ID, fullname, license_id and his/her car. The car field is defined as a nested struct, inside the driver’s struct, and shows various information about the driver’s car such as its id, model, etc. The route column lists a sequence of data items which report the latest location and speed of the driver’s car. Therefore, the data type for the route column is defined as an array of structs where each struct has two fields: location and speed. The location field is a nested struct containing latitude and longitude values, collected by the vehicle’s GPS, and the speed field is defined as integer.

  1. CREATE STREAM navigation (
  2. driver STRUCT<id VARCHAR, fullname VARCHAR, license_id VARCHAR, car<id VARCHAR, model VARCHAR, plate VARCHAR, is_electric BOOLEAN>>,
  3.  
  4. route ARRAY<STRUCT<location STRUCT<latitude DOUBLE, longitude DOUBLE>, speed INTEGER>>
  5. ) WITH ('topic'='navigation', 'value.format'='json');

Statement 1. DDL to define navigation stream.

Now that we can access the data in Kafka, using the navigation stream we just created, we run a CSAS statement to extract the columns and nested fields that are relevant to our application. We are going to query the latest navigation information about a driver or car, using their IDs. Hence, we need to select the driver’s ID and his/her car’s ID from the driver column. We also select the driver’s name to show it on the dashboard. We pick the first element of the route column to show the latest location and speed of the car on the map. We unnest location coordinates and show them as separate columns along with the speed in the new stream. As you can see in Statement 2, in DeltaStream the `->` operator is used to access fields of a struct. Moreover, given that arrays are one-indexed (i.e., the first element of an array is at index 1), route[1] is fetching the very first element from a given route array.

  1. Create Stream flat_data AS
  2. SELECT driver->id AS driver_id,
  3. driver->fullname AS driver_name,
  4. driver->car->id AS car_id,
  5. route[1]->location->latitude AS latitude,
  6. route[1]->location->longitude AS longitude,
  7. route[1]->speed AS speed
  8. FROM navigation;

Statement 2. CSAS to define flat_data stream

For a given driver or car ID, we can run a query on the flat_data stream and get the latest relevant data we need to show on the map. We are going to use Superblocks to visualize the location of the queried drivers or cars. Currently, Superblock’s available APIs do not let us directly send the result of our query to update the map. We can achieve this by creating a materialized view on top of the flat_data stream. Moreover, given that we are only interested in the most recent location of a car or a driver, when showing it on the map, we need to make sure our materialized view ingests the data in the “upsert” mode. This way, if a new record is inserted into the materialized view for an existing driver or car id, it overwrites the current record and updates the materialized view. We can use a changelog in DeltaStream to interpret the records in a given topic in the upsert mode. You can use the DDL statement in Statement 3 to define such a changelog. We define the primary key for the changelog as a composite key, using the driver_id and car_id columns.

  1. CREATE CHANGELOG navigation_logs (
  2. driver_id VARCHAR,
  3. car_id VARCHAR,
  4. driver_name VARCHAR,
  5. latitude DOUBLE,
  6. longitude DOUBLE,
  7. speed INTEGER,
  8. PRIMARY KEY (driver_id, car_id)
  9. ) WITH ('topic'='flat_data', 'value.format'='json');

Statement 3. DDL to define navigation_logs Changelog

Our final step to prepare the data for Superblocks is creating a materialized view, called “naviagtion_view”, by selecting all records from the navigation_logs changelog defined above. Now, for a given driver or car id, we can run a simple filter query on the navigation_view to fetch the latest location coordinates and speed of the queried driver or car. This query’s result is directly usable by Superblock to update the map on our dashboard.

  1. CREATE MATERIALIZED VIEW navigation_view AS
  2. SELECT * FROM navigation_logs;

Statement 4. Statement to define navigation_view Materialized View

Visualize the data using Superblocks

Now, let’s use Superblocks to visualize the location of drivers and cars on a map, in real time. We can achieve this by creating an application in Superblocks which fetches the latest location of a driver or car from the naviagation_view Materialized View, we defined above.

Generate API Token in DeltaStream

DeltaStream uses Api Tokens to authenticate third-party applications and let them run queries and access the results securely. In order to generate an Api Token, on your DeltaStream Home page, click on your avatar icon on the main navigation bar, and under your profile select the “Api Token” tab. You need to pick a name for the new token and DeltaStream will generate it for you. Let’s call our new Api Token “SuperblocksToken”. You won’t be able to access the content of a generated token once you exit this page; Therefore make sure you download the new token and save it in a safe place for future reference.

Create a Superblocks Application

Next step is creating a Superblocks application and connecting it to DeltaStream. Our new application receives the driver and car ids as inputs from the user, generates a DSQL query and submits it to DeltaStream to fetch the latest location of the car from the Materialized View. It then shows this location on a map. Login into your Superblocks account, and select the “new application” option to create one.

First step is defining the input fields. Use the “Components” panel on the left to create two input boxes, named “Driver” and “Car”, and give them proper labels and placeholders. Make sure both fields are set as required.

Next step is creating the “Submit” button for the application. The Submit button calls DeltaStream’s REST API to run new queries and get their results. It puts the DeltaStream API token, generated before, in the header of requests to authenticate for secure access. For this purpose, add a new button to the application and set its click handler to be a new REST API. The API should be configured as below to connect to DeltaStream:

  • Method: POST
  • URLhttps://api.deltastream.io/run-statement
  • Headers: Authorization: Bearer <YOUR-API-TOKEN>
  • Body Content Type: Form
  • Set Form Data as following:
    • orgID: <YOUR-ORGANIZATION-ID>
    • roleName: sysadmin
    • storeName: <YOUR-STORE-NAME>
    • databaseName: <YOUR-DATABASE-NAME>
    • schemaName: <YOUR-SCHEMA-NAME>
    • statement: select * from navigation_views where driver_id = ‘{{Input1.value}}’ and car_id = ‘{{Input2.value}}’;

You can check your organization ID in your DeltaStream account’s Home page. Click on your avatar icon and find it under the “organizations” tab.

As you can see, the “Statement” defined in the above REST API configuration is  the DSQL query that is generated using the input values for the Driver and Car ids. Superblocks generates this query each  time the “Submit” button is clicked and sends it to the configured DeltaStream’s endpoint. Go ahead and set valid input values for the Driver and Car ids and submit a request. Once the query is run successfully, DeltaStream returns its results, wrapped in a list. In order to show the latest location of the driver, we only need the very first element in that list. We define a function to pick that element from the returned results. Click on the plus button on the left panel and select the ”Javascript Function” option. This opens a new panel with a placeholder to add code for defining a new function. Set the function code as: “return Step1.output[0].data;”.

The very last step is creating a map to show the fetched location on it. For this purpose, select the “Map” option from the component panel and configure it as:

  • Initial location: {{API1.response[0]}}
  • Default markers: {{API1.response}}

Now, once you put valid values for the driver and car ids in the input field boxes and submit a request, the latest location of the car is marked on the map.

alert-icon

Please enter a valid email address.

Request Submitted

Thank you for requesting a demo.
You will receive your login information to your email soon.