Skip to content

Concurrency primitives for TypeScript and JavaScript.

License

Notifications You must be signed in to change notification settings

joeycumines/ts-chan

Repository files navigation

ts-chan

NPM Package GitHub Repo Code Style: Google

Concurrency primitives for TypeScript and JavaScript.

Introduction

Concurrency in JavaScript, frankly, sucks.

This module is an effort to provide concurrency primitives for TypeScript/JavaScript that capture as much of the semantics of Go's channels as possible, while remaining idiomatic to the language.

I'll be iterating on this for a few weeks, in my spare time, with the goal of a production-ready module, which can be used any JS environment, including browsers.

Why ts-chan?

There are many applications for concurrent programming, and (IMO) it's something that anyone writing software should have a basic understanding of. Concurrency is rarely trivial, it's often difficult to get right. By providing a simple API that mitigates common pitfalls, ts-chan aims to make concurrent programming more accessible to JavaScript developers.

Patterns are a broad topic, but I intend to document several, with a side-by-side comparison to vanilla JS. Additionally, you'll find that many of Go's concurrency patterns apply directly, excepting (for example) those that leverage multiple CPUs.

See also:

  1. /docs/pattern-fan-in.md

Usage

Installation

Install or import the NPM package ts-chan. Supported platforms include Node.js, Deno, and browsers.

The microtask queue: a footgun

This module takes steps to mitigate the risk of microtask cycles. They remain, however, a real concern for any JavaScript program, that involves communication between concurrent operations. Somewhat more insidious than plain call cycles, as they are not visible in the call stack, it's important to know that promises and async/await operate on the microtask queue, unless they wait on something that operates on the macrotask queue (e.g. IO, timers).

While queueMicrotask is not used by this module, MDN's Using microtasks in JavaScript with queueMicrotask() guide is both informative and relevant.

The mitigation strategy used by this module is for high-level async methods (including Chan.send, Chan.recv, and Select.wait) to use getYieldGeneration and yieldToMacrotaskQueue like:

const exampleHighLevelAsyncMethod = async () => {
  // ...
  const yieldGeneration = getYieldGeneration();
  const yieldPromise = yieldToMacrotaskQueue();
  try {
    // ...
    return await result;
  } finally {
    if (getYieldGeneration() === yieldGeneration) {
      await yieldPromise;
    }
  }
};

The above is a simple (albeit unoptimised) pattern which ensures that, so long as one side calls one of these methods, the event loop will not block. This solution does have some performance impact, and does not completely mitigate the risk, but seems a reasonable compromise.

Architecture

Protocol

To facilitate arbitrary implementations, this module defines a protocol for implementation of channels, modelled as Sender and Receiver. This protocol is styled after JavaScript's iteration protocols, though it differs in that the outer types, Sendable and Receivable (analogues to Iterable), are not intended to support statefulness independently of the channel (no return analogue).

This documentation is a work in progress, so, for now, it may be easiest to peruse src/protocol.ts.

Chan class

The Chan class is a reference implementation of the channel protocol. It has full support for the protocol, Go-like channel close semantics, and supports buffered channel semantics.

Unlike Go's channel, all sends and receives are processed in FIFO order, allowing it to function as a queue, if desired. Also provided are a number of convenience methods and properties, that are not part of the core protocol, but are useful for common use cases.

See the API documentation for more details.

Select class

The absence of an analogue to Go's select statement would limit the usefulness of channels, as the select statement is Go's key to implementing "reactive" software. The Select class provided by this module is intended to fill that gap, and is modelled after Go's select statement, particularly regarding the semantics of receiving and sending. This class utilizes the "channel protocol" (as defined by this module). AbortSignal is fully supported, and (can) function equivalently to including a case <-ctx.Done():, in Go. Promises are also supported, though they have no analogue, in Go.

Comparison to Go's select statement

Similarities
  1. Random Selection: Just as Go's select statement picks one communicative operation using a uniform pseudo-random selection (if more than one is immediately available), the Select class does so too.
  2. Blocking Behavior: In Go, if no communication can proceed and there's no default case, the select statement blocks. Similarly, the Select class's wait method will also block until a case is ready.
  3. Default Case Equivalence: The poll method in the Select class serves a similar purpose as the default case in Go's select statement. If no case is ready, poll will return undefined, offering a non-blocking alternative.
  4. Case Evaluation Order: (Optional, see below) The SelectFactory class may be used to evaluate senders, receivers, and values (to send), in source order, and is intended to be used within loops and similar. Use of this class may avoid unnecessary recreation of select cases, on each iteration, with the caveat that it does not (currently) support promises.
