File Transfer

This recipe looks at the file_transfer package, a template included with kit and also copied here. To create the template use

kit n file_transfer -t file_transfer

The file_transfer package shows off a few parts of Kinode userspace:

  1. It makes use of the VFS to store files on disk.
  2. It uses a manager-worker pattern (see conceptual discussion here and here) to enable multiple concurrent uploads/downloads without sacrificing code readability.
  3. It exports its WIT API so that other packages can easily build in file transfer functionality in a library-like manner, as demonstrated in another recipe.

Protocol

The main file_transfer process is a thin wrapper over the file_transfer_worker_api. The main process manages transfers and exposes a ListFiles Request variant that, when requested, returns the files that are available for download.

The file_transfer_worker_api makes calling the file_transfer_worker ergonomic. Specifically, it provides a function, start_download(), which spins up a worker to download a file from a given node. When called on the node serving the file, it spins up a worker to upload the requested file to the requestor.

Downloading a file proceeds as follows:

  1. Requestor calls start_download(), which:
    1. spawn()s a file_transfer_worker.
    2. Passes file_transfer_worker a Download Request variant.
    3. file_transfer_worker forwards a modified Download Request variant to the target.
  2. Provider receives Download Request variant, calls start_download(), which:
    1. spawn()s a file_transfer_worker.
    2. Passes file_transfer_worker the Download Request variant.
    3. Sends chunks of file to the requestor's file_transfer_worker.

Thus, a worker is responsible for downloading/uploading a single file, and then exits. All longer-term state and functionality is the responsibility of the main process, here, file_transfer.

Files are transferred from and to the file_transfer:template.os/files drive. If you use the file_transfer_worker or file_transfer_worker_api in your own package, replace that first part of the path with your package's package id.

WIT API

#![allow(unused)]
fn main() {
interface file-transfer {
    variant request {
        list-files,
    }

    variant response {
        list-files(list<file-info>),
    }

    record file-info {
        name: string,
        size: u64,
    }
}

interface file-transfer-worker {
    use standard.{address};

    /// external-facing requests
    variant request {
        /// download starts a download.
        /// * used by requestor to start whole process
        /// * used by provider to spin up worker to serve request
        download(download-request),
        /// progress is from worker to parent
        /// * acks not required, but provided for completeness
        progress(progress-request),
    }

    variant response {
        download(result<_, string>),
        /// ack: not required, but provided for completeness
        progress,
    }

    /// requests used between workers to transfer the file
    /// parent will not receive these, so need not handle them
    variant internal-request {
        chunk(chunk-request),
        size(u64),
    }

    record download-request {
        name: string,
        target: address,
        is-requestor: bool,
    }

    record progress-request {
        name: string,
        progress: u64,
    }

    record chunk-request {
        name: string,
        offset: u64,
        length: u64,
    }

    /// easiest way to use file-transfer-worker
    /// handle file-transfer-worker::request by calling this helper function
    start-download: func(
        our: address,
        source: address,
        name: string,
        target: address,
        is-requestor: bool,
    ) -> result<_, string>;
}

world file-transfer-worker-api-v0 {
    export file-transfer-worker;
}

world file-transfer-template-dot-os-v0 {
    import file-transfer;
    import file-transfer-worker;
    include process-v0;
}
}

Main Process

