21 Feb 2024
Min Read
Stream Processing for Blockchain Data
Cryptocurrencies, smart contracts, NFTs, and Web3 have infiltrated mainstream media as the newest hot tech (other than generative AI of course). These technologies are backed by blockchains, which are distributed ledgers that rely on cryptography and decentralization to make secure transactions. In this blog post, we’ll be building a stream processing application using DeltaStream to inspect the gas fees associated with Ethereum transactions.
Ethereum is one of the most popular cryptocurrencies. Using Ethereum, users can set up wallets, transfer ether between accounts, interact with smart contracts, and more. With over a million transactions occurring on the Ethereum network every day (the latest Ethereum usage statistics), there is a lot of real-time data getting processed by the network. However, off-chain analytics can also play a role – to extract insights from the blockchain or the metadata associated with the blockchain.
For this use case, we are going to be doing real-time analysis from the transactions data persisted to Ethereum’s blockchain. Any time an Ethereum user wants to perform an action in the network, whether it’s running a smart contract or simply transferring ether from their wallet to another wallet, a transaction needs to be created. The user will send this transaction to what are called “validators” who will persist this transaction to the blockchain. Once a transaction is part of a block on the blockchain, that transaction is completed as blocks are generally irreversible. However, each block on the blockchain has a gas limit, which essentially caps how many transactions can be in a particular block. Each transaction requires a certain amount of gas – a simple transfer of ether from one wallet to another costs 21,000 gas for example. Running complex smart contracts will require more gas and will fill up a block’s gas limit more quickly. This means that not every transaction automatically gets persisted to the blockchain as validators pick which set of transactions they want to include in the next block (read more about gas fees in Ethereum).
After the Ethereum Improvement Proposal (EIP) 1559 upgrade was added to Ethereum, the gas fee structure changed to include priority fees. In order to make your own transaction more attractive for validators to pick, a priority fee can be attached to the transaction. This priority fee is a tip to the validator if they write your transaction to a block. So, the larger the priority fee, the more likely a validator will include the transaction. The question we want to help solve in this post is what should we set the priority fee to be?
Using real Ethereum transactions data from Infura.io, we want to look at what transactions are being persisted to Ethereum’s blockchain in real-time and get a sense of how big the priority fees are for these transactions. So, in the following sections you’ll see how we can create an application to analyze gas fees in real time using DeltaStream. As an Ethereum user, these insights will be valuable for setting reasonable priority fees on transactions without overpaying.
Setup and Assumptions for Analyzing Real-Time Data
Using Infura.io’s APIs, we are able to get Ethereum’s entire blockchain block data. We wrote a small program to wait for new blocks, get the transactions in each block, and write these transactions as JSON-formatted payloads to Kafka. We’ll be using these transactions as the source data for our use case. However, there are some assumptions that we’ll be making to simplify our use case. These assumptions are listed out below for completeness:
- We are ignoring legacy transactions that are not type 2 (read more about Ethereum’s transaction types). Type 2 transactions are those that adhere to the post EIP-1559 specifications and use priority fees. Since the EIP-1559 upgrade is backwards compatible, users can still send transactions using the old specifications, but we are ignoring these transactions to simplify our use case.
- For our transactions data, we are enriching each transaction payload with the respective block’s timestamp and the transaction’s hash. These fields are not part of the transaction message itself.
- Users set the maxPriorityFeePerGas and the maxFeePerGas when making a transaction. While it’s not always the case, for simplicity we will assume that the (maxPriorityFeePerGas + baseFeePerGas) > maxFeePerGas.
- Occasionally, transactions with maxPriorityFeePerGas set to 0 or very low values will make it onto the blockchain. The reason validators choose these transactions is likely because users have bribed the validators (learn more about bribes in Ethereum in this research paper). As you’ll see later in the setup, we are going to filter out any transactions with maxPriorityFeePerGas <= 100.
Let’s get on with the setup of our use case. In DeltaStream, after setting up Kafka as a Store, we can create a Stream that is backed by the transactions data in our Kafka topic. The Stream that we create is metadata that informs DeltaStream how to deserialize the records in the Kafka topic. The following CREATE STREAM statement creates a new Stream called eth_txns
that is backed by the ethereum_transactions
topic.
CREATE STREAM eth_txns ( "txn_hash" VARCHAR, "block_ts" BIGINT, "blockNumber" BIGINT, "gas" BIGINT, "maxFeePerGas" BIGINT, "maxPriorityFeePerGas" BIGINT, "value" BIGINT ) WITH ( 'topic' = 'ethereum_transactions', 'value.format' = 'json', 'timestamp' = 'block_ts' );
Now that we have our eth_txns
source Stream defined, we first need to filter out transactions that don’t fit our assumptions. We can write a CREATE STREAM AS SELECT (CSAS) query that will continuously ingest data from the eth_txns
Stream, filter out records that don’t meet our criteria, and sink the resulting records to a new Stream backed by a new Kafka topic. In the following query, note in the WHERE clause that we only accept transactions that have maxPriorityFeePerGas > 100 (transactions chosen due to bribes) and maxPriorityFeePerGas < maxFeePerGas (transactions whose priority fee is not accurately represented by maxPriorityFeePerGas).
CSAS query to create eth_txns_filtered
Stream:
CREATE STREAM eth_txns_filtered AS SELECT "txn_hash", "block_ts", "blockNumber", "maxPriorityFeePerGas", "gas" FROM eth_txns WITH ('source.deserialization.error.handling'='IGNORE') WHERE "maxPriorityFeePerGas" > 100 AND "maxPriorityFeePerGas" < "maxFeePerGas";
Analyzing Ethereum’s Blockchain Transaction Gas Fee Data
When forming the next block, Ethereum’s validators basically get to select whichever transactions they want from a pool of pending transactions. Since new blocks are only added every several seconds and blocks have a gas limit, we can expect validators to choose transactions that are in their best financial interest and choose the transactions with the highest priority fees per gas. So, we’ll analyze the maxPriorityFeePerGas field over time as transactions flow in to get a sense of what priority fees are currently being accepted.
The following query is a CREATE CHANGELOG AS SELECT (CCAS) query that is calculating the moving average, min, max, and standard deviation of priority fees over a 2 minute window.
CCAS query to create eth_txns_priority_fee_analysis
Changelog:
CREATE CHANGELOG eth_txns_priority_fee_analysis AS SELECT window_start, window_end, COUNT(*) AS txns_cnt, MIN("maxPriorityFeePerGas") AS min_priority_fee, MAX("maxPriorityFeePerGas") AS max_priority_fee, AVG("maxPriorityFeePerGas") AS avg_priority_fee, STDDEV_SAMP("maxPriorityFeePerGas") AS priority_fee_stddev FROM HOP(eth_txns_filtered, SIZE 2 MINUTES, ADVANCE BY 15 SECONDS) GROUP BY window_start, window_end;
Let’s see some result records in the eth_txns_priority_fee_analysis
topic:
{ "window_start": "2023-12-18 21:14:45", "window_end": "2023-12-18 21:16:45", "txns_cnt": 368, "min_priority_fee": 50000000, "max_priority_fee": 32250000000, "avg_priority_fee": 859790456, "priority_fee_stddev": 97003259 } { "window_start": "2023-12-18 21:15:00", "window_end": "2023-12-18 21:17:00", "txns_cnt": 514, "min_priority_fee": 50000000, "max_priority_fee": 219087884691, "avg_priority_fee": 1951491416, "priority_fee_stddev": 79308531 }
Using these results, users can be better informed at setting priority fees for their own transactions. For example, if the transactions are more urgent, they can choose to set the priority fee to a value greater than the average. Similarly, if users want to save money and don’t mind waiting for some time for their transactions to make it onto the blockchain, they can choose a priority fee that is less than the average. These results are also useful for follow on use cases, such as tracking priority fees over a period of time. DeltaStream’s pattern recognition capabilities also allow users to track patterns in the priority fees. For example, users could set up a pattern recognition query to detect when priority fees stop trending upwards or when priority fees experience a sudden drop off.
Intersecting Web3 and Stream Processing
In this blog post, we put together a real-time streaming analytics pipeline to analyze Ethereum’s gas fees. With DeltaStream’s easy-to-use platform, we were able to solve the use case and deploy our pipeline within minutes, using only a few simple SQL queries. Although this is an entry level example, we illustrate a use case at the intersection of these two emerging technologies.
If you are interested in learning more about DeltaStream, schedule a demo with us or sign up for a free trial.