Exporting Workers in Package APIs
Kinode packages can export workers and expose them in easy-to-use ways. Exporting and importing functions is discussed in the previous recipe. This recipe focuses on:
- A simple example of exporting a worker and exposing it in an ergonmoic API.
- A simple example of importing a worker.
- Demonstrations of
kit
tooling for the above.
Exporting a Worker
Exporting or importing a worker is much the same as exporting or importing an API as usual as discussed in the previous recipe.
The main difference, in general, is that the exporter must include the worker when kit build
ing — see below.
In the specific example here, the exporter also exports a function that makes use of the worker ergonomic: that function, start_download()
, spawn()
s the worker.
In addition, in this specific example, the importer handles the message types of the worker.
Example: File Transfer
WIT API
#![allow(unused)] fn main() { ... interface file-transfer-worker { use standard.{address}; /// external-facing requests variant request { /// download starts a download. /// * used by requestor to start whole process /// * used by provider to spin up worker to serve request download(download-request), /// progress is from worker to parent /// * acks not required, but provided for completeness progress(progress-request), } variant response { download(result<_, string>), /// ack: not required, but provided for completeness progress, } /// requests used between workers to transfer the file /// parent will not receive these, so need not handle them variant internal-request { chunk(chunk-request), size(u64), } record download-request { name: string, target: address, is-requestor: bool, } record progress-request { name: string, progress: u64, } record chunk-request { name: string, offset: u64, length: u64, } /// easiest way to use file-transfer-worker /// handle file-transfer-worker::request by calling this helper function start-download: func( our: address, source: address, name: string, target: address, is-requestor: bool, ) -> result<_, string>; } world file-transfer-worker-api-v0 { export file-transfer-worker; } ... }
API Function Definitions
The API here spawn()
s a worker, and so the worker is part of the API.
API
#![allow(unused)] fn main() { use crate::exports::kinode::process::file_transfer_worker::{ DownloadRequest, Guest, Request as WorkerRequest, Response as WorkerResponse, }; use crate::kinode::process::standard::Address as WitAddress; use kinode_process_lib::{our_capabilities, spawn, Address, OnExit, Request, Response}; wit_bindgen::generate!({ path: "target/wit", world: "file-transfer-worker-api-v0", generate_unused_types: true, additional_derives: [serde::Deserialize, serde::Serialize, process_macros::SerdeJsonInto], }); fn start_download( our: &WitAddress, source: &WitAddress, name: &str, target: &WitAddress, is_requestor: bool, ) -> anyhow::Result<()> { // spin up a worker, initialize based on whether it's a downloader or a sender. let our_worker = spawn( None, &format!( "{}:{}/pkg/file_transfer_worker.wasm", our.process.package_name, our.process.publisher_node, ), OnExit::None, our_capabilities(), vec![], false, )?; let target = if is_requestor { target } else { source }; let our_worker_address = Address { node: our.node.clone(), process: our_worker, }; Response::new() .body(WorkerResponse::Download(Ok(()))) .send()?; Request::new() .expects_response(5) .body(WorkerRequest::Download(DownloadRequest { name: name.to_string(), target: target.clone(), is_requestor, })) .target(&our_worker_address) .send()?; Ok(()) } struct Api; impl Guest for Api { fn start_download( our: WitAddress, source: WitAddress, name: String, target: WitAddress, is_requestor: bool, ) -> Result<(), String> { match start_download(&our, &source, &name, &target, is_requestor) { Ok(result) => Ok(result), Err(e) => Err(format!("{e:?}")), } } } export!(Api); }
Worker
#![allow(unused)] fn main() { use crate::kinode::process::file_transfer_worker::{ ChunkRequest, DownloadRequest, InternalRequest, ProgressRequest, Request as WorkerRequest, Response as WorkerResponse, }; use crate::kinode::process::standard::{Address as WitAddress, ProcessId as WitProcessId}; use kinode_process_lib::{ await_message, call_init, get_blob, println, vfs::{open_dir, open_file, Directory, File, SeekFrom}, Address, Message, ProcessId, Request, Response, }; wit_bindgen::generate!({ path: "target/wit", world: "{package_name_kebab}-{publisher_dotted_kebab}-v0", generate_unused_types: true, additional_derives: [serde::Deserialize, serde::Serialize, process_macros::SerdeJsonInto], }); #[derive(Debug, serde::Deserialize, serde::Serialize, process_macros::SerdeJsonInto)] #[serde(untagged)] // untagged as a meta-type for all incoming messages enum Msg { // requests WorkerRequest(WorkerRequest), InternalRequest(InternalRequest), // responses WorkerResponse(WorkerResponse), } impl From<WitAddress> for Address { fn from(address: WitAddress) -> Self { Address { node: address.node, process: address.process.into(), } } } impl From<WitProcessId> for ProcessId { fn from(process: WitProcessId) -> Self { ProcessId { process_name: process.process_name, package_name: process.package_name, publisher_node: process.publisher_node, } } } const CHUNK_SIZE: u64 = 1048576; // 1MB fn handle_worker_request( request: &WorkerRequest, file: &mut Option<File>, files_dir: &Directory, ) -> anyhow::Result<bool> { match request { WorkerRequest::Download(DownloadRequest { name, target, is_requestor, }) => { Response::new() .body(WorkerResponse::Download(Ok(()))) .send()?; // open/create empty file in both cases. let mut active_file = open_file(&format!("{}/{}", files_dir.path, &name), true, None)?; if *is_requestor { *file = Some(active_file); Request::new() .expects_response(5) .body(WorkerRequest::Download(DownloadRequest { name: name.to_string(), target: target.clone(), is_requestor: false, })) .target::<Address>(target.clone().into()) .send()?; } else { // we are sender: chunk the data, and send it. let size = active_file.metadata()?.len; let num_chunks = (size as f64 / CHUNK_SIZE as f64).ceil() as u64; // give receiving worker file size so it can track download progress Request::new() .body(InternalRequest::Size(size)) .target(target.clone()) .send()?; active_file.seek(SeekFrom::Start(0))?; for i in 0..num_chunks { let offset = i * CHUNK_SIZE; let length = CHUNK_SIZE.min(size - offset); let mut buffer = vec![0; length as usize]; active_file.read_at(&mut buffer)?; Request::new() .body(InternalRequest::Chunk(ChunkRequest { name: name.clone(), offset, length, })) .target(target.clone()) .blob_bytes(buffer) .send()?; } return Ok(true); } } WorkerRequest::Progress(_) => { return Err(anyhow::anyhow!( "worker: got unexpected WorkerRequest::Progress", )); } } Ok(false) } fn handle_internal_request( request: &InternalRequest, file: &mut Option<File>, size: &mut Option<u64>, parent: &Option<Address>, ) -> anyhow::Result<bool> { match request { InternalRequest::Chunk(ChunkRequest { name, offset, length, }) => { // someone sending a chunk to us let file = match file { Some(file) => file, None => { return Err(anyhow::anyhow!( "worker: receive error: no file initialized" )); } }; let bytes = match get_blob() { Some(blob) => blob.bytes, None => { return Err(anyhow::anyhow!("worker: receive error: no blob")); } }; file.write_all(&bytes)?; // if sender has sent us a size, give a progress update to main transfer let Some(ref parent) = parent else { return Ok(false); }; if let Some(size) = size { let progress = ((offset + length) as f64 / *size as f64 * 100.0) as u64; Request::new() .expects_response(5) .body(WorkerRequest::Progress(ProgressRequest { name: name.to_string(), progress, })) .target(parent) .send()?; if progress >= 100 { return Ok(true); } } } InternalRequest::Size(incoming_size) => { *size = Some(*incoming_size); } } Ok(false) } fn handle_worker_response(response: &WorkerResponse) -> anyhow::Result<bool> { match response { WorkerResponse::Download(ref result) => { if let Err(e) = result { return Err(anyhow::anyhow!("{e}")); } } WorkerResponse::Progress => {} } Ok(false) } fn handle_message( our: &Address, message: &Message, file: &mut Option<File>, files_dir: &Directory, size: &mut Option<u64>, parent: &mut Option<Address>, ) -> anyhow::Result<bool> { return Ok(match message.body().try_into()? { // requests Msg::WorkerRequest(ref wr) => { *parent = Some(message.source().clone()); handle_worker_request(wr, file, files_dir)? } Msg::InternalRequest(ref ir) => handle_internal_request(ir, file, size, parent)?, // responses Msg::WorkerResponse(ref wr) => handle_worker_response(wr)?, }); } call_init!(init); fn init(our: Address) { println!("worker: begin"); let start = std::time::Instant::now(); let drive_path = format!("{}/files", our.package_id()); let files_dir = open_dir(&drive_path, false, None).unwrap(); let mut file: Option<File> = None; let mut size: Option<u64> = None; let mut parent: Option<Address> = None; loop { match await_message() { Err(send_error) => println!("worker: got SendError: {send_error}"), Ok(ref message) => { match handle_message(&our, message, &mut file, &files_dir, &mut size, &mut parent) { Ok(exit) => { if exit { println!("worker: done: exiting, took {:?}", start.elapsed()); break; } } Err(e) => println!("worker: got error while handling message: {e:?}"), } } } } } }
Process
The file_transfer
process imports and uses the exported start_download()
:
#![allow(unused)] fn main() { use crate::kinode::process::standard::{Address as WitAddress, ProcessId as WitProcessId}; use crate::kinode::process::file_transfer_worker::{start_download, Request as WorkerRequest, Response as WorkerResponse, DownloadRequest, ProgressRequest}; use crate::kinode::process::{package_name}::{Request as TransferRequest, Response as TransferResponse, FileInfo}; use kinode_process_lib::{ await_message, call_init, println, vfs::{create_drive, metadata, open_dir, Directory, FileType}, Address, Message, ProcessId, Response, }; wit_bindgen::generate!({ path: "target/wit", world: "{package_name_kebab}-{publisher_dotted_kebab}-v0", generate_unused_types: true, additional_derives: [serde::Deserialize, serde::Serialize, process_macros::SerdeJsonInto], }); #[derive(Debug, serde::Deserialize, serde::Serialize, process_macros::SerdeJsonInto)] #[serde(untagged)] // untagged as a meta-type for all incoming messages enum Msg { // requests TransferRequest(TransferRequest), WorkerRequest(WorkerRequest), // responses TransferResponse(TransferResponse), WorkerResponse(WorkerResponse), } impl From<Address> for WitAddress { fn from(address: Address) -> Self { WitAddress { node: address.node, process: address.process.into(), } } } impl From<ProcessId> for WitProcessId { fn from(process: ProcessId) -> Self { WitProcessId { process_name: process.process_name, package_name: process.package_name, publisher_node: process.publisher_node, } } } fn ls_files(files_dir: &Directory) -> anyhow::Result<Vec<FileInfo>> { let entries = files_dir.read()?; let files: Vec<FileInfo> = entries .iter() .filter_map(|file| match file.file_type { FileType::File => match metadata(&file.path, None) { Ok(metadata) => Some(FileInfo { name: file.path.clone(), size: metadata.len, }), Err(_) => None, }, _ => None, }) .collect(); Ok(files) } fn handle_transfer_request( request: &TransferRequest, files_dir: &Directory, ) -> anyhow::Result<()> { match request { TransferRequest::ListFiles => { let files = ls_files(files_dir)?; Response::new() .body(TransferResponse::ListFiles(files)) .send()?; } } Ok(()) } fn handle_worker_request( our: &Address, source: &Address, request: &WorkerRequest, ) -> anyhow::Result<()> { match request { WorkerRequest::Download(DownloadRequest { ref name, ref target, is_requestor }) => { match start_download( &our.clone().into(), &source.clone().into(), name, target, *is_requestor, ) { Ok(_) => {} Err(e) => return Err(anyhow::anyhow!("{e}")), } } WorkerRequest::Progress(ProgressRequest { name, progress }) => { println!("{} progress: {}%", name, progress); Response::new() .body(WorkerResponse::Progress) .send()?; } } Ok(()) } fn handle_transfer_response(source: &Address, response: &TransferResponse) -> anyhow::Result<()> { match response { TransferResponse::ListFiles(ref files) => { println!( "{}", files.iter(). fold(format!("{source} available files:\nFile\t\tSize (bytes)\n"), |mut msg, file| { msg.push_str(&format!( "{}\t\t{}", file.name.split('/').last().unwrap(), file.size, )); msg }) ); } } Ok(()) } fn handle_worker_response(response: &WorkerResponse) -> anyhow::Result<()> { match response { WorkerResponse::Download(ref result) => { if let Err(e) = result { return Err(anyhow::anyhow!("{e}")) } } WorkerResponse::Progress => {} } Ok(()) } fn handle_message( our: &Address, message: &Message, files_dir: &Directory, ) -> anyhow::Result<()> { match message.body().try_into()? { // requests Msg::TransferRequest(ref tr) => handle_transfer_request(tr, files_dir), Msg::WorkerRequest(ref wr) => handle_worker_request(our, message.source(), wr), // responses Msg::TransferResponse(ref tr) => handle_transfer_response(message.source(), tr), Msg::WorkerResponse(ref wr) => handle_worker_response(wr), } } call_init!(init); fn init(our: Address) { println!("begin"); let drive_path = create_drive(our.package_id(), "files", None).unwrap(); let files_dir = open_dir(&drive_path, false, None).unwrap(); loop { match await_message() { Err(send_error) => println!("got SendError: {send_error}"), Ok(ref message) => match handle_message(&our, message, &files_dir) { Ok(_) => {} Err(e) => println!("got error while handling message: {e:?}"), } } } } }
Importing an API
Dependencies
metadata.json
The metadata.json
file has a properties.dependencies
field.
When the dependencies
field is populated, kit build
will fetch that dependency from a Kinode hosting it.
See previous recipe for more discussion of dependencies.
Example: Chat with File Transfer
The example here is the kit n chat
chat template with the small addition of file transfer functionality.
The addition of file transfer requires changes to the WIT API (to import the file-transfer-worker
interface
, e.g.) as well as to the process itself to make use of the imported types and functions.
Compare the process with the unmodified kit n chat
process.
WIT API
#![allow(unused)] fn main() { interface chat-with-file-transfer { variant request { send(send-request), /// history of chat with given node history(string), } variant response { send, history(list<chat-message>), } record send-request { target: string, message: string, } record chat-message { author: string, content: string, } } world chat-with-file-transfer-template-dot-os-v0 { import chat-with-file-transfer; import file-transfer-worker; include process-v0; } }
Process
#![allow(unused)] fn main() { use std::collections::HashMap; use crate::kinode::process::chat_with_file_transfer::{ ChatMessage, Request as ChatRequest, Response as ChatResponse, SendRequest, }; use crate::kinode::process::file_transfer_worker::{ start_download, DownloadRequest, ProgressRequest, Request as WorkerRequest, Response as WorkerResponse, }; use crate::kinode::process::standard::{Address as WitAddress, ProcessId as WitProcessId}; use kinode_process_lib::{ await_message, call_init, get_capability, println, vfs::{create_drive, open_file}, Address, Message, ProcessId, Request, Response, }; wit_bindgen::generate!({ path: "target/wit", world: "chat-with-file-transfer-template-dot-os-v0", generate_unused_types: true, additional_derives: [serde::Deserialize, serde::Serialize, process_macros::SerdeJsonInto], }); #[derive(Debug, serde::Deserialize, serde::Serialize, process_macros::SerdeJsonInto)] #[serde(untagged)] // untagged as a meta-type for all incoming messages enum Msg { // requests ChatRequest(ChatRequest), WorkerRequest(WorkerRequest), // responses WorkerResponse(WorkerResponse), } impl From<Address> for WitAddress { fn from(address: Address) -> Self { WitAddress { node: address.node, process: address.process.into(), } } } impl From<ProcessId> for WitProcessId { fn from(process: ProcessId) -> Self { WitProcessId { process_name: process.process_name, package_name: process.package_name, publisher_node: process.publisher_node, } } } type MessageArchive = HashMap<String, Vec<ChatMessage>>; fn handle_chat_request( our: &Address, source: &Address, request: &ChatRequest, message_archive: &mut MessageArchive, ) -> anyhow::Result<()> { match request { ChatRequest::Send(SendRequest { ref target, ref message, }) => { if target == &our.node { println!("{}: {}", source.node, message); let message = ChatMessage { author: source.node.clone(), content: message.into(), }; message_archive .entry(source.node.clone()) .and_modify(|e| e.push(message.clone())) .or_insert(vec![message]); } else { let _ = Request::new() .target(Address { node: target.clone(), process: "chat_with_file_transfer:chat_with_file_transfer:template.os" .parse()?, }) .body(request) .send_and_await_response(5)? .unwrap(); let message = ChatMessage { author: our.node.clone(), content: message.into(), }; message_archive .entry(target.clone()) .and_modify(|e| e.push(message.clone())) .or_insert(vec![message]); } Response::new().body(ChatResponse::Send).send().unwrap(); } ChatRequest::History(ref node) => { Response::new() .body(ChatResponse::History( message_archive .get(node) .map(|msgs| msgs.clone()) .unwrap_or_default(), )) .send() .unwrap(); } } Ok(()) } fn handle_worker_request( our: &Address, source: &Address, request: &WorkerRequest, ) -> anyhow::Result<()> { match request { WorkerRequest::Download(DownloadRequest { ref name, ref target, is_requestor, }) => { match start_download( &our.clone().into(), &source.clone().into(), name, target, *is_requestor, ) { Ok(_) => {} Err(e) => return Err(anyhow::anyhow!("{e}")), } } WorkerRequest::Progress(ProgressRequest { name, progress }) => { println!("{} progress: {}%", name, progress); Response::new().body(WorkerResponse::Progress).send()?; } } Ok(()) } fn handle_worker_response(response: &WorkerResponse) -> anyhow::Result<()> { match response { WorkerResponse::Download(ref result) => { if let Err(e) = result { return Err(anyhow::anyhow!("{e}")); } } WorkerResponse::Progress => {} } Ok(()) } fn handle_message( our: &Address, message: &Message, message_archive: &mut MessageArchive, ) -> anyhow::Result<()> { match message.body().try_into()? { // requests Msg::ChatRequest(ref cr) => handle_chat_request(our, message.source(), cr, message_archive), Msg::WorkerRequest(ref wr) => handle_worker_request(our, message.source(), wr), // responses Msg::WorkerResponse(ref wr) => handle_worker_response(wr), } } #[cfg(feature = "test")] #[derive(Debug, serde::Deserialize, serde::Serialize, process_macros::SerdeJsonInto)] #[cfg(feature = "test")] enum Setup { Caps, WriteFile { name: String, contents: Vec<u8> }, } #[cfg(feature = "test")] fn handle_tester_setup(our: &Address, drive_path: &str) -> anyhow::Result<()> { println!("awaiting setup..."); let Ok(message) = await_message() else { return Err(anyhow::anyhow!("a")); }; // TODO: confirm its from tester match message.body().try_into()? { Setup::Caps => { println!("got caps..."); let vfs_read_cap = serde_json::json!({ "kind": "read", "drive": drive_path, }) .to_string(); let vfs_address = Address { node: our.node.clone(), process: "vfs:distro:sys".parse()?, }; let read_cap = get_capability(&vfs_address, &vfs_read_cap).unwrap(); Response::new() .body(vec![]) .capabilities(vec![read_cap]) .send() .unwrap(); println!("sent caps"); } Setup::WriteFile { ref name, ref contents, } => { println!("got write file..."); let file = open_file(&format!("{drive_path}/{name}"), true, None)?; file.write(contents)?; } } println!("setup done"); Ok(()) } call_init!(init); fn init(our: Address) { println!("begin"); let drive_path = create_drive(our.package_id(), "files", None).unwrap(); let mut message_archive = HashMap::new(); #[cfg(feature = "test")] handle_tester_setup(&our, &drive_path).unwrap(); loop { match await_message() { Err(send_error) => println!("got SendError: {send_error}"), Ok(ref message) => match handle_message(&our, message, &mut message_archive) { Ok(_) => {} Err(e) => println!("got error while handling message: {e:?}"), }, } } } }
Chat with File Transfer Usage Example
Build
# Start fake nodes.
kit f
kit f -o /tmp/kinode-fake-node-2 -p 8081 -f fake2.dev
# Create & build file_transfer dependency.
## The `-a` adds the worker Wasm file to the API so it can be exported properly.
kit n file_transfer -t file_transfer
kit b file_transfer -a file_transfer/pkg/file_transfer_worker.wasm
# Build chat_with_file_transfer.
## The `-l` satisfies the dependency using a local path.
kit b src/code/chat_with_file_transfer -l file_transfer
# Start chat_with_file_transfer on fake nodes.
kit s src/code/chat_with_file_transfer
kit s src/code/chat_with_file_transfer -p 8081
Usage
# First, put a file into `/tmp/kinode-fake-node-2/vfs/chat_with_file_transfer:template.os/files/`, e.g.:
echo 'hello world' > /tmp/kinode-fake-node-2/vfs/chat_with_file_transfer:template.os/files/my_file.txt
# In fake.dev terminal, download the file.
download:chat_with_file_transfer:template.os my_file.txt fake2.dev
# Confirm file was downloaded:
cat /tmp/kinode-fake-node/vfs/chat_with_file_transfer:template.os/files/my_file.txt