DocumentationSubstreamsBasicsModules

Modules

In this chapter, we’ll explore the core components of modules in Substreams through practical examples, focusing on both map and store modules. We’ll walk through a Uniswap pool creation example to illustrate these concepts.

Maps

As previously discussed, map modules are used for stateless data extraction, filtering, and transformation. In the case of Uniswap, we can utilize a map module to track the V3 Pool Factory contract, and extract events relating to the creation of a Pool.

Signature & Manifest

map modules are defined using the #[substreams::handlers::map] attribute and return a result containing the output Protobuf message, in this case, a protobuf message representing all Uniswap pools created within the block called Pools.

#[substreams::handlers::map]
fn map_pools_created(blk: eth::Block) -> Result<Pools, substreams::errors::Error> {
   ...
}

You’ll also notice that the map module we have defined takes in a Block object as a parameter,which allows us to extract data from the block that is being processed by our Substreams package. We define this module we have created in our substreams.yaml manifest file, as well as any subsquent modules we create.

modules:
  - name: map_pools_created
    kind: map
    initialBlock: 12369621
    inputs:
      - source: sf.ethereum.type.v2.Block
    output:
      type: proto:contract.v1.Pools

Event & Data Extraction

Typically map modules are used for extracting events from blocks. In our case, we would like to extract data relating to the PoolCreated event from the Uniswap V3 Factory contract, and create a Pool object for each of these events emitted within the block.

#[substreams::handlers::map]
fn map_pools_created(blk: eth::Block) -> Result<Pools, substreams::errors::Error> {
    Ok(Pools {
        pools: blk
            .events::<PoolCreated>(&[&UNISWAP_V3_FACTORY])
            .map(|(event, log)| Pool {
                address: Hex::encode(event.pool),
                token0: Hex::encode(event.token0),
                token1: Hex::encode(event.token1),
                created_at_tx_hash: Hex(&log.receipt.transaction.hash).to_string(),
                created_at_block_number: blk.number,
                created_at_timestamp: blk.timestamp_seconds(),
                log_ordinal: log.ordinal(),
 
            })
            .collect(),
    })
}

The Block object we have defined as an input to our module contains a helper function called eventsthat allows us to extract sepcific events from the block. The event is specified using the turbofish syntax ::<PoolCreated>, and the &[&UNISWAP_V3_FACTORY] parameter specifies that we want to extract events from the Uniswap V3 Factory contract. From here we can use the map function on the iterator that events returns to create our Pool objects.

Often, the events emitted from a contract do not provide enough data for us to build up a meaningful object. Fortunately Substreams provides us with a heap of data related to the event and the block we are processing. In this case, we can use the log object associated with the event to extract the transaction hash that emitted the event, the log ordinal (more on these later), the block number from the Block parameter, and the timestamp of the block.

Stores

store module are used for stateful transformations, allowing data persistence across blocks. Let’s extend out our example by building a store module which will keep a count of all the Uniswap pools created across all blocks.

Signature & Manifest

store modules are defined using the #[substreams::handlers::store] attribute and interact with a stateful store of a specific data type. Like map modules, these modules have protobuf inputs that can be the built-in Firehose provisioned types such as Block or Clock, or custom protobuf messages defined in your projects proto folder, such as Pools. The store module also takes an input parameter for the key-value store we will be writing to.

#[substreams::handlers::store]
pub fn store_pool_count(pools: Pools, store: StoreAddInt64) {
    ...
}

The module, like all other modules in a Substreams package, needs to be defined in the substreams.yaml manifest file.

- name: store_pool_count
  kind: store
  updatePolicy: add
  valueType: int64
  inputs:
    - map: map_pools_created

Update Policies & Value Types

The valueType property defined in our manifest file for the store module sets the type of data that the module will store. In our case, we want to store the number of pools created in a block, so we set the value type to int64.

The updatePolicy property defines how the module will update the store. In our case, we want to sum up all the pools created, so we set the update policy to add.

The combination of the valueType and updatePolicy informs Substreams what type of data can be saved to the store, as well as what operations can be performed on it.

#[substreams::handlers::store]
pub fn store_pool_count(pools: Pools, store: StoreAddInt64) {
    for pool in pools.pools {
        store.add(pool.log_ordinal, "pools_count", 1)
    }
}

