Skip to content

Commit

Permalink
docs(fibers): update/extend readme
Browse files Browse the repository at this point in the history
  • Loading branch information
postspectacular committed Aug 9, 2023
1 parent d8fa8ce commit 63ac0d7
Show file tree
Hide file tree
Showing 2 changed files with 305 additions and 1 deletion.
157 changes: 156 additions & 1 deletion packages/fibers/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ This project is part of the
- [Basic usage](#basic-usage)
- [Fiber operators](#fiber-operators)
- [Composition via transducers](#composition-via-transducers)
- [CSP primitives (Communicating Sequential Processes)](#csp-primitives-communicating-sequential-processes)
- [Buffering behaviors](#buffering-behaviors)
- [Channels](#channels)
- [CSP ping/pong example](#csp-pingpong-example)
- [Status](#status)
- [Installation](#installation)
- [Dependencies](#dependencies)
Expand Down Expand Up @@ -115,6 +119,7 @@ The following operators act as basic composition helpers to construct more elabo
- [`forkAll`](https://docs.thi.ng/umbrella/fibers/classes/Fiber.html#forkAll): create & attach multiple child processes
- [`join`](https://docs.thi.ng/umbrella/fibers/classes/Fiber.html#join): wait for all child processes to complete
- [`sequence`](https://docs.thi.ng/umbrella/fibers/functions/sequence.html): execute fibers in sequence
- [`shuffle`](https://docs.thi.ng/umbrella/fibers/functions/shuffle.html): execute fibers in constantly randomized order
- [`timeSlice`](https://docs.thi.ng/umbrella/fibers/functions/timeSlice.html): execute fiber in batches of N milliseconds
- [`until`](https://docs.thi.ng/umbrella/fibers/functions/until.html): wait until predicate is truthy
- [`untilEvent`](https://docs.thi.ng/umbrella/fibers/functions/untilEvent.html): wait until event occurs
Expand Down Expand Up @@ -174,6 +179,154 @@ sequence(
// part 7
```

### CSP primitives (Communicating Sequential Processes)

Reference: [Communicating Sequential
Processes](https://en.wikipedia.org/wiki/Communicating_sequential_processes)

In addition to the operators above, the basic fiber implementation can also be
used to construct other types of primitives, like these required for
channel-based communication between processes. The package includes a
fiber-based read/write channel primitive which can be customized with different
buffer behaviors to control blocking behaviors and backpressure handling (aka
attempting to write faster to a channel than values are being read, essentially
a memory management issue).

#### Buffering behaviors

The following channel buffer types are included, all accepting a max. capacity
and all implementing the required
[IReadWriteBuffer](https://docs.thi.ng/umbrella/fibers/interfaces/IReadWriteBuffer.html)
interface:

- [`fifo`](https://docs.thi.ng/umbrella/fibers/functions/fifo.html): First in,
first out. Writes to the channel will start blocking once the buffer's
capacity is reached, otherwise complete immediately. Likewise, channel reads
are non-blocking whilst there're more buffered values available. Reads will
only block if the buffer is empty.
- [`lifo`](https://docs.thi.ng/umbrella/fibers/functions/lifo.html): First in,
last out. Read/write behavior is mostly the same as with `fifo`, with the
important difference, that (as the name indicates), the last value written
will be the first value read (i.e. stack behavior).
- [`sliding`](https://docs.thi.ng/umbrella/fibers/functions/sliding.html):
Sliding window buffer. Writes to the channel are **never** blocking! Once the
buffer's capacity is reached, a new write will first expunge the oldest
buffered value (similar to LRU cache behavior). Read behavior is the same as
for `fifo`.
- [`dropping`](https://docs.thi.ng/umbrella/fibers/functions/dropping.html):
Dropping buffer. Writes to the channel are **never** blocking! Whilst the
buffer's capacity is reached, new writes will be silently ignored. Read
behavior is the same as for `fifo`.

#### Channels

As mentioned previously,
[channels](https://docs.thi.ng/umbrella/fibers/functions/channel.html) and their
[read](https://docs.thi.ng/umbrella/fibers/classes/Channel.html#read),
[write](https://docs.thi.ng/umbrella/fibers/classes/Channel.html#write) and
[close](https://docs.thi.ng/umbrella/fibers/classes/Channel.html#close)
operations are the key building blocks for CSP. In this fiber-based
implementation, all channel operations are executed in individual fibers to deal
with the potential blocking behaviors. This is demonstrated in the simple
example below.

Channels can be created like so:

```ts
// create unbuffered channel with single value capacity
const chan1 = channel();

// create channel with a FIFO buffer, capacity: 2 values
const chan2 = channel(2);

// create channel with a sliding window buffer and custom ID & logger
const chan3 = channel(
sliding(3),
{ id: "main", logger: new ConsoleLogger("chan") }
);
```

#### CSP ping/pong example

```ts tangle:export/pingpong.ts
import { channel, fiber, wait } from "@thi.ng/fibers";
import { ConsoleLogger } from "@thi.ng/logger";

// create idle main fiber with custom options
const app = fiber(null, {
id: "main",
logger: new ConsoleLogger("app"),
terminate: true,
});

// create CSP channels (w/ default config)
const ping = channel<number>();
const pong = channel<number>();

// attach ping/pong child processes
app.forkAll(
// ping
function* () {
let x: number | undefined;
while (ping.readable()) {
// blocking read op
x = yield* ping.read();
// check if channel was closed meanwhile
if (x === undefined) break;
console.log("PING", x);
// blocking write op to other channel
yield* pong.write(x);
// slowdown
yield* wait(100);
}
},
// pong (very similar)
function* () {
let x: number | undefined;
while (pong.readable()) {
x = yield* pong.read();
if (x === undefined) break;
console.log("PONG", x);
yield* ping.write(x + 1);
}
},
// channel managment
function* () {
// kickoff ping/pong
yield* ping.write(0);
yield* wait(1000);
// wait for both channels to close
yield* ping.close();
yield* pong.close();
}
);
app.run();

// [DEBUG] app: forking fib-0
// [DEBUG] app: forking fib-1
// [DEBUG] app: forking fib-2
// [DEBUG] app: running main...
// [DEBUG] app: init main
// [DEBUG] app: init fib-0
// [DEBUG] app: init fib-1
// [DEBUG] app: init fib-2
// PING 0
// PONG 0
// PING 1
// PONG 1
// ...
// PING 9
// PONG 9
// [DEBUG] app: done fib-2 undefined
// [DEBUG] app: deinit fib-2
// [DEBUG] app: done fib-1 undefined
// [DEBUG] app: deinit fib-1
// [DEBUG] app: done fib-0 undefined
// [DEBUG] app: deinit fib-0
// [DEBUG] app: cancel main
// [DEBUG] app: deinit main
```

## Status

**ALPHA** - bleeding edge / work-in-progress
Expand All @@ -200,16 +353,18 @@ For Node.js REPL:
const fibers = await import("@thi.ng/fibers");
```

Package sizes (brotli'd, pre-treeshake): ESM: 1.53 KB
Package sizes (brotli'd, pre-treeshake): ESM: 2.12 KB

## Dependencies

- [@thi.ng/api](https://github.com/thi-ng/umbrella/tree/develop/packages/api)
- [@thi.ng/arrays](https://github.com/thi-ng/umbrella/tree/develop/packages/arrays)
- [@thi.ng/bench](https://github.com/thi-ng/umbrella/tree/develop/packages/bench)
- [@thi.ng/checks](https://github.com/thi-ng/umbrella/tree/develop/packages/checks)
- [@thi.ng/errors](https://github.com/thi-ng/umbrella/tree/develop/packages/errors)
- [@thi.ng/idgen](https://github.com/thi-ng/umbrella/tree/develop/packages/idgen)
- [@thi.ng/logger](https://github.com/thi-ng/umbrella/tree/develop/packages/logger)
- [@thi.ng/random](https://github.com/thi-ng/umbrella/tree/develop/packages/random)

## Usage examples

Expand Down
149 changes: 149 additions & 0 deletions packages/fibers/tpl.readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ The following operators act as basic composition helpers to construct more elabo
- [`forkAll`](https://docs.thi.ng/umbrella/fibers/classes/Fiber.html#forkAll): create & attach multiple child processes
- [`join`](https://docs.thi.ng/umbrella/fibers/classes/Fiber.html#join): wait for all child processes to complete
- [`sequence`](https://docs.thi.ng/umbrella/fibers/functions/sequence.html): execute fibers in sequence
- [`shuffle`](https://docs.thi.ng/umbrella/fibers/functions/shuffle.html): execute fibers in constantly randomized order
- [`timeSlice`](https://docs.thi.ng/umbrella/fibers/functions/timeSlice.html): execute fiber in batches of N milliseconds
- [`until`](https://docs.thi.ng/umbrella/fibers/functions/until.html): wait until predicate is truthy
- [`untilEvent`](https://docs.thi.ng/umbrella/fibers/functions/untilEvent.html): wait until event occurs
Expand Down Expand Up @@ -154,6 +155,154 @@ sequence(
// part 7
```

### CSP primitives (Communicating Sequential Processes)

Reference: [Communicating Sequential
Processes](https://en.wikipedia.org/wiki/Communicating_sequential_processes)

In addition to the operators above, the basic fiber implementation can also be
used to construct other types of primitives, like these required for
channel-based communication between processes. The package includes a
fiber-based read/write channel primitive which can be customized with different
buffer behaviors to control blocking behaviors and backpressure handling (aka
attempting to write faster to a channel than values are being read, essentially
a memory management issue).

#### Buffering behaviors

The following channel buffer types are included, all accepting a max. capacity
and all implementing the required
[IReadWriteBuffer](https://docs.thi.ng/umbrella/fibers/interfaces/IReadWriteBuffer.html)
interface:

- [`fifo`](https://docs.thi.ng/umbrella/fibers/functions/fifo.html): First in,
first out. Writes to the channel will start blocking once the buffer's
capacity is reached, otherwise complete immediately. Likewise, channel reads
are non-blocking whilst there're more buffered values available. Reads will
only block if the buffer is empty.
- [`lifo`](https://docs.thi.ng/umbrella/fibers/functions/lifo.html): First in,
last out. Read/write behavior is mostly the same as with `fifo`, with the
important difference, that (as the name indicates), the last value written
will be the first value read (i.e. stack behavior).
- [`sliding`](https://docs.thi.ng/umbrella/fibers/functions/sliding.html):
Sliding window buffer. Writes to the channel are **never** blocking! Once the
buffer's capacity is reached, a new write will first expunge the oldest
buffered value (similar to LRU cache behavior). Read behavior is the same as
for `fifo`.
- [`dropping`](https://docs.thi.ng/umbrella/fibers/functions/dropping.html):
Dropping buffer. Writes to the channel are **never** blocking! Whilst the
buffer's capacity is reached, new writes will be silently ignored. Read
behavior is the same as for `fifo`.

#### Channels

As mentioned previously,
[channels](https://docs.thi.ng/umbrella/fibers/functions/channel.html) and their
[read](https://docs.thi.ng/umbrella/fibers/classes/Channel.html#read),
[write](https://docs.thi.ng/umbrella/fibers/classes/Channel.html#write) and
[close](https://docs.thi.ng/umbrella/fibers/classes/Channel.html#close)
operations are the key building blocks for CSP. In this fiber-based
implementation, all channel operations are executed in individual fibers to deal
with the potential blocking behaviors. This is demonstrated in the simple
example below.

Channels can be created like so:

```ts
// create unbuffered channel with single value capacity
const chan1 = channel();

// create channel with a FIFO buffer, capacity: 2 values
const chan2 = channel(2);

// create channel with a sliding window buffer and custom ID & logger
const chan3 = channel(
sliding(3),
{ id: "main", logger: new ConsoleLogger("chan") }
);
```

#### CSP ping/pong example

```ts tangle:export/pingpong.ts
import { channel, fiber, wait } from "@thi.ng/fibers";
import { ConsoleLogger } from "@thi.ng/logger";

// create idle main fiber with custom options
const app = fiber(null, {
id: "main",
logger: new ConsoleLogger("app"),
terminate: true,
});

// create CSP channels (w/ default config)
const ping = channel<number>();
const pong = channel<number>();

// attach ping/pong child processes
app.forkAll(
// ping
function* () {
let x: number | undefined;
while (ping.readable()) {
// blocking read op
x = yield* ping.read();
// check if channel was closed meanwhile
if (x === undefined) break;
console.log("PING", x);
// blocking write op to other channel
yield* pong.write(x);
// slowdown
yield* wait(100);
}
},
// pong (very similar)
function* () {
let x: number | undefined;
while (pong.readable()) {
x = yield* pong.read();
if (x === undefined) break;
console.log("PONG", x);
yield* ping.write(x + 1);
}
},
// channel managment
function* () {
// kickoff ping/pong
yield* ping.write(0);
yield* wait(1000);
// wait for both channels to close
yield* ping.close();
yield* pong.close();
}
);
app.run();

// [DEBUG] app: forking fib-0
// [DEBUG] app: forking fib-1
// [DEBUG] app: forking fib-2
// [DEBUG] app: running main...
// [DEBUG] app: init main
// [DEBUG] app: init fib-0
// [DEBUG] app: init fib-1
// [DEBUG] app: init fib-2
// PING 0
// PONG 0
// PING 1
// PONG 1
// ...
// PING 9
// PONG 9
// [DEBUG] app: done fib-2 undefined
// [DEBUG] app: deinit fib-2
// [DEBUG] app: done fib-1 undefined
// [DEBUG] app: deinit fib-1
// [DEBUG] app: done fib-0 undefined
// [DEBUG] app: deinit fib-0
// [DEBUG] app: cancel main
// [DEBUG] app: deinit main
```

{{meta.status}}

{{repo.supportPackages}}
Expand Down

0 comments on commit 63ac0d7

Please sign in to comment.