A reactive streams library for Go in the spirit of Reactive Extensions implemented
with generic functions. The library was born as I wanted a Reactive Extensions library
for Go that is type safe (e.g. no interface{}
, otherwise I would've used RxGo)
and could not yet find one.
The library is still in its infancy and thus sparse in terms of documentation and features. The general concepts from RxGo and from the ReactiveX do apply, so if you're new to reactive streams, you may want to browse these for an introduction to the concept.
The stream package provides the Observable interface and the core functions.
The Observable in this take on reactive streams is defined as:
type Observable[T any] interface {
Observe(ctx context.Context, next(func T) error) error
}
The next
function is called on each element. To retain good stack traces the next
function is called
from the goroutine that called Observe
. A non-nil error
returned by next
it will close the stream and
this error is returned by Observe
.
The call to Observe
is blocking and returns if:
ctx
provided by the observer is closednext
returns an error- the observable encounters an error
The core functions that operate on Observable[T]
are divided into:
- sources that create Observables
- operators that transform Observables
- sinks that terminate the Observable
Since Go's generics does not yet allow new type parameters in methods, all of these are implemented as top-level functions rather than methods in the Observable interface (e.g. as it is with RxGo and usual implementations).
As a first example, we'll implement a simple source Observable
that emits a single integer:
type singleIntegerObservable int
func (num singleIntegerObservable) Observe(ctx context.Context, next func(int) error) error {
if ctx.Err() != nil {
// Context already cancelled, stop before emitting items.
return ctx.Err()
}
return next(int(num))
}
We can now try it out with the Map
operator:
func main() {
var ten stream.Observable[int] = singleIntegerObservable(10)
twenty := stream.Map(ten, func(x int) int) { return x * 2 })
twenty.Observe(context.Background(), func(x int) error {
fmt.Printf("%d\n", x)
return nil
})
}
Instead of defining a new type every time we want to implement Observe
, we can use the FuncObservable
wrapper:
func singleInt(x int) stream.Observable[int] {
return stream.FuncObservable(func(ctx context.Context, next func(int) error) error {
if ctx.Err() != nil {
// Context already cancelled, stop before emitting items.
return ctx.Err()
}
return next(x)
})
}
Sources provide different ways of creating Observable
s without
implementing Observe
by hand:
Just(10) // emits 10
Error(errors.New("oh no")) // returns error without emitting items
Empty() // returns nil error without emitting items
FromSlice([]int{1,2,3}) // emits 1,2,3 and completes
FromChannel(in) // emits items from the given channel
Interval(time.Second) // emits items from sequence 0,1,... once a second
Range(0,3) // emits 0,1,2 and completes
Operators transform streams in different ways:
// Map[A, B any](src Observable[A], apply func(A) B) Observable[B]
Map(src, apply) // applies function 'apply' to each item.
// ParallelMap[A, B any](src Observable[A], par int, apply func(A) B) Observable[B]
ParallelMap(src, 4, apply) // applies function 'apply' to each item using 4 parallel workers.
// FlatMap[A, B any](src Observable[A], apply func(A) Observable[B]) Observable[B]
FlatMap(src, apply) // applies function 'apply' to each item. 'apply' returns an observable
// that is then flattened (Observable[Observable[B]] => Observable[B]).
// Filter[T any](src Observable[T], filter func(T) bool) Observable[T]
Filter(src, filter) // applies function 'filter' to each item. If 'filter' returns false the
// item is dropped.
// Reduce[T, Result any](src Observable[T], init Result, reduce func(T, Result) Result) Observable[Result]
Reduce(src, 0, reduce) // applies function 'reduce' to each item to "reduce" the stream into a single value.
Reduce(Range(0, 10), 0, func(x, result int) int { return x + result })
// Concat[T any](srcs ...Observable[T]) Observable[T]
Concat(Range(0, 10), Range(10, 20)) == Range(0,20)
// Multicast[T any](bufSize int, src Observable[T]) (Observable[T], func(context.Context) error)
// Creates an observable that multicasts each element from the source to all observers
// of the returned observable.
src, connect := Multicast(16, Range(0,20))
src.Observe(...) // 0...20
src.Observe(...) // 0...20
connect(ctx)
src.Observe(...) // 0...20? may miss items when subscribing after connect.
// Merge[T any](srcs ...Observable[T]) Observable[T]
Merge(Range(0, 10), Range(10, 20)) // values 0 to 19 in undefined order
// Delay[T any](src Observable[T], interval time.Duration) Observable[T]
Delay(src, time.Second) // emit at most one value per second
// OnNext[T any](src Observable[T], fn func(T)) Observable[T]
OnNext(src, fn) // insert the function 'fn' to be called before each item
// Retry[T any](src Observable[T], shouldRetry func(err error) bool) Observable[T]
Retry(src, func(err error) bool { return true }) // always retry if 'src' completes with error
// Take[T any](n int, src Observable[T]) Observable[T]
Take(10, src) // take 10 items from 'src' and then complete it.
// Buffer[T any](src Observable[T], bufSize int, strategy BackpressureStrategy) Observable[T]
Buffer(src, 16, BackpressureDrop) // buffer up to 16 items from 'src' and drop items if buffer is full
// SplitHead[T any](src Observable[T]) (head Observable[T], tail Observable[T])
SplitHead(Range(0,10)) => (Just(0), Range(1,10))
Sinks consume streams:
// First[T any](ctx context.Context, src Observable[T]) (item T, err error)
// Takes the first item from the observable and then cancels it.
item, err := First(ctx, src)
// ToSlice[T any](ctx context.Context, src Observable[T]) (items []T, err error)
// Converts the observable into a slice.
items, err := ToSlice(ctx, src)
// ToChannels[T any](ctx context.Context, src Observable[T]) (<-chan T, <-chan error)
// Converts the observable into items channel and errors channel.
items, errs := ToChannels(ctx, src)
// ToChannel[T any](ctx context.Context, errs <-chan error, src Observable[T]) <-chan T
// Like ToChannels, but with a provided error channel. Useful when demuxing multiple
// streams.
items1 := ToChannel(ctx, errs, src1)
items2 := ToChannel(ctx, errs, src2)
for {
select {
case err := <-errs:
return err
case item1 := <-items1:
// ...
case item2 := <-items2:
// ...
}
}
// Discard[T any](ctx context.Context, src Observable[T]) error
err := Discard(ctx, src)
Included in this repository are also some additional sources:
- sources/http - A wrapper for the Go net/http client
- sources/k8s - A wrapper for a Kubernetes informer