DynamoDB Filtering and Aggregation Queries Using SQL on Rockset

September 13, 2022

,

The challenges

Customer expectations and the corresponding demands on applications have never been higher. Users expect applications to be fast, reliable, and available. Further, data is king, and users want to be able to slice and dice aggregated data as needed to find insights. Users don't want to wait for data engineers to provision new indexes or build new ETL chains. They want unfettered access to the freshest data available.

But handling all of your application needs is a tall task for any single database. For the database, optimizing for frequent, low-latency operations on individual records is different from optimizing for less-frequent aggregations or heavy filtering across many records. Many times, we try to handle both patterns with the same database and deal with the inconsistent performance as our application scales. We think we are optimizing for minimal effort or cost, when in fact we are doing the opposite. Running analytics on an OLTP database usually requires that we overprovision a database to account for peaks in traffic. This ends up costing a lot of money and usually fails to provide a pleasing end user experience.

In this walkthrough, we'll see how to handle the high demands of users with both of these access patterns. We'll be building a financial application in which users are recording transactions and viewing recent transactions while also wanting complex filtering or aggregations on their past transactions.

A hybrid approach

To handle our application needs, we'll be using Amazon DynamoDB with Rockset. DynamoDB will handle our core transaction access patterns -- recording transactions plus providing a feed of recent transactions for users to browse. Rockset will supplement DynamoDB to handle our data-heavy, "delightful" access patterns. We'll let our users filter by time, merchant, category, or other fields to find the relevant transactions, or to perform powerful aggregations to view trends in spending over time.

As we work through these patterns, we will see how each of these systems are suited to the job at hand. DynamoDB excels at core OLTP operations -- reading or writing an individual item, or fetching a range of sequential items based on known filters. Due to the way it partitions data based on the primary key, DynamoDB is able to provide consistent performance for these types of queries at any scale.

Conversely, Rockset excels at continuous ingestion of large amounts of data and employing multiple indexing strategies on that data to provide highly selective filtering, real-time or query-time aggregations, and other patterns that DynamoDB cannot handle easily.

As we work through this example, we'll learn both the fundamental concepts underlying the two systems as well as practical steps to accomplish our goals. You can follow along with the application using the GitHub repo.

Implementing core features with DynamoDB

We will start this walkthrough by implementing the core features of our application. This is a common starting point for any application, as you build the standard "CRUDL" operations to provide the ability to manipulate individual records and list a set of related records.

For an e-commernce application, this would be the functionality to place an order and view previous orders. For a social media application, this would be creating posts, adding friends, or viewing the people you follow. This functionality is typically implemented by databases that specialize in online transactional processing (OLTP) workflows that emphasize many concurrent operations against a small number of rows.

For this example, we are building a business finance application where a user can make and receive payments, as well as view the history of their transactions.

The example will be intentionally simplified for this walkthrough, but you can think of three core access patterns for our application:

  • Record transaction, which will store a record of a payment made or received by the business;
  • View transactions by date range, which will allow users to see the most recent payments made and received by a business; and
  • View individual transaction, which will allow a user to drill into the specifics of a single transaction.

Each of these access patterns is a critical, high-volume access pattern. We will constantly be recording transactions for users, and the transaction feed will be the first view when they open the application. Further, each of these access patterns will use known, consistent parameters to fetch the relevant record(s).

We'll use DynamoDB to handle these access patterns. DynamoDB is a NoSQL database provided by AWS. It's a fully managed database, and it has growing popularity in both high-scale applications and in serverless applications.

One of DynamoDB's most unique features is how it provides consistent performance at any scale. Whether your table is 1 megabyte or 1 petabyte, you should see the same response time for your operations. This is a desirable quality for core, OLTP use cases like the ones we're implementing here. This is a great and valuable engineering achievement, but it is important to understand that it was achieved by being selective about the kinds of queries that will perform well.

DynamoDB is able to provide this consistent performance through two core design decisions. First, each record in your DynamoDB table must include a primary key. This primary key is made up of a partition key as well as an optional sort key. The second key design decision for DynamoDB is that the API heavily enforces the use of the primary key - more on this later.

