Welcome to the Reactive Revolution














Yes, this is a revolution. Nothing less.
What's all this fuzz about? What does reactive mean?
Let's put some light to the "reactive" term which has now become an overloaded word.
According to O'Reilly, when people talk about "reactive" in the context of software development and design, they generally mean these two things:

  • Reactive systems: an architectural style that allows multiple individual applications to work as a single unit, reacting to its surroundings, while remaining aware of each other.
  • Reactive programming: is a subset of asynchronous programming and a paradigm where the availability of new information drives the logic forward rather than having control flow driven by a thread-of-execution.

It is possible to write a single application in a reactive style (ie. using reactive programming); however that's just one piece of the puzzle. All the components of that system should be reactive, to be qualified as one.
In this article we will be referring to both terms: reactive programming and reactive systems.


Now, to be more precise, reactive programming refers to models that are built around reacting to change - network components reacting to I/O events, U/I controllers reacting to mouse events and others.
In the Java world particularly, there was a need of a non-blocking web stack that could handle concurrency with a small number of threads and scale with fewer hardware resources.

Reactive programming is basically programming with asynchronous data streams. These are going to be the center for your application. Events, messages, calls, and even failures are going to be transported by a data stream. You observe these streams and react when a value is emitted.
Programming in a reactive way is going to change the way you design and write your code.

These are some basic features you will get when you do Reactive programming:
  • Non blocking and Concurrent
  • Hyper efficient applications
  • Functional programming
  • Resilient & Interruptible. 
  • We can use Asynchronous or Synchronous threads. 
Here's a very interesting presentation The Value of Reactive Systems : it explains each one of these features explaining why you should change from imperative to reactive.

This post will be based on Project reactor which is a reactive library based on the Reactive Streams specification for building non-blocking applications. Reactive Streams is an initiative (a joint collaboration between engineers from Netflix, Pivotal, Red Hat, Twitter and many others) to provide a standard for asynchronous stream processing with non-blocking backpressure.
Say what?

What is backpressure?












 






In this model the requester asks for messages and the responder gives back a stream of data. If the requester receives too much information, it sends a signal back saying "this is too much, just gives me ten messages", and the responder can adjust the stream of information by producing and sending only the 10 messages requested. 
To do this we use RSocket which is a network protocol that uses reactive features across the network. RSocket is a bi-directional, multiplexed, message-based, binary protocol based on Reactive Streams back pressure. Out of the box provides four interaction models:

  • Request-Response (very similar to the HHTP equivalent)
  • Fire-and-Forget - the requester sends a message but it does not wait for anything back.
  • Request-Stream - requester sends a message and get a stream of messages back. 
  • Request-Channel - a connection is established and from that point on either party  on either side of the connection can send a request and receive responses back. 

If you think better, once you have the possibility to use a backpressure model you don't need, for example, the circuit-breaker model.
RSocket can establish single, long-lived connections to send several messages, no need to create a new connection for each request.
It is important to notice that RSocket is not a replacement for HTTP. It all depends on the type of solution you're working on. For example, HTTP is much better in security while RSocket works pretty well inside the firewall.

Having in mind this protocol we can now see how we can connect all applications in a reactive way (the reactive system we were referring at the beginning of this article).
We can build not only a reactive Microservice but, most important, each component of the stack we need to design our solution will be reactive, ie. non-blocking.



Projet Reactor

Reactive Relational Database Connectivity (PostgreSQL, H2,  SAP, SQL Server, MySQL, Hana)





RSocket.io
Spring WebFlux




                

We can build a Microservice using Spring-Webflux as the framework, which comes with Netty server by default. We can send messages from requester to responder using RSocket. On top of that we can use a reactive database either relational or non-relational. Each of these components are reactive, isn't that cool ?


Now let's dive into our example, which will shed some light in this topic.
We will create a simple SpringBoot RSocket server and a client. If you want to test this code you need to install PostgreSQL.
As usual, you can find the code of this example on my GitHub .

The SpringBoot app will expose a service which will provide a list of airports.




A list of airports is included in Json format. Also the scripts to load the data into the Airport table.
For this server app we will use Spring WebFlux, PostgreSQL and RSocket.
Here are the most important dependencies



You can see the starters for RSocket, WebFlux and R2DBC, including the driver for PostgreSQL. As usual, you can start your own on  Spring initializer .
The  AirportsConfig  class has the @EnableR2dbcRepositories  annotation to tell Spring to look for and enable the R2DBC repositories that it will find in his path.



This is our only Repository, it extends from  ReactiveCrudRepository  and has only one method findAirportByCode  which returns a Mono of an Airport. The rest of the methods already comes from the parent class. 



Our service will have two methods: one to find a a particular Airport by his code, and another one which will return a Flux  of Airports. Each one of the services will call the respective method in the repository which in time calls the database. Remember: there is no blocking code when we're calling all of our components! 




Finally the controller:


This might look as a regular controller but this time is for message mapping. We're exposing two services, one that will return a specific Airport (mapping the specific code we receive into the method that will call the service), and another which will return all the Airports on the database. 

Also in your properties file we're telling SpringBoot to start the RSocket server in port 9898 using the TCP transport by default (remember we already defined the Rsocket-starter as a dependency in our pom file).