Given that the module definitions valueType is of int64 and the updatePolicy is add, the rust modules store parameter is of type StoreAddInt64.

A comprehensive list of all the available update policies and value types can be found in the Streaming Fast Important - Store Types Documentation.

Let’s consider another example where we store detailed information about each created pool using a different update policy and type.

#[substreams::handlers::store]
fn store_pools_created(pools: Pools, store: StoreSetProto<Pool>) {
    for pool in pools.pools {
        store.set(pool.log_ordinal, format!("pool:{}", &pool.address), &pool)
    }
}

This module is defined in the substreams.yaml manifest file as follows:

- name: store_pools_created
  kind: store
  updatePolicy: set
  valueType: proto:contract.types.v1.Pools
  inputs:
    - map: map_pools_created

Here, the updatePolicy is set to set, which means each new Pool object will replace the previous value associated with the same key. The valueType is a custom Protobuf type Pool. This allows subsequents modules to extract pool specific data from the store. An alternative to this updatePolicy would be utilizing set_if_not_exists, which would only store the new value if the key does not already exist in the store.

#[substreams::handlers::store]
fn store_pools_created(pools: Pools, store: StoreSetIfNotExistsProto<Pool>) {
    for pool in pools.pools {
        store.set(pool.log_ordinal, format!("pool:{}", &pool.address), &pool)
    }
}
- name: store_pools_created
  kind: store
  updatePolicy: set_if_not_exists
  valueType: proto:contract.types.v1.Pool
  inputs:
    - map: map_pools_created

Factory Data Sources

If you come from a Subgraph development background, you will be aware of Data Source Templates. These allow Subgraphs to track “child” contracts that have been deployed using a Factory or Registry contract.

Understanding Factory Contracts

Factory contracts are a common practice in smart contract development. They are used to deploy multiple instances of a similar contract, known as child contracts. These child contracts inherit common properties and can be individually tracked for events and state changes. These are unique data sources, the addresses of which are not known up-front.

In the context of Uniswap, the V3 Factory contract is responsible for deploying numerous pool contracts, each representing a distinct trading pair. This pattern allows for deployment of multiple similar contracts through a single factory contract. The diagram below illustrates the relationship between the Uniswap V3 Factory contract and its child contracts.

Tracking Child Contracts in Substreams

So how do we track these child contracts in our Substreams? Let’s work through an example on how we can do this, building on our Uniswap Substreams.

Say we would like to track the Swap events associated with all of the Pools created by the Uniswap V3 Factory contract. We can do this by building a map module and utilizing our previously built store module.

#[substreams::handlers::map]
pub fn map_pool_events(
    blk: eth::Block,
    pool_store: StoreGetProto<Pool>,
) -> Result<PoolEvents, substreams::errors::Error> {
    let mut pool_events = PoolEvents::default();
 
    for trx in blk.transactions() {
        for (log, call) in trx.logs_with_calls() {
            let pool_address = Hex::encode(&log.address);
 
            if let Some(pool) = pool_store.get_last(&pool_address) {
                if let Some(swap) = abi::pool_contract::events::Swap::match_and_decode(&log) {
                    pool_events.events.push(PoolEvent {
                        r#type: Some(pool_event::Type::SwapEvent(pool_event::SwapEvent {
                            sender: Hex::encode(&swap.sender),
                            recipient: Hex::encode(&swap.recipient),
                            amount0: swap.amount0.to_string(),
                            amount1: swap.amount1.to_string(),
                        })),
                        pool_address: pool.address,
                        tx_hash: Hex::encode(&trx.hash),
                        block_number: blk.number,
                        timestamp: blk.timestamp_seconds(),
                    })
                }
            }
        }
    }
    Ok(pool_events)
}
- name: map_pool_events
  kind: map
  inputs:
    - source: sf.ethereum.type.v2.Block
    - store: store_pools_created
  output:
    type: proto:contract.v1.PoolEvents