In the image below, we have some sample transaction data in our FinTech application. Our table uses a partition key of the organization name in our application, plus a ULID-based sort key that provides the uniqueness characteristics of a UUID plus sortability by creation time that allow us to make time-based queries.

dynamodb-filtering-aggregation-1

The records in our table include other attributes, like merchant name, category, and amount, that are useful in our application but aren't as critical to DynamoDB's underlying architecture. The important part is in the primary key, and specifically the partition key.

Under the hood, DynamoDB will split your data into multiple storage partitions, each containing a subset of the data in your table. DynamoDB uses the partition key element of the primary key to assign a given record to a particular storage partition.

dynamodb-filtering-aggregation-2

As the amount of data in your table or traffic against your table increases, DynamoDB will add partitions as a way to horizontally scale your database.

As mentioned above, the second key design decision for DynamoDB is that the API heavily enforces the use of the primary key. Almost all API actions in DynamoDB require at least the partition key of your primary key. Because of this, DynamoDB is able to quickly route any request to the proper storage partition, no matter the number of partitions and total size of the table.

With these two tradeoffs, there are necessarily limitations in how you use DynamoDB. You must carefully plan and design for your access patterns upfront, as your primary key must be involved in your access patterns. Changing your access patterns later can be difficult and may require some manual migration steps.

When your use cases fall within DynamoDB's core competencies, you will reap the benefits. You'll receive consistent, predictable performance no matter the scale, and you won't see long-term degradation of your application over time. Further, you'll get a fully managed experience with low operational burden, allowing you to focus on what matters to the business.

The core operations in our example fit perfectly with this model. When retrieving a feed of transactions for an organization, we will have the organization ID available in our application that will allow us to use the DynamoDB Query operation to fetch a contiguous set of records with the same partition key. To retrieve additional details on a specific transaction, we will have both the organization ID and the transaction ID available to make a DynamoDB GetItem request to fetch the desired item.

You can see these operations in action with the sample application. Follow the instructions to deploy the application and seed it with sample data. Then, make HTTP requests to the deployed service to fetch the transaction feed for individual users. These operations will be fast, efficient operations regardless of the number of concurrent requests or the size of your DynamoDB table.

Supplementing DynamoDB with Rockset

So far, we've used DynamoDB to handle our core access patterns. DynamoDB is great for these patterns as its key-based partitioning will provide consistent performance at any scale.

However, DynamoDB is not great at handling other access patterns. DynamoDB does not allow you to efficiently query by attributes other than the primary key. You can use DynamoDB's secondary indexes to reindex your data by additional attributes, but it can still be problematic if you have many different attributes that may be used to index your data.

Additionally, DynamoDB does not provide any aggregation functionality out of the box. You can calculate your own aggregates using DynamoDB, but it may be with reduced flexibility or with unoptimized read consumption as compared to a solution that designs for aggregation up front.

To handle these patterns, we will supplement DynamoDB with Rockset.

Rockset is best thought of as a secondary set of indexes on your data. Rockset uses only these indexes at query time and does not project any load back into DynamoDB during a read. Rather than individual, transactional updates from your application clients, Rockset is designed for continuous, streaming ingestion from your primary data store. It has direct connectors for a number of primary data stores, including DynamoDB, MongoDB, Kafka, and many relational databases.

dynamodb-filtering-aggregation-3

As Rockset ingests data from your primary database, it then indexes your data in a Converged Index, which borrows concepts from: a row index, an inverted index, and a columnar index. Additional indexes, such as range, type and geospatial are automatically created based on the data types ingested. We'll discuss the specifics of these indexes below, but this Converged Index allows for more flexible access patterns on your data.

This is the core concept behind Rockset -- it is a secondary index on your data using a fully managed, near-real-time ingestion pipeline from your primary datastore.

Teams have long been extracting data from DynamoDB to insert into another system to handle additional use cases. Before we move into the specifics of how Rockset ingests data from your table, let's briefly discuss how Rockset differs from other options in this space. There are a few core differences between Rockset and other approaches.

