Support RSocket
Skip to main content

Getting started

Application structure

In this step we will set up a minimal code required for both the server and the client.

The application will be composed of 3 modules:

  • shared.py - code shared between client and server
  • server.py - server side
  • client.py - client side and example usage

We will start with server.py and client.py. shared.py will be empty for now.

See resulting code on GitHub

Server side

We will set up a simple server to accept connections and respond to the client sending the user's name. The server will listen on TCP port 6565.

Below is the code for the server.py module:

import asyncio
import logging

from rsocket.frame_helpers import ensure_bytes
from rsocket.helpers import create_future, utf8_decode
from rsocket.local_typing import Awaitable
from rsocket.payload import Payload
from rsocket.request_handler import BaseRequestHandler
from rsocket.rsocket_server import RSocketServer
from rsocket.transports.tcp import TransportTCP

class Handler(BaseRequestHandler):
async def request_response(self, payload: Payload) -> Awaitable[Payload]:
username = utf8_decode(payload.data)
return create_future(Payload(ensure_bytes(f'Welcome to chat, {username}')))

async def run_server():
def session(*connection):
RSocketServer(TransportTCP(*connection), handler_factory=Handler)

async with await asyncio.start_server(session, 'localhost', 6565) as server:
await server.serve_forever()

if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
asyncio.run(run_server())

Lines 21-22 start a TCP server listening on localhost:6565 using asyncio.

Lines 18-19 define that the response to this session will be instantiating an RSocketServer.

The 2 parameters passed are:

  • transport : An instance of a supported connection method. In this case it is an adapter over the TCP connection.
  • handler_factory: A callable which returns an RSocketHandler instance. This will be used to respond to the client's requests.

There is no need to specify anything else here since the RSocketServer starts internal tasks which listen for requests, and responds accordingly. The session will close when the connection is lost.

In the example, the handler factory (Line 12) is a subclass of BaseRequestHandler. In this class, we can implement any of the methods which handle the 4 RSocket request types:

  • request_response
  • request_stream
  • request_channel
  • fire_and_forget

Check the BaseRequestHandler for other methods which can be implemented.

Lines 13-15 implement the request_response handler, which welcomes the user. This method receives a single argument containing the payload. It is an instance of a Payload class which contains the data and metadata of the request. The username is taken from the data property of the Payload. The data property's type is always bytes. In our case it is a UTF8 encoded string, so we will use the utf8_decode helper to decode it.

A response is created using helper methods:

  • create_future : This creates a future which contains the response data.
  • ensure_bytes : All values in a response must be of type bytes. This method encodes string to bytes and assumes UTF8 for the input.

Next we will look at a simple client which connects to this server.

Client side

The client will connect to the server, send a single response request and disconnect.

Below is the code for the client.py module:

import asyncio
import logging

from rsocket.helpers import single_transport_provider, utf8_decode
from rsocket.payload import Payload
from rsocket.rsocket_client import RSocketClient
from rsocket.transports.tcp import TransportTCP

async def main():
connection = await asyncio.open_connection('localhost', 6565)

async with RSocketClient(single_transport_provider(TransportTCP(*connection))) as client:
response = await client.request_response(Payload(data=b'George'))

print(f"Server response: {utf8_decode(response.data)}")

if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
asyncio.run(main())

Line 10 instantiates an asyncio TCP connection to localhost on port 6565.

Line 12 instantiates an RSocketClient using an async with statement, to ensure the client closes the TCP connection when done.

Line 13 sends the request with a Payload containing the username as the data. The data value must be of type bytes.

The response is a Payload instance, the data of which is printed (Line 15).