Breaking Down the Module

  1. Module Inputs:
  • blk: eth::Block: Represents the block being processed.
  • pool_store: StoreGetProto<Pool>: A store module that allows us to access the pools previously saved in store_pools_created.
  1. Module Outputs:
  • PoolEvents: A custom Protobuf message containing all extracted Swap events.
  1. Processing Transactions:
  • The module iterates through each transaction (trx) in the block (blk).
  • For each transaction, it processes the logs and calls associated with it.
  1. Using the Store:
  • For each log in the transaction, it checks if the log address corresponds to a pool that exists in the pool_store. This ensures that we are only processing events for known pools.
  1. Matching and Decoding Events:
  • The module uses the match_and_decode function to extract Swap events from the logs.
  • If a Swap event is found, it creates a PoolEvent object and populates it with relevant details such as the sender, recipient, amounts, transaction hash, block number, and timestamp.

This map module is responsible for tracking all pools created via the Uniswap V3 Factory contract and extracting all Swap events that occur within these pools at each block. This approach leverages the previously built store module to maintain a dynamic data source of child contracts.

Understanding Ordinals

Ordinals in Substreams allow a key-value store to have multiple versions of a key within a single block. They are crucial for maintaining the order of updates and ensuring that downstream modules can correctly interpret the state of data at different points within a block.

Explanation with Example

Consider the example of tracking Uniswap pool swaps. Let’s extend our existing module to demonstrate ordinals:

#[substreams::handlers::store]
#[substreams::handlers::store]
pub fn store_pool_swaps_count(pool_events: PoolEvents, store: StoreAddInt64) {
    for event in pool_events.events {
        if let Some(event_type) = event.r#type {
            match event_type {
                Type::SwapEvent(swap) => {
                    store.add(
                        event.log_ordinal,
                        format!("pool_swaps_count:{}", &event.pool_address),
                        1,
                    );
                }
            }
        }
    }
}

Here’s how it works:

  • Log Ordinal: Each event log has an associated ordinal, representing its order within the block. By using the log_ordinal associated with the Swap event from the upstream map_pool_events module when updating the store, we ensure that each update is correctly ordered.
  • Multiple Updates: If multiple swaps occur within the same block, the store will record each update separately, preserving the order of events.

Example in Context

Imagine a block with the following transactions:

  • Transaction A: Swap event (log ordinal 1)
  • Transaction B: Swap event (log ordinal 2)
  • Transaction C: Swap event (log ordinal 3)

The store_pool_swaps function will:

  1. Add the first swap event with ordinal 1.
  2. Add the second swap event with ordinal 2.
  3. Add the second swap event with ordinal 3.

This sequence ensures that any downstream module querying the store can understand the exact order of events within the block.

Using Ordinals Downstream

When a downstream module queries the store, it can specify an ordinal to retrieve the state of a key at a specific point within the block:

let swap_count = store.get_at("swaps_count", log_ordinal);

So if we passed in a log ordinal of 2, we would be able to retrieve the state of the swaps_count key at that point in the block. This capability allows gives Substreams developers granular control over what data they retrieve from a store within a block.

Clarifications and Points for Developers

  • Importance of Ordinals: Ordinals ensure that the state changes are recorded in the exact order they occur within a block. This is crucial for applications that require precise historical data, such as financial transactions or state changes in a smart contract.

  • Setting Ordinals: Ordinals must be set every time a key is updated. Ensure that ordinals are always increasing or equal to the previous ordinal to maintain data integrity.

  • Querying with Ordinals: When querying the store, specifying an ordinal allows you to retrieve the state of the data at that exact point. This is useful for debugging, auditing, and ensuring the correctness of your data processing logic.

  • Real-life Use Case: In the context of token price tracking, knowing the price before and after specific transactions within the same block can be crucial for trading algorithms, arbitrage detection, and more.

Utilizing Stores Downstream

In Substreams, stores can be used in two modes: Get and Deltas. Each mode serves a different purpose and is used in different scenarios.

Get Mode

Get mode allows you to query the store and retrieve the current value associated with a key. This is the most common mode used in Substreams.

In the map_pool_events module, we use the StoreGetProto type to retrieve the pool details.