Differences
  1. Return Value from wait and poll: The Select class's wait method returns a promise that resolves with the index of the next ready case. The poll method, on the other hand, returns the index directly or undefined. In contrast, Go's select statement does not return values in this manner. This is a mechanism used to provide type support.
  2. Operation to "receive" value: Once a receive case is ready, in the Select class, the result must be explicitly retrieved using the recv method, which must be provided with the case which is ready. This contrasts with Go, where the received value is directly assigned in the case clause. Again, this is a part of the mechanism used to provide type support.
  3. Limited default value support: Nil channels have not analogue in TS/JS. Additionally, while receiving a "default value" (on receive from a closed channel) is a supported part of the channel protocol, it's not required, and has no (type-based) mechanism to describe whether the channel supports it, or not.
  4. Case Evaluation Order: This is an interesting topic, and I found that Go's behavior was not exactly what I expected. For simplicity, this functionality was omitted from Select, and is provided by SelectFactory, instead. For reference, from the Go spec:

    For all the cases in the statement, the channel operands of receive operations and the channel and right-hand-side expressions of send statements are evaluated exactly once, in source order, upon entering the "select" statement. The result is a set of channels to receive from or send to, and the corresponding values to send. Any side effects in that evaluation will occur irrespective of which (if any) communication operation is selected to proceed. Expressions on the left-hand side of a RecvStmt with a short variable declaration or assignment are not yet evaluated.

API

Table of Contents

Chan

Provides a communication mechanism between two or more concurrent operations.

In addition to various utility methods, it implements:

Parameters

  • capacity (optional, default 0)
  • newDefaultValue function (): T?

unsafe

If set to true, the channel will skip the microtask cycle mitigation mechanism, described by The microtask queue: a footgun, in the project README.

Defaults to false.

See also .setUnsafe.

Type: boolean

capacity

Returns the maximum number of items the channel can buffer.

Type: number

Returns number

length

Returns the number of items in the channel buffer.

Type: number

Returns number

concurrency

Returns an integer representing the number of blocking operations. Positive values indicate senders, while negative values indicate receivers.

Type: number

Returns number

setUnsafe

Sets the .unsafe property, and returns this.

Parameters

Returns this

trySend

Performs a synchronous send operation on the channel, returning true if it succeeds, or false if there are no waiting receivers, and the channel is full.

Will throw SendOnClosedChannelError if the channel is closed.

Parameters
  • value T

Returns boolean

send

Sends a value to the channel, returning a promise that resolves when it has been received, and rejects on error, or on abort signal.

Parameters
  • value T
  • abort AbortSignal?

Returns Promise<void>

tryRecv

Like trySend, this performs a synchronous recv operation on the channel, returning undefined if no value is available, or an iterator result, which models the received value, and whether the channel is open.

Returns (IteratorResult<T, (T | undefined)> | undefined)

recv

Receives a value from the channel, returning a promise that resolves with an iterator (the value OR indicator that the channel is closed, possibly with a default value), or rejects on error, or on abort signal.

Parameters
  • abort AbortSignal?

Returns Promise<IteratorResult<T, (T | undefined)>>

close

Closes the channel, preventing further sending of values.

See also Sender and Sender.close, which this implements.

  • Once a channel is closed, no more values can be sent to it.
  • If the channel is buffered and there are still values in the buffer when the channel is closed, receivers will continue to receive those values until the buffer is empty.
  • Attempting to send to a closed channel will result in an error and unblock any senders.
  • If the channel is already closed, calling close again will throw a CloseOfClosedChannelError.
  • This method should be used to signal the end of data transmission or prevent potential deadlocks.
  • Throws CloseOfClosedChannelError When attempting to close a channel that is already closed.
  • Throws Error When an error occurs while closing the channel, and no other specific error is thrown.

Returns void

ChanIterator

Iterates on all available values. May alternate between returning done and not done, unless ChanIterator.return or ChanIterator.throw are called.

Only the type is exported - may be initialized only performing an iteration on a Chan instance, or by calling chan[Symbol.iterator]().

Parameters

iterator

Returns this.