Firstly, Rockset is fully managed. Not only are you not required to manage the database infrastructure, but also you don't need to maintain the pipeline to extract, transform, and load data into Rockset. With many other solutions, you're in charge of the "glue" code between your systems. These systems are critical yet failure-prone, as you must defensively guard against any changes in the data structure. Upstream changes can result in downstream pain for those maintaining these systems.

Secondly, Rockset can handle real-time data in a mutable way. With many other systems, you get one or the other. You can choose to perform periodic exports and bulk-loads of your data, but this results in stale data between loads. Alternatively, you can stream data into your data warehouse in an append-only fashion, but you can't perform in-place updates on changing data. Rockset is able to handle updates on existing items as quickly and efficiently as it inserts new data and thus can give you a real-time look at your changing data.

Thirdly, Rockset generates its indexes automatically. Other 'fully managed' solutions still require you to configure indexes as you need them to support new queries. Rockset's query engine is designed to use one set of indexes to support any and all queries. As you add more and more queries to your system, you do not need to add additional indexes, taking up more and more space and computational resources. This also means that ad hoc queries can fully leverage the indexes as well, making them fast without waiting for an administrator to add a bespoke index to support them.

How Rockset ingests data from DynamoDB

Now that we know the basics of what Rockset is and how it helps us, let's connect our DynamoDB table to Rockset. In doing so, we will learn how the Rockset ingestion process works and how it differs from other options.

Rockset has purpose-built connectors for a number of data sources, and the specific connector implementation depends on the specifics of the upstream data source.

For connecting with DynamoDB, Rockset relies on DynamoDB Streams. DynamoDB Streams is a change data capture feature from DynamoDB where details of each write operation against a DynamoDB table are recorded in the stream. Consumers of the stream can process these changes in the same order they happened against the table to update downstream systems.

dynamodb-filtering-aggregation-4

A DynamoDB Stream is great for Rockset to stay up-to-date with a DynamoDB table in near real time, but it's not the full story. A DynamoDB Stream only contains records of write operations that occurred after the Stream was enabled on the table. Further, a DynamoDB Stream retains records for only 24 hours. Operations that occurred before the stream was enabled or more than 24 hours ago will not be present in the stream.

But Rockset needs not only the most recent data, but all of the data in your database in order to answer your queries correctly. To handle this, it does an initial bulk export (using either a DynamoDB Scan or an export to S3, depending on your table size) to grab the initial state of your table.

Thus, Rockset's DynamoDB connection process has two parts:

  1. An initial, bootstrapping process to export your full table for ingestion into Rockset;
  2. A subsequent, continuous process to consume updates from your DynamoDB Stream and update the data in Rockset.

Notice that both of these processes are fully managed by Rockset and transparent to you as a user. You won't be in charge of maintaining these pipelines and responding to alerts if there's an error.

Further, if you choose the S3 export method for the initial ingestion process, neither of the Rockset ingestion processes will consume read capacity units from your main table. Thus, Rockset won't take consumption from your application use cases or affect production availability.

Application: Connecting DynamoDB to Rockset

Before moving on to using Rockset in our application, let's connect Rockset to our DynamoDB table.

First, we need to create a new integration between Rockset and our table. We'll walk through the high-level steps below, but you can find more detailed step-by-step instructions in the application repository if needed.

In the Rockset console, navigate to the new integration wizard to start this process.

In the integration wizard, choose Amazon DynamoDB as your integration type. Then, click Start to move to the next step.

The DynamoDB integration wizard has step-by-step instructions for authorizing Rockset to access your DynamoDB table. This requires creating an IAM policy, an IAM role, and an S3 bucket for your table export.

You can follow those instructions to create the resources manually if you prefer. In the serverless world, we prefer to create things via infrastructure-as-code as much as possible, and that includes these supporting resources.

The example repository includes the infrastructure-as-code necessary to create the Rockset integration resources. To use these, first find the Rockset Account ID and External ID values at the bottom of the Rockset integration wizard.

dynamodb-filtering-aggregation-5

Copy and paste these values into the relevant sections of the custom block of the serverless.yml file. Then, uncomment the resources on lines 71 to 122 of the serverless.yml to create these resources.

