300 - From API Gateway to DynamoDB through Kinesis

In this lab we are going to build an architecture that is going to allow us to ingest data from API Gateway to DynamoDB through Kinesis Data Streams.

As this is a 300-level lab, only high level guidance will be provided.

If you do not have hands-on experience with AWS, we recommend finding a buddy with more AWS experience to assist.

Let’s start by describing the scenario.

Scenario

We were requested to build a platform that is going to receive orders for buyding and selling financial assets from the stock and Forex markets.

The users can send orders via their brokers, which at the end are going to be integrated to our system.

We need to be able to ingest the data in real-time, and extract statistics in real-time, to answer questions like these:

  • What is the volume traded in the last 1/5/10 minutes?
  • What are the top 10 traded assets within a certain time-frame?
  • Any outlier orders, in terms of traded volumes (quantity)?
  • What is the orders’incoming rate over time?

We believe that new use cases are going to come in terms of running analytics over the incoming data.

End-users send their orders via different platforms/applications, but for each platform the user must have a single identifier.

For example, if you have the Mobile app for Bank A in two mobile phones, the client identifier for the platform is going to be Bank A, and the user is going to have the same identity for both apps. In this way, the pair (client,user) uniquely identifies the owner of a certain order.

The requests are going to come in this format:

{
    "User" : <String uniquely identifying a user in the client platform>,
    "Client" : <String uniquely indentifying an application/platform>,
    "Timestamp" : <UTC timestamp in the format YYYY-MM-DDTHH:mm:ss.mmmZ",
    "Order" : {
        "Symbol" : <String identifying a symbol>,
        "Volume" : <Number, positive for buying, negative for selling>,
        "Price" : <current price>
    }
}

As an example, maybe we can have something like this:

{
    "User" : "theuser@amazon.com",
    "Client" : "13522bac-89fb-4f14-ac37-92642eec2b06",
    "Timestamp" : "2021-02-01T18:42:35.903Z",
    "Order" : {
        "Symbol" : "USDJPY",
        "Volume" : 200000,
        "BuyingPrice" : 104.987
    }
}

So, our idea is to expose a REST API so different customers can integrate to us, and due its streaming nature, we are thinking about building something like this:

  • API Gateway is going to be the ingestion façade for our customers;
  • We are going to have Kinesis Data Streams directly integrated to API Gateway to realize the streaming nature of the platform;
  • We are going to select Lambda to process the data, and store it in a database;
  • We are going to select DynamoDB for the database layer. It seems to be a good fit for our needs.

Let’s do a POC to validate this architecture! Let’s use the AWS Console to test out the idea. Then, we can write an Infrastructure-as-code specification for it.

Let’s start with some white boarding to see how we can build this.


Thinking about the architecture - how to build it?

Well, where should we start? In accordance to our architecture, these are the steps we need to take:

  1. We need to build an API integrated to a Kinesis Data Streams (KDS) resource.
    • So, to build the integration, we need:
      • to know how which methods of the KDS API we need to call,
      • how to prepare the payload to send to it.
  2. We need to have a KDS resource up and running, obviously.
  3. We need to have a Lambda function properly integrated to KDS to consume the records, and to send them to DynamoDB.
  4. And, of course, we need a DynamoDB table.
  5. Finally, we need to provide visibility somehow (for logs, metrics), and take care of the possible failures (What we do if a Lambda function fails? Other possible errors?)

Let’s try this strategy:

  1. For the start, let’s not worry about Kinesis and DynamoDB right now. Let’s focus on API Gateway and Lambda, and use the logs in CloudWatch Logs to be sure that the integration with Kinesis is going to work when we bring it up. Clearly we are going to need to test our VTL specifications, and CloudWatch Logs is going to be very helpful for that. This is going to be our TASK 1.
  2. Then, let’s focus on the Lambda function; on what it is going to receive from KDS, and what is going to be written to DynamoDB. By knowing the payload template that is going to be send to the Lambda function from the integration with Kinesis, and by knowing what we need to write to DynamoDB, we can write and test the Lambda function accordingly before implementing the integrations. This is going to be our TASK 2.
  3. Then, we can create the DynamoDB table, and as we already know what we are going to write to DynamoDB, we can test the integration between Lambda and DynamoDB. This is going to be our TASK 3.
  4. For our TASK 4, knowing that everything is (hopefully) working properly, we are going to connect all services:
    1. Spin up a Kinesis Data Stream resource.
    2. Set up the integration between API Gateway to Kinesis.
    3. Set up the integration between Kinesis and Lambda.
    4. Test the API, by sending a request to API Gateway and checking if the record appeared on DynamoDB.
  5. For our TASK 5, we are going to do some initial volume tests, preparing for a future test at scale.