Returns Iterator<T>

next

Next iteration.

Returns IteratorResult<T>

return

Ends the iterator, which is an idempotent operation.

Returns IteratorResult<T>

throw

Ends the iterator with an error, which is an idempotent operation.

Parameters
  • e any?

Returns IteratorResult<T>

ChanAsyncIterator

Iterates by receiving values from the channel, until it is closed, or the ChanAsyncIterator.return or ChanAsyncIterator.throw methods are called.

Only the type is exported - may be initialized only performing an async iteration on a Chan instance, or by calling chan[Symbol.asyncIterator]().

Parameters

asyncIterator

Returns this.

Returns AsyncIterator<T>

next

Next iteration.

Returns Promise<IteratorResult<T>>

return

Ends the iterator, which is an idempotent operation.

Returns Promise<IteratorResult<T>>

throw

Ends the iterator with an error, which is an idempotent operation.

Parameters
  • e any?

Returns Promise<IteratorResult<T>>

Receiver

Receiver allows callers to receive values. It uses a one-off callback that models what is going to receive the value.

Unlike Iterator, it is not intended to support statefulness - a Receivable should return equivalent (but not necessarily identical) Receiver instances on each call to getReceiver.

The addReceiver and removeReceiver methods are low-level constructs, and, in most scenarios, should not be called directly. When using these methods, consider the impact of cycles, particularly microtask cycles, and ways to mitigate them. See also getYieldGeneration and yieldToMacrotaskQueue.

Type: {addReceiver: function (callback: ReceiverCallback<T>): boolean, removeReceiver: function (callback: ReceiverCallback<T>): void}

Properties

addReceiver

Add a receiver callback to a list of receivers, or call it immediately if there is an available sender. Returns true if the receiver was called added to the receiver list. Returns false if the receiver was called immediately.

Type: function (callback: ReceiverCallback<T>): boolean

removeReceiver

Immediately removes the receiver from the receiver list, if it is there.

To facilitate "attempting synchronous receive", this method MUST only remove the last matching occurrence of the callback, if it exists.

Type: function (callback: ReceiverCallback<T>): void

ReceiverCallback

ReceiverCallback is a callback that receives a value from a sender and true, or a default value (or undefined if unsupported), and false, if the channel is closed.

Type: function (...([T, true] | [(T | undefined), false])): void

Receivable

Receivable is a value that can be converted to a Receiver.

Type: {getReceiver: function (): Receiver<T>}

Properties

getReceiver

See Receivable.

Sender

Sender allows callers to send values. It uses a one-off callback that models what is going to send the value.

Unlike Iterator, it is not intended to support statefulness - a Sendable should return equivalent (but not necessarily identical) Sender instances on each call to getSender.

See also SendOnClosedChannelError, which SHOULD be raised on addSender (if closed on add) or passed into send callbacks (otherwise), when attempting to send on a closed channel.

The addSender and removeSender methods are low-level constructs, and, in most scenarios, should not be called directly. When using these methods, consider the impact of cycles, particularly microtask cycles, and ways to mitigate them. See also getYieldGeneration and yieldToMacrotaskQueue.

Type: {addSender: function (callback: SenderCallback<T>): boolean, removeSender: function (callback: SenderCallback<T>): void, close: function (): void?}

Properties

addSender

Add a sender callback to a list of senders, or call it immediately if there is an available receiver. Returns true if the sender was added to the sender list. Returns false if the sender was called immediately. If the channel is closed, SHOULD throw SendOnClosedChannelError. If the channel is closed while the sender is waiting to be called, the sender SHOULD be called with SendOnClosedChannelError.

Type: function (callback: SenderCallback<T>): boolean

removeSender

Immediately removes the sender from the sender list, if it is there.

To facilitate "attempting synchronous send", this method MUST only remove the last matching occurrence of the callback, if it exists.

Type: function (callback: SenderCallback<T>): void

close

Closes the channel, adhering to the following semantics similar to Go's channels:

  • Once a channel is closed, no more values can be sent to it.
  • If a channel is buffered, and there are still values in the buffer when the channel is closed, the receivers will continue to receive those values until the buffer is empty.
  • It's the responsibility of the sender to close the channel, signaling to the receiver that no more data will be sent.
  • Attempting to send to a closed channel MUST result in an error, and MUST un-block any such senders as part of said close.
  • The error thrown when attempting to send on a closed channel SHOULD be SendOnClosedChannelError, but MAY be another error.
  • Unless explicitly documented as idempotent, close SHOULD throw CloseOfClosedChannelError on subsequent calls, but MAY throw other errors.
  • Channels should be closed to prevent potential deadlocks or to signal the end of data transmission. This ensures that receivers waiting on the channel don't do so indefinitely.

