workers is a small Tokio worker pool for bounded parallelism with a fixed
number of long-lived workers. Each worker owns its own clone of your Work
implementation and accepts one job at a time.
The pool supports two execution styles:
- fire-and-forget with
WorkerHandle::send() - request-response with
WorkerHandle::run()
It also has explicit shutdown control:
drain()stops accepting new workwait()waits for workers to exitclose()does both for graceful shutdown
Add the crate to your project:
[dependencies]
workers = { path = "../tokio-worker-pool" }
tokio = { version = "1", features = ["rt", "sync", "macros"] }If you are using it from this repository directly, the main entrypoints are:
make build
make test
make check
make version
make ciThis crate is useful when you want a fixed amount of async concurrency without spawning an unbounded number of tasks for every input item. Typical cases:
- per-worker connection state
- per-worker caches or counters
- backpressure via a limited worker count
- predictable shutdown of in-flight work
A worker pool is a way to say "only let this many pieces of work run at the same time." Instead of spawning a fresh task for every job and letting the runtime sort it out, you create a fixed set of workers and feed jobs to them.
That gives you two practical benefits:
- concurrency stays bounded
- worker state can live longer than a single job
Spawning per job is often fine, but sometimes you want stronger structure:
- you want to cap resource usage
- you want each worker to reuse expensive state
- you want shutdown to mean "finish what started, then stop"
This crate is aimed at those cases.
This is the most important mental model in the crate.
Your Work value is cloned once per worker. That means:
- fields inside
selfare worker-local - values passed in
Inputcan be shared across workers
Use worker-local state for things like:
- a connection owned by one worker
- a small cache private to one worker
- a counter used only for that worker's own behavior
Use shared input state for things like:
- a common accumulator
- a shared queue or map
- data that must be seen consistently by all workers
If you need shared mutable state, pass something like Arc<Mutex<T>> in the
input rather than assuming &mut self is shared across the whole pool.
Backpressure just means the system has a natural place where it says "not yet."
Here, the pressure point is get().await. If all workers are busy, you wait
until one becomes available. That prevents the caller from piling up unlimited
active work.
Use send() when the caller only cares that the job was accepted by a worker.
Use run() when the caller needs a result back.
Conceptually:
send()is "please do this"run()is "please do this and tell me what happened"
Graceful shutdown means the pool stops taking new work but does not abandon work that already started.
In this crate:
drain()closes the front door- existing jobs continue to completion
wait()waits for all workers to leaveclose()is the full graceful shutdown path
This matters in real systems because "drop everything immediately" is often the wrong behavior for jobs that touch files, sockets, or external services.
The pool does not push jobs directly into a shared work queue. Instead, idle workers advertise availability back to the pool.
Flow:
- Each worker creates a fresh
oneshotchannel. - The worker sends the
oneshot::Senderinto the pool'smpscqueue. WorkerPool::get()receives one of those senders and returns aWorkerHandle.- The caller uses that handle to send either:
- an input only, for
send() - an input plus a result channel, for
run()
- an input only, for
- The worker executes the job, optionally sends back the result, and then advertises itself again.
That means the mpsc queue is a queue of available workers, not a queue of
jobs.
use std::future::Future;
use workers::{Work, WorkerPool};
#[derive(Clone)]
struct Doubler;
impl Work for Doubler {
type Input = u64;
type Output = u64;
fn run(&mut self, input: Self::Input) -> impl Future<Output = Self::Output> + Send {
async move { input * 2 }
}
}
#[tokio::main(flavor = "current_thread")]
async fn main() {
let mut pool = WorkerPool::new(Doubler, 4);
let worker = pool.get().await.expect("worker available");
let output = worker.run(21).await.expect("job completed");
assert_eq!(output, 42);
pool.close().await;
}Pool size is a policy decision, not a cosmetic parameter.
Reasonable starting points:
- CPU-heavy work: start near the number of CPU cores
- IO-heavy work: start somewhat higher, then measure
- expensive per-worker resources: keep the pool as small as those resources require
Questions to ask:
- how many jobs should run concurrently at most?
- does each worker hold scarce state, like a socket or file handle?
- is the bottleneck CPU, remote latency, or some external rate limit?
Practical advice:
- start small
- measure throughput and latency
- only increase size when there is evidence that more concurrency helps
A larger pool is not automatically better. It can increase contention, memory use, and pressure on downstream services.
This is the best fit for the crate.
Examples:
- one DB connection per worker
- one parser cache per worker
- one client object per worker
Put that state inside the Work implementation so each worker owns its own
copy.
Sometimes workers do independent work but update a common result. In that case,
pass shared state through Input.
Examples:
Arc<Mutex<Vec<_>>>Arc<Mutex<HashMap<_, _>>>Arc<AtomicUsize>
Keep the shared critical section small. If every job spends most of its time waiting on the same mutex, the pool size stops mattering much.
Use run() when the caller needs the result of each job.
This is a good fit for:
- transformations
- RPC-like units of work
- computations where the caller needs the output immediately
Use send() when the caller wants bounded concurrency but does not need a per-job
return value.
This is a good fit for:
- background writes
- event handling
- bounded side effects
It is not. Each worker has its own clone.
If you want truly shared mutable state, pass it through Input with Arc<_>.
Dropping the pool is allowed, but it is not a blocking shutdown path. If you
need deterministic cleanup, call close().await.
If the bottleneck is an external service, a larger pool can just create a larger traffic spike. Size the pool according to the downstream limit, not only local machine capacity.
That defeats the purpose of the pool. If every job serializes on one mutex, the effective concurrency becomes one.
get().await returning None means the pool is drained or all workers are
gone. That is a lifecycle signal, not a transient inconvenience.
pool.close().await;Use this when you are done submitting work and want the pool to finish in-flight jobs cleanly.
pool.drain();
// do other shutdown coordination here
pool.wait().await;Use this when shutdown has multiple phases and the pool is only one part of it.
while let Some(worker) = pool.get().await {
worker.send(job_source.next().await?).unwrap();
}This style makes the pool lifecycle explicit: once the pool is drained, the producer naturally stops.
The worker exits, the pool's live worker count is decremented, and waiters are notified. The pool can continue operating with the remaining workers.
If every worker dies, get().await eventually returns None.
Dropping a WorkerHandle without calling send() or run() is allowed. The
worker notices the dropped oneshot and advertises itself again.
This crate deliberately does not provide:
- automatic job retries
- dynamic worker resizing
- cancellation of jobs already running
- job prioritization
- a multi-producer shared job queue API
Those can be built around the pool if needed, but they are not part of its core contract.
Implement Work to define what a worker does:
pub trait Work: Send + Sync + Clone + 'static {
type Input: Send + 'static;
type Output: Send + 'static;
fn run(&mut self, input: Self::Input) -> impl Future<Output = Self::Output> + Send;
}Important detail: each worker receives its own clone of the Work
implementation. Mutating &mut self changes that worker-local clone only.
Shared state should be passed through Input, usually inside Arc<_>.
WorkerPool::new(work, size)createssizeworkersget().awaitreturnsOption<WorkerHandle<W>>worker_count()reports how many workers are still aliveis_drained()reports whether the pool still accepts workdrain()closes the pool to new workwait().awaitwaits for all workers to exitclose().awaitperforms graceful shutdown
send(job)queues work and returns immediatelyrun(job).awaitqueues work and waits for the output
Handles are single-use. Dropping a handle without sending work is valid; the worker detects that and re-registers itself with the pool.
- Workers live for the lifetime of the pool
- Each worker processes at most one job at a time
- After finishing a job, the worker becomes available again
drain() means:
- no new
get()calls can succeed after buffered worker slots are exhausted - workers currently running a job are allowed to finish
- idle workers exit once they observe shutdown
After drain(), get().await returns None.
wait() only waits for worker exit. It does not itself initiate shutdown. In
practice you usually call:
pool.drain();
pool.wait().await;or just:
pool.close().await;Dropping WorkerPool triggers drain() but does not block waiting for workers.
If you need deterministic shutdown, call close().
mpsc::Receiver<WorkerSlot<W>>in the pool: receives availability signals from idle workersWorkerSlot<W> = oneshot::Sender<Task<W>>: a single-use channel to send one task to one workerTask<W>: the input plus an optional result senderAtomicUsize: tracks how many workers are still aliveNotify: wakes waiters when workers exit
The queue holds idle worker slots. At most size workers can be idle at once,
so mpsc::channel(size) is the natural bound. A smaller buffer can create
artificial contention among workers trying to announce availability.
Once a worker advertises availability, it waits for either:
- a task to arrive on its private
oneshotreceiver - the pool receiver to close
This is implemented with:
tokio::select! {
biased;
result = rx => result,
_ = self.sender.closed() => break,
}This matters for shutdown. If the pool closes while a worker is waiting for a
job, sender.closed() lets that worker break out instead of waiting forever.
The biased branch order preserves in-flight work when both events become ready
at the same time.
The current shutdown logic addresses three distinct failure modes:
- Buffered worker slots retained by the
mpscqueue could keeponeshotsenders alive and leave workers blocked forever.drain()fixes this by callingclose()on the receiver and thentry_recv()in a loop to drop any buffered worker slots. - A waiter could miss the final worker-exit notification if it checked the
atomic count before registering for wakeup.
wait()fixes this by enabling theNotifywaiter before checkingworker_count. - A worker could race with
drain()and manage to enqueue its worker slot after the receiver was closed and after the pool finished draining buffered slots. Waiting onsender.closed()inside the worker breaks that cycle.
Each worker task owns a WorkerGuard. If the worker panics, the guard still
drops during unwinding, which:
- decrements the worker count
- notifies any task waiting in
wait()
That keeps pool shutdown and worker_count() accurate even when workers die.
The project is driven by the Makefile:
make build
make test
make test-release
make check
make stressTarget summary:
make buildbuilds the cratemake testruns the test suitemake test-releaseruns tests in release modemake checkruns the main verification set used in this repomake bench-buildcompiles benches without starting Criterionmake stressruns the release stress examplemake versionprints the current release tagmake ciruns the local equivalent of the GitHub Actions pipelinemake releaseruns checks, commits release metadata, and creates the git tagmake cleanremoves build artifacts
The repo also includes:
examples/stress.rsfor repeated lifecycle stress testingbenches/pool_bench.rsfor Criterion benchmarks
GitHub Actions runs on every push to main and on every pull request.
The workflow currently enforces:
- formatting with
make fmt-check - linting with
make clippy - verification with
make check
For local parity before pushing, run:
make ciSee LICENSE.