#![allow(unused)]
fn main() {
use crate::kinode::process::file_transfer::{
    FileInfo, Request as TransferRequest, Response as TransferResponse,
};
use crate::kinode::process::file_transfer_worker::{
    start_download, DownloadRequest, ProgressRequest, Request as WorkerRequest,
    Response as WorkerResponse,
};
use crate::kinode::process::standard::{Address as WitAddress, ProcessId as WitProcessId};
use kinode_process_lib::{
    await_message, call_init, println,
    vfs::{create_drive, metadata, open_dir, Directory, FileType},
    Address, Message, ProcessId, Response,
};

wit_bindgen::generate!({
    path: "target/wit",
    world: "file-transfer-template-dot-os-v0",
    generate_unused_types: true,
    additional_derives: [serde::Deserialize, serde::Serialize, process_macros::SerdeJsonInto],
});

#[derive(Debug, serde::Deserialize, serde::Serialize, process_macros::SerdeJsonInto)]
#[serde(untagged)] // untagged as a meta-type for all incoming messages
enum Msg {
    // requests
    TransferRequest(TransferRequest),
    WorkerRequest(WorkerRequest),

    // responses
    TransferResponse(TransferResponse),
    WorkerResponse(WorkerResponse),
}

impl From<Address> for WitAddress {
    fn from(address: Address) -> Self {
        WitAddress {
            node: address.node,
            process: address.process.into(),
        }
    }
}

impl From<ProcessId> for WitProcessId {
    fn from(process: ProcessId) -> Self {
        WitProcessId {
            process_name: process.process_name,
            package_name: process.package_name,
            publisher_node: process.publisher_node,
        }
    }
}

fn ls_files(files_dir: &Directory) -> anyhow::Result<Vec<FileInfo>> {
    let entries = files_dir.read()?;
    let files: Vec<FileInfo> = entries
        .iter()
        .filter_map(|file| match file.file_type {
            FileType::File => match metadata(&file.path, None) {
                Ok(metadata) => Some(FileInfo {
                    name: file.path.clone(),
                    size: metadata.len,
                }),
                Err(_) => None,
            },
            _ => None,
        })
        .collect();
    Ok(files)
}

fn handle_transfer_request(request: &TransferRequest, files_dir: &Directory) -> anyhow::Result<()> {
    match request {
        TransferRequest::ListFiles => {
            let files = ls_files(files_dir)?;
            Response::new()
                .body(TransferResponse::ListFiles(files))
                .send()?;
        }
    }
    Ok(())
}

fn handle_worker_request(
    our: &Address,
    source: &Address,
    request: &WorkerRequest,
) -> anyhow::Result<()> {
    match request {
        WorkerRequest::Download(DownloadRequest {
            ref name,
            ref target,
            is_requestor,
        }) => {
            match start_download(
                &our.clone().into(),
                &source.clone().into(),
                name,
                target,
                *is_requestor,
            ) {
                Ok(_) => {}
                Err(e) => return Err(anyhow::anyhow!("{e}")),
            }
        }
        WorkerRequest::Progress(ProgressRequest { name, progress }) => {
            println!("{} progress: {}%", name, progress);
            Response::new().body(WorkerResponse::Progress).send()?;
        }
    }
    Ok(())
}

fn handle_transfer_response(source: &Address, response: &TransferResponse) -> anyhow::Result<()> {
    match response {
        TransferResponse::ListFiles(ref files) => {
            println!(
                "{}",
                files.iter().fold(
                    format!("{source} available files:\nFile\t\tSize (bytes)\n"),
                    |mut msg, file| {
                        msg.push_str(&format!(
                            "{}\t\t{}",
                            file.name.split('/').last().unwrap(),
                            file.size,
                        ));
                        msg
                    }
                )
            );
        }
    }
    Ok(())
}

fn handle_worker_response(response: &WorkerResponse) -> anyhow::Result<()> {
    match response {
        WorkerResponse::Download(ref result) => {
            if let Err(e) = result {
                return Err(anyhow::anyhow!("{e}"));
            }
        }
        WorkerResponse::Progress => {}
    }
    Ok(())
}

fn handle_message(our: &Address, message: &Message, files_dir: &Directory) -> anyhow::Result<()> {
    match message.body().try_into()? {
        // requests
        Msg::TransferRequest(ref tr) => handle_transfer_request(tr, files_dir),
        Msg::WorkerRequest(ref wr) => handle_worker_request(our, message.source(), wr),

        // responses
        Msg::TransferResponse(ref tr) => handle_transfer_response(message.source(), tr),
        Msg::WorkerResponse(ref wr) => handle_worker_response(wr),
    }
}

call_init!(init);
fn init(our: Address) {
    println!("begin");

    let drive_path = create_drive(our.package_id(), "files", None).unwrap();
    let files_dir = open_dir(&drive_path, false, None).unwrap();

    loop {
        match await_message() {
            Err(send_error) => println!("got SendError: {send_error}"),
            Ok(ref message) => match handle_message(&our, message, &files_dir) {
                Ok(_) => {}
                Err(e) => println!("got error while handling message: {e:?}"),
            },
        }
    }
}
}

Worker

