Skip to content

Latest commit

 

History

History

csp

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

@thi.ng/csp

npm version npm downloads Mastodon Follow

Note

This is one of 199 standalone projects, maintained as part of the @thi.ng/umbrella monorepo and anti-framework.

🚀 Please help me to work full-time on these projects by sponsoring me on GitHub. Thank you! ❤️

About

Primitives & operators for Communicating Sequential Processes based on async/await and async iterables.

Important

This package was temporarily deprecated (throughout most of 2023), but meanwhile has been reanimated in the form of a complete rewrite, using a new, more simple and more modern approach afforded by contemporary ES language features (and widespread support for them).

This new/current implementation is in most cases NOT compatible with earlier versions.

What is CSP?

References:

The key construct of this package is a read/write channel primitive which can be customized with different buffer implementations to control blocking behaviors and backpressure handling (aka attempting to write faster to a channel than values are being read, essentially a memory management issue). Unbuffered CSP channels are blocking on both the reader and writer side.

Buffering behaviors

The following channel buffer types/behaviors are included (from the thi.ng/buffers package), all accepting a max. capacity and all implementing the IReadWriteBuffer interface required by the channel:

  • fifo: First in, first out ring buffer. Writes to the channel will start blocking once the buffer's capacity is reached, otherwise complete immediately. Likewise, channel reads are non-blocking whilst there're more buffered values available. Reads will only block if the buffer is empty.
  • lifo: Last in, first out. Write behavior is the same as with fifo, reads are in reverse order (as the name indicates), i.e. the last value written will be the first value read (i.e. stack behavior).
  • sliding: Sliding window ring buffer. Writes to the channel are never blocking! Whilst the buffer is at full capacity, new writes will first expunge the oldest buffered value (similar to LRU cache behavior). Read behavior is the same as for fifo.
  • dropping: Dropping value ring buffer. Writes to the channel are never blocking! Whilst the buffer is at full capacity, new writes will be silently ignored. Read behavior is the same as for fifo.

Channels

As mentioned previously, channels and their read, write and close operations are the key building blocks for CSP.

Other channel types

  • Mult for channel multiplexing (aka one-to-many splitting) and dynamic add/removal of subscribers
  • PubSub for topic-based subscriptions, each topic implemented as a Mult

Channel operators

Status

BETA - possibly breaking changes forthcoming

Search or submit any issues for this package

Related packages

  • @thi.ng/fibers - Process hierarchies & operators for cooperative multitasking
  • @thi.ng/rstream - Reactive streams & subscription primitives for constructing dataflow graphs / pipelines
  • @thi.ng/transducers-async - Async versions of various highly composable transducers, reducers and iterators

Installation

yarn add @thi.ng/csp

ESM import:

import * as csp from "@thi.ng/csp";

Browser ESM import:

<script type="module" src="https://esm.run/@thi.ng/csp"></script>

JSDelivr documentation

For Node.js REPL:

const csp = await import("@thi.ng/csp");

Package sizes (brotli'd, pre-treeshake): ESM: 1.78 KB

Dependencies

Note: @thi.ng/api is in most cases a type-only import (not used at runtime)

Usage examples

One project in this repo's /examples directory is using this package:

Screenshot Description Live demo Source
CSP channel-based event handling, async transducers & reactive UI components Demo Source

API

Generated API docs

Ping pong

import { channel } from "@thi.ng/csp";

// create CSP channel for bi-directional communication
const chan = channel<number>();

// create first async process (ping)
(async () => {
    while (true) {
        // this op will block until a value becomes available in the channel
        const x = await chan.read();
        // if the channel was closed meanwhile, read() will deliver `undefined`
        if (x === undefined || x > 5) {
            console.log("stopping...");
            // calling close() is idempotent
            // any in-flight writes will still be readable
            chan.close();
            break;
        }
        console.log("ping", x);
        // this op will also block until the other side is reading the value
        await chan.write(x + 1);
    }
    console.log("ping done");
})();

// create second async process (pong, almost identical to ping)
(async () => {
    while (true) {
        // wait until value can be read (or channel closed)
        const x = await chan.read();
        // exit loop if channel closed
        if (x === undefined) break;
        console.log("pong", x);
        // write next value & wait until other side read it
        await chan.write(x + 1);
    }
    console.log("pong done");
})();

// kickoff
chan.write(0);

// ping 0
// pong 1
// ping 2
// pong 3
// ping 4
// pong 5
// stopping...
// ping done
// pong done

PubSub

import { channel, consumeWith, into, pubsub } from "@thi.ng/csp";

// input channel (optional)
const src = channel<string>({ id: "users" });

// publisher with a topic function
// (topic here is the first character of each received string)
const pub = pubsub<string>(src, (x) => x[0]);

// create topic subscriptions (channel & debug consumer)
// under the hood each topic is a Mult (multiplexed channel)
// subscription channels are automatically named:
// `<src-id>-<topic>-tap<tapid>` (see below)
for (let i of "abc") {
    consumeWith(pub.subscribeTopic(i), (x, ch) => console.log(ch.id, x));
}

// start processing by feeding an iterable of names
await into(src, ["alice", "bert", "bella", "charlie", "arthur"]);

// users-a-tap0 alice
// users-b-tap1 bert
// users-b-tap1 bella
// users-c-tap2 charlie
// users-a-tap0 arthur

// pubsubs & mults are closed recursively once we close the input channel
src.close();

Authors

If this project contributes to an academic publication, please cite it as:

@misc{thing-csp,
  title = "@thi.ng/csp",
  author = "Karsten Schmidt",
  note = "https://thi.ng/csp",
  year = 2016
}

License

© 2016 - 2024 Karsten Schmidt // Apache License 2.0