Looks like we have a plan. Let’s do it.


Building the architecture


Task 1 - Building the API and testing it with the Lambda function

Step 1 - Building the API

  1. Visit the API Gateway console.
  2. Create a new public REST API. Use the name that you want for it (if you are not felling creative, use broker).
    • Regional or Edge Optimized? You tell me!
  3. Create a resource named putorder.
    • Should you enable CORS? Yes? No? Why?
  4. Create a method POST under putorder.
  5. On the page /putorder - POST - Setup we were supposed to implement the integration between API Gateway and Lambda Function, but we don’t have the Lambda function yet. So, let’s select Mock for Integration type.

Click on Save and let’s move forward. Let’s create the Lambda function, and then get back to connect it to API Gateway.

Step 2 - Creating the Lambda function only to put data on CloudWatch Logs.

  1. Visit the Lambda Function console.

  2. Using Author from scratch, create a function with the following specifications:

    1. For Function name input StoreOrders.
    2. For Runtime use Node.js 14.x.
    3. Click on Create function.
    4. Go to the section Function code, and double click the index.js file that is shown.
    5. Replace everything with the following code:
    exports.handler = async (event) => {
        // let's log the incoming event
        let payloadAsString = JSON.stringify(event);
        console.log(payloadAsString);
        const response = {
            statusCode: 200,
            body: `we received ${payloadAsString}`
        };
        return response;
    };
    
    1. Click on Deploy.

    Great. We have our Lambda function. Now let’s get back to API Gateway to connect it to the Lambda function.

Step 3 - Integration between API Gateway and your Lambda function

  1. Visit the API Gateway console, and get back to the API that you have created.
  2. Go to the method POST under the resource putorder.
  3. Click on Integration request.
  4. Reconfigure the integration like this:
    1. For Integration type, now select Lambda Function.
    2. For Lambda Region, select the same region where is your API.
    3. For Lambda Function, start typing the name of the function that we have created (StoreOrders).
    4. Click on Save.
      1. A pop-up window with the message “Are you sure that you want to switch to a Lambda Integration?" is going to show up. Click Ok.
      2. A second pop-up window with the message “You are about to give API Gateway permission to invoke your Lambda function” will also show up. Click Ok again.
  5. Test the API call with the following payload:
{
    "User" : "theuser@amazon.com",
    "Client" : "13522bac-89fb-4f14-ac37-92642eec2b06",
    "Timestamp" : "2021-02-01T18:42:35.903Z",
    "Order" : {
        "Symbol" : "USDJPY",
        "Volume" : 200000,
        "Price" : 104.987
    }
}

If everyting went well, you should see a Response Body (top of the page) like this:

{
  "statusCode": 200,
  "body": "we received {\"User\":\"theuser@amazon.com\",\"Client\":\"13522bac-89fb-4f14-ac37-92642eec2b06\",\"Timestamp\":\"2021-02-01T18:42:35.903Z\",\"Order\":{\"Symbol\":\"USDJPY\",\"Volume\":200000,\"Price\":104.987}}"
}

Perfect. We have API Gateway and Lambda properly integrated. Now let’s work with the (future) integration with Kinesis and DynamoDB.


Task 2 - Adjusting the function to process incoming data from Kinesis and to store it in DynamoDB.

Here we need to identify how to send an appropriate payload to Kinesis, and what is the payload that we need to send to DynamoDB.

Step 1 - Sending data in the format expected by the Amazon Kinesis Data Streams (KDS)

Well, to accomplish the goal of this step, the first step is to know what is the format of the payload that we need to send to KDS.

To do that, you need to look at the Amazon Kinesis Data Streams Service API Reference, and find which one is the Action that you are going to use.

Take a look at it, and you are going to discover that it is this one.