Note: This method is optional. Some Sendable implementations may specify their own rules and semantics for closing channels. Always refer to the specific implementation's documentation to ensure correct usage and to prevent potential memory leaks or unexpected behaviors.

See also SendOnClosedChannelError and CloseOfClosedChannelError.

Type: function (): void

SenderCallback

SenderCallback is called as a value is received, or when an error or some other event occurs, which prevents the value from being received. It accepts two parameters, an error (if any), and the boolean ok, indicating if the value has been (will be, after return) received. It MUST return the value (or throw) if ok is true, and SHOULD throw err if ok is false.

The ok parameter being true guarantees that a value (once returned) has been received, though does not guarantee that anything will be done with it.

If the ok parameter is false, the first parameter will contain any error, and no value (regardless of what is returned) will be received.

Note: The sender callback is not called on removeSender.

WARNING: If the same value (===) as err (when ok is false) is thrown, that thrown error will not be bubbled - a mechanism used to avoid breaking the typing of the return value.

Type: function (...([undefined, true] | [any, false])): T

Sendable

Sendable is a value that can be converted to a Sender.

Type: {getSender: function (): Sender<T>}

Properties

  • getSender function (): Sender<T>

getSender

See Sendable.

SendOnClosedChannelError

Extends Error

Provided as a convenience, that SHOULD be used by Sender implementations, to indicate that a channel is closed. Should be raised as a result of send attempts on a closed channel, where the send operation is not allowed to proceed.

Parameters

  • args ...ConstructorParameters<any>

CloseOfClosedChannelError

Extends Error

Provided as a convenience, that SHOULD be used by Sender implementations, in the event that a channel close is attempted more than once.

Parameters

  • args ...ConstructorParameters<any>

SelectCase

SelectCase models the state of a single case in a Select.

WARNING: The selectState symbol is deliberately not exported, as the value of SelectCase[selectState] is not part of the API contract, and is simply a mechanism to support typing.

Type: (SelectCaseSender<T> | SelectCaseReceiver<T> | SelectCasePromise<T>)

SelectCaseSender

Sender select case. See also .send.

Type: {type: "Sender", selectState: CaseStateSender<T>}

Properties

  • type "Sender"
  • selectState CaseStateSender<T>

type

Type is provided to support type guards, and reflection-style logic.

Type: "Sender"

SelectCaseReceiver

Receiver select case. See also .recv.

Type: {type: "Receiver", selectState: CaseStateReceiver<T>}

Properties

  • type "Receiver"
  • selectState CaseStateReceiver<T>

type

Type is provided to support type guards, and reflection-style logic.

Type: "Receiver"

SelectCasePromise

Promise (or PromiseLike) select case. See also .wait.

Type: {type: "Promise", selectState: CaseStatePromise<T>}

Properties

  • type "Promise"
  • selectState CaseStatePromise<T>

type

Type is provided to support type guards, and reflection-style logic.

Type: "Promise"

recv

Prepares a SelectCaseReceiver case, to be used in a Select.

WARNING: Cases may only be used in a single select instance, though select instances are intended to be reused, e.g. when implementing control loops.

Parameters

Returns SelectCaseReceiver<T>

send

Prepares a SelectCaseSender case, to be used in a Select.

WARNING: Cases may only be used in a single select instance, though select instances are intended to be reused, e.g. when implementing control loops.

Parameters

  • to (Sendable<T> | Sender<T>) Target Sendable or Sender.
  • expr function (): T Expression to evaluate when sending. WARNING: Unlike Go, this is only evaluated when the case is selected, and only for the selected case. See the project README for more details.

Returns SelectCaseSender<T>

wait

Prepares a SelectCasePromise case, to be used in a Select.

WARNING: Cases may only be used in a single select instance, though select instances are intended to be reused, e.g. when implementing control loops.

Parameters

  • value (PromiseLike<T> | T)

Returns SelectCasePromise<Awaited<T>>

Select

