From fe1a4129375500e9a7602e382f4eb9d5f6f5ded4 Mon Sep 17 00:00:00 2001 From: yyy1000 <992364620@qq.com> Date: Wed, 12 Apr 2023 21:16:10 +0800 Subject: [PATCH 1/5] fix: chunk get together when random rechunk --- core/shared/src/main/scala/fs2/Stream.scala | 2 +- core/shared/src/test/scala/fs2/StreamSuite.scala | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 9f6faad245..7de3c25879 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -2413,7 +2413,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, Pull.output(acc) >> go(hd, size, tl) else { val (out, rem) = acc.splitAt(size - 1) - Pull.output(out) >> go(rem ++ hd, -1, tl) + Pull.output(out) >> go(rem, -1, Pull.output(hd) >> tl) } case None => Pull.output(acc) diff --git a/core/shared/src/test/scala/fs2/StreamSuite.scala b/core/shared/src/test/scala/fs2/StreamSuite.scala index d1fb1ae82d..dd043c9848 100644 --- a/core/shared/src/test/scala/fs2/StreamSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamSuite.scala @@ -1031,6 +1031,13 @@ class StreamSuite extends Fs2Suite { assertEquals(countChunks(source), 2) } + test("rechunkRandomly sometimes emits everything in a single chunk") { + val fiveChunks = Stream(1) ++ Stream(2) ++ Stream(3) ++ Stream(4) ++ Stream(5) + + val source = fiveChunks.rechunkRandomlyWithSeed(0.1, 2.0)(5).chunks.toList + assert(source.forall(_.size <= 2), "Some Chunks have larger size than maxFactor") + } + group("Stream[F, Either[Throwable, O]]") { test(".evalMap(_.pure.rethrow).mask <-> .rethrow.mask") { forAllF { (stream: Stream[Pure, Int]) => From 6103c9bba4f4d31315bf83f5a41ba4400c083a79 Mon Sep 17 00:00:00 2001 From: yyy1000 <992364620@qq.com> Date: Wed, 12 Apr 2023 21:58:35 +0800 Subject: [PATCH 2/5] fix: another mistake and comment --- core/shared/src/main/scala/fs2/Stream.scala | 4 ++-- core/shared/src/test/scala/fs2/StreamSuite.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 7de3c25879..709b143f4d 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -2410,10 +2410,10 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, if (acc.size < size) go(acc ++ hd, size, tl) else if (acc.size == size) - Pull.output(acc) >> go(hd, size, tl) + Pull.output(acc) >> go(hd, -1, tl) else { val (out, rem) = acc.splitAt(size - 1) - Pull.output(out) >> go(rem, -1, Pull.output(hd) >> tl) + Pull.output(out) >> go(rem, -1, s) } case None => Pull.output(acc) diff --git a/core/shared/src/test/scala/fs2/StreamSuite.scala b/core/shared/src/test/scala/fs2/StreamSuite.scala index dd043c9848..1d7c946256 100644 --- a/core/shared/src/test/scala/fs2/StreamSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamSuite.scala @@ -1035,7 +1035,7 @@ class StreamSuite extends Fs2Suite { val fiveChunks = Stream(1) ++ Stream(2) ++ Stream(3) ++ Stream(4) ++ Stream(5) val source = fiveChunks.rechunkRandomlyWithSeed(0.1, 2.0)(5).chunks.toList - assert(source.forall(_.size <= 2), "Some Chunks have larger size than maxFactor") + assert(source.forall(_.size <= 2), "Some Chunks have larger size than maxFactor * chunkSize") } group("Stream[F, Either[Throwable, O]]") { From eb8c472e05778e16960b53f610beccb806bbf07d Mon Sep 17 00:00:00 2001 From: yyy1000 <992364620@qq.com> Date: Tue, 18 Apr 2023 10:29:57 +0800 Subject: [PATCH 3/5] fix: add small difference --- core/shared/src/main/scala/fs2/Stream.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 709b143f4d..eef759ea87 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -2399,7 +2399,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, Stream.suspend { assert(maxFactor >= minFactor, "maxFactor should be greater or equal to minFactor") val random = new scala.util.Random(seed) - def factor: Double = Math.abs(random.nextInt()) % (maxFactor - minFactor) + minFactor + def factor: Double = random.between(minFactor, maxFactor+ (1e-4)) def nextSize(sourceSize: Int): Int = (factor * sourceSize).toInt From 7dab44d6523c8331cbf65b73edf9617c3ec7501e Mon Sep 17 00:00:00 2001 From: yyy1000 <992364620@qq.com> Date: Tue, 18 Apr 2023 10:34:24 +0800 Subject: [PATCH 4/5] fix: scala-fmt --- core/shared/src/main/scala/fs2/Stream.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index eef759ea87..abc32d29ed 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -2399,7 +2399,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, Stream.suspend { assert(maxFactor >= minFactor, "maxFactor should be greater or equal to minFactor") val random = new scala.util.Random(seed) - def factor: Double = random.between(minFactor, maxFactor+ (1e-4)) + def factor: Double = random.between(minFactor, maxFactor + 1e-4) def nextSize(sourceSize: Int): Int = (factor * sourceSize).toInt From d429186c255b131b7adb4e1337c102edc1a01de2 Mon Sep 17 00:00:00 2001 From: yyy1000 <992364620@qq.com> Date: Tue, 18 Apr 2023 10:40:50 +0800 Subject: [PATCH 5/5] fix: low version --- core/shared/src/main/scala/fs2/Stream.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index abc32d29ed..36ab5619e1 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -2399,7 +2399,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, Stream.suspend { assert(maxFactor >= minFactor, "maxFactor should be greater or equal to minFactor") val random = new scala.util.Random(seed) - def factor: Double = random.between(minFactor, maxFactor + 1e-4) + def factor: Double = Math.abs(random.nextInt()) % (maxFactor - minFactor) + minFactor + 1e-4 def nextSize(sourceSize: Int): Int = (factor * sourceSize).toInt