diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 9f6faad245..36ab5619e1 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -2399,7 +2399,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, Stream.suspend { assert(maxFactor >= minFactor, "maxFactor should be greater or equal to minFactor") val random = new scala.util.Random(seed) - def factor: Double = Math.abs(random.nextInt()) % (maxFactor - minFactor) + minFactor + def factor: Double = Math.abs(random.nextInt()) % (maxFactor - minFactor) + minFactor + 1e-4 def nextSize(sourceSize: Int): Int = (factor * sourceSize).toInt @@ -2410,10 +2410,10 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, if (acc.size < size) go(acc ++ hd, size, tl) else if (acc.size == size) - Pull.output(acc) >> go(hd, size, tl) + Pull.output(acc) >> go(hd, -1, tl) else { val (out, rem) = acc.splitAt(size - 1) - Pull.output(out) >> go(rem ++ hd, -1, tl) + Pull.output(out) >> go(rem, -1, s) } case None => Pull.output(acc) diff --git a/core/shared/src/test/scala/fs2/StreamSuite.scala b/core/shared/src/test/scala/fs2/StreamSuite.scala index d1fb1ae82d..1d7c946256 100644 --- a/core/shared/src/test/scala/fs2/StreamSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamSuite.scala @@ -1031,6 +1031,13 @@ class StreamSuite extends Fs2Suite { assertEquals(countChunks(source), 2) } + test("rechunkRandomly sometimes emits everything in a single chunk") { + val fiveChunks = Stream(1) ++ Stream(2) ++ Stream(3) ++ Stream(4) ++ Stream(5) + + val source = fiveChunks.rechunkRandomlyWithSeed(0.1, 2.0)(5).chunks.toList + assert(source.forall(_.size <= 2), "Some Chunks have larger size than maxFactor * chunkSize") + } + group("Stream[F, Either[Throwable, O]]") { test(".evalMap(_.pure.rethrow).mask <-> .rethrow.mask") { forAllF { (stream: Stream[Pure, Int]) =>