Skip to content
Documentation GitHub
Architecture

Tokio Background Worker Patterns

Tokio Background Worker Patterns

Problem

Background processing (embedding indexing, sync, bulk operations) requires careful orchestration to avoid blocking the async runtime, leaking memory, doing duplicate work, or leaving zombie tasks on shutdown.

Symptoms:

  • UI freezes during background indexing (CPU work on async runtime)
  • Memory grows unbounded when work queues faster than it processes
  • Same page embedded 5 times because user saved rapidly
  • Worker keeps running after workspace switch
  • No visibility into indexing progress

Solution

A production-grade background worker combines five patterns. The embedding pipeline (INK-174, INK-205/206) implements all five.

Pattern 1: Bounded Channels with Backpressure

const CHANNEL_CAPACITY: usize = 256;
let (tx, mut rx) = mpsc::channel::<Command>(CHANNEL_CAPACITY);
// Producer: non-blocking send with explicit backpressure
pub fn queue_work(&self, item: WorkItem) {
match self.tx.try_send(Command::Process(item)) {
Ok(()) => {}
Err(TrySendError::Full(_)) => {
tracing::warn!("work queue full, dropping request");
}
Err(TrySendError::Closed(_)) => {
tracing::warn!("worker shut down, dropping request");
}
}
}

Why try_send() not send(): The producer (Tauri command handler) must never block waiting for queue space. Dropping a non-critical request is better than freezing the UI. For embedding, a dropped page will be caught on the next bulk reindex.

Why not unbounded: mpsc::unbounded_channel() grows without limit. If the producer outpaces the consumer (e.g., bulk import triggers thousands of embeds), memory exhaustion follows.

Pattern 2: Debounced Processing with Dedup

let mut pending = HashSet::new();
let debounce = tokio::time::sleep(Duration::from_secs(2));
tokio::pin!(debounce);
loop {
tokio::select! {
Some(cmd) = rx.recv() => {
match cmd {
Command::Process(item) => {
pending.insert(item.id); // HashSet deduplicates
debounce.as_mut().reset(Instant::now() + Duration::from_secs(2));
}
Command::Shutdown => break,
}
}
_ = &mut debounce, if !pending.is_empty() => {
let batch: Vec<_> = pending.drain().collect();
process_batch(batch).await;
}
}
}

Why debounce: A user saving a page 3 times in 1.5s should trigger one embedding, not three. The 2-second window collapses rapid saves into a single batch.

Why HashSet: If page A is queued twice before the debounce fires, it’s processed once. Vec would process it twice.

Pattern 3: spawn_blocking for CPU Work

async fn process_batch(items: Vec<Uuid>) {
let provider = self.provider.clone(); // Arc<dyn EmbeddingProvider>
let repo = self.repo.clone(); // Arc<dyn EmbeddingRepository>
let result = tokio::task::spawn_blocking(move || {
// CPU-intensive: ONNX inference, tokenization, similarity
let embeddings = provider.embed_batch(&texts)?;
repo.upsert_batch(&entries)?;
Ok::<_, Error>(())
})
.await;
match result {
Ok(Ok(())) => tracing::debug!("batch processed"),
Ok(Err(e)) => tracing::error!("batch failed: {e}"),
Err(e) => tracing::error!("task panicked: {e}"),
}
}

Why spawn_blocking: ONNX inference is CPU-bound (10-100ms per batch). Running it on the async runtime starves I/O tasks (WebSocket messages, Tauri commands). spawn_blocking moves it to a dedicated thread pool.

Why clone Arc: spawn_blocking requires 'static + Send. Arc-wrapped dependencies are cheap to clone and satisfy both bounds.

Pattern 4: Dual Shutdown Paths

pub struct BackgroundWorker {
tx: mpsc::Sender<Command>,
worker_handle: Option<JoinHandle<()>>,
}
impl BackgroundWorker {
/// Graceful: send shutdown and wait for worker to finish current batch
pub async fn shutdown_and_wait(&mut self) {
let _ = self.tx.try_send(Command::Shutdown);
if let Some(handle) = self.worker_handle.take() {
let _ = handle.await;
}
}
}
impl Drop for BackgroundWorker {
/// Fire-and-forget: signal shutdown but don't await
fn drop(&mut self) {
let _ = self.tx.try_send(Command::Shutdown);
// Cannot await in Drop — JoinHandle is dropped, task will be cancelled
}
}

Why two paths:

  • shutdown_and_wait() — used during workspace switch to ensure pending work completes
  • Drop — safety net if the manager is dropped without explicit shutdown; cannot await in Drop, so fire-and-forget is the only option

Warning: Drop alone is not sufficient for graceful shutdown. Always call shutdown_and_wait() in the happy path.

Pattern 5: Watch Channel for Status Broadcast

let (status_tx, status_rx) = watch::channel(IndexingStatus::Idle);
// Worker updates status during bulk indexing
status_tx.send(IndexingStatus::Indexing { completed: 42, total: 100 }).ok();
// UI polls status (Tauri command)
pub fn get_status(&self) -> IndexingStatus {
self.status_rx.borrow().clone()
}

Why watch not broadcast: Status is “latest value wins” — the UI only cares about current progress, not every intermediate update. watch keeps exactly one value; broadcast would queue messages the UI never reads.

Frontend integration: A React hook polls get_embedding_status every 3s while the command palette is open, showing “Improving search…” during indexing. Polling stops when status returns to Idle.

Prevention

Checklist for New Background Workers

  • Channel is bounded with named capacity constant
  • Producer uses try_send() with explicit Full/Closed handling
  • Rapid triggers are debounced (if applicable)
  • Duplicate work is deduplicated (HashSet or similar)
  • CPU-intensive work uses spawn_blocking()
  • Dependencies are Arc-wrapped for cheap cloning into spawn_blocking
  • Graceful shutdown via explicit method + Drop safety net
  • JoinHandle stored for shutdown_and_wait()
  • Progress observable via watch channel (if UI needs it)
  • Errors logged but don’t crash the worker

Warning Signs

  • Memory growth — check if channel is unbounded or debounce HashSet grows without drain
  • UI freezes — check if CPU work runs on async runtime instead of spawn_blocking
  • Zombie workers — check if Drop fires shutdown signal
  • Duplicate processing — check if dedup is in place before debounce fires

References

Was this page helpful?