#[substreams::handlers::map]
pub fn map_pool_events(
    blk: eth::Block,
    pool_store: StoreGetProto<Pool>,
) -> Result<PoolEvents, substreams::errors::Error> {
    let mut pool_events = PoolEvents::default();
 
    for trx in blk.transactions() {
        for (log, _) in trx.logs_with_calls() {
            let pool_address = Hex::encode(&log.address);
            // Query the Pool store to get the Pool proto
            if let Some(pool) = pool_store.get_last(format!("pool:{}", &pool_address)) {
                substreams::log::debug!("Found pool: {}", &pool_address);
                if let Some(swap) = abi::pool_contract::events::Swap::match_and_decode(&log) {
                    ...
                }
                if let Some(mint) = abi::pool_contract::events::Mint::match_and_decode(&log) {
                    ...
                }
                if let Some(burn) = abi::pool_contract::events::Burn::match_and_decode(&log) {
                    ...
                }
            }
        }
    }
    Ok(pool_events)
}

In this example, we use pool_store.get_last to get the latest state of a pool by its address.

Stores in Get mode expose three methods:

  • get_last: Retrieves the latest store value. This method is the most efficient as it queries the store directly and is typically used when the most up-to-date value is needed. For instance, if a store undergoes multiple updates within a block, get_last provides the final state after all updates.

  • get_at: Retrieves the value associated with a specific ordinal. This is useful when you need the value at a specific point during a block. The ordinal needs to match the ordinal used when setting the value in the store upstream. For example, if a store is updated three times within a block, with ordinals 1, 2, and 3 respectively, you can retrieve the value at ordinal 2 by using store.get_at(2, key). The method works by starting with the latest value and iterating through the deltas in reverse order until it finds the specified ordinal.

  • get_first: Retrieves the initial value at the start of the block. It first checks the current block’s deltas in reverse order before querying the store. This is useful if the key being queried was mutated within the block. Essentially, it provides the old value of the first delta for the key in the current block, which reflects the final state of the store value from the previous block.

Deltas Mode

Deltas mode provides the changes that occurred to a store’s keys during block processing. This mode is useful for understanding how data evolves within a single block, and can be utilized to identify if there were any changes to a store within a block.

Let’s extend our Substreams to track changes in token balances and calculate the TVL (Total Value Locked) for each pool using deltas.

#[substreams::handlers::store]
pub fn store_pool_tvl(
    pool_store: StoreGetProto<Pool>,
    balances_store: StoreGetBigInt,
    balances_deltas: Deltas<DeltaBigInt>,
    chainlink_prices: StoreGetBigDecimal,
    store: StoreSetBigDecimal,
) {
    // Iterate through balance deltas to identify changes in pool balances
    for delta in balances_deltas.deltas {
        let pool_address = key::segment_at(&delta.key, 1);
        let token_id = key::segment_at(&delta.key, 2);
 
        if let Some(pool) = pool_store.get_last(format!("pool:{}", &pool_address)) {
            let token0_balance: BigInt;
            let token1_balance: BigInt;
 
            if token_id == "0" {
                // Use `new_value` to avoid a store lookup
                token0_balance = delta.new_value;
                token1_balance = balances_store
                    .get_last(format!("pool_token_balance:{}:1", &pool_address))
                    .unwrap_or_else(|| BigInt::zero());
            } else {
                token0_balance = balances_store
                    .get_last(format!("pool_token_balance:{}:0", &pool_address))
                    .unwrap_or_else(|| BigInt::zero());
                // Use `new_value` to avoid a store lookup
                token1_balance = delta.new_value;
            }
 
            let token0 = pool.token0.unwrap();
            let token1 = pool.token1.unwrap();
 
            let token0_price = chainlink_prices
                .get_last(format!("price_by_symbol:{}:USD", &token0.symbol))
                .unwrap_or_else(|| BigDecimal::zero());
            let token1_price = chainlink_prices
                .get_last(format!("price_by_symbol:{}:USD", &token1.symbol))
                .unwrap_or_else(|| BigDecimal::zero());
 
            let token0_tvl = token0_balance.to_decimal(token0.decimals) * token0_price;
            let token1_tvl = token1_balance.to_decimal(token1.decimals) * token1_price;
            let tvl = token0_tvl + token1_tvl;
 
            store.set(delta.ordinal, format!("pool_tvl:{}", &pool_address), &tvl);
        }
    }
}

In this example:

  • We use balances_deltas to identify if any changes in token balances occurred during the block.
  • We use delta.new_value directly to avoid performing extra store lookups.
  • The TVL for each pool is calculated based on the latest prices and balances.

Additionally, each delta contains an operation field. We can use pattern matching on this field to perform custom logic based on the operation type of the delta. The operation field is an enum which can be one of UNSET, CREATE, UPDATE, or DELETE.