#![allow(unused)]
fn main() {
use crate::kinode::process::file_transfer_worker::{
    ChunkRequest, DownloadRequest, InternalRequest, ProgressRequest, Request as WorkerRequest,
    Response as WorkerResponse,
};
use crate::kinode::process::standard::{Address as WitAddress, ProcessId as WitProcessId};
use kinode_process_lib::{
    await_message, call_init, get_blob, println,
    vfs::{open_dir, open_file, Directory, File, SeekFrom},
    Address, Message, ProcessId, Request, Response,
};

wit_bindgen::generate!({
    path: "target/wit",
    world: "file-transfer-template-dot-os-v0",
    generate_unused_types: true,
    additional_derives: [serde::Deserialize, serde::Serialize, process_macros::SerdeJsonInto],
});

#[derive(Debug, serde::Deserialize, serde::Serialize, process_macros::SerdeJsonInto)]
#[serde(untagged)] // untagged as a meta-type for all incoming messages
enum Msg {
    // requests
    WorkerRequest(WorkerRequest),
    InternalRequest(InternalRequest),

    // responses
    WorkerResponse(WorkerResponse),
}

impl From<WitAddress> for Address {
    fn from(address: WitAddress) -> Self {
        Address {
            node: address.node,
            process: address.process.into(),
        }
    }
}

impl From<WitProcessId> for ProcessId {
    fn from(process: WitProcessId) -> Self {
        ProcessId {
            process_name: process.process_name,
            package_name: process.package_name,
            publisher_node: process.publisher_node,
        }
    }
}

const CHUNK_SIZE: u64 = 1048576; // 1MB

fn handle_worker_request(
    request: &WorkerRequest,
    file: &mut Option<File>,
    files_dir: &Directory,
) -> anyhow::Result<bool> {
    match request {
        WorkerRequest::Download(DownloadRequest {
            name,
            target,
            is_requestor,
        }) => {
            Response::new()
                .body(WorkerResponse::Download(Ok(())))
                .send()?;

            // open/create empty file in both cases.
            let mut active_file = open_file(&format!("{}/{}", files_dir.path, &name), true, None)?;

            if *is_requestor {
                *file = Some(active_file);
                Request::new()
                    .expects_response(5)
                    .body(WorkerRequest::Download(DownloadRequest {
                        name: name.to_string(),
                        target: target.clone(),
                        is_requestor: false,
                    }))
                    .target::<Address>(target.clone().into())
                    .send()?;
            } else {
                // we are sender: chunk the data, and send it.
                let size = active_file.metadata()?.len;
                let num_chunks = (size as f64 / CHUNK_SIZE as f64).ceil() as u64;

                // give receiving worker file size so it can track download progress
                Request::new()
                    .body(InternalRequest::Size(size))
                    .target(target.clone())
                    .send()?;

                active_file.seek(SeekFrom::Start(0))?;

                for i in 0..num_chunks {
                    let offset = i * CHUNK_SIZE;
                    let length = CHUNK_SIZE.min(size - offset);

                    let mut buffer = vec![0; length as usize];
                    active_file.read_at(&mut buffer)?;

                    Request::new()
                        .body(InternalRequest::Chunk(ChunkRequest {
                            name: name.clone(),
                            offset,
                            length,
                        }))
                        .target(target.clone())
                        .blob_bytes(buffer)
                        .send()?;
                }
                return Ok(true);
            }
        }
        WorkerRequest::Progress(_) => {
            return Err(anyhow::anyhow!(
                "worker: got unexpected WorkerRequest::Progress",
            ));
        }
    }
    Ok(false)
}

fn handle_internal_request(
    request: &InternalRequest,
    file: &mut Option<File>,
    size: &mut Option<u64>,
    parent: &Option<Address>,
) -> anyhow::Result<bool> {
    match request {
        InternalRequest::Chunk(ChunkRequest {
            name,
            offset,
            length,
        }) => {
            // someone sending a chunk to us
            let file = match file {
                Some(file) => file,
                None => {
                    return Err(anyhow::anyhow!(
                        "worker: receive error: no file initialized"
                    ));
                }
            };

            let bytes = match get_blob() {
                Some(blob) => blob.bytes,
                None => {
                    return Err(anyhow::anyhow!("worker: receive error: no blob"));
                }
            };

            file.write_all(&bytes)?;

            // if sender has sent us a size, give a progress update to main transfer
            let Some(ref parent) = parent else {
                return Ok(false);
            };
            if let Some(size) = size {
                let progress = ((offset + length) as f64 / *size as f64 * 100.0) as u64;

                Request::new()
                    .expects_response(5)
                    .body(WorkerRequest::Progress(ProgressRequest {
                        name: name.to_string(),
                        progress,
                    }))
                    .target(parent)
                    .send()?;

                if progress >= 100 {
                    return Ok(true);
                }
            }
        }
        InternalRequest::Size(incoming_size) => {
            *size = Some(*incoming_size);
        }
    }
    Ok(false)
}

fn handle_worker_response(response: &WorkerResponse) -> anyhow::Result<bool> {
    match response {
        WorkerResponse::Download(ref result) => {
            if let Err(e) = result {
                return Err(anyhow::anyhow!("{e}"));
            }
        }
        WorkerResponse::Progress => {}
    }
    Ok(false)
}

fn handle_message(
    message: &Message,
    file: &mut Option<File>,
    files_dir: &Directory,
    size: &mut Option<u64>,
    parent: &mut Option<Address>,
) -> anyhow::Result<bool> {
    return Ok(match message.body().try_into()? {
        // requests
        Msg::WorkerRequest(ref wr) => {
            *parent = Some(message.source().clone());
            handle_worker_request(wr, file, files_dir)?
        }
        Msg::InternalRequest(ref ir) => handle_internal_request(ir, file, size, parent)?,

        // responses
        Msg::WorkerResponse(ref wr) => handle_worker_response(wr)?,
    });
}

call_init!(init);
fn init(our: Address) {
    println!("worker: begin");
    let start = std::time::Instant::now();

    let drive_path = format!("{}/files", our.package_id());
    let files_dir = open_dir(&drive_path, false, None).unwrap();

    let mut file: Option<File> = None;
    let mut size: Option<u64> = None;
    let mut parent: Option<Address> = None;

    loop {
        match await_message() {
            Err(send_error) => println!("worker: got SendError: {send_error}"),
            Ok(ref message) => {
                match handle_message(message, &mut file, &files_dir, &mut size, &mut parent) {
                    Ok(exit) => {
                        if exit {
                            println!("worker: done: exiting, took {:?}", start.elapsed());
                            break;
                        }
                    }
                    Err(e) => println!("worker: got error while handling message: {e:?}"),
                }
            }
        }
    }
}
}

