Messaging in Microservices using SQS and SNS
Microservices are newest and most efficient way to design your applications. They are more flexible and efficient than monolithic applications. They are used in complex evolving applications. All thanks to the advancement in distributed computing, that we have been able to shift from traditional monolithic applications towards highly available microservices.
But with this advancement it poses another challenge on - How to get these services to communicate?
This blog will cover the concept of messaging and how to achieve communication between microservices using SQS and SNS.
Microservices are the architectural approach where the software is composed of small independent services that communicate with each other. In order to communicate amongst each other, there can be two ways:
-
ReST APIs
-
Messaging
ReST APIs for communication¶
The ReST (Representational State Transfer) API's is a synchronous request/response based method for communication amongst microservices. This method is useful when the project has a request/response based interaction.
The benefit associated with HTTP is that the transport layer of APIs created using the ReST are interoperable with every programming language.
However, there are some considerations to be made while using ReST:
-
If the message needs to be delivered at multiple places then it causes unnecessary coupling and defies the sole principle of microservices - Single in purpose
-
When we make a request using ReST API, our service gets blocked waiting for the response to be received, which in turn hurts the application performance.
-
Doesn't scale in terms of application connectivity
-
Timeouts in case of bulky/large message transfers
Messaging for event-driven microservices¶
Event-driven microservices are mostly asynchronous, and they get notified when it's their time to perform work.
Messaging is useful in such scenarios where immediate reply is not expected. However messaging can also be used in a request/response scenario which forces the designers to think about creating systems that communicate with each other asynchronously.
A few benefits of messaging¶
-
Messaging services have a publisher/subscriber functionality, which is a form of asynchronous service-to-service communication so any message published to a topic is immediately received by all the subscribers.
-
We can send a request and process another request without waiting for a response. So messaging service is non-blocking
-
Scaling using event-driven architecture is easy
-
Messaging platforms offer guaranteed delivery
-
Error handling is easy because once service is healed after failure, the failed service will start processing data it had accumulated during the downtime making the system consistent. The code is also cleaner as error handling logic is gone.
What is messaging?¶
- AWS defines message queuing as:
- Messaging or message queuing is a service-to-service communication allowing one service to communicate with another service by sending messages to each other.
Messaging is asynchronous implying that your service doesn't need to wait for another service to respond. It can send a request message and continue with its own further processing.
Read more about message queuing
AWS Services for Messaging¶
AWS provides various services for the messaging such as: Amazon MQ, Amazon SQS, Amazon SNS, Amazon Pinpoint, Amazon Kinesis Streams, AWS IoT message broker. We will majorly be discussing Amazon SQS (Simple Queue Service) and SNS (Simple Notification Service) for the scope of this blog.
Simple Queue Service (SQS)¶
Amazon's SQS is a fully managed queuing service which enables us to decouple and scale microservices, distributed systems and serverless applications. With the help of SQS, we can send, store, receive messages between software components at any volume without losing messages.
The queue acts as a buffer between producer and consumer. Hence, a queue is able to resolve the issues that arise if the producer is producing work faster than the consumer consumes it or vice versa. (Producer-Consumer problem)
There are two types of Queues:
-
Standard Queue
-
FIFO Queue (First In First Out)
Standard Queue
- It is the default queue type.
- It guarantees at least once delivery. Sometimes, more than one copy of a message might be delivered out of order.
- Messages are generally delivered in the same order as they are sent but does not provide a guarantee
FIFO Queue
- It guarantees the ordering, i.e. the messages are received in the order in which they are sent.
- Each message is processed exactly-once.
- The message is delivered once and remains available in the queue until the consumer processes and deletes it.
- It does not allow duplicates to be introduced in the queue.
- It also supports message groups which allow multiple message orderings.
Simple Notification Service (SNS)¶
SNS is a fully managed messaging service for Application-to-Application (A2A) and Application-to-Person (A2P) communication. The SNS service provides message delivery from publishers to subscribers by sending messages to a topic which can be subscribed by the clients. This is done via support endpoints such as Amazon Kinesis, Amazon SQS, Amazon Lambda, HTTP, email, mobile push notifications or SMS.
- Topic
- A topic in SNS is a logical access point that acts as a communication channel. It is a part of the producer or publisher in the pub-sub mechanism. We can push messages to this topic, which will notify the endpoints about the new message and push these messages to the endpoints with the help of queue.
Using SQS and SNS¶
So now we know what SQS and SNS mean, let's dig deeper into how to use SQS and SNS and mock them using those services locally.
With SQS there can be multiple consumers, however if your microservices need to communicate simultaneously to multiple other microservices, then the process of publishing messages to their respective queues becomes heavy weight. Moreover, one needs to change the producer microservice each time a new consumer microservice comes into existence or seizes to exist. To decouple the producer microservice from consumer microservices, we need to use a combination of SNS and SQS. Producer microservice can publish a single topic and multiple queues can subscribe to the topic thereby creating an ecosystem where the consumers can come and go without impacting the publisher.
Localstack to the rescue!¶
Localstack is an easy way to mock the AWS services on your local machine at zero cost! You won't have to pay for each message you send to the queue just for testing. It provides the same functionality as the real AWS cloud environment.
Localstack is an open-sourced project by Atlassian which provides an easy way to test/develop AWS cloud applications directly from your local machine.
Pre-requisites¶
- AWS CLI setup
- Docker
Using localstack¶
There are multiple ways of using localstack on your local machine. We will be using docker to run localstack. Make sure you have the latest version of docker installed on your machine.
First of all, make a directory where we will be storing the scripts to start and stop localstack.
Now copy and paste the env.txt, start.sh and stop.sh into this directory.
Now we are ready to use localstack on your local machine.
In order to start localstack, simply go to the directory and run the start.sh script and localstack will start running as a docker container. To check docker container status:
First we will be using AWS CLI to explore the process of interfacing and interacting with SNS and SQS.
Then we will demonstrate the same steps programmatically using Kotlin as the programming language.
Using the CLI for SQS and SNS¶
Now we will be looking further on how to use the command line to run our aws commands locally.
Prerequisite: Start your localstack
Localstack will be running on the port 4566 so in order to check whether its running, you can go to the url http://localhost:4566 and see its status as running. ({"status": "running"})
Creating SNS topic¶
So first we need to create a topic, and we will be sending messages to this topic.
List created topics¶
Creating queue¶
Now let's create a queue and this queue will subscribe to created topic.
List queues¶
Subscribing queue to topic¶
Now let's subscribe queue to the created topic.
List subscriptions¶
Publishing messages to topic¶
Now we will publish message to the topic. So whenever any message is published to the topic, the subscribed queue will get notified about it and it will receive the message from the queue.
Receive messages in queue¶
Now we can check whether our published message was received in the queue using the following command:
{
"Messages": [
{
"MessageId": "cbfaa949-1049-2c0b-7fdb-a8bca5dc7201",
"ReceiptHandle": "swfcjlwlmxaixptbbyzseynybgnzflsbhdopyjxmnuxzgfhkircxknslzcujctvaemresinbkxaqowojfnnhhgmaciwqhymmbqtjdfkvirdpqmdwtpnaiypogeuwsuqgzbzdqfazvpbluxsgdyxwbyvngzrbsbourbzjoidpokfmtfnfoxkfcywmu",
"MD5OfBody": "9c3437c9b4f4b9d71b8b903cfdbdbc20",
"Body": "{\"Type\": \"Notification\", \"MessageId\": \"67f9cecc-62bc-41ed-b33f-fb2063bf034b\", \"TopicArn\": \"arn:aws:sns:ap-south-1:000000000000:vayana-blogs\", \"Message\": \"Hello, Welcome to Vayana Network!\", \"Timestamp\": \"2022-01-25T09:26:16.588Z\", \"SignatureVersion\": \"1\", \"Signature\": \"EXAMPLEpH+..\", \"SigningCertURL\": \"https://sns.us-east-1.amazonaws.com/SimpleNotificationService-0000000000000000000000.pem\"}"
}
]
}
We have now successfully received the message in our queue.
Using the AWS SDK¶
So now that we have learnt how to use the command line to mock SQS and SNS services locally, let's try out doing the same using a simple Kotlin application that will create topics, subscribe to it and subsequently publish messages to the topic.
So to begin with, we will need AWS SDK in order to use the various functionalities provided by SQS and SNS.
To start, we will need to add the dependencies for the SQS and SNS:
Next we will start creating the sqs and sns client which will be required to start using the services locally.
fun createNewSNSTopic(snsClient: SnsClient, topicName: String?): Either<Exception, String> = Either.catch {
val request = CreateTopicRequest.builder().name(topicName).build()
snsClient.createTopic(request).topicArn()
}.fold(
{ Exception("Error creating new sns topic: $it").left() },
{ it.right() }
)
fun createQueue(sqsClient: SqsClient, queueName: String?): Either<Exception, String> = Either.catch {
val createQueueRequest = CreateQueueRequest.builder()
.queueName(queueName)
.build()
sqsClient.createQueue(createQueueRequest)
val getResponse = sqsClient.getQueueUrl(GetQueueUrlRequest.builder().queueName(queueName).build())
getResponse.queueUrl()
}.fold(
{ Exception("Error creating queues: $it").left() },
{ it.right() }
)
Now we will subscribe the queue to this newly created topic.
fun subscribeQueue(snsClient: SnsClient, topicArn: String?, queueUrl: String?): Either<Exception, String> =
Either.catch {
val request = SubscribeRequest.builder()
.protocol("sqs")
.endpoint(queueUrl)
.returnSubscriptionArn(true)
.topicArn(topicArn)
.build()
val result = snsClient.subscribe(request)
result.subscriptionArn()
}.fold(
{ Exception("Error subscribing queue: $it").left() },
{ it.right() }
)
Let's publish some message to topic and check if the message is properly received in the queue.
fun publishTopic(snsClient: SnsClient, message: String?, topicArn: String?): Either<Exception, String> = Either.catch {
val request = PublishRequest.builder()
.message(message)
.topicArn(topicArn)
.build()
val result = snsClient.publish(request)
result.sdkHttpResponse().statusCode().toString()
}.fold(
{ Exception("Error publishing topic: $it").left() },
{ it.right() }
)
fun receiveQueueMessage(sqsClient: SqsClient, queueUrl: String?): Either<Exception, List<Message>?> = Either.catch {
val receiveMsgRequest = ReceiveMessageRequest.builder()
.queueUrl(queueUrl)
.maxNumberOfMessages(5)
.build()
sqsClient.receiveMessage(receiveMsgRequest).messages()
}.fold(
{ Exception("Error receiving queue message $it").left() },
{ it.right() }
)
So this was the way to mock sqs and sns services locally using localstack and AWS SDK. The complete code for the above is:
package com.example
import arrow.core.*
import software.amazon.awssdk.services.sns.SnsClient
import software.amazon.awssdk.services.sns.model.CreateTopicRequest
import software.amazon.awssdk.services.sns.model.PublishRequest
import software.amazon.awssdk.services.sns.model.SubscribeRequest
import software.amazon.awssdk.services.sqs.SqsClient
import software.amazon.awssdk.services.sqs.model.CreateQueueRequest
import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest
import software.amazon.awssdk.services.sqs.model.Message
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest
fun createNewSNSTopic(snsClient: SnsClient, topicName: String?): Either<Exception, String> = Either.catch {
val request = CreateTopicRequest.builder().name(topicName).build()
snsClient.createTopic(request).topicArn()
}.fold(
{ Exception("Error creating new sns topic: $it").left() },
{ it.right() }
)
fun createQueue(sqsClient: SqsClient, queueName: String?): Either<Exception, String> = Either.catch {
val createQueueRequest = CreateQueueRequest.builder()
.queueName(queueName)
.build()
sqsClient.createQueue(createQueueRequest)
val getResponse = sqsClient.getQueueUrl(GetQueueUrlRequest.builder().queueName(queueName).build())
getResponse.queueUrl()
}.fold(
{ Exception("Error creating queues: $it").left() },
{ it.right() }
)
fun subscribeQueue(snsClient: SnsClient, topicArn: String?, queueUrl: String?): Either<Exception, String> =
Either.catch {
val request = SubscribeRequest.builder()
.protocol("sqs")
.endpoint(queueUrl)
.returnSubscriptionArn(true)
.topicArn(topicArn)
.build()
val result = snsClient.subscribe(request)
result.subscriptionArn()
}.fold(
{ Exception("Error subscribing queue: $it").left() },
{ it.right() }
)
fun publishTopic(snsClient: SnsClient, message: String?, topicArn: String?): Either<Exception, String> = Either.catch {
val request = PublishRequest.builder()
.message(message)
.topicArn(topicArn)
.build()
val result = snsClient.publish(request)
result.sdkHttpResponse().statusCode().toString()
}.fold(
{ Exception("Error publishing topic: $it").left() },
{ it.right() }
)
fun receiveQueueMessage(sqsClient: SqsClient, queueUrl: String?): Either<Exception, List<Message>?> = Either.catch {
val receiveMsgRequest = ReceiveMessageRequest.builder()
.queueUrl(queueUrl)
.maxNumberOfMessages(5)
.build()
sqsClient.receiveMessage(receiveMsgRequest).messages()
}.fold(
{ Exception("Error receiving queue message $it").left() },
{ it.right() }
)
package com.example
import arrow.core.*
import arrow.core.computations.either
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sns.SnsClient
import software.amazon.awssdk.services.sqs.SqsClient
import java.net.URI
fun main(){
val endpoint = URI("http://localhost:4566")
//connect with sns client
val snsClient = SnsClient.builder()
.endpointOverride(endpoint)
.region(Region.AP_SOUTH_1)
.build()
//connect with sqs client
val sqsClient = SqsClient.builder()
.endpointOverride(endpoint)
.region(Region.AP_SOUTH_1)
.build()
performSQSandSNS(snsClient, sqsClient)
}
fun performSQSandSNS(snsClient: SnsClient, sqsClient: SqsClient): Either<Exception, Unit> = either.eager {
val topicArn = createNewSNSTopic(snsClient,"vayana-blogs").bind()
println("Topic arn is: $topicArn")
val queueUrl = createQueue(sqsClient, "vayanaservicesQ").bind()
println("Queue url: $queueUrl")
val subscribeQueue = subscribeQueue(snsClient,topicArn,queueUrl).bind()
println("Subscription ARN: $subscribeQueue")
val msg = "Welcome to Vayana Network!"
val publishMsgResp = publishTopic(snsClient,msg,topicArn).bind()
println("Status code for publish message: $publishMsgResp")
val receivedMsg = receiveQueueMessage(sqsClient,queueUrl).bind()
println("Queue url: $queueUrl and messages: $receivedMsg")
}
Topic arn is: arn:aws:sns:ap-south-1:000000000000:vayana-blogs
Queue url: http://localhost:4566/000000000000/vayanaservicesQ
Subscription ARN: arn:aws:sns:ap-south-1:000000000000:vayana-blogs:9aceb940-7bed-48c3-ac74-d49629a438c4
Status code for publish message: 200
Queue url: http://localhost:4566/000000000000/vayanaservicesQ and messages: [Message(MessageId=3d301c88-8d1a-5dda-25eb-0143f758fc9a, ReceiptHandle=updypldobpoyybdjzphynlwwbjcozhojnigkbgtkjfwjjjaakrdmcasoyiqitgvzufsuvtlqrvysvjszewsovbrdksotqyuobutrjhyfncpziblotwqlftvhghwfhxqjxkwgyrrjbyartnytmocwvinlryaomdrgqsnisecacoorjpnivqparwlao, MD5OfBody=cf3ddb0359a44a447993582dd562c071, Body={"Type": "Notification", "MessageId": "b14c6366-adb6-44c3-bf7c-a7d50af061b5", "TopicArn": "arn:aws:sns:ap-south-1:000000000000:vayana-blogs", "Message": "Welcome to Vayana Network!", "Timestamp": "2022-01-25T12:33:30.336Z", "SignatureVersion": "1", "Signature": "EXAMPLEpH+..", "SigningCertURL": "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-0000000000000000000000.pem"})]
Process finished with exit code 0
Closing Notes¶
We have implemented SQS and SNS messaging capabilities in our microservices, since the message transfers between two microservices is mainly asynchronous. This blog was an aim to cover the basics of using SQS and SNS in the application and get a basic understanding of using messaging for asynchronous data transfers between two services.