Skip to content

Commit

Permalink
Implement Stringable in Payload
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Sep 3, 2023
1 parent 408a3b4 commit 3a4a0fa
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 49 deletions.
13 changes: 12 additions & 1 deletion src/Payload.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*
* @implements \IteratorAggregate<int, string>
*/
final class Payload implements ReadableStream, \IteratorAggregate
final class Payload implements ReadableStream, \IteratorAggregate, \Stringable
{
use ReadableStreamIteratorAggregate;
use ForbidCloning;
Expand Down Expand Up @@ -128,4 +128,15 @@ public function onClose(\Closure $onClose): void
{
$this->onClose->getFuture()->finally($onClose);
}

/**
* Buffers entire stream before returning. Use {@see self::buffer()} to optionally provide a{@see Cancellation}
* and/or length limit.
*
* @throws BufferException|StreamException
*/
public function __toString(): string
{
return $this->buffer();
}
}
86 changes: 38 additions & 48 deletions test/PayloadTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,83 +7,73 @@
use Amp\Pipeline\Queue;
use Revolt\EventLoop;
use function Amp\async;
use function Amp\delay;

final class PayloadTest extends AsyncTestCase
{
public function testBufferingAll(): void
/**
* @param \Closure(string $expected, Payload $stream):void $test
*/
private function usingStream(\Closure $test, ?float $delay = null): void
{
$values = ["abc", "def", "ghi"];

$queue = new Queue;
$stream = new Payload(new ReadableIterableStream($queue->pipe()));

$future = async($test, \implode($values), $stream);

foreach ($values as $value) {
$queue->pushAsync($value)->ignore();
$queue->push($value);
}

if ($delay > 0) {
delay($delay);
}

$queue->complete();

self::assertSame(\implode($values), $stream->buffer());
$future->await();
}

public function testFullStreamConsumption(): void
{
$values = ["abc", "def", "ghi"];

$queue = new Queue;
$stream = new Payload(new ReadableIterableStream($queue->pipe()));

foreach ($values as $value) {
$queue->pushAsync($value)->ignore();
}

EventLoop::delay(0.005, function () use ($queue) {
$queue->complete();
});

$buffer = "";
while (($chunk = $stream->read()) !== null) {
$buffer .= $chunk;
}
$this->usingStream(function (string $expected, Payload $stream): void {
$emitted = [];
while (($chunk = $stream->read()) !== null) {
$emitted[] = $chunk;
}

self::assertSame(\implode($values), $buffer);
self::assertSame($expected, \implode($emitted));
}, delay: 0.1);
}

public function testFastResolvingStream(): void
{
$values = ["abc", "def", "ghi"];

$queue = new Queue;
$stream = new Payload(new ReadableIterableStream($queue->pipe()));

foreach ($values as $value) {
$queue->pushAsync($value)->ignore();
}

$queue->complete();

$emitted = [];
while (($chunk = $stream->read()) !== null) {
$emitted[] = $chunk;
}
$this->usingStream(function (string $expected, Payload $stream): void {
$emitted = [];
while (($chunk = $stream->read()) !== null) {
$emitted[] = $chunk;
}

self::assertSame($values, $emitted);
self::assertSame($expected, \implode($emitted));
});
}

public function testFastResolvingStreamBufferingOnly(): void
{
$values = ["abc", "def", "ghi"];

$queue = new Queue;
$stream = new Payload(new ReadableIterableStream($queue->pipe()));

foreach ($values as $value) {
$queue->pushAsync($value)->ignore();
}

$queue->complete();
$this->usingStream(
fn (string $expected, Payload $stream) => self::assertSame($expected, $stream->buffer()),
delay: 0.1,
);
}

self::assertSame(\implode($values), $stream->buffer());
public function testStringCast(): void
{
$this->usingStream(
fn (string $expected, Payload $stream) => self::assertSame($expected, (string) $stream),
delay: 0.1,
);
}

public function testPartialStreamConsumption(): void
Expand Down

0 comments on commit 3a4a0fa

Please sign in to comment.