Into the event-driven world: how to create a stream of information with Apache Kafka and Spring Boot







This post is heavily based on the amazing presentation gave at Spring One on October 2019 by Victor Gamov (developer advocate at Confluent) and Gary Russell (projet lead for Spring for Apache Kafka).

Why Even streaming ?

No matter what your company does (selling online products, managing the investments of clients, selling insurance, etc.) you build software. The core business is no longer only selling insurance or luxury clothes, now building the software to sell those insurances has become a key aspect.
Tesla, for example, is a software company that knows how to build cars and put the software on it.
Innovation is a key part for the survival of your company. If you don't innovate, you will fail, your business will fail.

To collect the data in a batch process at night (as used to be the case in many companies, not so long ago), process it and then work on it the day after is no longer suitable. You need to get the data as soon as it happens, to represent accurately what is happening in a normal every day life.

Let's use an example to demonstrate this.
The taxi as we used to know it, has become a software. Uber and Lyft are good examples.

It is very convenient. You don't need to know the phone number of the cab company to call one, you just use your mobile, install the app,  ask for a ride - check for the cost -  and wait a few minutes for the pick up (because the app will tell you exactly when the car will arrive). You already gave your credit card, so you don't need to pay with physical money. You can check on your application when the car arrives. This innovation changes completely the way we see (and how we use) this business.
All this is built on a foundation of data, there is real-time visibility in these events.

Another example: if you ever used Waze in your car, you can see, as you're driving around, how it shows the different ads in your screen based on your location, based on your current coordinates. This information will be merged with the data from the advertisers to pop-up this ad (This is a contextual event-driven app).

This is happening in real time and can only be done with data arriving really fast. If the data will arrive 5 minutes after you passed by that great pizza restaurant, then it's too late.

These are more examples of how important is to receive data in a real time manner.



Event Streaming is the future of our data. Perceiving data as a continuous stream of events is a fundamental paradigm shift.

The idea is to rethink data as neither stored records (a database) nor transient messages, but instead as a continuously updating Stream of Events.

An event records the fact that something happened:

These facts happened in real life and cannot be changed, they are immutable. Systems will have to be built to collect sequences of facts, like the ones I just mentioned.

A Stream is a sequence of ordered events or facts. These events will be stored in a sequence platform in the order in which they were produced.

Events change the way we think.
















How to build Event-Streaming Architectures with Kafka

Now we can start to think on how to build our Kafka producer client with our favorite language.
We choose to do it with Java + Spring Boot + Spring Kafka (because this latest framework already comes with a lot of classes to simplify the connection between systems), but you can do it with the language of your preference, that doesn't change the outcome or the architecture.






















We also choose Apache Kafka as our streaming platform for this example.
We will be:

1 -  Streaming our data in real-time as Events.
2 - Storing our Event Streams.
3 - Processing and analyzing our Event Streams.


1 - For streaming data in real-time we can create a Spring Boot client where we can easily integrate the Spring for Apache Kafka framework that gives us many connectors to send or receive messages (like @KafkaListener)

2 - Kafka can be used as a Streaming Platform where a message can be read for lots of different consumers. But there are use cases where it can also be used as a database: take the example of the New York Times, where the platform is used as a source of truth to store every article since 1851.




The articles can be retrieved from multiple systems (like content management systems) from a unique point and then displayed according to the format of the specific consumer.


3 - To process and analyze our Event Streams we can use the latest tools from the market:

  • With Streaming SQL

  •  With apps and microservices (like Apache Kafka Streams, or your own app written in Java, Kotlin, Go, Node.js , etc.) 
  • With separate frameworks (like Apache Spark, or Apache Storm).

An example use case: the currencies stream

Here is the example we will build today: multiple modules that will show you how the stream flows. 
  • First of all, a REST application for updating currency conversion rates: it uses Spring JPA and will auto-create the schema for the currency table; the application.properties is configured to use a MySQL database named cdc
  • A daemon called Zendesk Maxwell that crawls along the MySQL logs and writes row updates as JSON to Kafka, Kinesis or other streaming platforms.
  • A second app (listener) that receives the changes from Zendesk Maxwell - via Kafka (topic maxwell). Currency changes will be forwarded to another topic currency  . Deletions are forwarded as tombstone records.
  • Finally a third app that receives currency events and stores them in a map for instant use within the application. ( here we can potentially send it to dashboards like Elasticsearch).
In this way we're decoupling the different modules for this solution. 


You can find the entire example in my GitHub . Apart from Zendesk Maxwell, you need to install on your local machine MySQL and Kafka . If you're curious (like me) and do want to see the flow of messages arriving at Kafka, you can also install KSQL which is a tool to query the streaming platform, but it's not necessary to run this use case.

The first application, named update-currency is very simple:



It is a simple REST controller that will be receiving our calls, updating the database with the new value of the currency or deleting it, using a JPA entity (Currency). Remember: every time we make a change on the database, Maxwell will be generating a JSON update and sending it to Kafka (to the maxwell  topic).


The second component is a Spring-Kafka application called Listener . This one will be consuming from the topic that Maxwell is writing.


We will use ChangeEvent to map the message arriving from Kafka (Spring-Kafka has an automatic converter to do that in  KafkaListener, it will detect the JSON format and try to map it to the object we're adding as a parameter in the listen method). It will only care for ChangeEvent  messages, when one arrives it will check if the table name for that event is currency , if it is it will check then if it's an insert, an update or a delete event.
For the first two events it will then calculate the new value for this currency and send it to a new topic, this time currency. If it is a delete event it will send a tombstone record to the topic.



The third component is called currencyreader . It will consume the messages from the currency topic and add it to a ConcurrentHashMap to manage the currencies in a cache locally.
In the old way of doing things, you would have to query the database in order to obtain the latest currency values whereas in this way we're pushing those event changes in the currencies into the in-memory cache via Kafka as soon as they are created, one by one.
Afterwards, you can add a component to process the history of this values - this is where Elasticsearch can be handy - and create a nice dashboard.



Now let's try these components!
First we start MySQL:

./mysql -h localhost -u root -p

There were some authentication issues with MySQL 8 (check this link more details),
but basically what you need to do:

CREATE USER 'maxwell'@'localhost' IDENTIFIED WITH 'mysql_native_password' BY 'maxwell';
GRANT ALL PRIVILEGES ON maxwell.* TO 'maxwell'@'localhost';
GRANT REPLICATION SLAVE ON *.* TO 'maxwell'@'localhost';


Time to start Kafka:

./confluent local start

And then Zendesk Maxwell:

./maxwell --user maxwell --password maxwell --producer=kafka --kafka.bootstrap.servers=localhost:9092 --kafka_topic=Maxwell

And we start KSQL (remember this is not needed, this is just to check the incoming messages in Kafka).
LOG_DIR=./ksql_logs ksql
Once you start KSQL, you can see the messages with:
print 'maxwell';

Start the 3 Spring boot apps. Remember to change the port on at least two of them.

Now we post our first event, the euro value
curl -X POST http://localhost:8091/update/EUR/803

And then we delete it
curl -X POST http://localhost:8091/delete/EUR

More currencies
curl -X POST http://localhost:8091/update/GBP/782
curl -X POST http://localhost:8091/update/USD/1000

This is what we see in KSQL (which in turns checks on the Kafka topics):

ksql> print 'maxwell';
Format:JSON
{"ROWTIME":1580081951255,"ROWKEY":"{\"database\":\"currencyUpd\",\"table\":\"currency\",\"pk.symbol\":\"EUR\"}","database":"currencyUpd","table":"currency","type":"insert","ts":1580081951,"xid":1456,"commit":true,"data":{"symbol":"EUR","rate":803}}
{"ROWTIME":1580082015878,"ROWKEY":"{\"database\":\"currencyUpd\",\"table\":\"currency\",\"pk.symbol\":\"EUR\"}","database":"currencyUpd","table":"currency","type":"delete","ts":1580082015,"xid":1549,"commit":true,"data":{"symbol":"EUR","rate":803}}
{"ROWTIME":1580082376687,"ROWKEY":"{\"database\":\"currencyUpd\",\"table\":\"currency\",\"pk.symbol\":\"GBP\"}","database":"currencyUpd","table":"currency","type":"insert","ts":1580082376,"xid":2025,"commit":true,"data":{"symbol":"GBP","rate":782}}
{"ROWTIME":1580082397968,"ROWKEY":"{\"database\":\"currencyUpd\",\"table\":\"currency\",\"pk.symbol\":\"USD\"}","database":"currencyUpd","table":"currency","type":"update","ts":1580082397,"xid":2060,"commit":true,"data":{"symbol":"USD","rate":1000},"old":{"rate":8000}}

and this is the log in the currencyreader

 INFO 4211 --- [currency1-0-C-1] c.e.c.CurrencyreaderApplication          : Currencies now: {EUR=0.803}
 INFO 4211 --- [currency1-0-C-1] c.e.c.CurrencyreaderApplication          : Currencies now: {}
INFO 4211 --- [currency1-0-C-1] c.e.c.CurrencyreaderApplication          : Currencies now: {GBP=0.782}
INFO 4211 --- [currency1-0-C-1] c.e.c.CurrencyreaderApplication          : Currencies now: {GBP=0.782, USD=1.0}


As you can see it's a very powerful way to improve performance as these database change events are pushed to the applications interested in these events.
Again, if you have any doubts check my GitHub for the configuration for the three boot apps.

I hope you've enjoyed this post.


Comments

Popular Posts