API

#![allow(unused)]
fn main() {
use crate::exports::kinode::process::file_transfer_worker::{
    DownloadRequest, Guest, Request as WorkerRequest, Response as WorkerResponse,
};
use crate::kinode::process::standard::Address as WitAddress;
use kinode_process_lib::{our_capabilities, spawn, Address, OnExit, Request, Response};

wit_bindgen::generate!({
    path: "target/wit",
    world: "file-transfer-worker-api-v0",
    generate_unused_types: true,
    additional_derives: [serde::Deserialize, serde::Serialize, process_macros::SerdeJsonInto],
});

fn start_download(
    our: &WitAddress,
    source: &WitAddress,
    name: &str,
    target: &WitAddress,
    is_requestor: bool,
) -> anyhow::Result<()> {
    // spin up a worker, initialize based on whether it's a downloader or a sender.
    let our_worker = spawn(
        None,
        &format!(
            "{}:{}/pkg/file_transfer_worker.wasm",
            our.process.package_name, our.process.publisher_node,
        ),
        OnExit::None,
        our_capabilities(),
        vec![],
        false,
    )?;

    let target = if is_requestor { target } else { source };
    let our_worker_address = Address {
        node: our.node.clone(),
        process: our_worker,
    };

    Response::new()
        .body(WorkerResponse::Download(Ok(())))
        .send()?;

    Request::new()
        .expects_response(5)
        .body(WorkerRequest::Download(DownloadRequest {
            name: name.to_string(),
            target: target.clone(),
            is_requestor,
        }))
        .target(&our_worker_address)
        .send()?;

    Ok(())
}

struct Api;
impl Guest for Api {
    fn start_download(
        our: WitAddress,
        source: WitAddress,
        name: String,
        target: WitAddress,
        is_requestor: bool,
    ) -> Result<(), String> {
        match start_download(&our, &source, &name, &target, is_requestor) {
            Ok(result) => Ok(result),
            Err(e) => Err(format!("{e:?}")),
        }
    }
}
export!(Api);
}

Example Usage

Build

# Start fake nodes.
kit f
kit f -o /tmp/kinode-fake-node-2 -p 8081 -f fake2.dev

# Create & build file_transfer.
## The `-a` adds the worker Wasm file to the API so it can be exported properly.
kit n file_transfer -t file_transfer
kit b file_transfer -a file_transfer/pkg/file_transfer_worker.wasm

# Start file_transfer on fake nodes.
kit s file_transfer
kit s file_transfer -p 8081

Usage

# First, put a file into `/tmp/kinode-fake-node-2/vfs/file_transfer:template.os/files/`, e.g.:
echo 'hello world' > /tmp/kinode-fake-node-2/vfs/file_transfer:template.os/files/my_file.txt

# In fake.dev terminal, check if file exists.
list_files:file_transfer:template.os fake2.dev

# In fake.dev terminal, download the file.
download:file_transfer:template.os my_file.txt fake2.dev

# Confirm file was downloaded:
cat /tmp/kinode-fake-node/vfs/file_transfer:template.os/files/my_file.txt
Get Help: