qbp offers a lot of options and featues around asychronous processing.
Reach out on Twitter or GitHub. Let me know if you like it or have questions.
- qbp - queue, batch, process
await qbp(items, (item) => each(item));
This is the core of qbp's functionality. It will loop through items
and will concurrently pass every item to your each
function and await its completion.
qbp returns an object with a few attributes:
queue
object (which is explained below)completed
which is an array of successfully completed itemserrors
which is an array of objects with anerror
attribute and anitem
attribute.- These are items that threw an error in the
each
function. Theerror
attribute is the error that was thrown.
- These are items that threw an error in the
var { queue, completed, errors } = await qbp(items, (item) => each(item));
You can obviously forego await
for then()
.
qbp(items, (item) => each(item))
.then(({ queue, completed, errors }) => nextStep());
The constructor breaks down like this:
qbp([array,] [function,] [options])
Pro Tip: The way I'm parsing parameters allows you to enter these in any order. It just looks for an array, a function, and an object. You can also choose not to add any of the parameters.
The queue object is returned by qbp as well as passed to all of the functions that qbp calls. The queue lets you interact with the process as it is running. Here are the attributes and functions of the queue object.
You can set a name
option when calling qbp
. If you set the name
option, this attribute will have that value. This can be useful when you have multiple queues you've created and maybe some of them are using the same progress
function, for example.
You can call this function if you want to remove all items yet to be processed from the queue.
qbp((...args) => each(...args), {
error: (...args) => handleError(...args)
})
function handleError(err, item, { queue, setStatus }) => {
// Maybe you want to end all processing when an error occurs.
queue.empty()
}
Call this function to halt processing without removing items from the queue like queue.empty()
does.
Start processing the queue again after being paused.
Pass this function one or an array of items to add them to the queue. You can await
this function to wait for the items to finish processing before continuing. Keep in mind that the function expects an array passed in to be an array of items to be processed. If the item you want processed is an array, you would want to wrap it in another array before adding it.
var { queue } = qbp((...args) => each(...args))
async function processNewItems(newItems) {
var { queue, completed, errors } = await queue.add(newItems)
for (var i = 0; i < completed.length; i++) {
var successfulItem = completed[i]
// The completed property returned has an array of items that were successfully processed.
}
for (var i = 0; i < errors.length; i++) {
var { error, item } = errors[i]
// The errors property returned has an array of objects with an error property and an item property. These items threw errors somewhere in the each function.
}
}
You can pass or even change the each
function for the queue at any time by calling queue.each()
.
var { queue } = qbp(items)
var { completed, errors } = await queue.each((item) => {
// Do something with the item
})
Pass this function an integer to set how many threads you want the queue to process simultaneously.
You can tell the queue to start rate limiting at any time by calling this function and passing in your rate limit options. See Rate Limiting.
Call this to stop rate limiting.
This is an object containing attributes: items
for a total count of items added to the queue, complete
for total number of items completed, threads
for now many threads are currently running simultaenously, queued
for how many items have yet to be processed, and batch
which is the current batch size.
You can setup a progress
function to get called to inform your users how the processing is going.
function start () {
await qbp(items, (...args) => each(...args), {
progress: (...args) => progressUpdate(...args)
})
}
function progressUpdate (update) {
var {
total, // Total number of items that have been added to the queue
complete, // Number of items have completed the queue.
queued, // Number of items currently in the queue.
percent, // Percentage of completion.
threads, // Number of threads currently running.
itemsPerSecond, // Average rate of items per second since the last time a progress update was sent.
secondsRemaining, // An estimate of how many seconds are left based on itemsPerSecond. This will be -1 if there is no current estimate.
batch, // The current batch size. (See Batching section below)
dateTime, // Date object for when this was sent out.
statuses, // The statuses of items being processed (see Item Statuses section below)
queue // The queue object
} = update
}
With this configuration the progress
function will be called basically everytime there is a change. But if you would rather just get updates on a regular interval, you can set the progressInterval
option.
await qbp(items, (...args) => each(...args), {
progress: (...args) => progressUpdate(...args),
progressInterval: 5000
})
This will have the progress function called every 5 seconds. It will also be called when the queue is empty regardless of the interval.
Click here for live example.There are a few options for limiting how many items can be processed simultaenously. The simpliest is by limiting the number of threads that can run concurrently.
await qbp(items, (...args) => each(...args), {
threads: 2
})
By default, qbp will process all of your items concurrently. But if that's too much all at once, setting the threads
option will limit how many items are processed concurrently, in this case only 2 items will be processed at the same time.
If you need to limit processing your items by a number of items within a timeframe then this should help. Rate limiting adjusts the number of threads running simultaneously and makes sure each item takes a minimum amount of time to process in order to meet limit expectations.
In the options object you pass to qbp
you can set:
rateLimit
which is the maximum number of items allowed to process within the a time frame.rateLimitSeconds
which is that timeframe in seconds.rateUpdate
which is a function that can get called with updates every time the rate limit check runs.rateFidelity
which is how often you want the rate limiting check to run withinrateLimitSeconds
.- So a
rateFidelity
of 16 (which is the default value) withrateLimitSeconds
set to 3600 would check and adjust the rate limiting every 225 seconds, running 16 times in 3600 seconds.
- So a
At any time you can programmatically add rate limiting to a queue by calling queue.rateLimit()
and passing in the rateLimit
, rateLimitSeconds
, and rateFidelity
values.
Also, at any time you can stop rate limiting by calling queue.stopRateLimit()
.
Important note! Rate limiting hijacks your
threads
setting and automatically adjusts it to get the most items processed while remaining within your limits.
// Create a queue with an each function, the maximum number of threads you want to run at a time, and an empty function.
var { queue } = qbp((item) => each(item), {
rateLimit: 500, // Need rateLimit and rateLimitSeconds for rate limiting to work, all other options are not necessary.
rateLimitSeconds: 100,
rateFidelity: 20,
rateLimitOnEmpty: true, // If you're expecting items to constantly be getting added to your queue and have it sporadically be empty then this is a good option to have on for accuracy.
rateUpdate: (update) => rateUpdate(upate),
empty: () => itemsEmpty() });
async function processNewItems(newItems) {
// Add the new items to be processed. It'll restart the queue if it's already been empty.
queue.add(newItems);
}
async function each(item) {
// Do something with the item.
}
function rateUpdate(update) {
var {
queue, // The queue object.
projectedCount, // The number of items we're projecting to process at the current rate by next update.
projectedRate, // The items per second rate we're projecting by next process
minimumThreadTime, // The minimum amount of time a thread is allowed to run to meet rate limiting expectations, in milliseconds.
currentThreads, // The current number of threads running.
targetThreads, // The number of threads we're adjusting to.
currentRatePerSecond, // The number of items we processed per second since the last update.
currentThreadRate // The number of items per second each thread is averaging.
} = update
}
function itemsEmpty() {
// This gets called every time the queue runs out of items.
}
Need to get a few items at a time? Use the batch
option.
async function start() {
var { queue } = await qbp(items, (batch) => each(batch), { batch: 5 }); // Set your batch size in the options.
}
// Instead of a single item, you recieve an array of items.
async function each(batch) {
var ids = [];
for (var i = 0; i < batch.length; i++) {
ids.push(batch[i].id);
}
await httpSvc.getDetails(ids);
}
This is meant to be a system for easily creating a queued process that runs in the background but will provide updates as it progresses. Think of a resource intensive process, like crawling a website. You want to limit how many of these scans can happen simultaneously, and maybe you want to display a queue of the user's requests that updates as their requests progress.
var { queue } = qbp((...args) => handleItem(...args), {
threads: 5,
progress: (...args) => progressUpdates(...args)
})
async function processItem (newItem) {
await queue.add(newItem)
}
async function handleItem (item, { setStatus }) {
// You can give this item a status and make
// the status anything you want, object or string.
setStatus({
name: item.name,
process: 'Preparing',
percent: 0.33,
time: new Date()
})
await _db.insert(item)
if (checkSomething) {
await migrateItem(item, setStatus)
} else {
await doSomethingElse(item, setStatus)
}
}
async function migrateItem (item, setStatus) {
// You can change that status throughout the process
setStatus({
name: item.name,
process: 'Migrating',
percent: 0.66,
time: new Date()
})
await _db.move(item)
setStatus({
name: item.name,
process: 'Complete',
percent: 1,
time: new Date()
})
}
// If you haven't defined a progressInterval option, calling setStatus will trigger a progress update.
function progressUpdate ({ statuses }) {
console.log('Current Item Statuses:')
for (let i = 0; i < statuses.length; i++) {
const {
stage, // stage is set by qbp and will be 'queued', 'processing', 'complete', or 'error'.
status, // status is the status you set above. 'queued' items will not have a status set yet.
item // item is the actual item in the queue we're referring to.
} = statuses[i]
if (!status) continue
const {
name,
process,
percent,
time
} = status
}
}
I found myself nesting queues whenever I needed to loop through multiple arrays. So I added a mix
function to help with this. Here's what I was doing.
// Honestly, I'm not sure how you would effeciently cap threads on this.
await qbp(teachers, (teacher) => {
await qbp(classRooms, (classRoom) => {
await qbp(students, (student) => {
await addStudent(teacher, classRoom, student);
});
});
});
async function addStudent(teacher, classRoom, student) {
// No one likes to nest stuff.
}
So instead, now we can use the qbp.mix()
function.
// Now we can definitely cap the threads if we want.
await qbp.mix([teachers, classRooms, students], (...args) => addStudent(...args), { threads: 5 });
async function addStudent(teacher, classRoom, student, { queue }) {
// The parameters mirror the same order you gave them to qbp.
}
One thing to keep in mind. You only have one
queue
object using this. Thequeue.add()
function won't perform the mixing functionality that you get when you pass it in to themix()
function. But if you don't need to add any more items while processing, then this works perfect.
You can provide an error
function in qbp's options to be called if any items throw an error during their processing.
await qbp(items, (item) => each(item), {
error: (...errInfo) => onError(...errInfo) });
functon onError(error, item, { queue, setStatus }) {
// If you're using qbp.mix() then you'll get arguments spread out, such as (error, item1, item2, item3, { queue, setStatus })
}
Whenever you add items to the queue, whether it is when creating the queue or adding them with queue.add()
you will recieve an array of items that completed succesfully and an array of items that had errors.
var { queue, completed, errors } = await qbp(items, (...args) => each(...args))
for (var i = 0; i < completed.length; i++) {
var successfulItem = completed[i]
// The completed property returned has an array of items that were successfully processed.
}
for (var i = 0; i < errors.length; i++) {
var { error, item } = errors[i]
// The errors property returned has an array of objects with an error property and an item property. These items threw errors somewhere in the each function.
}
You can set an empty
option as a function when calling qbp and it will get called whenever the queue has finished processing all of the items it has been given.
var { queue } = await qbp(items, (...args) => each(...args), {
empty: (...args) => onEmpty(...args)
})
function onEmpty (queue) {
// Do something when the queue is empty.
}
Run unit testing with npm run test
.