Looking for an Expert Development Team? Take two weeks Trial! Try Now or Call: +91.9824127020

RSocket Communication Protocol for Reactive Apps

RSocket is a binary, point-to-point communication protocol that support Reactive Streams. It provides an alternative to other protocols like HTTP.

It is bi-directional, multiplexed, message-based on Reactive back pressure.

It is mainly developed to work with Reactive style applications, which are non-blocking applications. The meaning of Reactive Pressure is Publisher will not send the message to subscriber, unless subscriber will send the message, indicating it is ready and how to many messages that it can handle, then publisher will send the messages to subscriber.

One of the drawbacks of HTTP Protocol is its one directional, as our data is modified there is no way to communicate to the client that the data is updated, at the client end we need to handle all retry logic, timeouts, circuit breakers etc. If the application is built using Reactive Architecture can easily scale, avoid the above HTTP failures.

Difference between RSocket and HTTP Protocol

  1. Fire and Forget: When the response is not needed, such as non-critical event logging, sending email, etc.
  2. Request/Response: When we send the Request, and receive the response like HTTP, it also adds advantages over HTTP, through multiplexing and asynchronous.
  3. Request/Stream: Similar to Request/Response, returning the collection, the collection will stream for next results until it completes. It will be used in large data transfer between applications, like by giving the bank account number, it will respond with all bank account transactions.
  4. Channel: a bi-directional stream of messages allowing for arbitrary interaction models.

Rscocket will also supports resumption, using the previous connection id, if the stream in memory we can resume the consumption of the stream.

This is particularly useful for mobile‹–›server communication when network connections drop, switch, and reconnect frequently.

Maven Dependencies:

Create a Spring boot application from https://start.spring.io/ and select Lombok and ReactiveWeb as dependencies.

And manually add the below dependencies related to RSocket.

<dependency>
                                    <groupId>io.rsocket</groupId>
                                    <artifactId>rsocket-transport-netty</artifactId>
                                    <version>${rsocket.version}</version>
                                </dependency>

And add the below property in properties section:

<rsocket.version>0.11.14</rsocket.version>

As we know Netty is the popular server for Reactive applications

1. Request/Stream:

Request/stream model, subscriber will send a request, and publisher will start sending the response for un-limited time.

The requestStream method returns a Flux stream.

Create the below classes in our application:

@Component class Producer implements Ordered, ApplicationListener < ApplicationReadyEvent > 
{
privatestaticfinal Logger log = LogManager.getLogger(Producer.class);@Override publicintgetOrder() 
{ returnOrdered.HIGHEST_PRECEDENCE; } 
Flux < String > notifications(String name) 
{ returnFlux.fromStream(Stream.generate(() - > "Hello " + name + "@" + Instant.now().toString())).delayElements(Duration.ofSeconds(2)); } @Override publicvoidonApplicationEvent(ApplicationReadyEventevent) 
{
finalSocketAcceptorsocketAcceptor = (connectionSetupPayload, sendingSocket) - > 
{
finalAbstractRSocketabstractRSocket = newAbstractRSocket() 
{
@Override public Flux < Payload > requestStream(Payload payload) 
{
final String name = payload.getDataUtf8();
log.info("got request from consumer with payload: " + name);
return notifications(name).map(DefaultPayload::create);
}
};returnMono.just(abstractRSocket);
};
finalTcpServerTransporttransport = TcpServerTransport.create(7000);
RSocketFactory.receive().acceptor(socketAcceptor).transport(transport).start().block();
}
}
@Component class Consumer implements Ordered, ApplicationListener < ApplicationReadyEvent > 
{
privatestaticfinal Logger log = LogManager.getLogger(Consumer.class);@Override publicintgetOrder() 
{ returnOrdered.LOWEST_PRECEDENCE; } @Override publicvoidonApplicationEvent(ApplicationReadyEventevent) 
{
finalTcpClientTransporttransport = TcpClientTransport.create(7000);
RSocketFactory.connect().transport(transport).start().flatMapMany(sender - > sender.requestStream(DefaultPayload.create("sravan"))).map(Payload::getDataUtf8).subscribe(result - > log.info(" consumed new result " + result));
}
}

SocketAcceptor is one of the class provided by RSocket to accept the requests from Consumer. AbstractRsocket is the abstract implementation of RSocket. As we are implementing requestStream will implement the same in this class. And notifications is the class which produces stream of the string with 2 seconds delay. For each 2 seconds, the publisher will send the messages to Rsocket server, and consumer is connecting the same server which is running on port 7000, to consume the message. We used Order interface, so that Publisher will create RSocket server before Consumer connect to the RSocket server.

2. Request/Channel:

The channel provides bi-directional communication, messages will flow continuously from consumer to publisher then publisher to consumer and it is going on…

Lets the update the ApplicationReadyEvent handler method with below:

Publisher Event Handler method:

publicvoidonApplicationEvent(ApplicationReadyEventevent) 
{
finalSocketAcceptorsocketAcceptor = (connectionSetupPayload, sendingSocket) - > 
{ finalAbstractRSocketabstractRSocket = newAbstractRSocket() 
{ @Override public Flux < Payload > requestChannel(Publisher < Payload > payloads) 
{ returnFlux.from(payloads).map(Payload::getDataUtf8).doOnNext(str - > log.info("received " + str + " in " + this.getClass().getName())).map(RequestResponse::reply).map(DefaultPayload::create); } };
returnMono.just(abstractRSocket); };
finalTcpServerTransporttransport = TcpServerTransport.create(7000);
RSocketFactory.receive().acceptor(socketAcceptor).transport(transport).start().block(); 
}

Consumer EventHandler method:

@Override
                                        publicvoidonApplicationEvent(ApplicationReadyEventevent) {
                                        log.info("started " + this.getClass().getName());
                                        finalTcpClientTransporttransport = TcpClientTransport.create(7000);
                                        RSocketFactory.connect().transport(transport).start()
                                        .flatMapMany(socket ->socket
                                        .requestChannel(Flux.interval(Duration.ofSeconds(2)).map(l ->DefaultPayload.create("ping")))
                                        .map(payload ->payload.getDataUtf8())
                                        .doOnNext(string ->log.info("Received " + string + " in class " + this.getClass().getName()))
                                        .take(10).doFinally(signal ->socket.dispose()))
                                        .then().subscribe(result ->log.info(" consumed new result " + result));
                                        }

And static reply method as follows:

static String reply(String message) {
                                        if (message.equalsIgnoreCase("ping")) {
                                        return"pong";
                                        } elseif (message.equalsIgnoreCase("pong")) {
                                        return"ping";
                                        } else {
                                        thrownewIllegalArgumentException("input must either 'ping' or 'pong'");
                                        }
                                        }

For each 2 seconds consumer will send the “ping” to publisher and publisher will respond with “pong” message.

We can observe the same in log:

2019-02-06 19:39:45.829  INFO 21268 --- [actor-tcp-nio-6] org.sravan.rsocket.channel.Pong          : received ping in org.sravan.rsocket.channel.Pong$1
                                        2019-02-06 19:39:45.832  INFO 21268 --- [actor-tcp-nio-4] org.sravan.rsocket.channel.Ping          : Received pong in class org.sravan.rsocket.channel.Ping
                                        2019-02-06 19:39:47.807  INFO 21268 --- [actor-tcp-nio-6] org.sravan.rsocket.channel.Pong          : received ping in org.sravan.rsocket.channel.Pong$1
                                        2019-02-06 19:39:47.812  INFO 21268 --- [actor-tcp-nio-4] org.sravan.rsocket.channel.Ping          : Received pong in class org.sravan.rsocket.channel.Ping

Request/Response: the consumer will request the publisher to send the message, and client will block the thread unless it receives the thread from publisher.

Fire-and-Forget: the client will ping the server and it will not receive any response from publisher, if we want to do things independently with another thread on Publisher then we can use this method.

Conclusion

RSocket is one of the protocols on top HTTP, for faster communication in an Asynchronous way, and it also provides different ways to communicate the data from consumer to publisher. It mainly used Reactive applications.

To discuss more on this blog please contact to hire Java Developers.

Post Tags

#HTTP Protocol  #RSocket

Aegis Infoways

Aegis Infoways is a leading software development company that provides a wide range of business solutions like software development, data warehouse, or web development for specific business needs.

Related Posts

10 Top B2B E-commerce Websites to Find Buyers...

10 Top B2B E-commerce Websites to Find Buyers...

Businesses must have B2B eCommerce platforms to expand operations to other nations, build relationships with dependable suppliers, and source goods more efficiently. As we approach 2025, these platforms will facilitate seamless global trade, whether for small...

CompletableFuture in Java

CompletableFuture in Java

Technology CompletableFuture is used for asynchronous programming in Java. Asynchronous Programming means running tasks in a separate thread, other than the main thread, and notifying the execution progress like completion or failure. It helps improve application...

10 Eclipse Java Plug-ins You Can’t Do Witho...

10 Eclipse Java Plug-ins You Can’t Do Witho...

Eclipse is the most widely used integrated development environment for Java. Used to develop the Java applications, Eclipse is also often used to develop applications. Its extensive plug-ins give it the flexibility to be customized. This open-source software has...

×