Skip to content

Commit

Permalink
Extend Closable in Channel
Browse files Browse the repository at this point in the history
Adds an onClose method to Channel.
  • Loading branch information
trowski committed Apr 3, 2022
1 parent 1f89deb commit f25b8d2
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 13 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ jobs:
- name: Run tests
run: vendor/bin/phpunit ${{ matrix.phpunit-flags }}

- name: Run static analysis
run: vendor/bin/psalm.phar

- name: Run style fixer
env:
PHP_CS_FIXER_IGNORE_ENV: 1
Expand Down
15 changes: 2 additions & 13 deletions src/Channel.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Amp\Sync;

use Amp\Cancellation;
use Amp\Closable;
use Amp\Serialization\SerializationException;

/**
Expand All @@ -11,7 +12,7 @@
* @template TReceive
* @template TSend
*/
interface Channel
interface Channel extends Closable
{
/**
* @param Cancellation|null $cancellation Cancels waiting for the next value. Note the next value is not discarded
Expand All @@ -31,16 +32,4 @@ public function receive(?Cancellation $cancellation = null): mixed;
* @throws SerializationException If the underlying transport mechanism uses serialization and fails.
*/
public function send(mixed $data): void;

/**
* Closes the channel, preventing further sends or receives.
*/
public function close(): void;

/**
* Returns whether the channel has been closed.
*
* @return bool {@code true} if closed, otherwise {@code false}
*/
public function isClosed(): bool;
}
14 changes: 14 additions & 0 deletions src/Internal/ConcurrentIteratorChannel.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Amp\Sync\Internal;

use Amp\Cancellation;
use Amp\DeferredFuture;
use Amp\Pipeline\ConcurrentIterator;
use Amp\Pipeline\Queue;
use Amp\Sync\Channel;
Expand All @@ -22,6 +23,8 @@
*/
final class ConcurrentIteratorChannel implements Channel
{
private readonly DeferredFuture $onClose;

/**
* @param ConcurrentIterator<TReceive> $receive
* @param Queue<TSend> $send
Expand All @@ -30,6 +33,7 @@ public function __construct(
private readonly ConcurrentIterator $receive,
private readonly Queue $send,
) {
$this->onClose = new DeferredFuture();
}

public function __destruct()
Expand All @@ -49,11 +53,21 @@ public function close(): void
}

$this->receive->dispose();

if ($this->onClose->isComplete()) {
$this->onClose->complete();
}
}

public function onClose(\Closure $onClose): void
{
$this->onClose->getFuture()->finally($onClose);
}

public function receive(?Cancellation $cancellation = null): mixed
{
if (!$this->receive->continue($cancellation)) {
$this->close();
throw new ChannelException("The channel closed while waiting to receive the next value");
}

Expand Down

0 comments on commit f25b8d2

Please sign in to comment.