Redeploy your application to create these new resources. In the outputs from the deploy, copy and paste the S3 bucket name and the IAM role ARN into the appropriate places in the Rockset console.

dynamodb-filtering-aggregation-6

Then, click the Save Integration button to save your integration.

After you have created your integration, you will need to create a Rockset collection from the integration. Navigate to the collection creation wizard in the Rockset console and follow the steps to use your integration to create a collection. You can also find step-by-step instructions to create a collection in the application repository.

Once you have completed this connection, generally, on a properly sized set of instances, inserts, updates or deletes to data in DynamoDB will be reflected in Rockset's index and available for querying in less than 2 seconds.

Using Rockset for complex filtering

Now that we have connected Rockset to our DynamoDB table, let's see how Rockset can enable new access patterns on our existing data.

Recall from our core features section that DynamoDB is heavily focused on your primary keys. You must use your primary key to efficiently access your data. Accordingly, we structured our table to use the organization name and the transaction time in our primary keys.

dynamodb-filtering-aggregation-1

This structure works for our core access patterns, but we may want to provide a more flexible way for users to browse their transactions. There are a number of useful attributes -- category, merchant name, amount, etc. -- that can be useful in filtering.

We could use DynamoDB's secondary indexes to enable filtering on more attributes, but that's still not a great fit here. DynamoDB's primary key structure does not easily allow for flexible querying that involve combinations of many, optional attributes. You could have a secondary index for filtering by merchant name and date, but you would need another secondary index if you wanted to allow filtering by merchant name, date, and amount. An access pattern that filters on category would require a third secondary index.

Rather than deal with that complexity, we'll lean on Rockset here.

We saw before that Rockset uses a Converged Index to index your data in multiple ways. One of those ways is an inverted index. With an inverted index, Rockset indexes each attribute directly.

dynamodb-filtering-aggregation-7

Notice how this index is organized. Each attribute name and value is used as the key of the index, and the value is a list of document IDs that include the corresponding attribute name and value. The keys are constructed so that their natural sort order can support range queries efficiently.

An inverted index is great for queries that have selective filter conditions. Imagine we want to allow our users to filter their transactions to find those that match certain criteria. Someone in the Vandelay Industries organization is interested in how many times they've ordered Chipotle recently.

You could find this with a query as follows:

SELECT *
FROM transactions
WHERE organization = 'Vandelay Industries'
AND merchant_name = 'Chipotle'

Because we're doing selective filters on the customer and merchant name, we can use the inverted index to quickly find the matching documents.

Rockset will look up both attribute name and value pairs in the inverted index to find the lists of matching documents.

dynamodb-filtering-aggregation-8

Once it has these two lists, it can merge them to find the set of records that match both sets of conditions, and return the results back to the client.

dynamodb-filtering-aggregation-9

Just like DynamoDB's partition-based indexing is efficient for operations that use the partition key, Rockset's inverted index gives you efficient lookups on any field in your data set, even on attributes of embedded objects or on values inside of embedded arrays.

Application: Using the Rockset API in your application

Now that we know how Rockset can efficiently execute selective queries against our dataset, let's walk through the practical aspects of integrating Rockset queries into our application.

Rockset exposes RESTful services that are protected by an authorization token. SDKs are also available for popular programming languages. This makes it a great fit for integrating with serverless applications because you don't need to set up complicated private networking configuration to access your database.

In order to interact with the Rockset API in our application, we will need a Rockset API key. You can create one in the API keys section of the Rockset console. Once you've done so, copy its value into your serverless.yml file and redeploy to make it available to your application.

Side note: For simplicity, we're using this API key as an environment variable. In a real application, you should use something like Parameter Store or AWS Secrets Manager to store your secret and avoid environment variables.

Look at our TransactionService class to see how we interact with the Rockset API. The class initialization takes in a Rockset client object that will be used to make calls to Rockset.

