-
Notifications
You must be signed in to change notification settings - Fork 136
A simple API, and a Rich API
LazyFutureStream has all the functionality in this page.
SimpleReactStream is a simpler, more focused API with only the SimpleReact Core API features (and some extensions).
The core SimpleReact API remains very simple. Although it has expanded slightly since the initial release it is today :-
- with
- then
- doOnEach
- retry
- onFail
- capture
- block
- allOf
- anyOf
- run
- toQueue
- flatMap
- peek
- filter
- merge
These are the concurrent non-blocking operations (except for block!) that represent the core of the API.
#java.util.stream.Stream
With SimpleReact v0.3 we have also added all the methods of the Stream api to this 👍
- filter(Predicate<? super T>)
- map(Function<? super T, ? extends R>)
- mapToInt(ToIntFunction<? super T>)
- mapToLong(ToLongFunction<? super T>)
- mapToDouble(ToDoubleFunction<? super T>)
- flatMap(Function<? super T, ? extends Stream<? extends R>>)
- flatMapToInt(Function<? super T, ? extends IntStream>)
- flatMapToLong(Function<? super T, ? extends LongStream>)
- flatMapToDouble(Function<? super T, ? extends DoubleStream>)
- distinct()
- sorted()
- sorted(Comparator<? super T>)
- peek(Consumer<? super T>)
- limit(long)
- skip(long)
- forEach(Consumer<? super T>)
- forEachOrdered(Consumer<? super T>)
- toArray()
- toArray(IntFunction<A[]>)
- reduce(T, BinaryOperator)
- reduce(BinaryOperator)
- reduce(U, BiFunction<U, ? super T, U>, BinaryOperator)
- collect(Supplier, BiConsumer<R, ? super T>, BiConsumer<R, R>)
- collect(Collector<? super T, A, R>)
- min(Comparator<? super T>)
- max(Comparator<? super T>)
- count()
- anyMatch(Predicate<? super T>)
- allMatch(Predicate<? super T>)
- noneMatch(Predicate<? super T>)
- findFirst()
- findAny()
- builder()
- empty()
- of(T)
- of(T...)
- iterate(T, UnaryOperator)
- generate(Supplier)
- concat(Stream<? extends T>, Stream<? extends T>)
#org.jooq.lambda.Seq
And we have also implemented Seq, which adds the following functions
- stream()
- concat(Stream)
- concat(T)
- concat(T...)
- cycle()
- zip(Seq)
- zip(Seq, BiFunction<T, U, R>)
- zipWithIndex()
- foldLeft(U, BiFunction<U, ? super T, U>)
- foldRight(U, BiFunction<? super T, U, U>)
- scanLeft(U, BiFunction<U, ? super T, U>)
- scanRight(U, BiFunction<? super T, U, U>)
- reverse()
- shuffle()
- shuffle(Random)
- skipWhile(Predicate<? super T>)
- skipUntil(Predicate<? super T>)
- limitWhile(Predicate<? super T>)
- limitUntil(Predicate<? super T>)
- intersperse(T)
- duplicate()
- partition(Predicate<? super T>)
- splitAt(long)
- splitAtHead()
- slice(long, long)
- toCollection(Supplier)
- toList()
- toSet()
- toMap(Function<T, K>, Function<T, V>)
- toString(String)
- minBy(Function<T, U>)
- minBy(Function<T, U>, Comparator<? super U>)
- maxBy(Function<T, U>)
- maxBy(Function<T, U>, Comparator<? super U>)
- ofType(Class)
- cast(Class)
- groupBy(Function<? super T, ? extends K>)
- groupBy(Function<? super T, ? extends K>, Collector<? super T, A, D>)
- groupBy(Function<? super T, ? extends K>, Supplier, Collector<? super T, A, D>)
- join()
- join(CharSequence)
- join(CharSequence, CharSequence, CharSequence)
- of(T)
- of(T...)
- empty()
- iterate(T, UnaryOperator)
- generate()
- generate(T)
- generate(Supplier)
- seq(Stream)
- seq(Iterable)
- seq(Iterator)
- seq(Map<K, V>)
- seq(Optional)
- cycle(Stream)
- unzip(Stream<Tuple2<T1, T2>>)
- unzip(Stream<Tuple2<T1, T2>>, Function<T1, U1>, Function<T2, U2>)
- unzip(Stream<Tuple2<T1, T2>>, Function<Tuple2<T1, T2>, Tuple2<U1, U2>>)
- unzip(Stream<Tuple2<T1, T2>>, BiFunction<T1, T2, Tuple2<U1, U2>>)
- zip(Stream, Stream)
- zip(Stream, Stream, BiFunction<T1, T2, R>)
- zipWithIndex(Stream)
- foldLeft(Stream, U, BiFunction<U, ? super T, U>)
- foldRight(Stream, U, BiFunction<? super T, U, U>)
- scanLeft(Stream, U, BiFunction<U, ? super T, U>)
- scanRight(Stream, U, BiFunction<? super T, U, U>)
- unfold(U, Function<U, Optional<Tuple2<T, U>>>)
- reverse(Stream)
- shuffle(Stream)
- shuffle(Stream, Random)
- concat(Stream...)
- duplicate(Stream)
- toString(Stream<?>)
- toString(Stream<?>, String)
- toCollection(Stream, Supplier)
- toList(Stream)
- toSet(Stream)
- toMap(Stream<Tuple2<K, V>>)
- toMap(Stream, Function<T, K>, Function<T, V>)
- slice(Stream, long, long)
- skip(Stream, long)
- skipWhile(Stream, Predicate<? super T>)
- skipUntil(Stream, Predicate<? super T>)
- limit(Stream, long)
- limitWhile(Stream, Predicate<? super T>)
- limitUntil(Stream, Predicate<? super T>)
- intersperse(Stream, T)
- partition(Stream, Predicate<? super T>)
- splitAt(Stream, long)
- splitAtHead(Stream)
- ofType(Stream, Class)
- cast(Stream, Class)
- groupBy(Stream, Function<? super T, ? extends K>)
- groupBy(Stream, Function<? super T, ? extends K>, Collector<? super T, A, D>)
- groupBy(Stream, Function<? super T, ? extends K>, Supplier, Collector<? super T, A, D>)
- join(Stream<?>)
- join(Stream<?>, CharSequence)
- join(Stream<?>, CharSequence, CharSequence, CharSequence)
- filter(Predicate<? super T>)
- map(Function<? super T, ? extends R>)
- mapToInt(ToIntFunction<? super T>)
- mapToLong(ToLongFunction<? super T>)
- mapToDouble(ToDoubleFunction<? super T>)
- flatMap(Function<? super T, ? extends Stream<? extends R>>)
- flatMapToInt(Function<? super T, ? extends IntStream>)
- flatMapToLong(Function<? super T, ? extends LongStream>)
- flatMapToDouble(Function<? super T, ? extends DoubleStream>)
- distinct()
- sorted()
- sorted(Comparator<? super T>)
- peek(Consumer<? super T>)
- limit(long)
- skip(long)
- onClose(Runnable)
- close()
- sequential()
- parallel()
- unordered()
- spliterator()
- forEach(Consumer<? super T>)
#com.aol.cyclops.sequence.ReactiveSeq
* flatten()
* Optional<List<T>> toOptional();
* CompletableFuture<List<T>> toCompletableFuture();
* cycle(int times)
* cycle()
* cycle(Monoid<T> m, int times) ;
* <R> ReactiveSeq<R> cycle(Class<R> monadC, int times);
* ReactiveSeq<T> cycleWhile(Predicate<? super T> predicate);
* ReactiveSeq<T> cycleUntil(Predicate<? super T> predicate);
* <U> ReactiveSeq<Tuple2<T, U>> zipStream(Stream<U> other);
* <S,U> ReactiveSeq<Tuple3<T,S,U>> zip3(Stream<? extends S> second,Stream<? extends U> third);
* <T2,T3,T4> ReactiveSeq<Tuple4<T,T2,T3,T4>> zip4(Stream<T2> second,Stream<T3> third,Stream<T4> fourth);
* <S, R> ReactiveSeq<R> zipSequence(ReactiveSeq<? extends S> second,
* BiFunction<? super T, ? super S, ? extends R> zipper)
* <S, R> ReactiveSeq<R> zipAnyM(AnyM<? extends S> second,
* BiFunction<? super T, ? super S, ? extends R> zipper) ;
* <S, R> ReactiveSeq<R> zipStream(BaseStream<? extends S,? extends BaseStream<? extends S,?>> second,
* BiFunction<? super T, ? super S, ? extends R> zipper);
* Tuple2<ReactiveSeq<T>,ReactiveSeq<T>> duplicateSequence();
* Tuple3<ReactiveSeq<T>,ReactiveSeq<T>,ReactiveSeq<T>> triplicate();
* Tuple4<ReactiveSeq<T>,ReactiveSeq<T>,ReactiveSeq<T>,ReactiveSeq<T>> quadruplicate();
* Tuple2<Optional<T>,ReactiveSeq<T>> splitSequenceAtHead();
* Tuple2<ReactiveSeq<T>,ReactiveSeq<T>> splitBy(Predicate<T> splitter);
* ReactiveSeq<List<T>> sliding(int windowSize);
* ReactiveSeq<List<T>> sliding(int windowSize,int increment);
* ReactiveSeq<List<T>> grouped(int groupSize);
* ReactiveSeq<T> scanLeft(Monoid<T> monoid);
* ReactiveSeq<T> scanRight(Monoid<T> monoid);
* boolean xMatch(int num, Predicate<? super T> c);
* HeadAndTail<T> headAndTail();
* Optional<HeadAndTail<T>> headAndTailOptional();
* <R> R mapReduce(Monoid<R> reducer);
* <R> R mapReduce(Function<? super T,? extends R> mapper, Monoid<R> reducer);
* List collectStream(Stream<Collector> collectors);
* <R> List<R> collectIterable(Iterable<Collector> collectors);
* T reduce(Monoid<T> reducer);
* List<T> reduce(Stream<Monoid<T>> reducers);
* List<T> reduce(Iterable<Monoid<T>> reducers);
* T foldLeft(Monoid<T> reducer);
* <T> T foldLeftMapToType(Monoid<T> reducer);
* T foldRight(Monoid<T> reducer);
* public <T> T foldRightMapToType(Monoid<T> reducer);
* Streamable<T> toStreamable();
* <T> Stream<T> toStream();
* startsWith(Iterable<T> iterable);
* startsWith(Iterator<T> iterator);
* AnyM<T> anyM();
* flatMapAnyM(Function<? super T,AnyM<? extends R>> fn);
* flatMapCollection(Function<? super T,Collection<? extends R>> fn);
* flatMapStream(Function<? super T,BaseStream<? extends R,?>> fn);
* flatMapOptional(Function<? super T,Optional<? extends R>> fn) ;
* flatMapCompletableFuture(Function<? super T,CompletableFuture<? extends R>> fn);
* flatMapCharSequence(Function<? super T,CharSequence> fn);
* flatMapFile(Function<? super T,File> fn);
* flatMapURL(Function<? super T, URL> fn) ;
* flatMapBufferedReader(Function<? super T,BufferedReader> fn);
* Collection<T> toLazyCollection();
* Collection<T> toConcurrentLazyCollection();
* Streamable<T> toConcurrentLazyStreamable();
* ReactiveSeq<T> appendStream(Stream<T> stream);
* ReactiveSeq<T> prependStream(Stream<T> stream);
* ReactiveSeq<T> append(T... values);
* ReactiveSeq<T> prepend(T... values) ;
* ReactiveSeq<T> insertAt(int pos, T... values);
* ReactiveSeq<T> deleteBetween(int start,int end);
* ReactiveSeq<T> insertStreamAt(int pos, Stream<T> stream);
* FutureOperations<T> futureOperations(Executor exec);
* boolean endsWith(Iterable<T> iterable);
* boolean endsWith(Stream<T> stream);
* ReactiveSeq<T> skip(long time, final TimeUnit unit);
* ReactiveSeq<T> limit(long time, final TimeUnit unit);
* ReactiveSeq<T> skipLast(int num);
* ReactiveSeq<T> limitLast(int num);
* HotStream<T> hotStream(Executor e);
* T firstValue();
* T single()
* Optional<T> elementAt(long index)
* Tuple2<T,ReactiveSeq<T>> get(long index)
* ReactiveSeq<Tuple2<T,Long>> elapsed()
* ReactiveSeq<Tuple2<T,Long>> timestamp()
* <T> CyclopsSubscriber<T> subscriber()
* ReactiveSeq<T> xPer(int x, long time, TimeUnit t);
* ReactiveSeq<T> onePer(long time, TimeUnit t);
* ReactiveSeq<T> fixedDelay(long l, TimeUnit unit);
* ReactiveSeq<T> jitter(long maxJitterPeriodInNanos);
* ReactiveSeq<T> debounce(long time, TimeUnit t);
* ReactiveSeq<List<T>> batchBySizeAndTime(int size, long time, TimeUnit t);
* <C extends Collection<T>> ReactiveSeq<C> batchBySizeAndTime(int size,long time, TimeUnit unit, Supplier<C> factory);
* ReactiveSeq<List<T>> batchByTime(long time, TimeUnit t);
* <C extends Collection<T>> ReactiveSeq<C> batchByTime(long time, TimeUnit unit, Supplier<C> factory);
* ReactiveSeq<List<T>> batchBySize(int size);
* <C extends Collection<T>>ReactiveSeq<C> batchBySize(int size, Supplier<C> supplier);
* ReactiveSeq<Streamable<T>> windowBySizeAndTime(int maxSize, long maxTime, TimeUnit maxTimeUnit);
* ReactiveSeq<Streamable<T>> windowWhile(Predicate<T> predicate);
* ReactiveSeq<Streamable<T>> windowUntil(Predicate<T> predicate);
* ReactiveSeq<Streamable<T>> windowStatefullyWhile(BiPredicate<Streamable<T>,T> predicate);
* ReactiveSeq<Streamable<T>> windowByTime(long time, TimeUnit t);
* ReactiveSeq<List<T>> batchUntil(Predicate<T> predicate);
* ReactiveSeq<List<T>> batchWhile(Predicate<T> predicate);
* <C extends Collection<T>> ReactiveSeq<C> batchWhile(Predicate<T> predicate, Supplier<C> factory);
* <C extends Collection<T>> ReactiveSeq<C> batchUntil(Predicate<T> predicate, Supplier<C> factory);
* ReactiveSeq<T> recover(final Function<Throwable, T> fn);
* <EX extends Throwable> ReactiveSeq<T> recover(Class<EX> exceptionClass, final Function<EX, T> fn);
* <R> ReactiveSeq<R> retry(Function<T,R> fn)
(many of these are also available on ReactiveSeq also)
zipping operators
- combineLatest
- withLatest
sharding operators :
- shard (map, fn)
Control operators -
- debounce
- onePer
- xPer
- control (fn)
- skipUntil (stream)
- takeUntil (stream)
- jitter
- fixedDelay
Batching operators
- batchBySize
- batchByTime
- batch (fn)
Chunking operators
- chunkSinceLastRead
- chunkSinceLastReadIterator
Futures operators
-
limitFutures
-
skipFutures
-
sliceFutures
-
duplicateFutures
-
partitionFutures
-
splitAtFutures
-
zipFutures
-
zipFuturesWithIndex
-
firstOf
batchBySizeAndTime : batches results until a time or size limit is reached
e.g. batch in 10's or whatever has returned within 5 seconds
lazyReact.from(urls)
.map(this::load)
.batchBySizeAndTime(10,5,TimeUnit.SECONDS)
.toList();
switchOnNextValue : creates a new stream that takes the lasted value from a number of streams
LazyFutureStream<Integer> fast = ... // [1,2,3,4,5,6,7..]
LazyFutureStream<Integer> slow = ... // [100,200,300,400,500,600..]
LazyFutureStream<Integer> merged = fast.switchOnNextValue(Stream.of(slow));
//[1,2,3,4,5,6,7,8,100,9,10,11,12,13,14,15,16,200..]
copy : copies a Stream the specified number of times
LazyFutureStream.of(1,2,3,4,5,6)
.map(i->i+2)
.copy(5)
.forEach(s -> System.out.println(s.toList()));
toLazyCollection : creates a Collection placeholder but doesn't block. EagerFutureStreams and SimpleReactStreams can populate the Collection asynchronously immediately and LazyFutureStreams won't populate it until a method is invoked
Collection<Integer> col = LazyFutureStream.of(1,2,3,4,5,6)
.map(i->i+2)
.toLazyCollection();
toConcurrentLazyCollection : creates a lazy collection that can be shared across threads
Collection<Integer> col = LazyFutureStream.of(1,2,3,4,5,6)
.map(i->i+2)
.toConcurrentLazyCollection();
firstValue : return the first value in a stream, must be present - no optional
int first = LazyFutureStream.of(1,2,3,4)
.firstValue();
//first is 1
single : return a single entry, exception if no entries or multiple
int num = LazyFutureStream.of(1)
.single();
//num is 1
futureOperations() & futureOperations(executor) - access terminal operations that return a future and execute asyncrhonously
CompletableFuture<Integer> sum = LazyFutureStream.of(1,2,3,4,5)
.map(it -> it*100)
.futureOperations()
.reduce( 50,(acc,next) -> acc+next);
//sum is CompletableFuture[1550]
sliding(size) : creates a sliding window over the data in the stream
//futureStream
oops - my bad