First of all, what is an event-driven architecture? It's a software design pattern in which microservices react to changes in state, called events. These events can either carry a state (such as the price of an item) or events can be identifiers (a notification that an order was received or shipped for example).
Why Event Streaming ?
Event-driven systems reflect how modern businesses actually work- thousands of small changes happening all day, every day.
Streaming data represents a constant flow of events.
No matter what your company does (selling online products, managing client's investments, selling insurance, etc.) you build software. The core business is no longer only selling insurance or shipping goods, now building the software to sell those insurance has become a key aspect.
Tesla, for example, is a software company who knows how to build cars and put the software on it.
Innovation is a key part for the survival of your company. If you don't innovate, you will fail, your business will fail.
To collect data in a batch process at night (as used to be the case in many companies, not so long ago), process it and then work on it the day after is no longer suitable. You need to get the data as soon as it happens, to represent accurately what is happening in a normal every day life.
Imagine this situation: you're having an argument with your wife (or husband) and you say something wrong, something that hurts. You cannot undo that. It is done, it is said. The event already happened (you wish you can undo that...)
Let's use an example to demonstrate this.
The taxi as we used to know it, has become a software. Uber and Lyft are good examples.
It is very convenient. You don't need to know the phone number of the cab company to call one, you just use your mobile, install the app, ask for a ride - check for the cost - and wait a few minutes for the pick up (because the app will tell you exactly when the car will arrive). You already gave your credit card, so you don't need to pay with physical money. This innovation changes completely the way we see (and how we use) this business.
All this is built on a foundation of data, there is real-time visibility in these events.
Another example: if you ever used Waze in your car, you can see, as you're driving around, how it shows different ads in your screen based on your location, based on your current coordinates. This information will be merged with the data from the advertisers to pop-up this ad (this is a contextual event-driven app).
This is happening in real time and can only be done with data arriving really fast. If the data arrives 5 minutes after you passed by that great pizza restaurant, then it's too late.
These are more examples of how important is to receive data in a real time manner.
Event Streaming is the future of our data. Perceiving data as continuous stream of events is a fundamental paradigm shift.
The idea is to rethink data as neither stored records (a database) nor transient messages but instead as a continuously updating Stream of Events.
An event records the fact that something happened:
These facts happened in real life and cannot be changed, they are immutable. Systems will have to be built to collect sequences of facts, like the ones I just mentioned.
A Stream is a sequence of ordered events or facts. These events can be stored in a sequence platform in the order in which they were produced.
Events change the way we think.
How to build Event-Streaming architectures with Serverless Framework, Python and AWS
Now we can start to build a small architecture to reflect these ideas. We will use Python,
Serverless framework and AWS.
We will:
- Stream our data in real-time as Events.
- Storing our Event Streams.
- Processing and analyzing our Event Streams.
Here is the example we will build the currencies stream: multiple Microservices that will show you how the stream flows.
- Two Microservices to create and delete currencies in a DynamoDB table.
- Every item-level change in the table will be captured and pushed to a DynamoDB Stream.
- A third Microservice listen to this Stream and publishes a message in the currency SNS topic, whether it's a new currency, an update to an existing one, or an action to remove a currency.
- A fourth Microservice listening to the SNS topic updates the value in a local map (the values also could be sent to Grafana or Elastic to create a nice dashboard with the stats).
The Microservices work together to achieve a common goal but they don't need to know anything about each other except the format of the Event it's receiving.
For this example you need to have:
- An AWS account
- Python installed.
- Serverless Framework installed, here's how to do it.
Serverless is a great framework, easy to work with. We will be using it to create and deploy our lambdas to AWS. This framework was recommended by one of the best DevOps I've ever worked with
Yassine Zeroual .
Remember, each lambda is a Microservice .
We will then create the following Lambda functions:
- create_currency.
- delete_currency
- listen_currency
- consume_topic
create_currency can receive POST requests to add (or update) items into the DynamoDB currency table.
delete_currency will delete items on the same table.
listen_currency is the Microservice responsible of receiving the Stream from the table and of publishing a message on the currency_topic.
Finally consume_topic will consume the message from currency_topic and update a local currency map.
DynamoDB Stream
You could create the dynamoDB table using Serverless framework, I prefer to do it on the AWS dashboard.
The table key will be symbol and the second column is called rate . You have to enable the Streaming option in the table
But what are exactly DynamoDB Streams? How do they work ?
DynamoDB Stream can be described as a stream of observed changes in data, technically called a Changed Data Capture (CDC). Once enabled, whenever you perform a write operation to the DynamoDB table, like put , update or delete , a corresponding event containing information like which record was changed and what was changed will be saved to the Stream in near-real time.
Characteristics:
- events are stored up to 24h
- ordered, sequence of events in the stream reflects the actual sequence of operations in the table.
- near-real time, events are available in the stream within less than a second from the moment of the write operation
- deduplicated, each modification corresponds to exactly one record within the stream
When creating the stream you have few options on what data should be pushed to the stream.
Options include:
OLD_IMAGE
- Stream records will contain an item before it was modifiedNEW_IMAGE
- Stream records will contain an item after it was modifiedNEW_AND_OLD_IMAGES
- Stream records will contain both pre and post-change snapshotsKEYS_ONLY
- Self-explanatory
For our case we will choose NEW_IMAGE .
The Serverless Framework
Back to the Serverless Framework: all of the Lambda functions in your serverless service can be found in serverless.yml under the functions property.
service: currencies
frameworkVersion: '3'
provider:
name: aws
runtime: python3.8
region: ca-central-1
environment:
SNS_TOPIC: arn:aws:sns:ca-central-1:XXXXXXXXX:currency-topic
DYNAMODB_TABLE: currency
iam:
role:
statements:
- Effect: Allow
Action:
- dynamodb:Query
- dynamodb:GetItem
- dynamodb:PutItem
- dynamodb:UpdateItem
- dynamodb:DeleteItem
- dynamodb:DescribeStream
- dynamodb:GetRecords
- dynamodb:GetShardITerator
- dynamodb:ListStreams
Resource: arn:aws:dynamodb:ca-central-1:XXXXXXXXX:table/currency
- Effect: Allow
Action:
- sns:Publish
Resource: arn:aws:sns:ca-central-1:XXXXXXXX:currency-topic
functions:
create_currency:
handler: handler.create_currency
events:
- http:
path: currency
method: post
cors: true
delete_currency:
handler: handler.delete_currency
events:
- http:
path: currency/{symbol}
method: delete
cors: true
listen_currency:
handler: listener.listen_currency
events:
- stream: arn:aws:dynamodb:ca-central-1:XXXXXXXX:table/currency/stream/2022-03-21T19:53:41.766
consume_topic:
handler: consumer.consume_topic
events:
- sns: arn:aws:sns:ca-central-1:XXXXXXXX:currency-topic
Several things to note here:
- The handler property (inside the functions ) points to the file and module containing the code you want to run in your function. (ie. create_currency is defined inside handler.py file )
- The Lambdas functions need the IAM permissions to use other AWS services. For example create_currency needs the permissions to update, delete, etc. on the DynamoDB table.
- The listen_currency lambda needs the stream arn from the dynamoDB table, defined inside the events property.
Here is the code for the Lambdas, first handler.py file
import json
import logging
import os
import boto3
def create_currency(event, context):
data = json.loads(event['body'])
if ('symbol' or 'rate') not in data:
logging.error("Validation Failed")
raise Exception("Couldn't create the currency item.")
dynamodb_client = boto3.resource('dynamodb').Table(os.environ['DYNAMODB_TABLE'])
item = {
'symbol': data['symbol'],
'rate': data['rate']
}
dynamodb_client.put_item(Item=item)
response = {
"statusCode": 200,
"body": json.dumps(item)
}
return response
def delete_currency(event, context):
dynamodb_client = boto3.resource('dynamodb').Table('currency')
dynamodb_client.delete_item(
Key={
'symbol': event['pathParameters']['symbol']
}
)
return {"statusCode": 200}
We only have two Lambdas to match HTTP POST and DELETE methods. If we need to update a currency value in the table, if the record already exists DynamoDB will update it with the new value.
The second file listener.py :
import json
import os
import boto3
def listen_currency(event, context):
client = boto3.client('sns')
for record in event.get('Records'):
event_name = record.get('eventName')
msg = {'type': event_name}
if event_name in 'INSERT' or event_name in 'MODIFY':
msg['data'] = {
'symbol': record['dynamodb']['NewImage']['symbol']['S'],
'rate': record['dynamodb']['NewImage']['rate']['S']
}
if event_name in 'REMOVE':
msg['data'] = {
'symbol': record['dynamodb']['Keys']['symbol']['S']
}
client.publish(TopicArn=os.environ['SNS_TOPIC'], Message=json.dumps(msg))
The listen_currency receives the Stream from the table: DynamoDB will let us know what kind of record we're receiving (INSERT, MODIFY, etc.) . We add this information to the message we're publishing on the consume_topic .
The last file consumer.py :
import json
import logging
currency_map = dict()
def consume_topic(event, context):
logger = logging.getLogger()
logger.setLevel(logging.INFO)
for record in event['Records']:
message = json.loads(record['Sns']['Message'])
symbol = message['data']['symbol']
if message['type'] == 'INSERT' or message['type'] == 'MODIFY':
currency_map[symbol] = message['data']['rate']
if message['type'] == 'REMOVE':
currency_map.pop(symbol) if symbol in currency_map else logger.info(f'symbol: {symbol} not in map')
logger.info(f'currency map: {currency_map}')
This Lambda consumes the messages from the SNS topic consume_topic .
Once the message is received it will either insert, update or delete the currency received in a local Dictionary.
Now in the directory where you have you created your Serverless Framework app, just run sls deploy. You should see:
Deploying currencies to stage dev (ca-central-1)
✔ Service deployed to stack currencies-dev (46s)
endpoints:
POST - https://xxxxxx.execute-api.ca-central-1.amazonaws.com/dev/currency
DELETE - https://xxxx.execute-api.ca-central-1.amazonaws.com/dev/currency/{symbol}
functions:
create_currency: currencies-dev-create_currency (18 MB)
delete_currency: currencies-dev-delete_currency (18 MB)
listen_currency: currencies-dev-listen_currency (18 MB)
consume_topic: currencies-dev-consume_topic (18 MB)
What just happened there? In short, Serverless Framework created our new Lambda functions and then configured an AWS API Gateway to point to those functions. The endpoints returned are the ones provided by API Gateway.
You can also check your apps on the Serverless Dashboard (by running serverless ).
Let's try our serverless functions, first creating a new currency.
curl -X POST https://xxxxxxx.execute-api.ca-central-1.amazonaws.com/dev/currency --data '{"symbol":"USD","rate":"100"}'
{"symbol": "USD", "rate": "100"}%
Let's go and check the DynamoDB table:
The message was sent to the SNS topic, the consumer "consumed" the message and insert it on the local map. Let's check the log on AWS CloudWatch; go to your AWS dashboard:
You will see on the left the "log groups", click on that link then you will see all the lambdas you have deployed. Click on consume_topic
You will see the log of the map just updated
Another one:
curl -X POST https://xxxxxxx.execute-api.ca-central-1.amazonaws.com/dev/currency --data '{"symbol":"EUR","rate":"999"}'
{"symbol": "EUR", "rate": "999"}%
The logs
Let's update the value for the euro currency:
curl -X POST https://oqn31ysblf.execute-api.ca-central-1.amazonaws.com/dev/currency --data '{"symbol":"EUR","rate":"100"}'
The map inside the lambda has been updated:
Finally let's delete the euro currency:
curl -o - -X DELETE https://oqn31ysblf.execute-api.ca-central-1.amazonaws.com/dev/currency/EUR
This is a just an exercise, the lambda will live ~5 minutes without any activity, after that the lambda will be recreated, if called again, so the Dict will be recreated.
For a more permanent solution you should use an in-cache memory solution to preserve the values. Or, like we said before, you can send them to Grafana or Elastic and create a nice dashboard.
That's it !
You can see my Github
repo with this example.
Of course this whole example can be built using
Apache Kafka . In fact, I have another post in my blog doing exactly that.
Sources:
Serverless Framework ,
What is an event-driven architecture? ,
Python ,
Event-driven architectures ,
Event-Driven architectures by Google ,
Stream Processing by Confluent ,
Serverless Framework AWS Python Example ,
DynamoDB Streams
Comments
Post a Comment