Promise utilities designed for handling asynchronous operations and controlling throughput in JavaScript/TypeScript applications.
npm install prom-utilsLimits the concurrency of promises. This can be used to control how many requests are made to a server or API at once.
Note: Exceptions will be swallowed internally to prevent UnhandledPromiseRejection errors when promises reject before the limit is reached. Handle exceptions on a per-promise basis.
concurrency: number- Maximum number of concurrent promises (set toInfinityto disable)...rateLimiters: RateLimiter[]- One or more rate limiters (optional)
Each rate limiter is an object that extends ThroughputLimiterOptions:
interface RateLimiter extends ThroughputLimiterOptions {
/**
* Maximum throughput allowed (items/period)
*/
maxItemsPerPeriod: number
}Since rateLimit internally uses throughputLimiter, each rate limiter accepts all options from ThroughputLimiterOptions.
Below are the options for ThroughputLimiterOptions with the defaults used for rateLimit.
interface ThroughputLimiterOptions {
/**
* The period of time in ms to track the rate. Set to 60000 for 1 minute.
* Defaults to 1000, which is units/sec.
*/
period?: number
/**
* The minimum number of throttle invocations prior to checking the rate.
* Use this to allow for short bursts without throttling.
* Should be 1 or more. Defaults to 1.
*/
minWindowLength?: number
/**
* The maximum number of throttle invocations to hold in memory.
* Should be 1 or more. Defaults to maxItemsPerPeriod.
*/
maxWindowLength?: number
/**
* Expire throttle invocations after this many ms.
* Defaults to the period.
*/
expireAfter?: number
/**
* The timeframe to use for calculating the rate.
* Defaults to getTimeframeUsingPeriod.
*/
getTimeframe?: GetTimeframe
}{
/**
* Add a promise. Waits for one promise to resolve if limit is met or for
* throughput to drop below threshold if any rate limiters are configured.
* Optionally, set `bypass` to true to bypass async waiting.
*/
add: (prom: Promise<T>, options?: AddOptions) => Promise<void>
/**
* Wait for all promises to resolve
*/
finish: () => Promise<void>
/**
* Number of pending promises.
*/
length: number
/**
* Get current rate statistics for all rate limiters
*/
getStats: () => {
itemsPerPeriod: number[]
}
}Single rate limiter:
const limiter = rateLimit(5, { maxItemsPerPeriod: 75, period: 60000 }) // 5 concurrent, max 75 per minute
for (const url of urls) {
// Will wait for one promise to finish if limit is reached
await limiter.add(fetch(url))
}
// Wait for unresolved promises to resolve
await limiter.finish()Multiple rate limiters:
// Limit to 5 concurrent AND max 100 per second AND max 1000 per minute
const limiter = rateLimit(
5,
{ maxItemsPerPeriod: 100, period: 1000 },
{ maxItemsPerPeriod: 1000, period: 60000 }
)
for (const url of urls) {
// Will wait until all rate limiter constraints are satisfied
await limiter.add(fetch(url))
}
await limiter.finish()
// Get stats for all rate limiters
const stats = limiter.getStats()
console.log('Rates:', stats.itemsPerPeriod) // Array of rates for each limiterNo rate limiters (concurrency only):
const limiter = rateLimit(10) // Only limit concurrency to 10
for (const url of urls) {
await limiter.add(fetch(url))
}
await limiter.finish()Batches calls via a local queue. This can be used to accumulate values before writing to a database or making API calls.
fn: (arr: A[]) => B- Function to call with batched itemsoptions: QueueOptions- Configuration options
interface QueueOptions {
/**
* Wait for the batch to reach this number of elements before flushing the queue.
* Defaults to 500.
*/
batchSize?: number
/**
* Wait for the batch to reach this size in bytes before flushing the queue.
*/
batchBytes?: number
/**
* Wait this long in ms before flushing the queue.
*/
timeout?: number
/**
* Maximum throughput allowed (items/sec).
* Defaults to Infinity.
*/
maxItemsPerSec?: number
/**
* Maximum throughput allowed (bytes/sec).
* Defaults to Infinity.
*/
maxBytesPerSec?: number
}{
/**
* Call fn with the items in the queue.
*/
flush: () => Promise<void>
/**
* Add an item to the queue. When a queue condition is met flush will be called.
*/
enqueue: (item: A) => Promise<void>
/**
* The last result returned from calling fn.
*/
lastResult?: Awaited<B>
/**
* The cause for the last automatic queue flush. Will be one of:
* timeout, batchSize, or batchBytes.
*/
lastFlush?: LastFlush
/**
* Get the current throughput rates.
*/
getStats: () => QueueStats
/**
* Length of the queue.
*/
length: number
}const writeToDatabase = async (records) => {
// database write logic here
return { success: true }
}
const queue = batchQueue(writeToDatabase, {
batchSize: 250,
timeout: 5000, // also flush after 5 seconds
maxItemsPerSec: 1000, // limit to 1000 items per second
})
for (const record of records) {
await queue.enqueue(record)
}
// Call fn with remaining queued items
await queue.flush()
// Check statistics
console.log(queue.getStats())Batches calls via a local queue, similar to batchQueue but designed to be safe for concurrent access. This can be used to accumulate values before writing to a database or making API calls when you need to call it from multiple concurrent contexts.
Note: Unlike batchQueue, this function does not support timeout-based flushing or throughput limiting options. It only supports batchSize and batchBytes triggers.
fn: (arr: A[]) => unknown- Function to call with batched itemsoptions: QueueOptionsParallel- Configuration options
interface QueueOptionsParallel {
/**
* Wait for the batch to reach this number of elements before flushing the queue.
* Defaults to 500.
*/
batchSize?: number
/**
* Wait for the batch to reach this size in bytes before flushing the queue.
*/
batchBytes?: number
}{
/**
* Call fn with the items in the queue.
*/
flush: () => void
/**
* Add an item to the queue. When a queue condition is met flush will be called.
*/
enqueue: (item: A) => void
/**
* Length of the queue.
*/
length: number
}const writeToDatabase = (records) => {
// database write logic here
console.log(`Writing ${records.length} records`)
}
const queue = batchQueueParallel(writeToDatabase, {
batchSize: 250,
batchBytes: 1024 * 1024, // 1MB
})
// Safe to call from multiple concurrent contexts
await Promise.all(
records.map(async (record) => {
// This is safe to call concurrently
queue.enqueue(record)
})
)
// Call fn with remaining queued items
queue.flush()Limits throughput by sleeping until the rate (units/period) is less than the maximum limit. Units and period are intentionally abstract since they could represent requests/min, bytes/sec, etc.
maxUnitsPerPeriod: number- Maximum units allowed per periodoptions: ThroughputLimiterOptions- Configuration options
interface ThroughputLimiterOptions {
/**
* The period of time in ms to track the rate. Set to 60000 for 1 minute.
* Defaults to 1000, which is units/sec.
*/
period?: number
/**
* The minimum number of throttle invocations prior to checking the rate.
* Use this to allow for short bursts without throttling.
* Should be 1 or more. Defaults to 1.
*/
minWindowLength?: number
/**
* The maximum number of throttle invocations to hold in memory.
* Should be 1 or more. Defaults to 3.
*/
maxWindowLength?: number
/**
* Expire throttle invocations after this many ms.
* Defaults to Infinity.
*/
expireAfter?: number
/**
* The timeframe to use for calculating the rate.
* Two built-in options: getTimeframeUsingElapsed or getTimeframeUsingPeriod.
* Defaults to getTimeframeUsingElapsed.
*/
getTimeframe?: GetTimeframe
}{
/**
* Get the current rate (units/period).
*/
getCurrentRate: () => number
/**
* Sleep until the rate is below the maximum.
*/
throttle: () => Promise<void>
/**
* Add units to the sliding window.
*/
append: (numUnits: number) => void
/**
* Throttle first, then append.
*/
throttleAndAppend: (numUnits: number) => Promise<void>
/**
* Append first, then throttle.
*/
appendAndThrottle: (numUnits: number) => Promise<void>
}// Limit to at most 1000 items/sec
const limiter = throughputLimiter(1000)
for (const batch of batches) {
// Will wait until the rate is < maxUnitsPerPeriod
await limiter.throttleAndAppend(batch.length)
console.log('Current rate: %d items/sec', limiter.getCurrentRate())
}Creates a mechanism to pause and resume a loop. When pause is called, maybeBlock will return a promise that resolves when resume is called.
timeout?: number- Optional timeout in ms to auto-resume
{
/**
* Pause execution when maybeBlock is called.
*/
pause: () => void
/**
* Resume execution.
*/
resume: () => void
/**
* Call in your loop to potentially block execution.
*/
maybeBlock: () => Promise<void> | undefined
/**
* Whether currently paused.
*/
isPaused: boolean
}const shouldProcess = pausable()
// In some event handler or condition
onSomeCondition(() => shouldProcess.pause())
onSomeOtherCondition(() => shouldProcess.resume())
// In your processing loop
for (const record of records) {
await shouldProcess.maybeBlock()
await processRecord(record)
}Creates a deferred promise that resolves when done is called.
{
/**
* Resolves the promise when called.
*/
done: () => void
/**
* Promise that resolves when done() is called.
*/
promise: Promise<void>
}const delay = (milliseconds: number) => {
const deferred = defer()
setTimeout(deferred.done, milliseconds)
return deferred.promise
}
// Use the delay function
await delay(1000) // Wait 1 secondSleep for a specified time before resolving the promise.
time?: number- Time to sleep in ms, defaults to 0
Promise<void>- Resolves after the specified time
// Sleep for one second
await sleep(1000)Calls a heartbeat function at regular intervals until a promise resolves or rejects.
heartbeatFn: () => void- Function to call at intervalspromise: Promise<T>- Promise to wait forinterval?: number- Interval in ms, defaults to 1000
- The value from the resolved promise
const heartbeatFn = () => {
console.log('Still processing...')
}
const result = await pacemaker(heartbeatFn, longRunningOperation())Waits until the predicate returns a truthy value or the timeout expires.
pred: () => Promise<T> | T- Predicate function that returns any valueoptions: WaitOptions- Configuration options
interface WaitOptions {
/**
* Wait this long in ms before rejecting. Defaults to 5000 ms.
*/
timeout?: number
/**
* Check the predicate with this frequency. Defaults to 50 ms.
*/
checkFrequency?: number
}Promise<T>- Resolves with the truthy value returned by the predicate, rejects if timeout expires
// Wait until a value is returned from Redis
const result = await waitUntil(() => redis.get('someKey'), { timeout: 5000 })Returns the value of a promise if it resolves before a timeout, otherwise returns the exported TIMEOUT symbol.
prom: Promise<A>- Promise to racetimeout: number- Timeout in ms
Promise<A | typeof TIMEOUT>- Either the promise result or TIMEOUT symbol
const winner = await raceTimeout(someLongOperation(), 5000)
if (winner === TIMEOUT) {
console.log('Operation timed out')
} else {
console.log('Operation completed with result:', winner)
}Merges multiple async iterators into a single async iterator. The merged iterator will yield values as they become available from the input iterators. The order in which the iterators are checked is randomized to prevent consistently favoring the first iterator when multiple values are available simultaneously. If any of the input iterators throws an error, the merged iterator will throw an error. The merged iterator will terminate when all of the input iterators have terminated.
...iters: Array<AsyncIter<T>>- The async iterators or async iterables to merge
AsyncIterableIterator<T>- An async iterator that yields values from all input iterators
async function* numbers() {
yield 1
yield 2
yield 3
}
async function* letters() {
yield 'a'
yield 'b'
yield 'c'
}
// Merge multiple async iterators
for await (const value of multiplex(numbers(), letters())) {
console.log(value) // Will log values as they become available
}
// Can also be used with async iterables
const iterable = {
async *[Symbol.asyncIterator]() {
yield 'x'
yield 'y'
},
}
for await (const value of multiplex(numbers(), iterable)) {
console.log(value)
}The library exports two error classes:
OptionsError- Thrown when invalid options are providedTimeoutError- Thrown when an operation times out
Example:
import { TimeoutError, waitUntil } from 'prom-utils'
try {
await waitUntil(() => false, { timeout: 100 })
} catch (error) {
if (error instanceof TimeoutError) {
console.log('Timed out:', error.message)
} else {
throw error
}
}