So, we know that PutRecord needs to receive, mandatorily:

  • The PartitionKey.
  • the Data blob which is the payload, base64-encoded, that we need to send.
  • the StreamName, which is the identifier of the KDS resource to where we want to send the payload (the Data).
    • By the way, we can chose that name right now. Let’s say it is going to be TradingStream.

So, if the producer (the API Gateway client) is sending the payload that we have shown before (Task 1, step 3), we need to send something like this to KDS:

{
    "PartitionKey" : "theuser@amazon.com#13522bac-89fb-4f14-ac37-92642eec2b06#USDJPY#200000#104.987#2021-02-01T18:42:35.903Z",
    "Data" : "ewogICAgIlVzZXIiIDogInRoZXVzZXJAYW1hem9uLmNvbSIsCiAgICAiQ2xpZW50IiA6ICIxMzUyMmJhYy04OWZiLTRmMTQtYWMzNy05MjY0MmVlYzJiMDYiLAogICAgIlRpbWVzdGFtcCIgOiAiMjAyMS0wMi0wMVQxODo0MjozNS45MDNaIiwKICAgICJPcmRlciIgOiB7CiAgICAgICAgIlN5bWJvbCIgOiAiVVNESlBZIiwKICAgICAgICAiVm9sdW1lIiA6IDIwMDAwMCwKICAgICAgICAiUHJpY2UiIDogMTA0Ljk4NwogICAgfQp9",
    "StreamName" : "TradingStream"
}

Why that partition key? Is it a good choice? Yes? No? Why?

Do you want to check the Base64-encoding? Visit https://www.base64decode.org/ an decode it.

Okay. Let’s implement the integration:

  1. Get back to the configuration of the POST method of our API, and click on Integration Request.
  2. Scroll down to the Mapping Templates, and
    1. For Request body passthrough, click on When there are no templates defined (recommended).
    2. Click on Add mapping template, and for Content-Type input application/json.
    3. Scroll down to the template configuration, and input this:
    #set($inputPath = $input.path('$'))
    {
    "PartitionKey" : "$inputPath.User#$inputPath.Client#$inputPath.Order.Symbol#$inputPath.Order.Volume#$inputPath.Order.Price#$inputPath.Timestamp",
    "Data" : "$util.base64Encode("$input.json('$')")",
    "StreamName" : "TradingStream"
    }
    
    1. Click on Save right below that text-box.
  3. Test it again with the payload that we used before. If everything went well, you should receive something like this as a reponse:
    {
    "statusCode": 200,
    "body": "we received {\"PartitionKey\":\"theuser@amazon.com#13522bac-89fb-4f14-ac37-92642eec2b06#USDJPY#200000#104.987#2021-02-01T18:42:35.903Z\",\"Data\":\"eyJVc2VyIjoidGhldXNlckBhbWF6b24uY29tIiwiQ2xpZW50IjoiMTM1MjJiYWMtODlmYi00ZjE0LWFjMzctOTI2NDJlZWMyYjA2IiwiVGltZXN0YW1wIjoiMjAyMS0wMi0wMVQxODo0MjozNS45MDNaIiwiT3JkZXIiOnsiU3ltYm9sIjoiVVNESlBZIiwiVm9sdW1lIjoyMDAwMDAsIlByaWNlIjoxMDQuOTg3fX0=\",\"StreamName\":\"TradingStream\"}"
    }
    

Our Lambda function is showing us that the payload expected by KDS is being properly sent by the API Gateway.

This shows that API Gateway is ready to be integrated to Kinesis Data Streams.

Step 2 - Preparing the Lambda function to process the Kinesis Data Streams records

Let’s now work with the Lambda Function and prepare it to send data to DynamoDB.

To go into the details and understand what is the format of the event sent to the Lambda function through this integration, take a look here: https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html.

Take a look at the “Example Kinesis record event” in that page:

{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                "approximateArrivalTimestamp": 1545084650.987
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692540925702759324208523137515618",
                "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=",
                "approximateArrivalTimestamp": 1545084711.166
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        }
    ]
}

We are going to use it to test our Lambda function.