for delta in balances_deltas.deltas {
    match delta.operation {
        Operation::CREATE => {
            // Custom logic for CREATE operation
        }
        Operation::UPDATE => {
            // Custom logic for UPDATE operation
        }
        Operation::DELETE => {
            // Custom logic for DELETE operation
        }
        _ => {}
    }
}

This allows you to perform specific logic based on the operation type of the delta based on the needs of your Substreams.

More information on deltas mode can be found in the Streaming Fast Documentation.

Importing Packages

As we have discussed in prior sections, Substreams packages are designed to be composable, allowing you to import and utilize the outputs of other Substreams packages in your own. This enables developers to build on top of existing work and leverage the data and transformations provided by other packages. Let’s explore how to import and use other Substreams packages in your project.

You may have noticed that we utilize a chainlink_prices store in the code snippet in Deltas Mode. This store module comes from an external package we have imported into our Substreams project. The source code for this module can be found in The Graph BuildersDAO Github.

Importing the package into our Substreams is as simple as updating the imports field in our substreams.yaml manifest file.

imports:
  ...
  chainlink_prices: https://github.com/Graph-BuildersDAO/substreams/releases/download/chainlink-prices-v1.0.2/chainlink-price-substream-v1.0.2.spkg

Once we have done this we can also override the initialBlock associated with the packages modules in our substreams.yaml manifest file.

networks:
  mainnet:
    initialBlock:
      chainlink_prices:store_confirmed_feeds: 12369621
      chainlink_prices:get_chainlink_answers: 12369621
      chainlink_prices:chainlink_price_store: 12369621
      chainlink_prices:graph_out: 12369621

This will set the initialBlock for the chainlink_prices modules to 12369621, the same as our modules.

We then need to update the module definition in our substreams.yaml file for any module that wants to utilize a module from the imported package.

- name: store_pool_tvl
  kind: store
  updatePolicy: set
  valueType: bigdecimal
  inputs:
    - store: store_pools_created
    - store: store_pool_token_balances
    - store: store_pool_token_balances
      mode: deltas
    - store: chainlink_prices:chainlink_price_store

The - store: chainlink_prices:chainlink_price_store line specifies that we want to use the chainlink_price_store module from the chainlink_prices package in our store_pool_tvl module.

At this point, we need to know what type of store the chainlink_price_store module is. To do this we can inspect the codebase, however, a quicker way is to run substreams info once we have imported this package and built it. substreams info provides us with information pertaining to each module in our package, pretty neat right!

Name: chainlink_prices:chainlink_price_store
Initial block: 12369621
Kind: store
Input: map: chainlink_prices:get_chainlink_answers
Value Type: bigdecimal
Update Policy: set
Hash: b7a00d531c40e3bb0226f74d304ddf2740c66f55

Using this acquired information, we can add the store as a parameter to our rust module.

#[substreams::handlers::store]
pub fn store_pool_tvl(
    pool_store: StoreGetProto<Pool>,
    balances_store: StoreGetBigInt,
    balances_deltas: Deltas<DeltaBigInt>,
    chainlink_prices: StoreGetBigDecimal,
    store: StoreSetBigDecimal,
) {
    ...
}

Within the module we can then query this store to fetch prices for tokens in our pools, and use these values to calculate the TVL for each pool.

let token0_price = chainlink_prices
    .get_last(format!("price_by_symbol:{}:USD", &token0.symbol))
    .unwrap_or_else(|| BigDecimal::zero());
let token1_price = chainlink_prices
    .get_last(format!("price_by_symbol:{}:USD", &token1.symbol))
    .unwrap_or_else(|| BigDecimal::zero());
 
let token0_tvl = token0_balance.to_decimal(token0.decimals) * token0_price;
let token1_tvl = token1_balance.to_decimal(token1.decimals) * token1_price;
let tvl = token0_tvl + token1_tvl;
 
store.set(delta.ordinal, format!("pool_tvl:{}", &pool_address), &tvl);

In this section, we have demonstrated the powerful nature of composability in Substreams by importing and utilizing an external package to enhance our Substreams project. This approach allows developers to build on top of existing work, leverage shared data and transformations, and create more sophisticated and feature-rich applications without the need for re-inventing the wheel.