Now if you want to test your RSocket app you'll need this Rsocket client which aims to be a curl for RSocket (thanks to Toshiaki Maki for this amazing client). 
Remember: we're no longer in the HTTP world here, we cannot do GETs and POSTs, this is a different paradigm. 
Let's start the app (don't forget to start before your DB):


 
SpringBoot is telling us in the logs that it has started Netty and RSocket in their respective ports.
Let's call our Server app with the RSocket client, like this

java -jar rsc-0.5.0.jar --request  --route find.airport.YUL   tcp://localhost:9898 


We're calling the server app with --request mode, telling the --route we're looking for. And then we receive 

{"code":"YUL","name":"Montreal / Pierre Elliott Trudeau International Airport","latitude":null,"longitude":null}


Let's try now a stream of Airports:

java -jar rsc-0.5.0.jar --stream  --route list.all.airports   tcp://localhost:9898 --delayElements 300


We're telling now the client to call the RSocket server with a --stream option, with the parameter of the route that points to our controller method. Also take a look at the --delayElements option: this is how we use the backpressure we mentioned before, this is how we tell the server "wait!! send me the data more slowly".

And then we receive a very long list....

{"code":"UTK","name":"Utirik Airport","latitude":null,"longitude":null}

{"code":"WKK","name":"Aleknagik / New Airport","latitude":null,"longitude":null}

{"code":"FOB","name":"Fort Bragg Airport","latitude":null,"longitude":null}

{"code":"ABP","name":"Atkamba Airport","latitude":null,"longitude":null}

{"code":"ADC","name":"Andakombe Airport","latitude":null,"longitude":null}

{"code":"TJP","name":"Areopuerto Internacional Michael Gonzalez","latitude":null,"longitude":null}

{"code":"AEK","name":"Aseki Airport","latitude":null,"longitude":null}

{"code":"OLR","name":"Salerno Landing Zone Airport","latitude":null,"longitude":null}

{"code":"AFR","name":"Afore Airstrip","latitude":null,"longitude":null}

{"code":"AFT","name":"Afutara Aerodrome","latitude":null,"longitude":null}

{"code":"ATD","name":"Uru Harbour Airport","latitude":null,"longitude":null}

{"code":"VEV","name":"Barakoma Airport","latitude":null,"longitude":null}

{"code":"GEF","name":"Geva Airport","latitude":null,"longitude":null}

{"code":"AGG","name":"Angoram Airport","latitude":null,"longitude":null}

{"code":"AKS","name":"Auki Airport","latitude":null,"longitude":null}

{"code":"BAS","name":"Ballalae Airport","latitude":null,"longitude":null}

{"code":"FRE","name":"Fera/Maringe Airport","latitude":null,"longitude":null}

{"code":"HIR","name":"Honiara International Airport","latitude":null,"longitude":null}

{"code":"MBU","name":"Babanakira Airport","latitude":null,"longitude":null}

{"code":"IRA","name":"Ngorangora Airport","latitude":null,"longitude":null}

{"code":"SCZ","name":"Santa Cruz/Graciosa Bay/Luova Airport","latitude":null,"longitude":null}

{"code":"MUA","name":"Munda Airport","latitude":null,"longitude":null}

{"code":"GZO","name":"Nusatupe Airport","latitude":null,"longitude":null}

{"code":"MNY","name":"Mono Airport","latitude":null,"longitude":null}

{"code":"PRS","name":"Parasi Airport","latitude":null,"longitude":null}

{"code":"RNL","name":"Rennell/Tingoa Airport","latitude":null,"longitude":null}

{"code":"EGM","name":"Sege Airport","latitude":null,"longitude":null}

{"code":"RUS","name":"Marau Airport","latitude":null,"longitude":null}

{"code":"VAO","name":"Suavanao Airport","latitude":null,"longitude":null}

{"code":"AGK","name":"Kagua Airport","latitude":null,"longitude":null}

{"code":"KGE","name":"Kagau Island Airport","latitude":null,"longitude":null}

{"code":"AGL","name":"Wanigela Airport","latitude":null,"longitude":null}

{"code":"RIN","name":"Ringi Cove Airport","latitude":null,"longitude":null}

{"code":"RBV","name":"Ramata Airport","latitude":null,"longitude":null}

{"code":"AHY","name":"Ambatolhy Airport","latitude":null,"longitude":null}



I just cut the process since there are thousands of airports in the list...

Now if you want to create a client app which will call this RSocket server app, all you need to do is:


1 - Define an RSocketRequester on your service (or controller). This is a thin wrapper around a sending RSocket with a fluent API accepting and returning higher level Objects for input and for output.

2 - You will create an instance of this requester with the builder, defining the media type as an APPLICATION_CBOR and the address of the server.

3 - This requester will be used each time you need to call the server. For example to get an specific Airport, you use this requester, defining the route, and adding the variable if needed, then using the  retrieveMono  method.

4 - You will use the same request for a stream, this time using the retrieveFlux method.    


Finally when you're building your apps you will need to add a dependency to BlockHound: this is a component that can intercept blocking calls if they are performed from threads marked as "non-blocking operations only". If and when this happens, an error will be thrown. 



To summarize: in this post you learned about the principles of Reactive programming and Reactive systems. You learned about the RSocket protocol and how to build your SpringBoot app to test this principles. 
The Reactive topic is huge. This is just the beginning.  

I hope you enjoyed this post.










Comments

Popular Posts