This is the third part of my Serverless Webhooks post. You can find Part 2 here where we integrated SQS and Part 1 here where we built the Lambda function handler.

This post is Part 3 of our serverless webhook system. Let’s look back at the architecture. So far we have the Lambda function handler poc-data-feed-handler as the webhook that receives the data. It then stores the raw data to the DynamoDB table poc-data-feed with the generated unique id txn_id. This unique id is also passed to a SQS message queue poc-data-feed-queue waiting to be processed.

What’s next is for us to build the poc-data-processor application to get the txn_id in the queue, get the corresponding data from the table, process and update it.

Architecture

architecture-container

Create the data processor application

Let’s go straight to building the application. Here we will use Java and Spring. You can choose your own stack. The approach is similar. AWS provides an array of SDKs for different programming languages.

Spring and Java code

Create a new Java project poc-data-processor. We will use Spring Boot as the framework of the application, Spring Cloud to access the SQS queue, and the AWS SDK for Java to access the Dynamo DB table.

Add the dependencies

Set the Maven dependencies as below.

	<properties>
        <spring-cloud-version>2.1.3.RELEASE</spring-cloud-version>
        <aws-java-sdk-version>1.11.699</aws-java-sdk-version>
	</properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-aws-context</artifactId>
                <version>${spring-cloud-version}</version>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-aws-messaging</artifactId>
            <version>${spring-cloud-version}</version>
        </dependency>

        <dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-aws</artifactId>
            <version>${spring-cloud-version}</version>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>

        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-java-sdk-dynamodb</artifactId>
            <version>${aws-java-sdk-version}</version>
        </dependency>

    </dependencies>

Listen to the queue

In Spring Cloud, just add the annotation @SqsListener to enable your method to listen to the SQS queue.

    @SqsListener("poc-data-feed-queue")
    public void dataFeedListener(String data) {
        System.out.println("message received: " + data);
        processItem(data);
    }

The method dataFeedListener will receive the SQS message body. In this case, it will be the txn_id generated by the Lambda function.

Sample log of receiving a message:

message received: e51c16cb-41d0-4fff-8de4-de32de42205e

It then invokes the method to process the data passing the txn_id as key.

Process the data

The processItem method retrieves the item from the DynamoDB table poc-data-feed. Here we will use the AWS SDK for Java directly. Spring Boot supports accessing the DynamoDB table too but I find using the AWS SDK easier and simple for this PoC.

First, setup a bean for the DynamoDB client.

    @Value("${cloud.aws.region.static}")
    private String region;

    @Bean
    public AmazonDynamoDB amazonDynamoDB() {
        AmazonDynamoDB amazonDynamoDB = AmazonDynamoDBClientBuilder.standard().withRegion(region).build();
        return amazonDynamoDB;
    }

Then add processItem, getItem and updateItem methods in the DataProcessor class.

    final static String TABLE_NAME = "poc_data_feed";

    @Autowired
    private AmazonDynamoDB dynamoDB;

    private void processItem(String txnId) {
        Map<String, AttributeValue> itemKey = new HashMap<>();
        itemKey.put("uuid", new AttributeValue(txnId));

        Map<String, AttributeValue> item = getItem(itemKey);
        System.out.println("data payload: " + item);

        Map<String, AttributeValueUpdate> updatedItem = new HashMap<>();
        updatedItem.put("status", new AttributeValueUpdate(new AttributeValue("COMPLETED"), AttributeAction.PUT));

        updateItem(itemKey, updatedItem);
        System.out.println("updated payload: " + getItem(itemKey));
    }

    private Map<String, AttributeValue> getItem(Map<String, AttributeValue> itemKey) {
        GetItemRequest request = new GetItemRequest()
                .withKey(itemKey)
                .withTableName(TABLE_NAME);
        Map<String, AttributeValue> item = dynamoDB.getItem(request).getItem();
        return item;
    }

    private void updateItem(Map<String, AttributeValue> itemKey, Map<String, AttributeValueUpdate> updatedItem) {
        dynamoDB.updateItem(TABLE_NAME, itemKey, updatedItem);
    }

In processItem, we first create the itemKey map using the txnId. We then get and update the data using this key.

The data type Map<String AttributeValue> is part of the AWS SDK for Java. It represents an item in the DynamoDB table.

In getItem, we retrieve the entire data payload on the table using the itemKey map. After getting the data, we can do the processing. In this PoC, we are simply updating the status to COMPLETED to imply that we have received the data, processed it as necessary and updated its status.

If you notice in the previous post, we have updated the Lambda function to add a new attribute status=”PENDING” for every new data it receives and stores in DynamoDB.

Then in updateItem, we passed back the itemKey and the new updatedItem to commit the changes back to the DynamoDB table.

The data type Map<String AttributeValueUpdate> is part of the AWS SDK for Java. It represents an updated item in the DynamoDB table.

That’s all that we need for the application to process the data. You can also refer to the official AWS SDK for Java documentation on how to access DynamoDB tables.

Test the application locally

Let’s run the application locally first before we containerize and deploy it to ECS. When testing locally, we have to setup the credentials to access the AWS resources. In our case, our application needs access to the SQS queue and DynamoDB table.

Check locally that you have ~/.aws/credentials already set-up. This is where your AWS access key and secret are stored. We will need it for local testing. Another option is to pass the credentials via environment variables.

Also setup the Spring Cloud property cloud.aws.credentials.useDefaultAwsCredentialsChain=true in the application.properties of the Spring Boot app. This property will tell Spring Cloud to use the AWS credential chain as defined in the SDK. Later, when we deploy this application to ECS, the chain will use the instance profile to get the credentials.

So regardless of where you put your credentials be it in the environment variables, instance profile or credential profile in ~.aws/credentials, the precedence will follow the AWS credential chain. Just ensure you do not store the credentials in your code or in your instances.

Now, build and run the app.

mvn clean package
jaa -jar target/poc-data-processor-1.0-SNAPSHOT.jar

Fire some requests using the same Postman payload as in the previous post.

postman-sqs

Sample log of processing the data:

message received: a6e7965a-f2fb-4e76-ac26-75d2e221ea13
data payload: {referralId={S: sazed55,}, name={S: Sazed,}, emailId={S: sazed@gmail.com,}, uuid={S: a6e7965a-f2fb-4e76-ac26-75d2e221ea13,}, age={N: 50,}, status={S: PENDING,}}
updated payload: {referralId={S: sazed55,}, name={S: Sazed,}, emailId={S: sazed@gmail.com,}, uuid={S: a6e7965a-f2fb-4e76-ac26-75d2e221ea13,}, age={N: 50,}, status={S: COMPLETED,}}

Notice that when we retrieved the item, its status was PENDING and it was updated to COMPLETED after the processing.

Note that the if the processing failed for some reason, the SQS message has already been consumed. Either you modify and return back the message to the queue if the processing failed or retry the processing in the application.

That’s it for our data processing application. We’ve tested it and showed that it can consume the SQS message and update the data in the DynamoDB table.

In my next post, we will containerize this application and deploy it to ECS.

12Jan20 Update: The amazonDynamoDB bean has been modified to include the AWS Region. I have also uploaded the full source code of the poc-data-processor application on GitHub here.