Select implements the functionality of Go's select statement, with support for support cases comprised of Sender, Receiver, or PromiseLike, which are treated as a single-value never-closed channel.

See also promises, which is a convenience method for creating a select instance with promise cases, or a mix of both promises and other cases.

Parameters

unsafe

If set to true, the select will skip the microtask cycle mitigation mechanism, described by The microtask queue: a footgun, in the project README.

Defaults to false.

See also .setUnsafe.

Type: boolean

cases

Retrieves the cases associated with this select instance.

Each case corresponds to an input case (including order). After selecting a case, via Select.poll or Select.wait, received values may be retrieved by calling Select.recv with the corresponding case.

Type: any

Examples

Accessing a (typed) received value:

import {recv, Chan, Select} from 'ts-chan';

const ch1 = new Chan<number>();
const ch2 = new Chan<string>();

void sendsToCh1ThenEventuallyClosesIt();
void sendsToCh2();

const select = new Select([recv(ch1), recv(ch2)]);
for (let running = true; running;) {
  const i = await select.wait();
  switch (i) {
  case 0: {
    const v = select.recv(select.cases[i]);
    if (v.done) {
      running = false;
      break;
    }
    console.log(`rounded value: ${Math.round(v.value)}`);
    break;
  }
  case 1: {
    const v = select.recv(select.cases[i]);
    if (v.done) {
      throw new Error('ch2 unexpectedly closed');
    }
    console.log(`uppercase string value: ${v.value.toUpperCase()}`);
    break;
  }
  default:
    throw new Error('unreachable');
  }
}

Returns any T

length

Retrieves the number of the cases that are currently pending.

Will return the length of cases, less the number of promise cases that have been resolved and received (or ignored).

Type: number

Returns number

pending

Returns all the original values of all pending promise cases (cases that haven't been consumed or ignored), in case order.

Type: Array<any>

Returns Array<any>

setUnsafe

Sets the .unsafe property, and returns this.

Parameters

Returns this

poll

Poll returns the next case that is ready, or undefined if none are ready. It must not be called concurrently with Select.wait or Select.recv.

This is effectively a non-blocking version of Select.wait, and fills the same role as the default select case, in Go's select statement.

Returns (number | undefined)

wait

Wait returns a promise that will resolve with the index of the next case that is ready, or reject with the first error.

Parameters
  • abort AbortSignal?

Returns Promise<number>

recv

Consume the result of a ready case.

Parameters

Returns IteratorResult<T, (T | undefined)>

promises

Promises is a convenience method for creating a select instance with promise cases, or a mix of both promises and other cases.

Note that the behavior is identical to passing the same array to the constructor. The constructor's typing is more strict, to simplify implementations which encapsulate or construct select instances.

Parameters
  • cases T

Returns Select<any>

SelectFactory

A wrapper of Select that's intended for use within loops, that allows the contents of select cases (but not the structure, namely the direction/type of communication) to be updated, and evaluated as expressions, in code order.

With the caveat that it does not support promises, this is the closest analogue to Go's select statement, provided by this module.

clear

Clears references to values to send, receives and senders, but not the select cases themselves. Use cases include avoiding retaining references between iterations of a loop, if such references are not needed, or may be problematic.

WARNING: Must not be called concurrently with Select.wait (on the underlying instance for this factory). Calling this method then calling either Select.wait or Select.poll (prior to another with) may result in an error.

with

With should be to configure and retrieve (or initialize) the underlying Select instance.

Must be called with the same number of cases each time, with each case having the same direction.

Parameters
  • cases T

Returns Select<any>

getYieldGeneration

Returns the current yield generation. This value is incremented on each yieldToMacrotaskQueue, which is a self-conflating operation.

See The microtask queue: a footgun, in the project README, for details on the purpose of this mechanism.

Returns number

yieldToMacrotaskQueue

Returns a promise which will resolve on the next iteration of the event loop. Intended to be used in concert with getYieldGeneration, this mechanism allows implementers to reduce the risk of the "footgun" that the microtask queue represents.

Calls to this function are self-conflating, meaning that if this function is called multiple times before the next iteration of the event loop, the same promise will be returned.

See The microtask queue: a footgun, in the project README, for details on the purpose of this mechanism.

Returns Promise<number>

About

Concurrency primitives for TypeScript and JavaScript.

Resources

License

Stars

Watchers

Forks

Packages

No packages published