Serverless Webhooks using AWS Lambda - Part 3
This is the third part of my Serverless Webhooks post. You can find Part 2 here where we integrated SQS and Part 1 here where we built the Lambda function handler.
This post is Part 3 of our serverless webhook system. Let’s look back at the architecture. So far we have
the Lambda function handler poc-data-feed-handler
as the webhook that receives the data. It then stores the raw data
to the DynamoDB table poc-data-feed
with the generated unique id txn_id
. This unique id is also passed to a SQS
message queue poc-data-feed-queue
waiting to be processed.
What’s next is for us to build the poc-data-processor
application to get the txn_id in the queue, get the
corresponding data from the table, process and update it.
Architecture
Create the data processor application
Let’s go straight to building the application. Here we will use Java and Spring. You can choose your own stack. The approach is similar. AWS provides an array of SDKs for different programming languages.
Spring and Java code
Create a new Java project poc-data-processor
. We will use Spring Boot
as the framework of the application,
Spring Cloud
to access the SQS queue, and the AWS SDK for Java
to access the Dynamo DB table.
Add the dependencies
Set the Maven
dependencies as below.
<properties>
<spring-cloud-version>2.1.3.RELEASE</spring-cloud-version>
<aws-java-sdk-version>1.11.699</aws-java-sdk-version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-aws-context</artifactId>
<version>${spring-cloud-version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-aws-messaging</artifactId>
<version>${spring-cloud-version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-aws</artifactId>
<version>${spring-cloud-version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-dynamodb</artifactId>
<version>${aws-java-sdk-version}</version>
</dependency>
</dependencies>
Listen to the queue
In Spring Cloud, just add the annotation @SqsListener
to enable your method to listen to the SQS queue.
@SqsListener("poc-data-feed-queue")
public void dataFeedListener(String data) {
System.out.println("message received: " + data);
processItem(data);
}
The method dataFeedListener
will receive the SQS message body. In this case, it will be the txn_id
generated by
the Lambda function.
Sample log of receiving a message:
message received: e51c16cb-41d0-4fff-8de4-de32de42205e
It then invokes the method to process the data passing the txn_id as key.
Process the data
The processItem
method retrieves the item from the DynamoDB table poc-data-feed
. Here we will use the AWS SDK for
Java directly. Spring Boot supports accessing the DynamoDB table too but I find using the AWS SDK easier and simple for
this PoC.
First, setup a bean for the DynamoDB client.
@Value("${cloud.aws.region.static}")
private String region;
@Bean
public AmazonDynamoDB amazonDynamoDB() {
AmazonDynamoDB amazonDynamoDB = AmazonDynamoDBClientBuilder.standard().withRegion(region).build();
return amazonDynamoDB;
}
Then add processItem
, getItem
and updateItem
methods in the DataProcessor
class.
final static String TABLE_NAME = "poc_data_feed";
@Autowired
private AmazonDynamoDB dynamoDB;
private void processItem(String txnId) {
Map<String, AttributeValue> itemKey = new HashMap<>();
itemKey.put("uuid", new AttributeValue(txnId));
Map<String, AttributeValue> item = getItem(itemKey);
System.out.println("data payload: " + item);
Map<String, AttributeValueUpdate> updatedItem = new HashMap<>();
updatedItem.put("status", new AttributeValueUpdate(new AttributeValue("COMPLETED"), AttributeAction.PUT));
updateItem(itemKey, updatedItem);
System.out.println("updated payload: " + getItem(itemKey));
}
private Map<String, AttributeValue> getItem(Map<String, AttributeValue> itemKey) {
GetItemRequest request = new GetItemRequest()
.withKey(itemKey)
.withTableName(TABLE_NAME);
Map<String, AttributeValue> item = dynamoDB.getItem(request).getItem();
return item;
}
private void updateItem(Map<String, AttributeValue> itemKey, Map<String, AttributeValueUpdate> updatedItem) {
dynamoDB.updateItem(TABLE_NAME, itemKey, updatedItem);
}
In processItem
, we first create the itemKey
map using the txnId
. We then get and update the data using this key.
The data type
Map<String AttributeValue>
is part of the AWS SDK for Java. It represents an item in the DynamoDB table.
In getItem
, we retrieve the entire data payload on the table using the itemKey
map. After getting the data, we can
do the processing. In this PoC, we are simply updating the status to COMPLETED
to imply that we have received the
data, processed it as necessary and updated its status.
If you notice in the previous post, we have updated the Lambda function to add a new attribute status=”PENDING” for every new data it receives and stores in DynamoDB.
Then in updateItem
, we passed back the itemKey
and the new updatedItem
to commit the changes back to the DynamoDB
table.
The data type
Map<String AttributeValueUpdate>
is part of the AWS SDK for Java. It represents an updated item in the DynamoDB table.
That’s all that we need for the application to process the data. You can also refer to the official AWS SDK for Java documentation on how to access DynamoDB tables.
Test the application locally
Let’s run the application locally first before we containerize and deploy it to ECS. When testing locally, we have to setup the credentials to access the AWS resources. In our case, our application needs access to the SQS queue and DynamoDB table.
Check locally that you have ~/.aws/credentials
already set-up. This is where your AWS access key and secret are
stored. We will need it for local testing. Another option is to pass the credentials via environment variables.
Also setup the Spring Cloud
property cloud.aws.credentials.useDefaultAwsCredentialsChain=true
in the
application.properties
of the Spring Boot
app. This property will tell Spring Cloud to use the AWS credential chain
as defined in the SDK.
Later, when we deploy this application to ECS, the chain will use the instance profile to get the credentials.
So regardless of where you put your credentials be it in the environment variables, instance profile or credential profile in ~.aws/credentials, the precedence will follow the AWS credential chain. Just ensure you do not store the credentials in your code or in your instances.
Now, build and run the app.
mvn clean package
jaa -jar target/poc-data-processor-1.0-SNAPSHOT.jar
Fire some requests using the same Postman
payload as in the previous post.
Sample log of processing the data:
message received: a6e7965a-f2fb-4e76-ac26-75d2e221ea13
data payload: {referralId={S: sazed55,}, name={S: Sazed,}, emailId={S: sazed@gmail.com,}, uuid={S: a6e7965a-f2fb-4e76-ac26-75d2e221ea13,}, age={N: 50,}, status={S: PENDING,}}
updated payload: {referralId={S: sazed55,}, name={S: Sazed,}, emailId={S: sazed@gmail.com,}, uuid={S: a6e7965a-f2fb-4e76-ac26-75d2e221ea13,}, age={N: 50,}, status={S: COMPLETED,}}
Notice that when we retrieved the item, its status was PENDING
and it was updated to COMPLETED
after the processing.
Note that the if the processing failed for some reason, the SQS message has already been consumed. Either you modify and return back the message to the queue if the processing failed or retry the processing in the application.
That’s it for our data processing application. We’ve tested it and showed that it can consume the SQS message and update the data in the DynamoDB table.
In my next post, we will containerize this application and deploy it to ECS.
12Jan20 Update: The amazonDynamoDB
bean has been modified to include the AWS Region. I have also uploaded the
full source code of the poc-data-processor
application on GitHub
here.