In the filterTransactions method in our service class, we have the following query to interact with Rockset:

    const response = await this._rocksetClient.queries.query({
      sql: {
        query: `
              SELECT *
              FROM Transactions
              WHERE organization = :organization
              AND category = :category
              AND amount BETWEEN :minAmount AND :maxAmount
              ORDER BY transactionTime DESC
              LIMIT 20`,
        parameters: [
          {
            name: "organization",
            type: "string",
            value: organization,
          },
          {
            name: "category",
            type: "string",
            value: category,
          },
          {
            name: "minAmount",
            type: "float",
            value: minAmount,
          },
          {
            name: "maxAmount",
            type: "float",
            value: maxAmount,
          },
        ],
      },
    });

There are two things to note about this interaction. First, we are using named parameters in our query when handling input from users. This is a common practice with SQL databases to avoid SQL injection attacks.

Second, the SQL code is intermingled with our application code, and it can be difficult to track over time. While this can work, there is a better way. As we apply our next use case, we'll look at how to use Rockset Query Lambdas in our application.

Using Rockset for aggregation

To this point, we've reviewed the indexing strategies of DynamoDB and Rockset in discussing how the database can find an individual record or set of records that match a particular filter predicate. For example, we saw that DynamoDB pushes you towards using a primary key to find a record, whereas Rockset's inverted index can efficiently find records using highly-selective filter conditions.

In this final section, we'll switch gears a bit to focus on data layout rather than indexing directly. In thinking about data layout, we'll contrast two approaches: row-based vs. column-based.

Row-based databases, like the name implies, arrange their data on disk in rows. Most relational databases, like PostgreSQL and MySQL, are row-based databases. So are many NoSQL databases, like DynamoDB, even if their records aren't technically "rows" in the relational database sense.

Row-based databases are great for the access patterns we've looked at so far. When fetching an individual transaction by its ID or a set of transactions according to some filter conditions, we generally want all of the fields to come back for each of the transactions. Because all the fields of the record are stored together, it generally takes a single read to return the record. (Note: some nuance on this coming in a bit).

Aggregation is a different story altogether. With aggregation queries, we want to calculate an aggregate -- a count of all transactions, a sum of the transaction totals, or an average spend by month for a set of transactions.

Returning to the user from the Vandelay Industries organization, imagine they want to look at the last three months and find the total spend by category for each month. A simplified version of that query would look as follows:

SELECT 
  category, 
  EXTRACT(month FROM transactionTime) AS month, 
  sum(amount) AS amount
FROM transactions
WHERE organization = 'Vandelay Industries' 
AND transactionTime > CURRENT_TIMESTAMP() - INTERVAL 3 MONTH
GROUP BY category, month
ORDER BY category, month DESC

For this query, there could be a large number of records that need to be read to calculate the result. However, notice that we don't need many of the fields for each of our records. We need only four -- category, transactionTime, organization, and amount -- to determine this result.

Thus, not only do we need to read a lot more records to satisfy this query, but also our row-based layout will read a bunch of fields that are unnecessary to our result.

Conversely, a column-based layout stores data on disk in columns. Rockset's Converged Index uses a columnar index to store data in a column-based layout. In a column-based layout, data is stored together by columns. An individual record is shredded into its constituent columns for indexing.

If my query needs to do an aggregation to sum the "amount" attribute for a large number of records, Rockset can do so by simply scanning the "amount" portion of the columnar index. This vastly reduces the amount of data read and processed as compared to row-based layouts.

Note that, by default, Rockset's columnar index is not going to order the attributes within a column. Because we have user-facing use cases that will operate on a particular customer's data, we would prefer to organize our columnar index by customer to reduce the amount of data to scan while using the columnar index.

Rockset provides data clustering on your columnar index to help with this. With clustering, we can indicate that we want our columnar index to be clustered by the "organization" attribute. This will group all column values by the organization within the columnar indexes. Thus, when Vandelay Industries is doing an aggregation on their data, Rockset's query processor can skip the portions of the columnar index for other customers.

How Rockset's row-based index helps processing

Before we move on to using the columnar index in our application, I want to talk about another aspect of Rockset's Converged Index.

Earlier, I mentioned that row-based layouts were used when retrieving full records and indicated that both DynamoDB and our Rockset inverted-index queries were using these layouts.

