Blog

Using DynamoDB streams to run additional processing

02 Nov, 2022
Xebia Background Header Wave

DynamoDB offers a feature named streams. We tried streams to find out if it would be applicable to perform additional processing after updates or inserts. DynamoDB streams are a promising solution for cases where processing is needed based on changes, as long as timing is not essential.

Each table can have an associated stream. Changes are published on the stream. These changes may then be used to run processes that take a while to complete. In our case we wanted to analyze a data set whenever a new record was added to the set or an existing record was changed. Because this analysis may take quite some time, we didn’t want it to interfere with response times.
Streams offer an easy way to do this kind of processing.

The use case was that whenever a customer finished playing a game, we wanted to find out if this play would earn the customer a badge. A badge may be something like ‘You have played this game 10 days in a row’ or ‘You have played this game for the first time’. So our API would accept the result of a play and then send out an event indicating the play was completed. This event is picked up by a lambda. Processing involves reading all previous plays to find out if a new badge was earned. If so, new achievements are stored in a table and the next time a player queries their data, the badge show up on their personal page.

To test the streams offered by DynamoDB, we created a simple table named JansTable. After creation, the table looks like this

table definition

Note the ‘exports and streams’ tab. Under ‘DynamoDB stream details’ it offers a ‘Enable button.
Clicking this button shows a new page where you have to select the type of stream. DynamoDB can place messages on the stream with only new, only old or both new and old data.
I’ve selected new and old images and now the stream is enabled.

stream details

To actually do something with the messages on the stream, we need a lambda that will be triggered when a new message arrives on the stream. So, click ‘create trigger’, then select ‘create new’ to create a new lambda function. Select ‘DynamoDB-process-stream-python’ in the filter box and then ‘Process updates made to a DDB table’. In the next dialog, choose a function name and role and select the arn for the table from the list. Stub code to process messages is generated. After creation, this code can be modified.

It is helpful to slightly modify the generated code to print events so we can find out if the stream works. I entered this code

import json

def lambda_handler(event, context):
  print("Received event: " + json.dumps(event))
  for record in event['Records']:
    print("do something useful")
  return 'Successfully processed {} records.'.format(len(event['Records']))

This is basically the default with the print statements cleaned up. I also removed the JSON formatting to make sure the message appears on a single line. If you accept the default it may be hard to read the logs because the contents of the messages received by the lambda will be spread over multiple lines.

One essential change to make is to set the correct roles for the lambda to operate.
The policy statement should look like this:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": [
        "dynamodb:DescribeStream",
        "dynamodb:GetRecords",
        "dynamodb:GetShardIterator",
        "dynamodb:ListStreams"
      ],
      "Resource": [
        "arn:aws:dynamodb:eu-central-1:[yourSubscriptionID]:table/JansTable/stream/2022-10-28T15:51:58.343"
      ],
      "Effect": "Allow"
    }
  ]
}

Replace the resource arn with the arn for your stream (it can be found in the ‘DynamoDB stream details’ section where the stream is defined).

Now we can test by inserting a record in the table. Use the ‘explore table items’ button to go to the list of records and add a new record. The contents are not important, I entered

{
  "JansPartition": {
    "S": "hello world"
  }
}

The message should show up in the logs:

new and old message in logs

Note that this first event is an insert, so the message contains a ‘NewImage’ key like this:

{
  "NewImage": {
    "JansPartition": {
      "S": "hello world"
    }
  }
}

Updates would show up with both a NewImage and an OldImage key like this:

{
  "NewImage": {
    "JansPartition": {
      "S": "hello world, update"
    },
    "moreData": {
      "S": "hello again"
    }
  },
  "OldImage": {
    "JansPartition": {
      "S": "hello world"
    }
  }
}

While writing this blog I made a small mistake in the lambda code. This caused processing to fail with an error message, shown in the logs. The error shows up a couple of times, with increasing intervals. AWS will try to deliver the message for 24 hours.

I think streams are a promising solution for cases where processing is needed based on changes, as long as timing is not essential.

Jan Vermeir
Developing software and infrastructure in teams, doing whatever it takes to get stable, safe and efficient systems in production.
Questions?

Get in touch with us to learn more about the subject and related solutions

Explore related posts