So, follow these steps:

  1. Get back to your Lambda function, and input the following code. Don’t forget to Deploy it.
    exports.handler = async (event) => {
        // let's log the incoming event
        let payloadAsString = JSON.stringify(event);
        console.log(payloadAsString);
        try{
            // event contains Records, which is an array with a certain structure
            for(let i=0;i<event.Records.length;i++) {
                //let's get the Record
                let record = event.Records[i];
                // let's decode the base64-encoded payload that was sent
                let data = Buffer.from(record.kinesis.data,'base64').toString('utf-8');
                // let's show the data
                console.log(`Data received: ${data}`);
                // let's show the timestamp in which it was received (approximately)
                let receivedTst = new Date(record.kinesis.approximateArrivalTimestamp*1000);
                console.log(`Received tst: ${receivedTst}`);
            }
        } catch(e) {
            //let's handle the errors, if any
            console.log("Error:",e);
            const response = {
                statusCode : 200,
                body : e
            };
            return response;
        }
        // this is a successful processing
        const response = {
            statusCode: 200
        };
        return response;
    };
    
  2. Right above the code, there is a drop-down button named Test. Click to open it, and then select Configure test event.
  3. For Event Name input StandardKinesisRecords.
  4. Clear the text box where there is {"key1": "value1", "key2": "value2", "key3": "value3"} and replace it with the “Example Kinesis record event” that we have shown before.
  5. Click on Test.
  6. If everything went well, the Lambda console should open a new tab named Execution results with something like what is shown below (copy and paste it to a file to see it better if necessary):
Response
{
  "statusCode": 200
}

