File upload/download
In this section we will add very basic file upload/download functionality. All files will be stored in memory, and downloadable by all users.
See resulting code on GitHub
Shared
First, define a mimetype which will represent file names in the payloads. This will be used by both server and client, so place it in the shared module:
chat_filename_mimetype = b'chat/file-name'
Server side
Data-classes
Next, we need a place to store the files in memory. Add a dictionary to the ChatData
class to store the files.
The keys will be the file names, and the values the file content.
from dataclasses import dataclass, field
from typing import Dict
@dataclass(frozen=True)
class ChatData:
...
files: Dict[str, bytes] = field(default_factory=dict)
Helper methods
Next, define a helper method which extracts the filename from the upload/download payload:
from shared import chat_filename_mimetype
from rsocket.extensions.composite_metadata import CompositeMetadata
from rsocket.helpers import utf8_decode
def get_file_name(composite_metadata: CompositeMetadata):
return utf8_decode(composite_metadata.find_by_mimetype(chat_filename_mimetype)[0].content)
This helper uses the find_by_mimetype
method of CompositeMetadata
to get a list of metadata items with the
specified mimetype.
Endpoints
Next, register the request-response endpoints for uploading and downloading files, and for retrieving a list of available files:
from typing import Awaitable
from shared import chat_filename_mimetype
from rsocket.extensions.composite_metadata import CompositeMetadata
from rsocket.extensions.helpers import composite, metadata_item
from rsocket.frame_helpers import ensure_bytes
from rsocket.helpers import create_response
from rsocket.payload import Payload
from rsocket.routing.request_router import RequestRouter
from rsocket.streams.stream_from_generator import StreamFromGenerator
class ChatUserSession:
def router_factory(self):
router = RequestRouter(payload_mapper=decode_payload)
...
@router.response('file.upload')
async def upload_file(payload: Payload, composite_metadata: CompositeMetadata) -> Awaitable[Payload]:
chat_data.files[get_file_name(composite_metadata)] = payload.data
return create_response()
@router.response('file.download')
async def download_file(composite_metadata: CompositeMetadata) -> Awaitable[Payload]:
file_name = get_file_name(composite_metadata)
return create_response(chat_data.files[file_name],
composite(metadata_item(ensure_bytes(file_name), chat_filename_mimetype)))
@router.stream('files')
async def get_file_names() -> Publisher:
count = len(chat_data.files)
generator = ((Payload(ensure_bytes(file_name)), index == count) for (index, file_name) in
enumerate(chat_data.files.keys(), 1))
return StreamFromGenerator(lambda: generator)
The upload_file
and download_file
methods (Lines 20-29) extract the filename from the metadata using the helper method we created,
and set and get the file content from the chat_data
storage respectively.
In this section we introduce the second argument which can be passed to routed endpoints. If the session is set up to use
composite metadata, the composite_metadata
parameter will contain a parsed structure of the metadata in the request payload.
Line 36 uses the StreamFromGenerator
helper which creates a stream Publisher
/Subscription
from a generator factory.
The generator must return a tuple of two values for each iteration:
- Payload instance
- boolean value denoting if it is the last element in the generator. The argument for the helper class is a method which returns a generator, not the generator itself.
Large file support
In the download_file
method (Line 24), even though the frame size limit is 16MB, larger files can be downloaded.
To allow this, fragmentation must be enabled. This is done by adding the fragment_size_bytes
argument to the RSocketServer
instantiation:
from rsocket.rsocket_server import RSocketServer
from rsocket.transports.tcp import TransportTCP
def session(*connection):
RSocketServer(TransportTCP(*connection),
handler_factory=handler_factory,
fragment_size_bytes=1_000_000)
Client side
Methods
On the client side, we will add 3 methods to access the new server functionality:
upload
download
list_files
from typing import List
from rsocket.awaitable.awaitable_rsocket import AwaitableRSocket
from rsocket.extensions.helpers import composite, route, metadata_item
from rsocket.frame_helpers import ensure_bytes
from rsocket.helpers import utf8_decode
from rsocket.payload import Payload
from shared import chat_filename_mimetype
class ChatClient:
async def upload(self, file_name: str, content: bytes):
await self._rsocket.request_response(Payload(content, composite(
route('file.upload'),
metadata_item(ensure_bytes(file_name), chat_filename_mimetype)
)))
async def download(self, file_name: str):
return await self._rsocket.request_response(Payload(
metadata=composite(
route('file.download'),
metadata_item(ensure_bytes(file_name), chat_filename_mimetype)
)))
async def list_files(self) -> List[str]:
request = Payload(metadata=composite(route('files')))
response = await AwaitableRSocket(self._rsocket).request_stream(request)
return list(map(lambda _: utf8_decode(_.data), response))
Lines 13-17 define the upload method. the Payload
of the request-response consists of a body with the file's contents,
and metadata which contains routing and the filename. To specify the filename a custom mimetype was used chat/file-name.
This mime type was used to create a metadata item using the metadata_item
method. the composite
method was used to combine
the two metadata items to the complete metadata of the payload.
Lines 19-24 define the download method. It is similar to the upload method, except for the absence of the payload data, and a different route: 'file.download'.
Lines 26-32 defines the list_files method. Same as the list_channels
method in the previous section,
it uses the request-stream 'files' endpoint to get a list of files.
Large file support
Same as on the server size, fragmentation must be enabled to allow uploading files larger than 16MB.
This is done by adding the fragment_size_bytes
argument to the RSocketClient
instantiation. Do this for both clients:
from rsocket.extensions.mimetypes import WellKnownMimeTypes
from rsocket.helpers import single_transport_provider
from rsocket.rsocket_client import RSocketClient
from rsocket.transports.tcp import TransportTCP
async with RSocketClient(single_transport_provider(TransportTCP(*connection1)),
metadata_encoding=WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA,
fragment_size_bytes=1_000_000) as client1:
...
Test the new functionality
We will try out the new functionality with the following code:
async def files_example(user1: ChatClient, user2: ChatClient):
file_contents = b'abcdefg1234567'
file_name = 'file_name_1.txt'
await user1.upload(file_name, file_contents)
print(f'Files: {await user1.list_files()}')
download = await user2.download(file_name)
if download.data != file_contents:
raise Exception('File download failed')
else:
print(f'Downloaded file: {len(download.data)} bytes')
call the files_example
method from the main client method.