Support RSocket
Skip to main content

Websocket

Up until now the transport used was TCP. Let's change the transport to be Secure Websocket. For this purpose we will use and aiohttp server and client. You may ensure the package is installed using the rsocket pip extrac (aiohttp):

pip3 install rsocket[aiohttp]

See resulting code on GitHub

Server side

We will make changes to the server startup. The RSocket handler itself does not need to be modified. Belows is the new server startup in the run_server method:

from aiohttp import web

from rsocket.rsocket_server import RSocketServer
from rsocket.transports.aiohttp_websocket import TransportAioHttpWebsocket

def run_server():
def websocket_handler_factory(**kwargs):
async def websocket_handler(request):
websocket = web.WebSocketResponse()
await websocket.prepare(request)

transport = TransportAioHttpWebsocket(websocket)
RSocketServer(transport, **kwargs)
await transport.handle_incoming_ws_messages()

return websocket

return websocket_handler

app = web.Application()
app.add_routes([web.get('/chat', websocket_handler_factory(handler_factory=handler_factory,
fragment_size_bytes=1_000_000))])

web.run_app(app, port=6565)

if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
run_server()

Lines 20-24 set up the web server and attach the application to the /chat url. This is done using the websocket_handler_factory helper defined in Lines 6-18.

Lines 9-10 set up the websocket, and lines 12-13 set up the TransportAioHttpWebsocket transport adapter for it, and pass it to an RSocketServer. Line 14 listens for incoming websocket requests.

Since aiohttp controls the event loop, Lines 26-28 are modified to run the run_server method directly.

Client side

Same as the server, only the connection code needs to be modified.

We will define a new connect() method to simplify the async with hierarchy in the code:

from contextlib import asynccontextmanager

import aiohttp
from rsocket.extensions.mimetypes import WellKnownMimeTypes
from rsocket.helpers import single_transport_provider
from rsocket.rsocket_client import RSocketClient
from rsocket.transports.aiohttp_websocket import TransportAioHttpClient

@asynccontextmanager
async def connect():
async with aiohttp.ClientSession() as session:
async with session.ws_connect('ws://localhost:6565/chat') as websocket:
async with RSocketClient(
single_transport_provider(TransportAioHttpClient(websocket=websocket)),
metadata_encoding=WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA,
fragment_size_bytes=1_000_000) as client:
yield client

Lines 11-12 set up the websocket itself, while Lines 13-16 set up the client using the same settings as the previous tutorial step, this time using the TransportAioHttpClient transport. Note that the server and the client use different transport classes.

Line 7 yields the client. Make sure to annotate the method with asynccontextmanager as shown in Line 9

Test the new functionality

Finally, we will update the main() method to use the new client:

async def main():
async with connect() as client1:
async with connect() as client2:
user1 = ChatClient(client1)
user2 = ChatClient(client2)

await user1.login('user1')
await user2.login('user2')

...