Function Logs
START RequestId: f48324d4-495f-43a7-813a-6872d4af1114 Version: $LATEST
2018-12-17T22:11:52.227Z	f48324d4-495f-43a7-813a-6872d4af1114	INFO	{"Records":[{"kinesis":{"kinesisSchemaVersion":"1.0","partitionKey":"1","sequenceNumber":"49590338271490256608559692538361571095921575989136588898","data":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==","approximateArrivalTimestamp":1545084650.987},"eventSource":"aws:kinesis","eventVersion":"1.0","eventID":"shardId-000000000006:49590338271490256608559692538361571095921575989136588898","eventName":"aws:kinesis:record","invokeIdentityArn":"arn:aws:iam::123456789012:role/lambda-role","awsRegion":"us-east-2","eventSourceARN":"arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"},{"kinesis":{"kinesisSchemaVersion":"1.0","partitionKey":"1","sequenceNumber":"49590338271490256608559692540925702759324208523137515618","data":"VGhpcyBpcyBvbmx5IGEgdGVzdC4=","approximateArrivalTimestamp":1545084711.166},"eventSource":"aws:kinesis","eventVersion":"1.0","eventID":"shardId-000000000006:49590338271490256608559692540925702759324208523137515618","eventName":"aws:kinesis:record","invokeIdentityArn":"arn:aws:iam::123456789012:role/lambda-role","awsRegion":"us-east-2","eventSourceARN":"arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"}]}
2018-12-17T22:11:52.227Z	f48324d4-495f-43a7-813a-6872d4af1114	INFO	Data received: "Hello, this is a test."
2018-12-17T22:11:52.250Z	f48324d4-495f-43a7-813a-6872d4af1114	INFO	Received tst: Mon Dec 17 2018 22:10:50 GMT+0000 (Coordinated Universal Time)
2018-12-17T22:11:52.251Z	f48324d4-495f-43a7-813a-6872d4af1114	INFO	Data received: "This is only a test."
2018-12-17T22:11:52.251Z	f48324d4-495f-43a7-813a-6872d4af1114	INFO	Received tst: Mon Dec 17 2018 22:11:51 GMT+0000 (Coordinated Universal Time)
END RequestId: f48324d4-495f-43a7-813a-6872d4af1114
REPORT RequestId: f48324d4-495f-43a7-813a-6872d4af1114	Duration: 43.67 ms	Billed Duration: 44 ms	Memory Size: 128 MB	Max Memory Used: 67 MB	Init Duration: 176.31 ms

Request ID
f48324d4-495f-43a7-813a-6872d4af1114

See that "Hello, this is a test." and "This is only a test." are the messages encoded in Base64. In the case of our order processing system, we would have there the data that we need to store in DynamoDB.

You can also visit CloudWatch Logs to check it.

Step 3 - Preparing the Lambda function to send data to DynamoDB

Something that we need to be aware of is that writing a batch of records to DynamoDB is more efficient than writing them one by one. If you visit the DynamoDB API you are going to find BatchWriteItem, which shows that you can indeed write data in batches to DynamoDB.

However, be aware that the AWS SDK for your programming language can have some helper libraries which can simplify the programming task. In our case, as we are using the AWS SDK for Javascript, the batchWrite function within the DocumentClient is going to provide a more simple interface for our integration.

Taking the payload that we first used to test our API, thinking that we have decoded it from the payload coming from the Kinesis Data Streams, and that the table on DynamoDb has the name Orders, we would need to provide the following parameter for batchWrite:

var params = {
    RequestItems: {
        'Orders': [
            {
                PutRequest: {
                    Item: {
                        "TransactionId" : "<some transaction identifier>",
                        "User" : "theuser@amazon.com",
                        "Client" : "13522bac-89fb-4f14-ac37-92642eec2b06",
                        "Timestamp" : "2021-02-01T18:42:35.903Z",
                        "Order" : {
                            "Symbol" : "USDJPY",
                            "Volume" : 200000,
                            "Price"  : 104.987
                        }
                    }
                }
            }
        ]
    }
};

So, basically we need to get the records coming from Kinesis, decode them, put them into a batchWrite request, and then submit it to DynamoDB.

However, if you read the documentation, you are going to see that batchWrite has some constraints in terms of data volume and number of records inside a single request. So, the code is going to need to cope with that.

Like we did before, let’s first test the code looking at the CloudWatch Logs.

  1. Visit your Lambda code and replace it with this code:
exports.handler = async (event) => {
    // let's log the incoming event
    let payloadAsString = JSON.stringify(event);
    console.log(payloadAsString);
    try{
        let batch = [];
        let putRequestBatches = [];
        // event contains Records, which is an array with a certain structure
        for(let i=0;i<event.Records.length;i++) {
            //let's get the Record
            let record = event.Records[i];
            // let's decode the base64-encoded payload that was sent
            let data = Buffer.from(record.kinesis.data,'base64').toString('utf-8');
            // let's show the data
            console.log(`Data received: ${data}`);
            // let's show the timestamp in which it was received (approximately)
            let receivedTst = new Date(record.kinesis.approximateArrivalTimestamp*1000);
            console.log(`Received tst: ${receivedTst}`);
            //-----
            // The following part of the code deals with the DynamoDB batches
            //-----
            // 
            let dataForDynamoDB = {
                "Data": data
            };
            dataForDynamoDB["TransactionId"] = i.toString();
            // put data into the current batch
            batch.push(
                {
                    "PutRequest" : { "Item" : dataForDynamoDB }
                }
            );
            // Batches are limited to 25 items; so, we "close" a batch when we reach 25 items.
            if (batch.length == 25 || i == event.Records.length-1) {
                putRequestBatches.push(batch);
                batch = [];
            }
        }
        // Here we have in putRequestBatches an array of batches
        for (let i=0; i< putRequestBatches.length; i++) {
            let params = {
                "RequestItems" : {
                    "Orders" : putRequestBatches[i]
                }
            };
            console.log(`Writing to dynamodb: ${JSON.stringify(params)}`);
        }
    } catch(e) {
        //let's handle the errors, if any
        console.log("Error:",e);
        const response = {
            statusCode : 200,
            body : e
        };
        return response;
    }
    const response = {
        statusCode: 200
    };
    return response;
};
  1. Test the code with the previously created StandardKinesisRecords test event. If everything is ok, look at the *“Executions results page”. You will find this line at a certain point:
Writing to dynamodb: {"RequestItems":{"Orders":[{"PutRequest":{"Item":{"Data":"Hello, this is a test.","TransactionId":"0"}}},{"PutRequest":{"Item":{"Data":"This is only a test.","TransactionId":"1"}}}]}}

This puts us close to the point to write the data to DynamoDB. But first, we need a table.


Task 3 - Implementing the integration between Lambda and DynamoDB

Step 1 - Create the Orders table on DynamoDB

  1. Visit your DynamoDB console.
  2. Create a new table with the following configuration.
    • For Table name*, input Orders.
    • For Primary key* Partition Key input TransactionId.
    • Leave everything as is and hit Create.
  3. Take note of the ARN of your table; it is in the Overview tab, down on the bottom.

Step 2 - Adjust the code to write data to DynamoDB

  1. Give the permissions to your Lambda Function to write to DynamoDB
    1. Visit the Permissions tab of your Lambda function.
    2. Click on the link that is bound to the Role name.
    3. Add a new policy to the role, with the following configuration:
      • For Service, select DynamoDB.
      • For Actions, select BatchWriteItem.
      • For Resources, put the ARN of your DynamoDB table.
  2. Replace the code of your Lambda function with the following one
// NEW LINE - IMPORTING THE DYNAMODB LIBRARY
const DynamoDB = require('aws-sdk/clients/dynamodb');
// NEW LINE - INSTANTIATING THE DOCUMENT CLIENT
const dynamoDBclient = new DynamoDB.DocumentClient();

exports.handler = async (event) => {
    // let's log the incoming event
    let payloadAsString = JSON.stringify(event);
    console.log(payloadAsString);
    try{
        let batch = [];
        let putRequestBatches = [];
        // event contains Records, which is an array with a certain structure
        for(let i=0;i<event.Records.length;i++) {
            //let's get the Record
            let record = event.Records[i];
            // let's decode the base64-encoded payload that was sent
            let data = Buffer.from(record.kinesis.data,'base64').toString('utf-8');
            // let's show the data
            console.log(`Data received: ${data}`);
            // let's show the timestamp in which it was received (approximately)
            let receivedTst = new Date(record.kinesis.approximateArrivalTimestamp*1000);
            console.log(`Received tst: ${receivedTst}`);
            //-----
            // The following part of the code deals with the DynamoDB batches
            //-----
            // 
            let dataForDynamoDB = {
                "Data": data
            };
            dataForDynamoDB["TransactionId"] = i.toString();
            // put data into the current batch
            batch.push(
                {
                    "PutRequest" : { "Item" : dataForDynamoDB }
                }
            );
            // Batches are limited to 25 items; so, we "close" a batch when we reach 25 items.
            if (batch.length == 25 || i == event.Records.length-1) {
                putRequestBatches.push(batch);
                batch = [];
            }
        }
        // Here we have in putRequestBatches an array of batches
        for (let i=0; i< putRequestBatches.length; i++) {
            let params = {
                "RequestItems" : {
                    "Orders" : putRequestBatches[i]
                }
            };
            console.log(`Writing to dynamodb: ${JSON.stringify(params)}`);
            // NEW LINE - SENDING THE REQUEST TO DYNAMODB
            let ddbResponse = await dynamoDBclient.batchWrite(params).promise();
            console.log(ddbResponse);
        }
    } catch(e) {
        //let's handle the errors, if any
        console.log("Error:",e);
        const response = {
            statusCode : 200,
            body : e
        };
        return response;
    }
    const response = {
        statusCode: 200
    };
    return response;
};
  1. Run the test with the StandardKinesisRecords test event.
  2. If everything went well, your table is going to have 2 records. Check it.

Step 3 - Adjust the code to send to DynamoDB the Orders received via Kinesis/API Gateway

  1. Replace the code of your Lambda function with the following one
const DynamoDB = require('aws-sdk/clients/dynamodb');
const dynamoDBclient = new DynamoDB.DocumentClient();

exports.handler = async (event) => {
    // let's log the incoming event
    let payloadAsString = JSON.stringify(event);
    console.log(payloadAsString);
    try{
        let batch = [];
        let putRequestBatches = [];
        // event contains Records, which is an array with a certain structure
        for(let i=0;i<event.Records.length;i++) {
            //let's get the Record
            let record = event.Records[i];
            // let's decode the base64-encoded payload that was sent
            let data = Buffer.from(record.kinesis.data,'base64').toString('utf-8');
            // let's show the data
            console.log(`Data received: ${data}`);
            // let's show the timestamp in which it was received (approximately)
            let receivedTst = new Date(record.kinesis.approximateArrivalTimestamp*1000);
            console.log(`Received tst: ${receivedTst}`);
            //-----
            // The following part of the code deals with the DynamoDB batches
            //-----
            // 
            // THE FOLLOWING TWO STATEMENTS WERE ADAPTED TO PREPARE
            // THE RECORD FOR DYNAMODB
            let dataForDynamoDB = JSON.parse(data);
            dataForDynamoDB["TransactionId"] = 
                dataForDynamoDB.User+"#"+
                dataForDynamoDB.Client+"#"+
                dataForDynamoDB.Order.Symbol+"#"+
                dataForDynamoDB.Order.Volume+"#"+
                dataForDynamoDB.Order.Price+"#"+
                dataForDynamoDB.Timestamp;
            // put data into the current batch
            batch.push(
                {
                    "PutRequest" : { "Item" : dataForDynamoDB }
                }
            );
            // Batches are limited to 25 items; so, we "close" a batch when we reach 25 items.
            if (batch.length == 25 || i == event.Records.length-1) {
                putRequestBatches.push(batch);
                batch = [];
            }
        }
        // Here we have in putRequestBatches an array of batches
        for (let i=0; i< putRequestBatches.length; i++) {
            let params = {
                "RequestItems" : {
                    "Orders" : putRequestBatches[i]
                }
            };
            console.log(`Writing to dynamodb: ${JSON.stringify(params)}`);
            let ddbResponse = await dynamoDBclient.batchWrite(params).promise();
            console.log(ddbResponse);
        }
    } catch(e) {
        //let's handle the errors, if any
        console.log("Error:",e);
        const response = {
            statusCode : 200,
            body : e
        };
        return response;
    }
    const response = {
        statusCode: 200
    };
    return response;
};
  1. Create the following new test event, naming it KinesisRecordsWithOrders
{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                "data": "ewogICAgIlVzZXIiIDogInRoZXVzZXJAYW1hem9uLmNvbSIsCiAgICAiQ2xpZW50IiA6ICIxMzUyMmJhYy04OWZiLTRmMTQtYWMzNy05MjY0MmVlYzJiMDYiLAogICAgIlRpbWVzdGFtcCIgOiAiMjAyMS0wMi0wMVQxODo0MjozNS45MDNaIiwKICAgICJPcmRlciIgOiB7CiAgICAgICAgIlN5bWJvbCIgOiAiVVNESlBZIiwKICAgICAgICAiVm9sdW1lIiA6IDIwMDAwMCwKICAgICAgICAiUHJpY2UiIDogMTA0Ljk4NwogICAgfQp9",
                "approximateArrivalTimestamp": 1545084650.987
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692540925702759324208523137515618",
                "data": "ewogICAgIlVzZXIiIDogInNlY29uZFVzZXJAYW1hem9uLmNvbSIsCiAgICAiQ2xpZW50IiA6ICIxMzUyMmJhYy04OWZiLTRmMTQtYWMzNy05MjY0MmVlYzJiMDYiLAogICAgIlRpbWVzdGFtcCIgOiAiMjAyMS0wMi0wMVQyMjowMDowMC4wMDBaIiwKICAgICJPcmRlciIgOiB7CiAgICAgICAgIlN5bWJvbCIgOiAiVVNEQ05IIiwKICAgICAgICAiVm9sdW1lIiA6IDY3MDAwLAogICAgICAgICJQcmljZSIgOiA2LjQ2MTYxCiAgICB9Cn0=",
                "approximateArrivalTimestamp": 1545084711.166
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        }
    ]
}
  1. Test your Lambda function with the new code, and with the new event.
  2. Check the records on DynamoDB.
  3. Delete the records from DynamoDB.

Both the architecture and the code still have some problems, but let’s solve it in the future, when we work with testing it at scale. This was only a POC to test our idea. It seems it works.


Task 4 - Creating the Kinesis Data Streams resource, and integrating it to API Gateway and Lambda

Now we are going to disconnect API Gateway from the Lambda function, and connect one end of a Kinesis Data Streams with API Gateway, so API Gateway can put data into the stream, and the other end we are going to connect to AWS Lambda, so it can pull data from Kinesis Data Stream, and send it to your function.

So, clearly, we are going to need to execute the following steps:

  1. Create the Kinesis Data Streams.
  2. Create a Role that will give API Gateway permissions to write to Kinesis.
  3. Connect API Gateway to Kinesis.
  4. Update the Role of your Lambda function, so it can read data from Kinesis.
  5. Configure the integration between Lambda and Kinesis.

So, let’s do it!

Step 1 - Creating the Kinesis Data Streams

  1. Visit your Amazon Kinesis console.
  2. Under Data Stream, click on Create data stream.
  3. For Data stream name, input the name that we have previously defined when we implemented our API.
    • TradingStream, remember?
  4. For Number of open shards, input 1.
  5. Click on Create data stream.
  6. Take note of the stream ARN.

Step 2 - Preparing a Role for API Gateway

  1. On IAM, create a new Role for API Gateway.
  2. Attach an inline police to the role, with the following configuration:
    1. For Service, select Kinesis.
    2. For Actions, select PutRecord.
    3. For Resources, put the ARN of your Kinesis Data Streams
  3. Take note of the ARN of this role.

Step 3 - Integrating API Gateway and Kinesis

We already know that we are going to use PutRecord, right? So, revisit the API documentation and take a look at the example.

  1. Get back to your API Gateway console, and select your API.
  2. On the method POST under /putorder, click on Integration Request.
  3. Let’s reconfigure the integration:
    1. For Integration type, select AWS Service.
    2. For AWS Region, select the same region where you have your API Gateway.
    3. For Service, select Kinesis.
    4. For AWS Subdomain, leave it blank.
    5. For HTTP Method, select POST.
    6. If there is a Path override configuration in that integration, click on it. We need to change it to Action.
      • Click on it.
      • Select Use action name.
      • For Action, input PutRecord.
      • You are not going to be able to confirm it until you update the role.
    7. For Execution role, input the ARN for the role that we created for API Gateway.
    8. For Credentials cache, leave Do not add caller credentials to cache key selected.
    9. For Content Handling, select Passthrough.

As we already have configured the Mapping Template, we are good.

Step 4 - Update the role of your Lambda function, to allow it to get records from Kinesis

First, let’s see what happens if we try to integrate it to Kinesis without making any changes.

  1. Get back to your Lambda function console.
  2. On the top, in the Designer section, click on Add trigger.
  3. At the Add Trigger page
    1. In the drop-down list Select a trigger, select Kinesis.
    2. For Kinesis stream, select the stream that you have created.
    3. For Batch window, select 1.
    4. Click Add.
      • You should get an error like this: “An error occurred when creating the trigger: Cannot access stream arn:aws:kinesis:[region]:[account]:stream/[streamName]. Please ensure the role can perform the GetRecords, GetShardIterator, DescribeStream, ListShards, and ListStreams Actions on your stream in IAM. (Service: AWSLambda; Status Code: 400; Error Code: InvalidParameterValueException; Request ID: 4ab7c487-7f7f-4217-93f5-22eac1c3f81a; Proxy: null)"
      • See that the message gives you all the tips to configure your Lambda role IAM. Let’s do it.
    5. Click on Cancel to get back to the Lambda page.
  4. Go to the tab Permissions, and open the Lambda role.
  5. Add an inline policy with the Actions indicated above.
  6. For the ARN, input the ARN of your Kinesis Data Streams.
  7. Save the policy and the changes to the Lambda Role.

Step 4 - Create the trigger integrating Kinesis with Lambda

  1. Get back to the configuration of your Lambda function, and configure the trigger for Kinesis.
  2. Remove the trigger from API Gateway.

Step 5 - Test the integration

  1. Get back to API Gateway and test it with the following payload
{
    "User" : "theuser@amazon.com",
    "Client" : "13522bac-89fb-4f14-ac37-92642eec2b06",
    "Timestamp" : "2021-02-01T18:42:35.903Z",
    "Order" : {
        "Symbol" : "USDJPY",
        "Volume" : 200000,
        "Price" : 104.987
    }
}
  1. Go to DynamoDB. If the record is there, celebrate!

The architecture and the code here are not perfect. They need to be tested at scale, and be tested against corner cases.

We need to think about these points:

  • What if a non-compliant passes through API Gateway?
  • As we move between very high and low ingestion rates, how do we scale up/down Kinesis?
  • What is going to happen to our Lambda function if we send too many batchWrite requests to DynamoDB? Are we going to be throttled?
  • Things fail all the time. What if our Lambda function fails? What is going to happen to the records in processing?
  • Did we make the best choices for the partition keys, both for Kinesis and DynamoDB?

Lab Author: Fabian Da Silva