Concurrency primitives for TypeScript and JavaScript.
Concurrency in JavaScript, frankly, sucks.
This module is an effort to provide concurrency primitives for TypeScript/JavaScript that capture as much of the semantics of Go's channels as possible, while remaining idiomatic to the language.
I'll be iterating on this for a few weeks, in my spare time, with the goal of a production-ready module, which can be used any JS environment, including browsers.
There are many applications for concurrent programming, and (IMO) it's
something that anyone writing software should have a basic understanding of.
Concurrency is rarely trivial, it's often difficult to get right. By providing
a simple API that mitigates common pitfalls, ts-chan
aims to make concurrent
programming more accessible to JavaScript developers.
Patterns are a broad topic, but I intend to document several, with a side-by-side comparison to vanilla JS. Additionally, you'll find that many of Go's concurrency patterns apply directly, excepting (for example) those that leverage multiple CPUs.
See also:
Install or import the NPM package ts-chan
.
Supported platforms include Node.js, Deno, and browsers.
This module takes steps to mitigate the risk of microtask cycles. They remain, however, a real concern for any JavaScript program, that involves communication between concurrent operations. Somewhat more insidious than plain call cycles, as they are not visible in the call stack, it's important to know that promises and async/await operate on the microtask queue, unless they wait on something that operates on the macrotask queue (e.g. IO, timers).
While queueMicrotask
is not used by this module, MDN's
Using microtasks in JavaScript with queueMicrotask()
guide is both informative and relevant.
The mitigation strategy used by this module is for high-level async methods (including Chan.send, Chan.recv, and Select.wait) to use getYieldGeneration and yieldToMacrotaskQueue like:
const exampleHighLevelAsyncMethod = async () => {
// ...
const yieldGeneration = getYieldGeneration();
const yieldPromise = yieldToMacrotaskQueue();
try {
// ...
return await result;
} finally {
if (getYieldGeneration() === yieldGeneration) {
await yieldPromise;
}
}
};
The above is a simple (albeit unoptimised) pattern which ensures that, so long as one side calls one of these methods, the event loop will not block. This solution does have some performance impact, and does not completely mitigate the risk, but seems a reasonable compromise.
To facilitate arbitrary implementations, this module defines a protocol for
implementation of channels, modelled as Sender and
Receiver. This protocol is styled after JavaScript's
iteration protocols,
though it differs in that the outer types, Sendable and
Receivable (analogues to Iterable
), are not intended to
support statefulness independently of the channel (no return
analogue).
This documentation is a work in progress, so, for now, it may be easiest to peruse src/protocol.ts.
The Chan class is a reference implementation of the channel protocol. It has full support for the protocol, Go-like channel close semantics, and supports buffered channel semantics.
Unlike Go's channel, all sends and receives are processed in FIFO order, allowing it to function as a queue, if desired. Also provided are a number of convenience methods and properties, that are not part of the core protocol, but are useful for common use cases.
See the API documentation for more details.
The absence of an analogue to Go's
select statement
would limit the usefulness of channels, as the select statement is Go's key to
implementing "reactive" software.
The Select class provided by this module is intended to fill that
gap, and is modelled after Go's select
statement, particularly regarding the
semantics of receiving and sending. This class utilizes the "channel protocol"
(as defined by this module). AbortSignal
is fully supported, and (can)
function equivalently to including a case <-ctx.Done():
, in Go. Promises are
also supported, though they have no analogue, in Go.
- Random Selection: Just as Go's
select
statement picks one communicative operation using a uniform pseudo-random selection (if more than one is immediately available), theSelect
class does so too. - Blocking Behavior: In Go, if no communication can proceed and there's
no default case, the
select
statement blocks. Similarly, theSelect
class'swait
method will also block until a case is ready. - Default Case Equivalence: The
poll
method in theSelect
class serves a similar purpose as thedefault
case in Go'sselect
statement. If no case is ready,poll
will returnundefined
, offering a non-blocking alternative. - Case Evaluation Order: (Optional, see below) The SelectFactory class may be used to evaluate senders, receivers, and values (to send), in source order, and is intended to be used within loops and similar. Use of this class may avoid unnecessary recreation of select cases, on each iteration, with the caveat that it does not (currently) support promises.
- Return Value from
wait
andpoll
: TheSelect
class'swait
method returns a promise that resolves with the index of the next ready case. Thepoll
method, on the other hand, returns the index directly orundefined
. In contrast, Go'sselect
statement does not return values in this manner. This is a mechanism used to provide type support. - Operation to "receive" value: Once a receive case is ready, in
the
Select
class, the result must be explicitly retrieved using therecv
method, which must be provided with the case which is ready. This contrasts with Go, where the received value is directly assigned in the case clause. Again, this is a part of the mechanism used to provide type support. - Limited default value support: Nil channels have not analogue in TS/JS. Additionally, while receiving a "default value" (on receive from a closed channel) is a supported part of the channel protocol, it's not required, and has no (type-based) mechanism to describe whether the channel supports it, or not.
- Case Evaluation Order: This is an interesting topic, and I found that
Go's behavior was not exactly what I expected. For simplicity, this
functionality was omitted from
Select
, and is provided bySelectFactory
, instead. For reference, from the Go spec:For all the cases in the statement, the channel operands of receive operations and the channel and right-hand-side expressions of send statements are evaluated exactly once, in source order, upon entering the "select" statement. The result is a set of channels to receive from or send to, and the corresponding values to send. Any side effects in that evaluation will occur irrespective of which (if any) communication operation is selected to proceed. Expressions on the left-hand side of a RecvStmt with a short variable declaration or assignment are not yet evaluated.
- Chan
- ChanIterator
- ChanAsyncIterator
- Receiver
- ReceiverCallback
- Receivable
- getReceiver
- Sender
- SenderCallback
- Sendable
- getSender
- SendOnClosedChannelError
- CloseOfClosedChannelError
- SelectCase
- SelectCaseSender
- SelectCaseReceiver
- SelectCasePromise
- recv
- send
- wait
- Select
- SelectFactory
- getYieldGeneration
- yieldToMacrotaskQueue
Provides a communication mechanism between two or more concurrent operations.
In addition to various utility methods, it implements:
- Sendable and Sender (including Sender.close).
- Receivable and Receiver
- Iterable (see also ChanIterator)
- AsyncIterable (see also ChanAsyncIterator)
capacity
(optional, default0
)newDefaultValue
function (): T?
If set to true, the channel will skip the microtask cycle mitigation mechanism, described by The microtask queue: a footgun, in the project README.
Defaults to false.
See also .setUnsafe.
Type: boolean
Returns the maximum number of items the channel can buffer.
Type: number
Returns number
Returns the number of items in the channel buffer.
Type: number
Returns number
Returns an integer representing the number of blocking operations. Positive values indicate senders, while negative values indicate receivers.
Type: number
Returns number
Sets the .unsafe property, and returns this.
unsafe
boolean
Returns this
Performs a synchronous send operation on the channel, returning true if it succeeds, or false if there are no waiting receivers, and the channel is full.
Will throw SendOnClosedChannelError if the channel is closed.
value
T
Returns boolean
Sends a value to the channel, returning a promise that resolves when it has been received, and rejects on error, or on abort signal.
value
Tabort
AbortSignal?
Returns Promise<void>
Like trySend, this performs a synchronous recv operation on the channel, returning undefined if no value is available, or an iterator result, which models the received value, and whether the channel is open.
Returns (IteratorResult<T, (T | undefined)> | undefined)
Receives a value from the channel, returning a promise that resolves with an iterator (the value OR indicator that the channel is closed, possibly with a default value), or rejects on error, or on abort signal.
abort
AbortSignal?
Returns Promise<IteratorResult<T, (T | undefined)>>
Closes the channel, preventing further sending of values.
See also Sender and Sender.close, which this implements.
- Once a channel is closed, no more values can be sent to it.
- If the channel is buffered and there are still values in the buffer when the channel is closed, receivers will continue to receive those values until the buffer is empty.
- Attempting to send to a closed channel will result in an error and unblock any senders.
- If the channel is already closed, calling
close
again will throw a CloseOfClosedChannelError. - This method should be used to signal the end of data transmission or prevent potential deadlocks.
- Throws CloseOfClosedChannelError When attempting to close a channel that is already closed.
- Throws Error When an error occurs while closing the channel, and no other specific error is thrown.
Returns void
Iterates on all available values. May alternate between returning done and not done, unless ChanIterator.return or ChanIterator.throw are called.
Only the type is exported - may be initialized only performing an
iteration on a Chan instance, or by calling
chan[Symbol.iterator]()
.
chan
Chan<T>
Returns this.
Returns Iterator<T>
Next iteration.
Returns IteratorResult<T>
Ends the iterator, which is an idempotent operation.
Returns IteratorResult<T>
Ends the iterator with an error, which is an idempotent operation.
e
any?
Returns IteratorResult<T>
Iterates by receiving values from the channel, until it is closed, or the ChanAsyncIterator.return or ChanAsyncIterator.throw methods are called.
Only the type is exported - may be initialized only performing an async
iteration on a Chan instance, or by calling
chan[Symbol.asyncIterator]()
.
chan
Chan<T>
Returns this.
Returns AsyncIterator<T>
Next iteration.
Returns Promise<IteratorResult<T>>
Ends the iterator, which is an idempotent operation.
Returns Promise<IteratorResult<T>>
Ends the iterator with an error, which is an idempotent operation.
e
any?
Returns Promise<IteratorResult<T>>
Receiver allows callers to receive values. It uses a one-off callback that models what is going to receive the value.
Unlike Iterator, it is not intended to support statefulness - a Receivable should return equivalent (but not necessarily identical) Receiver instances on each call to getReceiver.
The addReceiver and removeReceiver methods are low-level constructs, and, in most scenarios, should not be called directly. When using these methods, consider the impact of cycles, particularly microtask cycles, and ways to mitigate them. See also getYieldGeneration and yieldToMacrotaskQueue.
Type: {addReceiver: function (callback: ReceiverCallback<T>): boolean, removeReceiver: function (callback: ReceiverCallback<T>): void}
addReceiver
function (callback: ReceiverCallback<T>): booleanremoveReceiver
function (callback: ReceiverCallback<T>): void
Add a receiver callback to a list of receivers, or call it immediately if there is an available sender. Returns true if the receiver was called added to the receiver list. Returns false if the receiver was called immediately.
Type: function (callback: ReceiverCallback<T>): boolean
Immediately removes the receiver from the receiver list, if it is there.
To facilitate "attempting synchronous receive", this method MUST only remove the last matching occurrence of the callback, if it exists.
Type: function (callback: ReceiverCallback<T>): void
ReceiverCallback is a callback that receives a value from a sender and true, or a default value (or undefined if unsupported), and false, if the channel is closed.
Type: function (...([T, true
] | [(T | undefined), false
])): void
Receivable is a value that can be converted to a Receiver.
Type: {getReceiver: function (): Receiver<T>}
getReceiver
function (): Receiver<T>
See Receivable.
Sender allows callers to send values. It uses a one-off callback that models what is going to send the value.
Unlike Iterator, it is not intended to support statefulness - a Sendable should return equivalent (but not necessarily identical) Sender instances on each call to getSender.
See also SendOnClosedChannelError, which SHOULD be raised on addSender (if closed on add) or passed into send callbacks (otherwise), when attempting to send on a closed channel.
The addSender and removeSender methods are low-level constructs, and, in most scenarios, should not be called directly. When using these methods, consider the impact of cycles, particularly microtask cycles, and ways to mitigate them. See also getYieldGeneration and yieldToMacrotaskQueue.
Type: {addSender: function (callback: SenderCallback<T>): boolean, removeSender: function (callback: SenderCallback<T>): void, close: function (): void?}
addSender
function (callback: SenderCallback<T>): booleanremoveSender
function (callback: SenderCallback<T>): voidclose
function (): void?
Add a sender callback to a list of senders, or call it immediately if there is an available receiver. Returns true if the sender was added to the sender list. Returns false if the sender was called immediately. If the channel is closed, SHOULD throw SendOnClosedChannelError. If the channel is closed while the sender is waiting to be called, the sender SHOULD be called with SendOnClosedChannelError.
Type: function (callback: SenderCallback<T>): boolean
Immediately removes the sender from the sender list, if it is there.
To facilitate "attempting synchronous send", this method MUST only remove the last matching occurrence of the callback, if it exists.
Type: function (callback: SenderCallback<T>): void
Closes the channel, adhering to the following semantics similar to Go's channels:
- Once a channel is closed, no more values can be sent to it.
- If a channel is buffered, and there are still values in the buffer when the channel is closed, the receivers will continue to receive those values until the buffer is empty.
- It's the responsibility of the sender to close the channel, signaling to the receiver that no more data will be sent.
- Attempting to send to a closed channel MUST result in an error, and MUST un-block any such senders as part of said close.
- The error thrown when attempting to send on a closed channel SHOULD be SendOnClosedChannelError, but MAY be another error.
- Unless explicitly documented as idempotent,
close
SHOULD throw CloseOfClosedChannelError on subsequent calls, but MAY throw other errors. - Channels should be closed to prevent potential deadlocks or to signal the end of data transmission. This ensures that receivers waiting on the channel don't do so indefinitely.
Note: This method is optional. Some Sendable implementations may specify their own rules and semantics for closing channels. Always refer to the specific implementation's documentation to ensure correct usage and to prevent potential memory leaks or unexpected behaviors.
See also SendOnClosedChannelError and CloseOfClosedChannelError.
Type: function (): void
SenderCallback is called as a value is received, or when an error or some
other event occurs, which prevents the value from being received.
It accepts two parameters, an error (if any), and the boolean ok
,
indicating if the value has been (will be, after return) received.
It MUST return the value (or throw) if ok
is true, and SHOULD throw
err
if ok
is false.
The ok
parameter being true guarantees that a value (once returned) has
been received, though does not guarantee that anything will be done with it.
If the ok
parameter is false, the first parameter will contain any error,
and no value (regardless of what is returned) will be received.
Note: The sender callback is not called on removeSender
.
WARNING: If the same value (===) as err (when ok is false) is thrown, that thrown error will not be bubbled - a mechanism used to avoid breaking the typing of the return value.
Type: function (...([undefined, true
] | [any, false
])): T
Sendable is a value that can be converted to a Sender.
Type: {getSender: function (): Sender<T>}
getSender
function (): Sender<T>
See Sendable.
Extends Error
Provided as a convenience, that SHOULD be used by Sender implementations, to indicate that a channel is closed. Should be raised as a result of send attempts on a closed channel, where the send operation is not allowed to proceed.
args
...ConstructorParameters<any>
Extends Error
Provided as a convenience, that SHOULD be used by Sender implementations, in the event that a channel close is attempted more than once.
args
...ConstructorParameters<any>
SelectCase models the state of a single case in a Select.
WARNING: The selectState symbol is deliberately not exported, as the
value of SelectCase[selectState]
is not part of the API contract, and
is simply a mechanism to support typing.
Type: (SelectCaseSender<T> | SelectCaseReceiver<T> | SelectCasePromise<T>)
Sender select case. See also .send.
Type: {type: "Sender"
, selectState: CaseStateSender<T>}
type
"Sender"
selectState
CaseStateSender<T>
Type is provided to support type guards, and reflection-style logic.
Type: "Sender"
Receiver select case. See also .recv.
Type: {type: "Receiver"
, selectState: CaseStateReceiver<T>}
type
"Receiver"
selectState
CaseStateReceiver<T>
Type is provided to support type guards, and reflection-style logic.
Type: "Receiver"
Promise (or PromiseLike) select case. See also .wait.
Type: {type: "Promise"
, selectState: CaseStatePromise<T>}
type
"Promise"
selectState
CaseStatePromise<T>
Type is provided to support type guards, and reflection-style logic.
Type: "Promise"
Prepares a SelectCaseReceiver case, to be used in a Select.
WARNING: Cases may only be used in a single select instance, though select instances are intended to be reused, e.g. when implementing control loops.
from
(Receivable<T> | Receiver<T>)
Returns SelectCaseReceiver<T>
Prepares a SelectCaseSender case, to be used in a Select.
WARNING: Cases may only be used in a single select instance, though select instances are intended to be reused, e.g. when implementing control loops.
to
(Sendable<T> | Sender<T>) Target Sendable or Sender.expr
function (): T Expression to evaluate when sending. WARNING: Unlike Go, this is only evaluated when the case is selected, and only for the selected case. See the project README for more details.
Returns SelectCaseSender<T>
Prepares a SelectCasePromise case, to be used in a Select.
WARNING: Cases may only be used in a single select instance, though select instances are intended to be reused, e.g. when implementing control loops.
value
(PromiseLike<T> | T)
Returns SelectCasePromise<Awaited<T>>
Select implements the functionality of Go's select statement, with support for support cases comprised of Sender, Receiver, or PromiseLike, which are treated as a single-value never-closed channel.
See also promises, which is a convenience method for creating a select instance with promise cases, or a mix of both promises and other cases.
cases
Array<(SelectCase | Promise | any)> The cases to select from, which must be initialized using .send, .recv, unless they are to be treated as a promise.
If set to true, the select will skip the microtask cycle mitigation mechanism, described by The microtask queue: a footgun, in the project README.
Defaults to false.
See also .setUnsafe.
Type: boolean
Retrieves the cases associated with this select instance.
Each case corresponds to an input case (including order). After selecting a case, via Select.poll or Select.wait, received values may be retrieved by calling Select.recv with the corresponding case.
Type: any
Accessing a (typed) received value:
import {recv, Chan, Select} from 'ts-chan';
const ch1 = new Chan<number>();
const ch2 = new Chan<string>();
void sendsToCh1ThenEventuallyClosesIt();
void sendsToCh2();
const select = new Select([recv(ch1), recv(ch2)]);
for (let running = true; running;) {
const i = await select.wait();
switch (i) {
case 0: {
const v = select.recv(select.cases[i]);
if (v.done) {
running = false;
break;
}
console.log(`rounded value: ${Math.round(v.value)}`);
break;
}
case 1: {
const v = select.recv(select.cases[i]);
if (v.done) {
throw new Error('ch2 unexpectedly closed');
}
console.log(`uppercase string value: ${v.value.toUpperCase()}`);
break;
}
default:
throw new Error('unreachable');
}
}
Returns any T
Retrieves the number of the cases that are currently pending.
Will return the length of cases, less the number of promise cases that have been resolved and received (or ignored).
Type: number
Returns number
Returns all the original values of all pending promise cases (cases that haven't been consumed or ignored), in case order.
Type: Array<any>
Returns Array<any>
Sets the .unsafe property, and returns this.
unsafe
boolean
Returns this
Poll returns the next case that is ready, or undefined if none are ready. It must not be called concurrently with Select.wait or Select.recv.
This is effectively a non-blocking version of Select.wait, and
fills the same role as the default
select case, in Go's select
statement.
Wait returns a promise that will resolve with the index of the next case that is ready, or reject with the first error.
abort
AbortSignal?
Consume the result of a ready case.
v
SelectCase<T>
Returns IteratorResult<T, (T | undefined)>
Promises is a convenience method for creating a select instance with promise cases, or a mix of both promises and other cases.
Note that the behavior is identical to passing the same array to the constructor. The constructor's typing is more strict, to simplify implementations which encapsulate or construct select instances.
cases
T
Returns Select<any>
A wrapper of Select that's intended for use within loops, that allows the contents of select cases (but not the structure, namely the direction/type of communication) to be updated, and evaluated as expressions, in code order.
With the caveat that it does not support promises, this is the closest analogue to Go's select statement, provided by this module.
Clears references to values to send, receives and senders, but not the select cases themselves. Use cases include avoiding retaining references between iterations of a loop, if such references are not needed, or may be problematic.
WARNING: Must not be called concurrently with Select.wait (on the underlying instance for this factory). Calling this method then calling either Select.wait or Select.poll (prior to another with) may result in an error.
With should be to configure and retrieve (or initialize) the underlying Select instance.
Must be called with the same number of cases each time, with each case having the same direction.
cases
T
Returns Select<any>
Returns the current yield generation. This value is incremented on each yieldToMacrotaskQueue, which is a self-conflating operation.
See The microtask queue: a footgun, in the project README, for details on the purpose of this mechanism.
Returns number
Returns a promise which will resolve on the next iteration of the event loop. Intended to be used in concert with getYieldGeneration, this mechanism allows implementers to reduce the risk of the "footgun" that the microtask queue represents.
Calls to this function are self-conflating, meaning that if this function is called multiple times before the next iteration of the event loop, the same promise will be returned.
See The microtask queue: a footgun, in the project README, for details on the purpose of this mechanism.