That's only partially true. The inverted index has some similarities with a column-based index, as it stores column names and values together for efficient lookups by any attribute. Each index entry includes a pointer to the IDs of the records that include the given column name and value combination. Once the relevant ID or IDs are discovered from the inverted index, Rockset can retrieve the full record using the row index. Rockset uses dictionary encoding and other advanced compression techniques to minimize the data storage size.

Thus, we've now seen how Rockset's Converged Index fits together:

  • The column-based index is used for quickly scanning large numbers of values in a particular column for aggregations;
  • The inverted index is used for selective filters on any column name and value;
  • The row-based index is used to fetch any additional attributes that may be referenced in the projection clause.

Under the hood, Rockset's powerful indexing and querying engine is tracking statistics on your data and generating optimal plans to execute your query efficiently.

Application: Using Rockset Query Lambdas in your application

Let's implement our Rockset aggregation query that uses the columnar index.

For our previous query, we wrote our SQL query directly to the Rockset API. While this is the right thing to do from some highly customizable user interfaces, there is a better option when the SQL code is more static. We would like to avoid maintaining our messy SQL code in the middle of our application logic.

To help with this, Rockset has a feature called Query Lambdas. Query Lambdas are named, versioned, parameterized queries that are registered in the Rockset console. After you have configured a Query Lambda in Rockset, you will receive a fully managed, scalable endpoint for the Query Lambda that you can call with your parameters to be executed by Rockset. Further, you'll even get monitoring statistics for each Query Lambda, so you can track how your Query Lambda is performing as you make changes.

You can learn more about Query Lambdas here, but let's set up our first Query Lambda to handle our aggregation query. A full walkthrough can be found in the application repository.

Navigate to the Query Editor section of the Rockset console. Paste the following query into the editor:

SELECT
    category,
    EXTRACT(
        month
        FROM
            transactionTime
    ) as month,
    EXTRACT(
        year
        FROM
            transactionTime
    ) as year,
    TRUNCATE(sum(amount), 2) AS amount
FROM
    Transactions
WHERE
    organization = :organization
    AND transactionTime > CURRENT_TIMESTAMP() - INTERVAL 3 MONTH
GROUP BY
    category,
    month,
    year
ORDER BY
    category,
    month,
    year DESC

This query will group transactions over the last three months for a given organization into buckets based on the given category and the month of the transaction. Then, it will sum the values for a category by month to find the total amount spent during each month.

Notice that it includes a parameter for the "organization" attribute, as indicated by the ":organization" syntax in the query. This indicates an organization value must be passed up to execute the query.

Save the query as a Query Lambda in the Rockset console. Then, look at the fetchTransactionsByCategoryAndMonth code in our TransactionService class. It calls the Query Lambda by name and passes up the "organization" property that was given by a user.

This is much simpler code to handle in our application. Further, Rockset provides version control and query-specific monitoring for each Query Lambda. This makes it easier to maintain your queries over time and understand how changes in the query syntax affect performance.

Conclusion

In this post, we saw how to use DynamoDB and Rockset together to build a fast, delightful application experience for our users. In doing so, we learned both the conceptual foundations and the practical steps to implement our application.

First, we used DynamoDB to handle the core functionality of our application. This includes access patterns like retrieving a transaction feed for a particular customer or viewing an individual transaction. Because of DynamoDB's primary-key-based partitioning strategy, it is able to provide consistent performance at any scale.

But DynamoDB's design also limits its flexibility. It can't handle selective queries on arbitrary fields or aggregations across a large number of records.

To handle these patterns, we used Rockset. Rockset provides a fully managed secondary index to power data-heavy applications. We saw how Rockset maintains a continuous ingestion pipeline from your primary data store that indexes your data in a Converged Index, which combines inverted, columnar and row indexing. As we walked through our patterns, we saw how each of Rockset's indexing techniques work together to handle delightful user experiences. Finally, we went through the practical steps to connect Rockset to our DynamoDB table and interact with Rockset in our application.




Alex DeBrie is an AWS Data Hero and the author of The DynamoDB Book, a comprehensive guide to data modeling with DynamoDB. He works with teams to provide data modeling, architectural, and performance advice on cloud-based architectures on AWS.