File Transfer
This recipe looks at the file_transfer
package, a template included with kit
and also copied here.
To create the template use
kit n file_transfer -t file_transfer
The file_transfer
package shows off a few parts of Kinode userspace:
- It makes use of the VFS to store files on disk.
- It uses a manager-worker pattern (see conceptual discussion here and here) to enable multiple concurrent uploads/downloads without sacrificing code readability.
- It exports its WIT API so that other packages can easily build in file transfer functionality in a library-like manner, as demonstrated in another recipe.
Protocol
The main file_transfer
process is a thin wrapper over the file_transfer_worker_api
.
The main process manages transfers and exposes a ListFiles
Request variant that, when requested, returns the files that are available for download.
The file_transfer_worker_api
makes calling the file_transfer_worker
ergonomic.
Specifically, it provides a function, start_download()
, which spins up a worker to download a file from a given node.
When called on the node serving the file, it spins up a worker to upload the requested file to the requestor.
Downloading a file proceeds as follows:
- Requestor calls
start_download()
, which:spawn()
s afile_transfer_worker
.- Passes
file_transfer_worker
aDownload
Request variant. file_transfer_worker
forwards a modifiedDownload
Request variant to thetarget
.
- Provider receives
Download
Request variant, callsstart_download()
, which:spawn()
s afile_transfer_worker
.- Passes
file_transfer_worker
theDownload
Request variant. - Sends chunks of file to the requestor's
file_transfer_worker
.
Thus, a worker is responsible for downloading/uploading a single file, and then exits.
All longer-term state and functionality is the responsibility of the main process, here, file_transfer
.
Files are transferred from and to the file_transfer:template.os/files
drive.
If you use the file_transfer_worker
or file_transfer_worker_api
in your own package, replace that first part of the path with your package's package id.
WIT API
#![allow(unused)] fn main() { interface file-transfer { variant request { list-files, } variant response { list-files(list<file-info>), } record file-info { name: string, size: u64, } } interface file-transfer-worker { use standard.{address}; /// external-facing requests variant request { /// download starts a download. /// * used by requestor to start whole process /// * used by provider to spin up worker to serve request download(download-request), /// progress is from worker to parent /// * acks not required, but provided for completeness progress(progress-request), } variant response { download(result<_, string>), /// ack: not required, but provided for completeness progress, } /// requests used between workers to transfer the file /// parent will not receive these, so need not handle them variant internal-request { chunk(chunk-request), size(u64), } record download-request { name: string, target: address, is-requestor: bool, } record progress-request { name: string, progress: u64, } record chunk-request { name: string, offset: u64, length: u64, } /// easiest way to use file-transfer-worker /// handle file-transfer-worker::request by calling this helper function start-download: func( our: address, source: address, name: string, target: address, is-requestor: bool, ) -> result<_, string>; } world file-transfer-worker-api-v0 { export file-transfer-worker; } world file-transfer-template-dot-os-v0 { import file-transfer; import file-transfer-worker; include process-v0; } }
Main Process
#![allow(unused)] fn main() { use crate::kinode::process::file_transfer::{ FileInfo, Request as TransferRequest, Response as TransferResponse, }; use crate::kinode::process::file_transfer_worker::{ start_download, DownloadRequest, ProgressRequest, Request as WorkerRequest, Response as WorkerResponse, }; use crate::kinode::process::standard::{Address as WitAddress, ProcessId as WitProcessId}; use kinode_process_lib::{ await_message, call_init, println, vfs::{create_drive, metadata, open_dir, Directory, FileType}, Address, Message, ProcessId, Response, }; wit_bindgen::generate!({ path: "target/wit", world: "file-transfer-template-dot-os-v0", generate_unused_types: true, additional_derives: [serde::Deserialize, serde::Serialize, process_macros::SerdeJsonInto], }); #[derive(Debug, serde::Deserialize, serde::Serialize, process_macros::SerdeJsonInto)] #[serde(untagged)] // untagged as a meta-type for all incoming messages enum Msg { // requests TransferRequest(TransferRequest), WorkerRequest(WorkerRequest), // responses TransferResponse(TransferResponse), WorkerResponse(WorkerResponse), } impl From<Address> for WitAddress { fn from(address: Address) -> Self { WitAddress { node: address.node, process: address.process.into(), } } } impl From<ProcessId> for WitProcessId { fn from(process: ProcessId) -> Self { WitProcessId { process_name: process.process_name, package_name: process.package_name, publisher_node: process.publisher_node, } } } fn ls_files(files_dir: &Directory) -> anyhow::Result<Vec<FileInfo>> { let entries = files_dir.read()?; let files: Vec<FileInfo> = entries .iter() .filter_map(|file| match file.file_type { FileType::File => match metadata(&file.path, None) { Ok(metadata) => Some(FileInfo { name: file.path.clone(), size: metadata.len, }), Err(_) => None, }, _ => None, }) .collect(); Ok(files) } fn handle_transfer_request(request: &TransferRequest, files_dir: &Directory) -> anyhow::Result<()> { match request { TransferRequest::ListFiles => { let files = ls_files(files_dir)?; Response::new() .body(TransferResponse::ListFiles(files)) .send()?; } } Ok(()) } fn handle_worker_request( our: &Address, source: &Address, request: &WorkerRequest, ) -> anyhow::Result<()> { match request { WorkerRequest::Download(DownloadRequest { ref name, ref target, is_requestor, }) => { match start_download( &our.clone().into(), &source.clone().into(), name, target, *is_requestor, ) { Ok(_) => {} Err(e) => return Err(anyhow::anyhow!("{e}")), } } WorkerRequest::Progress(ProgressRequest { name, progress }) => { println!("{} progress: {}%", name, progress); Response::new().body(WorkerResponse::Progress).send()?; } } Ok(()) } fn handle_transfer_response(source: &Address, response: &TransferResponse) -> anyhow::Result<()> { match response { TransferResponse::ListFiles(ref files) => { println!( "{}", files.iter().fold( format!("{source} available files:\nFile\t\tSize (bytes)\n"), |mut msg, file| { msg.push_str(&format!( "{}\t\t{}", file.name.split('/').last().unwrap(), file.size, )); msg } ) ); } } Ok(()) } fn handle_worker_response(response: &WorkerResponse) -> anyhow::Result<()> { match response { WorkerResponse::Download(ref result) => { if let Err(e) = result { return Err(anyhow::anyhow!("{e}")); } } WorkerResponse::Progress => {} } Ok(()) } fn handle_message(our: &Address, message: &Message, files_dir: &Directory) -> anyhow::Result<()> { match message.body().try_into()? { // requests Msg::TransferRequest(ref tr) => handle_transfer_request(tr, files_dir), Msg::WorkerRequest(ref wr) => handle_worker_request(our, message.source(), wr), // responses Msg::TransferResponse(ref tr) => handle_transfer_response(message.source(), tr), Msg::WorkerResponse(ref wr) => handle_worker_response(wr), } } call_init!(init); fn init(our: Address) { println!("begin"); let drive_path = create_drive(our.package_id(), "files", None).unwrap(); let files_dir = open_dir(&drive_path, false, None).unwrap(); loop { match await_message() { Err(send_error) => println!("got SendError: {send_error}"), Ok(ref message) => match handle_message(&our, message, &files_dir) { Ok(_) => {} Err(e) => println!("got error while handling message: {e:?}"), }, } } } }
Worker
#![allow(unused)] fn main() { use crate::kinode::process::file_transfer_worker::{ ChunkRequest, DownloadRequest, InternalRequest, ProgressRequest, Request as WorkerRequest, Response as WorkerResponse, }; use crate::kinode::process::standard::{Address as WitAddress, ProcessId as WitProcessId}; use kinode_process_lib::{ await_message, call_init, get_blob, println, vfs::{open_dir, open_file, Directory, File, SeekFrom}, Address, Message, ProcessId, Request, Response, }; wit_bindgen::generate!({ path: "target/wit", world: "file-transfer-template-dot-os-v0", generate_unused_types: true, additional_derives: [serde::Deserialize, serde::Serialize, process_macros::SerdeJsonInto], }); #[derive(Debug, serde::Deserialize, serde::Serialize, process_macros::SerdeJsonInto)] #[serde(untagged)] // untagged as a meta-type for all incoming messages enum Msg { // requests WorkerRequest(WorkerRequest), InternalRequest(InternalRequest), // responses WorkerResponse(WorkerResponse), } impl From<WitAddress> for Address { fn from(address: WitAddress) -> Self { Address { node: address.node, process: address.process.into(), } } } impl From<WitProcessId> for ProcessId { fn from(process: WitProcessId) -> Self { ProcessId { process_name: process.process_name, package_name: process.package_name, publisher_node: process.publisher_node, } } } const CHUNK_SIZE: u64 = 1048576; // 1MB fn handle_worker_request( request: &WorkerRequest, file: &mut Option<File>, files_dir: &Directory, ) -> anyhow::Result<bool> { match request { WorkerRequest::Download(DownloadRequest { name, target, is_requestor, }) => { Response::new() .body(WorkerResponse::Download(Ok(()))) .send()?; // open/create empty file in both cases. let mut active_file = open_file(&format!("{}/{}", files_dir.path, &name), true, None)?; if *is_requestor { *file = Some(active_file); Request::new() .expects_response(5) .body(WorkerRequest::Download(DownloadRequest { name: name.to_string(), target: target.clone(), is_requestor: false, })) .target::<Address>(target.clone().into()) .send()?; } else { // we are sender: chunk the data, and send it. let size = active_file.metadata()?.len; let num_chunks = (size as f64 / CHUNK_SIZE as f64).ceil() as u64; // give receiving worker file size so it can track download progress Request::new() .body(InternalRequest::Size(size)) .target(target.clone()) .send()?; active_file.seek(SeekFrom::Start(0))?; for i in 0..num_chunks { let offset = i * CHUNK_SIZE; let length = CHUNK_SIZE.min(size - offset); let mut buffer = vec![0; length as usize]; active_file.read_at(&mut buffer)?; Request::new() .body(InternalRequest::Chunk(ChunkRequest { name: name.clone(), offset, length, })) .target(target.clone()) .blob_bytes(buffer) .send()?; } return Ok(true); } } WorkerRequest::Progress(_) => { return Err(anyhow::anyhow!( "worker: got unexpected WorkerRequest::Progress", )); } } Ok(false) } fn handle_internal_request( request: &InternalRequest, file: &mut Option<File>, size: &mut Option<u64>, parent: &Option<Address>, ) -> anyhow::Result<bool> { match request { InternalRequest::Chunk(ChunkRequest { name, offset, length, }) => { // someone sending a chunk to us let file = match file { Some(file) => file, None => { return Err(anyhow::anyhow!( "worker: receive error: no file initialized" )); } }; let bytes = match get_blob() { Some(blob) => blob.bytes, None => { return Err(anyhow::anyhow!("worker: receive error: no blob")); } }; file.write_all(&bytes)?; // if sender has sent us a size, give a progress update to main transfer let Some(ref parent) = parent else { return Ok(false); }; if let Some(size) = size { let progress = ((offset + length) as f64 / *size as f64 * 100.0) as u64; Request::new() .expects_response(5) .body(WorkerRequest::Progress(ProgressRequest { name: name.to_string(), progress, })) .target(parent) .send()?; if progress >= 100 { return Ok(true); } } } InternalRequest::Size(incoming_size) => { *size = Some(*incoming_size); } } Ok(false) } fn handle_worker_response(response: &WorkerResponse) -> anyhow::Result<bool> { match response { WorkerResponse::Download(ref result) => { if let Err(e) = result { return Err(anyhow::anyhow!("{e}")); } } WorkerResponse::Progress => {} } Ok(false) } fn handle_message( message: &Message, file: &mut Option<File>, files_dir: &Directory, size: &mut Option<u64>, parent: &mut Option<Address>, ) -> anyhow::Result<bool> { return Ok(match message.body().try_into()? { // requests Msg::WorkerRequest(ref wr) => { *parent = Some(message.source().clone()); handle_worker_request(wr, file, files_dir)? } Msg::InternalRequest(ref ir) => handle_internal_request(ir, file, size, parent)?, // responses Msg::WorkerResponse(ref wr) => handle_worker_response(wr)?, }); } call_init!(init); fn init(our: Address) { println!("worker: begin"); let start = std::time::Instant::now(); let drive_path = format!("{}/files", our.package_id()); let files_dir = open_dir(&drive_path, false, None).unwrap(); let mut file: Option<File> = None; let mut size: Option<u64> = None; let mut parent: Option<Address> = None; loop { match await_message() { Err(send_error) => println!("worker: got SendError: {send_error}"), Ok(ref message) => { match handle_message(message, &mut file, &files_dir, &mut size, &mut parent) { Ok(exit) => { if exit { println!("worker: done: exiting, took {:?}", start.elapsed()); break; } } Err(e) => println!("worker: got error while handling message: {e:?}"), } } } } } }
API
#![allow(unused)] fn main() { use crate::exports::kinode::process::file_transfer_worker::{ DownloadRequest, Guest, Request as WorkerRequest, Response as WorkerResponse, }; use crate::kinode::process::standard::Address as WitAddress; use kinode_process_lib::{our_capabilities, spawn, Address, OnExit, Request, Response}; wit_bindgen::generate!({ path: "target/wit", world: "file-transfer-worker-api-v0", generate_unused_types: true, additional_derives: [serde::Deserialize, serde::Serialize, process_macros::SerdeJsonInto], }); fn start_download( our: &WitAddress, source: &WitAddress, name: &str, target: &WitAddress, is_requestor: bool, ) -> anyhow::Result<()> { // spin up a worker, initialize based on whether it's a downloader or a sender. let our_worker = spawn( None, &format!( "{}:{}/pkg/file_transfer_worker.wasm", our.process.package_name, our.process.publisher_node, ), OnExit::None, our_capabilities(), vec![], false, )?; let target = if is_requestor { target } else { source }; let our_worker_address = Address { node: our.node.clone(), process: our_worker, }; Response::new() .body(WorkerResponse::Download(Ok(()))) .send()?; Request::new() .expects_response(5) .body(WorkerRequest::Download(DownloadRequest { name: name.to_string(), target: target.clone(), is_requestor, })) .target(&our_worker_address) .send()?; Ok(()) } struct Api; impl Guest for Api { fn start_download( our: WitAddress, source: WitAddress, name: String, target: WitAddress, is_requestor: bool, ) -> Result<(), String> { match start_download(&our, &source, &name, &target, is_requestor) { Ok(result) => Ok(result), Err(e) => Err(format!("{e:?}")), } } } export!(Api); }
Example Usage
Build
# Start fake nodes.
kit f
kit f -o /tmp/kinode-fake-node-2 -p 8081 -f fake2.dev
# Create & build file_transfer.
## The `-a` adds the worker Wasm file to the API so it can be exported properly.
kit n file_transfer -t file_transfer
kit b file_transfer -a file_transfer/pkg/file_transfer_worker.wasm
# Start file_transfer on fake nodes.
kit s file_transfer
kit s file_transfer -p 8081
Usage
# First, put a file into `/tmp/kinode-fake-node-2/vfs/file_transfer:template.os/files/`, e.g.:
echo 'hello world' > /tmp/kinode-fake-node-2/vfs/file_transfer:template.os/files/my_file.txt
# In fake.dev terminal, check if file exists.
list_files:file_transfer:template.os fake2.dev
# In fake.dev terminal, download the file.
download:file_transfer:template.os my_file.txt fake2.dev
# Confirm file was downloaded:
cat /tmp/kinode-fake-node/vfs/file_transfer:template.os/files/my_file.txt