Thrift: Bidirectional Async RPC
Source on GitHub: http://github.com/JoelPM/BidiThrift
A reader posted to the thrift-user mailing list wondering if it was possible for a Thrift RPC server to send messages to the client. The responses indicated that this could be accomplished by polling the server for updates or hosting another Thrift server in the client that could receive RPCs from the server (requires opening another port on the client and handling firewall issues). I responded with a technique I'd used for accomplishing something similar and the responses made me think maybe this would be worth writing up and posting some example code.
Here's my email to the mailing list that describes what I'm doing:
I think I've done something similar to what you're trying to do, and as long as you can commit to using only async messages it's possible to pull it off without having to start a server on the client to accept RPCs from the server.
When your RPC is marked as async the server doesn't send a response and the client doesn't try to read one. So, if all your RPC calls from the client to the server are async you have effectively freed up the inbound half of the socket connection. That means that you can use it for receiving async messages from the server - the only catch is that you have to start a new thread to read and dispatch the incoming async RPC calls.
In a typical Thrift RPC system you'd create a MyService.Processor on your server and a MyService.Client on your client. To do bidirectional async message sending you'll need to go a step further and create a MyService.Client on your server for each client that connects (this can be accomplished by providing your own TProcessorFactory) and then on each client you create a MyService.Processor. (This assumes that you've gone with a generic MyService definition like you described above that has a bunch of optional messages, another option would be to define separate service definitions for the client and server.) With two clients connected the objects in existence would look something like this:
MyService.Processor mainProcessor - handles incoming async RPCs
MyService.Client clientA - used to send outgoing async RPCs to ClientA
MyService.Client clientB - used to send outgoing async RPCs to ClientB
MyService.Client - used to send messages to Server
MyService.Processor clientProcessor - used (by a separate thread) to process incoming async RPCs
MyService.Client - used to send messages to Server MyService.Processor clientProcessor - used (by a separate thread) to process incoming async RPCs
Hopefully that explains the concept. If you need example code I can try and pull something together (it will be in Java). The nice thing about this method is that you don't have to establish two connections, so you can get around the firewall issues others have mentioned. I've been using this method on a service in production and haven't had any problems. When you have a separate thread in your client running a Processor you're basically blocking on a read, waiting for a message from the server. The benefit of this is that you're notified immediately when the server shuts down instead of having to wait until you send a message and then finding out that the TCP connection was reset.Cheers,
Here's an example app that sends messages between clients connected to a server. It's similar to a chat app.
First, define your Thrift objects and the service. Our object and service are extremely simple:
In this case the service is generic enough that both the client and server will use the same service definition. We could also create a ClientMessageService and a ServerMesageService if we needed different functionality.
On the server side, when a client connection is accepted we want to create a MessageService.Client object that we'll use to send messages back to the client. We can accomplish this by creating our own TProcessorFactory and using the getProcessor method as an opportunity to get access to the transport being used between the client and server:
As you can see above, we're using the same MessageDistributor for each new processor that we create. Before we create and return the processor we create a new client and add it to the list of clients that the MessageDistributor is aware of. The server is pretty simple and you can take a look at the code to see how the MessageDistributor uses the clients to send messages back.
On the client side things are a little more complex because we have to create a separate thread to read incoming messages (this is handled by the TThreadPoolServer on the server side). Here's the class that reads incoming messages:
It extends a utility class called ConnectionRequiredRunnable that provides utility methods for handling server disconnects and reconnects, but on the whole it's pretty simple because we pass in a separate class that actually handles the incoming messages. We also create a MessageService.Client, but we wrap it in a separate thread and use a blocking queue so that other components in the system wanting to send a message can do so very quickly - or at least, have the message handed off for delivery extremely quickly.
Here's the class that handles our message sending:
This class also extends ConnectionRequiredRunnable since it can't send messages without a connection. Here's the main method of the Client that establishes the connection to the server:
It actually looks pretty simple since all the different pieces are organized in separate classes. The ConnectionStatusMonitor class is responsible for opening the actual connection and notifying the MessageSender and MessageReceiver when the connection has been established, at which point they'll start sending and receiving messages. If the server dies both of those processes will stop and wait until a connection has been re-established (a task the ConnectionStatusMonitor is responsible for). Here's sample output from the server:
And here's output from client1:
And here's from client2:
You can see that client1 and client2 paused when the server was terminated. Had the server restarted the clients would have reconnected and begun sending messages again.
The source is built using Maven and requires that you've installed libthrift.jar in your local maven repo (see the README for details). I'm also including a tgz with the compiled jar files for those who can't build the source.Source: BidiMessages.tgz Jars: BidiMessagesJars.tgz