Tokio Background Worker Patterns
Tokio Background Worker Patterns
Problem
Background processing (embedding indexing, sync, bulk operations) requires careful orchestration to avoid blocking the async runtime, leaking memory, doing duplicate work, or leaving zombie tasks on shutdown.
Symptoms:
- UI freezes during background indexing (CPU work on async runtime)
- Memory grows unbounded when work queues faster than it processes
- Same page embedded 5 times because user saved rapidly
- Worker keeps running after workspace switch
- No visibility into indexing progress
Solution
A production-grade background worker combines five patterns. The embedding pipeline (INK-174, INK-205/206) implements all five.
Pattern 1: Bounded Channels with Backpressure
const CHANNEL_CAPACITY: usize = 256;
let (tx, mut rx) = mpsc::channel::<Command>(CHANNEL_CAPACITY);
// Producer: non-blocking send with explicit backpressurepub fn queue_work(&self, item: WorkItem) { match self.tx.try_send(Command::Process(item)) { Ok(()) => {} Err(TrySendError::Full(_)) => { tracing::warn!("work queue full, dropping request"); } Err(TrySendError::Closed(_)) => { tracing::warn!("worker shut down, dropping request"); } }}Why try_send() not send(): The producer (Tauri command handler) must never block waiting for queue space.
Dropping a non-critical request is better than freezing the UI. For embedding, a dropped page will be caught on the next
bulk reindex.
Why not unbounded: mpsc::unbounded_channel() grows without limit. If the producer outpaces the consumer (e.g.,
bulk import triggers thousands of embeds), memory exhaustion follows.
Pattern 2: Debounced Processing with Dedup
let mut pending = HashSet::new();let debounce = tokio::time::sleep(Duration::from_secs(2));tokio::pin!(debounce);
loop { tokio::select! { Some(cmd) = rx.recv() => { match cmd { Command::Process(item) => { pending.insert(item.id); // HashSet deduplicates debounce.as_mut().reset(Instant::now() + Duration::from_secs(2)); } Command::Shutdown => break, } } _ = &mut debounce, if !pending.is_empty() => { let batch: Vec<_> = pending.drain().collect(); process_batch(batch).await; } }}Why debounce: A user saving a page 3 times in 1.5s should trigger one embedding, not three. The 2-second window collapses rapid saves into a single batch.
Why HashSet: If page A is queued twice before the debounce fires, it’s processed once. Vec would process it twice.
Pattern 3: spawn_blocking for CPU Work
async fn process_batch(items: Vec<Uuid>) { let provider = self.provider.clone(); // Arc<dyn EmbeddingProvider> let repo = self.repo.clone(); // Arc<dyn EmbeddingRepository>
let result = tokio::task::spawn_blocking(move || { // CPU-intensive: ONNX inference, tokenization, similarity let embeddings = provider.embed_batch(&texts)?; repo.upsert_batch(&entries)?; Ok::<_, Error>(()) }) .await;
match result { Ok(Ok(())) => tracing::debug!("batch processed"), Ok(Err(e)) => tracing::error!("batch failed: {e}"), Err(e) => tracing::error!("task panicked: {e}"), }}Why spawn_blocking: ONNX inference is CPU-bound (10-100ms per batch). Running it on the async runtime starves I/O
tasks (WebSocket messages, Tauri commands). spawn_blocking moves it to a dedicated thread pool.
Why clone Arc: spawn_blocking requires 'static + Send. Arc-wrapped dependencies are cheap to clone and satisfy
both bounds.
Pattern 4: Dual Shutdown Paths
pub struct BackgroundWorker { tx: mpsc::Sender<Command>, worker_handle: Option<JoinHandle<()>>,}
impl BackgroundWorker { /// Graceful: send shutdown and wait for worker to finish current batch pub async fn shutdown_and_wait(&mut self) { let _ = self.tx.try_send(Command::Shutdown); if let Some(handle) = self.worker_handle.take() { let _ = handle.await; } }}
impl Drop for BackgroundWorker { /// Fire-and-forget: signal shutdown but don't await fn drop(&mut self) { let _ = self.tx.try_send(Command::Shutdown); // Cannot await in Drop — JoinHandle is dropped, task will be cancelled }}Why two paths:
shutdown_and_wait()— used during workspace switch to ensure pending work completesDrop— safety net if the manager is dropped without explicit shutdown; cannotawaitin Drop, so fire-and-forget is the only option
Warning: Drop alone is not sufficient for graceful shutdown. Always call shutdown_and_wait() in the happy path.
Pattern 5: Watch Channel for Status Broadcast
let (status_tx, status_rx) = watch::channel(IndexingStatus::Idle);
// Worker updates status during bulk indexingstatus_tx.send(IndexingStatus::Indexing { completed: 42, total: 100 }).ok();
// UI polls status (Tauri command)pub fn get_status(&self) -> IndexingStatus { self.status_rx.borrow().clone()}Why watch not broadcast: Status is “latest value wins” — the UI only cares about current progress, not every
intermediate update. watch keeps exactly one value; broadcast would queue messages the UI never reads.
Frontend integration: A React hook polls get_embedding_status every 3s while the command palette is open, showing
“Improving search…” during indexing. Polling stops when status returns to Idle.
Prevention
Checklist for New Background Workers
- Channel is bounded with named capacity constant
- Producer uses
try_send()with explicit Full/Closed handling - Rapid triggers are debounced (if applicable)
- Duplicate work is deduplicated (HashSet or similar)
- CPU-intensive work uses
spawn_blocking() - Dependencies are
Arc-wrapped for cheap cloning intospawn_blocking - Graceful shutdown via explicit method + Drop safety net
- JoinHandle stored for
shutdown_and_wait() - Progress observable via
watchchannel (if UI needs it) - Errors logged but don’t crash the worker
Warning Signs
- Memory growth — check if channel is unbounded or debounce HashSet grows without drain
- UI freezes — check if CPU work runs on async runtime instead of
spawn_blocking - Zombie workers — check if Drop fires shutdown signal
- Duplicate processing — check if dedup is in place before debounce fires
References
- Commits:
880712b(INK-174),d23858b(INK-205/206/207/208/220/221) - Files:
apps/desktop/src-tauri/src/embedding.rs,crates/application/src/embedding/pipeline.rs - Tokio docs: https://docs.rs/tokio/latest/tokio/sync/mpsc/
Tauri Workspace Lifecycle Side-Effect Pattern Next
ESLint v9 Flat Config in Monorepo Packages
Was this page helpful?
Thanks for your feedback!