diff --git a/README.md b/README.md index 18089464..491889e4 100644 --- a/README.md +++ b/README.md @@ -82,6 +82,7 @@ multiple concurrent HTTP requests without blocking. * [Uri](#uri) * [ResponseException](#responseexception) * [React\Http\Middleware](#reacthttpmiddleware) + * [InactiveConnectionTimeoutMiddleware](#inactiveconnectiontimeoutmiddleware) * [StreamingRequestMiddleware](#streamingrequestmiddleware) * [LimitConcurrentRequestsMiddleware](#limitconcurrentrequestsmiddleware) * [RequestBodyBufferMiddleware](#requestbodybuffermiddleware) @@ -2692,6 +2693,25 @@ access its underlying response object. ### React\Http\Middleware +#### InactiveConnectionTimeoutMiddleware + +The `React\Http\Middleware\InactiveConnectionTimeoutMiddleware` is purely a configuration middleware to configure the +`HttpServer` to close any inactive connections between requests to close the connection and not leave them needlessly +open. The default is `60` seconds of inactivity and should only be changed if you know what you are doing. + +The following example configures the `HttpServer` to close any inactive connections after one and a half second: + +```php +$http = new React\Http\HttpServer( + new React\Http\Middleware\InactiveConnectionTimeoutMiddleware(1.5), + $handler +); +``` +> Internally, this class is used as a "value object" to override the default timeout of one minute. + As such it doesn't have any behavior internally, that is all in the internal "StreamingServer". + This timeout is only in effect if we expect data from the client, not when we are writing data to + the client. + #### StreamingRequestMiddleware The `React\Http\Middleware\StreamingRequestMiddleware` can be used to diff --git a/src/HttpServer.php b/src/HttpServer.php index f2334733..2dc8218c 100644 --- a/src/HttpServer.php +++ b/src/HttpServer.php @@ -8,6 +8,7 @@ use React\Http\Io\IniUtil; use React\Http\Io\MiddlewareRunner; use React\Http\Io\StreamingServer; +use React\Http\Middleware\InactiveConnectionTimeoutMiddleware; use React\Http\Middleware\LimitConcurrentRequestsMiddleware; use React\Http\Middleware\StreamingRequestMiddleware; use React\Http\Middleware\RequestBodyBufferMiddleware; @@ -219,10 +220,13 @@ public function __construct($requestHandlerOrLoop) } $streaming = false; + $idleConnectionTimeout = InactiveConnectionTimeoutMiddleware::DEFAULT_TIMEOUT; foreach ((array) $requestHandlers as $handler) { if ($handler instanceof StreamingRequestMiddleware) { $streaming = true; - break; + } + if ($handler instanceof InactiveConnectionTimeoutMiddleware) { + $idleConnectionTimeout = $handler->getTimeout(); } } @@ -252,10 +256,10 @@ public function __construct($requestHandlerOrLoop) * doing anything with the request. */ $middleware = \array_filter($middleware, function ($handler) { - return !($handler instanceof StreamingRequestMiddleware); + return !($handler instanceof StreamingRequestMiddleware) && !($handler instanceof InactiveConnectionTimeoutMiddleware); }); - $this->streamingServer = new StreamingServer($loop, new MiddlewareRunner($middleware)); + $this->streamingServer = new StreamingServer($loop, new MiddlewareRunner($middleware), $idleConnectionTimeout); $that = $this; $this->streamingServer->on('error', function ($error) use ($that) { diff --git a/src/Io/StreamingServer.php b/src/Io/StreamingServer.php index 790c8cc1..d51592bd 100644 --- a/src/Io/StreamingServer.php +++ b/src/Io/StreamingServer.php @@ -87,6 +87,12 @@ final class StreamingServer extends EventEmitter /** @var Clock */ private $clock; + /** @var LoopInterface */ + private $loop; + + /** @var int */ + private $idleConnectionTimeout; + /** * Creates an HTTP server that invokes the given callback for each incoming HTTP request * @@ -95,19 +101,21 @@ final class StreamingServer extends EventEmitter * connections in order to then parse incoming data as HTTP. * See also [listen()](#listen) for more details. * - * @param LoopInterface $loop * @param callable $requestHandler + * @param int $idleConnectionTimeout * @see self::listen() */ - public function __construct(LoopInterface $loop, $requestHandler) + public function __construct(LoopInterface $loop, $requestHandler, $idleConnectionTimeout) { if (!\is_callable($requestHandler)) { throw new \InvalidArgumentException('Invalid request handler given'); } + $this->loop = $loop; $this->callback = $requestHandler; $this->clock = new Clock($loop); $this->parser = new RequestHeaderParser($this->clock); + $this->idleConnectionTimeout = $idleConnectionTimeout; $that = $this; $this->parser->on('headers', function (ServerRequestInterface $request, ConnectionInterface $conn) use ($that) { @@ -134,7 +142,35 @@ public function __construct(LoopInterface $loop, $requestHandler) */ public function listen(ServerInterface $socket) { - $socket->on('connection', array($this->parser, 'handle')); + $socket->on('connection', array($this, 'handleConnection')); + } + + /** @internal */ + public function handleConnection(ConnectionInterface $connection) + { + $idleConnectionTimeout = $this->idleConnectionTimeout; + $loop = $this->loop; + $idleConnectionTimeoutHandler = function () use ($connection, &$closeEventHandler, &$dataEventHandler) { + $connection->removeListener('close', $closeEventHandler); + $connection->removeListener('data', $dataEventHandler); + + $connection->close(); + }; + $timer = $this->loop->addTimer($idleConnectionTimeout, $idleConnectionTimeoutHandler); + $closeEventHandler = function () use ($connection, &$closeEventHandler, &$dataEventHandler, $loop, &$timer) { + $connection->removeListener('close', $closeEventHandler); + $connection->removeListener('data', $dataEventHandler); + + $loop->cancelTimer($timer); + }; + $dataEventHandler = function () use ($loop, $idleConnectionTimeout, $idleConnectionTimeoutHandler, &$timer) { + $loop->cancelTimer($timer); + $timer = $loop->addTimer($idleConnectionTimeout, $idleConnectionTimeoutHandler); + }; + $connection->on('close', $closeEventHandler); + $connection->on('data', $dataEventHandler); + + $this->parseRequest($connection); } /** @internal */ @@ -359,7 +395,7 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt // either wait for next request over persistent connection or end connection if ($persist) { - $this->parser->handle($connection); + $this->parseRequest($connection); } else { $connection->end(); } @@ -380,13 +416,46 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt // write streaming body and then wait for next request over persistent connection if ($persist) { $body->pipe($connection, array('end' => false)); - $parser = $this->parser; - $body->on('end', function () use ($connection, $parser, $body) { + $that = $this; + $body->on('end', function () use ($connection, $body, &$that) { $connection->removeListener('close', array($body, 'close')); - $parser->handle($connection); + $that->parseRequest($connection); }); } else { $body->pipe($connection); } } + + /** + * @internal + */ + public function parseRequest(ConnectionInterface $connection) + { + $idleConnectionTimeout = $this->idleConnectionTimeout; + $loop = $this->loop; + $parser = $this->parser; + $idleConnectionTimeoutHandler = function () use ($connection, $parser, &$removeTimerHandler) { + $parser->removeListener('headers', $removeTimerHandler); + $parser->removeListener('error', $removeTimerHandler); + + $parser->emit('error', array( + new \RuntimeException('Request timed out', Response::STATUS_REQUEST_TIMEOUT), + $connection + )); + }; + $timer = $this->loop->addTimer($idleConnectionTimeout, $idleConnectionTimeoutHandler); + $removeTimerHandler = function ($_, $conn) use ($loop, $timer, $parser, $connection, &$removeTimerHandler) { + if ($conn !== $connection) { + return; + } + + $loop->cancelTimer($timer); + $parser->removeListener('headers', $removeTimerHandler); + $parser->removeListener('error', $removeTimerHandler); + }; + $this->parser->on('headers', $removeTimerHandler); + $this->parser->on('error', $removeTimerHandler); + + $this->parser->handle($connection); + } } diff --git a/src/Middleware/InactiveConnectionTimeoutMiddleware.php b/src/Middleware/InactiveConnectionTimeoutMiddleware.php new file mode 100644 index 00000000..0ca4c759 --- /dev/null +++ b/src/Middleware/InactiveConnectionTimeoutMiddleware.php @@ -0,0 +1,62 @@ + Internally, this class is used as a "value object" to override the default timeout of one minute. + * As such it doesn't have any behavior internally, that is all in the internal "StreamingServer". + */ +final class InactiveConnectionTimeoutMiddleware +{ + /** + * @internal + */ + const DEFAULT_TIMEOUT = 60; + + /** + * @var float + */ + private $timeout; + + /** + * @param float $timeout + */ + public function __construct($timeout = self::DEFAULT_TIMEOUT) + { + $this->timeout = $timeout; + } + + public function __invoke(ServerRequestInterface $request, $next) + { + return $next($request); + } + + /** + * @return float + * @internal + */ + public function getTimeout() + { + return $this->timeout; + } +} diff --git a/tests/HttpServerTest.php b/tests/HttpServerTest.php index 72d48468..518af923 100644 --- a/tests/HttpServerTest.php +++ b/tests/HttpServerTest.php @@ -6,6 +6,8 @@ use React\EventLoop\Loop; use React\Http\HttpServer; use React\Http\Io\IniUtil; +use React\Http\Io\StreamingServer; +use React\Http\Middleware\InactiveConnectionTimeoutMiddleware; use React\Http\Middleware\StreamingRequestMiddleware; use React\Promise; use React\Promise\Deferred; @@ -60,6 +62,10 @@ public function testConstructWithoutLoopAssignsLoopAutomatically() $ref->setAccessible(true); $clock = $ref->getValue($streamingServer); + $ref = new \ReflectionProperty($streamingServer, 'parser'); + $ref->setAccessible(true); + $parser = $ref->getValue($streamingServer); + $ref = new \ReflectionProperty($clock, 'loop'); $ref->setAccessible(true); $loop = $ref->getValue($clock); @@ -257,6 +263,18 @@ function (ServerRequestInterface $request) use (&$streaming) { $this->assertEquals(true, $streaming); } + public function testIdleConnectionWillBeClosedAfterConfiguredTimeout() + { + $this->connection->expects($this->once())->method('close'); + + $http = new HttpServer(Loop::get(), new InactiveConnectionTimeoutMiddleware(0.1), $this->expectCallableNever()); + + $http->listen($this->socket); + $this->socket->emit('connection', array($this->connection)); + + Loop::run(); + } + public function testForwardErrors() { $exception = new \Exception(); @@ -439,7 +457,7 @@ public function testConstructServerWithMemoryLimitDoesLimitConcurrency() public function testConstructFiltersOutConfigurationMiddlewareBefore() { - $http = new HttpServer(new StreamingRequestMiddleware(), function () { }); + $http = new HttpServer(new InactiveConnectionTimeoutMiddleware(0), new StreamingRequestMiddleware(), function () { }); $ref = new \ReflectionProperty($http, 'streamingServer'); $ref->setAccessible(true); diff --git a/tests/Io/StreamingServerTest.php b/tests/Io/StreamingServerTest.php index afab371e..6da40c70 100644 --- a/tests/Io/StreamingServerTest.php +++ b/tests/Io/StreamingServerTest.php @@ -60,7 +60,7 @@ private function mockConnection(array $additionalMethods = null) public function testRequestEventWillNotBeEmittedForIncompleteHeaders() { - $server = new StreamingServer(Loop::get(), $this->expectCallableNever()); + $server = $this->createStreamingServer($this->expectCallableNever()); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -70,9 +70,21 @@ public function testRequestEventWillNotBeEmittedForIncompleteHeaders() $this->connection->emit('data', array($data)); } + public function testIdleConnectionWillBeClosedAfterConfiguredTimeout() + { + $this->connection->expects($this->once())->method('close'); + + $server = $this->createStreamingServer($this->expectCallableNever(), 0.1); + + $server->listen($this->socket); + $this->socket->emit('connection', array($this->connection)); + + Loop::run(); + } + public function testRequestEventIsEmitted() { - $server = new StreamingServer(Loop::get(), $this->expectCallableOnce()); + $server = $this->createStreamingServer($this->expectCallableOnce()); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -87,7 +99,7 @@ public function testRequestEventIsEmitted() public function testRequestEventIsEmittedForArrayCallable() { $this->called = null; - $server = new StreamingServer(Loop::get(), array($this, 'helperCallableOnce')); + $server = $this->createStreamingServer(array($this, 'helperCallableOnce')); $server->listen($this->socket); $this->socket->emit('connection', array($this->connection)); @@ -107,7 +119,7 @@ public function testRequestEvent() { $i = 0; $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$i, &$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$i, &$requestAssertion) { $i++; $requestAssertion = $request; }); @@ -140,7 +152,7 @@ public function testRequestEventWithSingleRequestHandlerArray() { $i = 0; $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$i, &$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$i, &$requestAssertion) { $i++; $requestAssertion = $request; }); @@ -172,7 +184,7 @@ public function testRequestEventWithSingleRequestHandlerArray() public function testRequestGetWithHostAndCustomPort() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -194,7 +206,7 @@ public function testRequestGetWithHostAndCustomPort() public function testRequestGetWithHostAndHttpsPort() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -216,7 +228,7 @@ public function testRequestGetWithHostAndHttpsPort() public function testRequestGetWithHostAndDefaultPortWillBeIgnored() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -238,7 +250,7 @@ public function testRequestGetWithHostAndDefaultPortWillBeIgnored() public function testRequestGetHttp10WithoutHostWillBeIgnored() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -260,7 +272,7 @@ public function testRequestGetHttp10WithoutHostWillBeIgnored() public function testRequestGetHttp11WithoutHostWillReject() { - $server = new StreamingServer(Loop::get(), 'var_dump'); + $server = $this->createStreamingServer('var_dump'); $server->on('error', $this->expectCallableOnce()); $server->listen($this->socket); @@ -273,7 +285,7 @@ public function testRequestGetHttp11WithoutHostWillReject() public function testRequestOptionsAsterisk() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -293,7 +305,7 @@ public function testRequestOptionsAsterisk() public function testRequestNonOptionsWithAsteriskRequestTargetWillReject() { - $server = new StreamingServer(Loop::get(), $this->expectCallableNever()); + $server = $this->createStreamingServer($this->expectCallableNever()); $server->on('error', $this->expectCallableOnce()); $server->listen($this->socket); @@ -306,7 +318,7 @@ public function testRequestNonOptionsWithAsteriskRequestTargetWillReject() public function testRequestConnectAuthorityForm() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -328,7 +340,7 @@ public function testRequestConnectAuthorityForm() public function testRequestConnectWithoutHostWillBePassesAsIs() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -350,7 +362,7 @@ public function testRequestConnectWithoutHostWillBePassesAsIs() public function testRequestConnectAuthorityFormWithDefaultPortWillBePassedAsIs() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -372,7 +384,7 @@ public function testRequestConnectAuthorityFormWithDefaultPortWillBePassedAsIs() public function testRequestConnectAuthorityFormNonMatchingHostWillBePassedAsIs() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -393,7 +405,7 @@ public function testRequestConnectAuthorityFormNonMatchingHostWillBePassedAsIs() public function testRequestConnectOriginFormRequestTargetWillReject() { - $server = new StreamingServer(Loop::get(), $this->expectCallableNever()); + $server = $this->createStreamingServer($this->expectCallableNever()); $server->on('error', $this->expectCallableOnce()); $server->listen($this->socket); @@ -405,7 +417,7 @@ public function testRequestConnectOriginFormRequestTargetWillReject() public function testRequestNonConnectWithAuthorityRequestTargetWillReject() { - $server = new StreamingServer(Loop::get(), $this->expectCallableNever()); + $server = $this->createStreamingServer($this->expectCallableNever()); $server->on('error', $this->expectCallableOnce()); $server->listen($this->socket); @@ -419,7 +431,7 @@ public function testRequestWithoutHostEventUsesSocketAddress() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -445,7 +457,7 @@ public function testRequestAbsoluteEvent() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -467,7 +479,7 @@ public function testRequestAbsoluteNonMatchingHostWillBePassedAsIs() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -487,7 +499,7 @@ public function testRequestAbsoluteNonMatchingHostWillBePassedAsIs() public function testRequestAbsoluteWithoutHostWillReject() { - $server = new StreamingServer(Loop::get(), $this->expectCallableNever()); + $server = $this->createStreamingServer($this->expectCallableNever()); $server->on('error', $this->expectCallableOnce()); $server->listen($this->socket); @@ -501,7 +513,7 @@ public function testRequestOptionsAsteriskEvent() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -523,7 +535,7 @@ public function testRequestOptionsAbsoluteEvent() { $requestAssertion = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestAssertion) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestAssertion) { $requestAssertion = $request; }); @@ -543,7 +555,7 @@ public function testRequestOptionsAbsoluteEvent() public function testRequestPauseWillBeForwardedToConnection() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { $request->getBody()->pause(); }); @@ -563,7 +575,7 @@ public function testRequestPauseWillBeForwardedToConnection() public function testRequestResumeWillBeForwardedToConnection() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { $request->getBody()->resume(); }); @@ -583,7 +595,7 @@ public function testRequestResumeWillBeForwardedToConnection() public function testRequestCloseWillNotCloseConnection() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { $request->getBody()->close(); }); @@ -598,7 +610,7 @@ public function testRequestCloseWillNotCloseConnection() public function testRequestPauseAfterCloseWillNotBeForwarded() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { $request->getBody()->close(); $request->getBody()->pause(); }); @@ -615,7 +627,7 @@ public function testRequestPauseAfterCloseWillNotBeForwarded() public function testRequestResumeAfterCloseWillNotBeForwarded() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { $request->getBody()->close(); $request->getBody()->resume(); }); @@ -634,7 +646,7 @@ public function testRequestEventWithoutBodyWillNotEmitData() { $never = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($never) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($never) { $request->getBody()->on('data', $never); }); @@ -649,7 +661,7 @@ public function testRequestEventWithSecondDataEventWillEmitBodyData() { $once = $this->expectCallableOnceWith('incomplete'); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($once) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($once) { $request->getBody()->on('data', $once); }); @@ -669,7 +681,7 @@ public function testRequestEventWithPartialBodyWillEmitData() { $once = $this->expectCallableOnceWith('incomplete'); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($once) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($once) { $request->getBody()->on('data', $once); }); @@ -690,7 +702,7 @@ public function testRequestEventWithPartialBodyWillEmitData() public function testResponseContainsServerHeader() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response(); }); @@ -720,7 +732,7 @@ public function testResponsePendingPromiseWillNotSendAnything() { $never = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($never) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($never) { return new Promise(function () { }, $never); }); @@ -750,7 +762,7 @@ public function testResponsePendingPromiseWillBeCancelledIfConnectionCloses() { $once = $this->expectCallableOnce(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($once) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($once) { return new Promise(function () { }, $once); }); @@ -782,7 +794,7 @@ public function testResponseBodyStreamAlreadyClosedWillSendEmptyBodyChunkedEncod $stream = new ThroughStream(); $stream->close(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 200, array(), @@ -811,13 +823,15 @@ function ($data) use (&$buffer) { $this->assertStringStartsWith("HTTP/1.1 200 OK\r\n", $buffer); $this->assertStringEndsWith("\r\n\r\n0\r\n\r\n", $buffer); + + Loop::run(); } public function testResponseBodyStreamEndingWillSendEmptyBodyChunkedEncoded() { $stream = new ThroughStream(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 200, array(), @@ -848,6 +862,8 @@ function ($data) use (&$buffer) { $this->assertStringStartsWith("HTTP/1.1 200 OK\r\n", $buffer); $this->assertStringEndsWith("\r\n\r\n0\r\n\r\n", $buffer); + + Loop::run(); } public function testResponseBodyStreamAlreadyClosedWillSendEmptyBodyPlainHttp10() @@ -855,7 +871,7 @@ public function testResponseBodyStreamAlreadyClosedWillSendEmptyBodyPlainHttp10( $stream = new ThroughStream(); $stream->close(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 200, array(), @@ -891,7 +907,7 @@ public function testResponseStreamWillBeClosedIfConnectionIsAlreadyClosed() $stream = new ThroughStream(); $stream->on('close', $this->expectCallableOnce()); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 200, array(), @@ -946,7 +962,7 @@ public function testResponseBodyStreamWillBeClosedIfConnectionEmitsCloseEvent() $stream = new ThroughStream(); $stream->on('close', $this->expectCallableOnce()); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 200, array(), @@ -964,7 +980,7 @@ public function testResponseBodyStreamWillBeClosedIfConnectionEmitsCloseEvent() public function testResponseUpgradeInResponseCanBeUsedToAdvertisePossibleUpgrade() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array( @@ -996,11 +1012,13 @@ function ($data) use (&$buffer) { $this->connection->emit('data', array($data)); $this->assertEquals("HTTP/1.1 200 OK\r\nUpgrade: demo\r\nContent-Length: 3\r\n\r\nfoo", $buffer); + + Loop::run(); } public function testResponseUpgradeWishInRequestCanBeIgnoredByReturningNormalResponse() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array( @@ -1031,11 +1049,13 @@ function ($data) use (&$buffer) { $this->connection->emit('data', array($data)); $this->assertEquals("HTTP/1.1 200 OK\r\nContent-Length: 3\r\n\r\nfoo", $buffer); + + Loop::run(); } public function testResponseUpgradeSwitchingProtocolIncludesConnectionUpgradeHeaderWithoutContentLength() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 101, array( @@ -1075,7 +1095,7 @@ public function testResponseUpgradeSwitchingProtocolWithStreamWillPipeDataToConn { $stream = new ThroughStream(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 101, array( @@ -1116,7 +1136,7 @@ public function testResponseConnectMethodStreamWillPipeDataToConnection() { $stream = new ThroughStream(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 200, array(), @@ -1154,7 +1174,7 @@ public function testResponseConnectMethodStreamWillPipeDataFromConnection() { $stream = new ThroughStream(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 200, array(), @@ -1173,7 +1193,7 @@ public function testResponseConnectMethodStreamWillPipeDataFromConnection() public function testResponseContainsSameRequestProtocolVersionAndChunkedBodyForHttp11() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array(), @@ -1202,11 +1222,13 @@ function ($data) use (&$buffer) { $this->assertContainsString("HTTP/1.1 200 OK\r\n", $buffer); $this->assertContainsString("bye", $buffer); + + Loop::run(); } public function testResponseContainsSameRequestProtocolVersionAndRawBodyForHttp10() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array(), @@ -1240,7 +1262,7 @@ function ($data) use (&$buffer) { public function testResponseContainsNoResponseBodyForHeadRequest() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array(), @@ -1269,6 +1291,8 @@ function ($data) use (&$buffer) { $this->assertContainsString("HTTP/1.1 200 OK\r\n", $buffer); $this->assertContainsString("\r\nContent-Length: 3\r\n", $buffer); $this->assertNotContainsString("bye", $buffer); + + Loop::run(); } public function testResponseContainsNoResponseBodyForHeadRequestWithStreamingResponse() @@ -1276,7 +1300,7 @@ public function testResponseContainsNoResponseBodyForHeadRequestWithStreamingRes $stream = new ThroughStream(); $stream->on('close', $this->expectCallableOnce()); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 200, array('Content-Length' => '3'), @@ -1304,11 +1328,13 @@ function ($data) use (&$buffer) { $this->assertContainsString("HTTP/1.1 200 OK\r\n", $buffer); $this->assertContainsString("\r\nContent-Length: 3\r\n", $buffer); + + Loop::run(); } public function testResponseContainsNoResponseBodyAndNoContentLengthForNoContentStatus() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 204, array(), @@ -1337,6 +1363,8 @@ function ($data) use (&$buffer) { $this->assertContainsString("HTTP/1.1 204 No Content\r\n", $buffer); $this->assertNotContainsString("\r\nContent-Length: 3\r\n", $buffer); $this->assertNotContainsString("bye", $buffer); + + Loop::run(); } public function testResponseContainsNoResponseBodyAndNoContentLengthForNoContentStatusResponseWithStreamingBody() @@ -1344,7 +1372,7 @@ public function testResponseContainsNoResponseBodyAndNoContentLengthForNoContent $stream = new ThroughStream(); $stream->on('close', $this->expectCallableOnce()); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 204, array('Content-Length' => '3'), @@ -1372,11 +1400,13 @@ function ($data) use (&$buffer) { $this->assertContainsString("HTTP/1.1 204 No Content\r\n", $buffer); $this->assertNotContainsString("\r\nContent-Length: 3\r\n", $buffer); + + Loop::run(); } public function testResponseContainsNoContentLengthHeaderForNotModifiedStatus() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 304, array(), @@ -1404,11 +1434,13 @@ function ($data) use (&$buffer) { $this->assertContainsString("HTTP/1.1 304 Not Modified\r\n", $buffer); $this->assertNotContainsString("\r\nContent-Length: 0\r\n", $buffer); + + Loop::run(); } public function testResponseContainsExplicitContentLengthHeaderForNotModifiedStatus() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 304, array('Content-Length' => 3), @@ -1436,11 +1468,13 @@ function ($data) use (&$buffer) { $this->assertContainsString("HTTP/1.1 304 Not Modified\r\n", $buffer); $this->assertContainsString("\r\nContent-Length: 3\r\n", $buffer); + + Loop::run(); } public function testResponseContainsExplicitContentLengthHeaderForHeadRequests() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array('Content-Length' => 3), @@ -1468,11 +1502,13 @@ function ($data) use (&$buffer) { $this->assertContainsString("HTTP/1.1 200 OK\r\n", $buffer); $this->assertContainsString("\r\nContent-Length: 3\r\n", $buffer); + + Loop::run(); } public function testResponseContainsNoResponseBodyForNotModifiedStatus() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 304, array(), @@ -1501,6 +1537,8 @@ function ($data) use (&$buffer) { $this->assertContainsString("HTTP/1.1 304 Not Modified\r\n", $buffer); $this->assertContainsString("\r\nContent-Length: 3\r\n", $buffer); $this->assertNotContainsString("bye", $buffer); + + Loop::run(); } public function testResponseContainsNoResponseBodyForNotModifiedStatusWithStreamingBody() @@ -1508,7 +1546,7 @@ public function testResponseContainsNoResponseBodyForNotModifiedStatusWithStream $stream = new ThroughStream(); $stream->on('close', $this->expectCallableOnce()); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 304, array('Content-Length' => '3'), @@ -1536,12 +1574,14 @@ function ($data) use (&$buffer) { $this->assertContainsString("HTTP/1.1 304 Not Modified\r\n", $buffer); $this->assertContainsString("\r\nContent-Length: 3\r\n", $buffer); + + Loop::run(); } public function testRequestInvalidHttpProtocolVersionWillEmitErrorAndSendErrorResponse() { $error = null; - $server = new StreamingServer(Loop::get(), $this->expectCallableNever()); + $server = $this->createStreamingServer($this->expectCallableNever()); $server->on('error', function ($message) use (&$error) { $error = $message; }); @@ -1575,7 +1615,7 @@ function ($data) use (&$buffer) { public function testRequestOverflowWillEmitErrorAndSendErrorResponse() { $error = null; - $server = new StreamingServer(Loop::get(), $this->expectCallableNever()); + $server = $this->createStreamingServer($this->expectCallableNever()); $server->on('error', function ($message) use (&$error) { $error = $message; }); @@ -1609,7 +1649,7 @@ function ($data) use (&$buffer) { public function testRequestInvalidWillEmitErrorAndSendErrorResponse() { $error = null; - $server = new StreamingServer(Loop::get(), $this->expectCallableNever()); + $server = $this->createStreamingServer($this->expectCallableNever()); $server->on('error', function ($message) use (&$error) { $error = $message; }); @@ -1646,7 +1686,7 @@ public function testRequestContentLengthBodyDataWillEmitDataEventOnRequestStream $closeEvent = $this->expectCallableOnce(); $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); @@ -1674,7 +1714,7 @@ public function testRequestChunkedTransferEncodingRequestWillEmitDecodedDataEven $errorEvent = $this->expectCallableNever(); $requestValidation = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent, &$requestValidation) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent, &$requestValidation) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); @@ -1705,7 +1745,7 @@ public function testRequestChunkedTransferEncodingWithAdditionalDataWontBeEmitte $closeEvent = $this->expectCallableOnce(); $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); @@ -1734,7 +1774,7 @@ public function testRequestChunkedTransferEncodingEmpty() $closeEvent = $this->expectCallableOnce(); $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); @@ -1762,7 +1802,7 @@ public function testRequestChunkedTransferEncodingHeaderCanBeUpperCase() $errorEvent = $this->expectCallableNever(); $requestValidation = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent, &$requestValidation) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent, &$requestValidation) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); @@ -1792,7 +1832,7 @@ public function testRequestChunkedTransferEncodingCanBeMixedUpperAndLowerCase() $closeEvent = $this->expectCallableOnce(); $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); @@ -1819,7 +1859,7 @@ public function testRequestContentLengthWillEmitDataEventAndEndEventAndAdditiona $closeEvent = $this->expectCallableOnce(); $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); @@ -1850,7 +1890,7 @@ public function testRequestContentLengthWillEmitDataEventAndEndEventAndAdditiona $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); @@ -1882,7 +1922,7 @@ public function testRequestZeroContentLengthWillEmitEndEvent() $closeEvent = $this->expectCallableOnce(); $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); @@ -1908,7 +1948,7 @@ public function testRequestZeroContentLengthWillEmitEndAndAdditionalDataWillBeIg $closeEvent = $this->expectCallableOnce(); $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); @@ -1935,7 +1975,7 @@ public function testRequestZeroContentLengthWillEmitEndAndAdditionalDataWillBeIg $closeEvent = $this->expectCallableOnce(); $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); @@ -1961,7 +2001,7 @@ public function testRequestZeroContentLengthWillEmitEndAndAdditionalDataWillBeIg public function testRequestInvalidChunkHeaderTooLongWillEmitErrorOnRequestStream() { $errorEvent = $this->expectCallableOnceWith($this->isInstanceOf('Exception')); - $server = new StreamingServer(Loop::get(), function ($request) use ($errorEvent){ + $server = $this->createStreamingServer(function ($request) use ($errorEvent){ $request->getBody()->on('error', $errorEvent); return \React\Promise\resolve(new Response()); }); @@ -1986,7 +2026,7 @@ public function testRequestInvalidChunkHeaderTooLongWillEmitErrorOnRequestStream public function testRequestInvalidChunkBodyTooLongWillEmitErrorOnRequestStream() { $errorEvent = $this->expectCallableOnceWith($this->isInstanceOf('Exception')); - $server = new StreamingServer(Loop::get(), function ($request) use ($errorEvent){ + $server = $this->createStreamingServer(function ($request) use ($errorEvent){ $request->getBody()->on('error', $errorEvent); }); @@ -2008,7 +2048,7 @@ public function testRequestInvalidChunkBodyTooLongWillEmitErrorOnRequestStream() public function testRequestUnexpectedEndOfRequestWithChunkedTransferConnectionWillEmitErrorOnRequestStream() { $errorEvent = $this->expectCallableOnceWith($this->isInstanceOf('Exception')); - $server = new StreamingServer(Loop::get(), function ($request) use ($errorEvent){ + $server = $this->createStreamingServer(function ($request) use ($errorEvent){ $request->getBody()->on('error', $errorEvent); }); @@ -2031,7 +2071,7 @@ public function testRequestUnexpectedEndOfRequestWithChunkedTransferConnectionWi public function testRequestInvalidChunkHeaderWillEmitErrorOnRequestStream() { $errorEvent = $this->expectCallableOnceWith($this->isInstanceOf('Exception')); - $server = new StreamingServer(Loop::get(), function ($request) use ($errorEvent){ + $server = $this->createStreamingServer(function ($request) use ($errorEvent){ $request->getBody()->on('error', $errorEvent); }); @@ -2053,7 +2093,7 @@ public function testRequestInvalidChunkHeaderWillEmitErrorOnRequestStream() public function testRequestUnexpectedEndOfRequestWithContentLengthWillEmitErrorOnRequestStream() { $errorEvent = $this->expectCallableOnceWith($this->isInstanceOf('Exception')); - $server = new StreamingServer(Loop::get(), function ($request) use ($errorEvent){ + $server = $this->createStreamingServer(function ($request) use ($errorEvent){ $request->getBody()->on('error', $errorEvent); }); @@ -2080,7 +2120,7 @@ public function testRequestWithoutBodyWillEmitEndOnRequestStream() $endEvent = $this->expectCallableOnce(); $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function ($request) use ($dataEvent, $closeEvent, $endEvent, $errorEvent){ + $server = $this->createStreamingServer(function ($request) use ($dataEvent, $closeEvent, $endEvent, $errorEvent){ $request->getBody()->on('data', $dataEvent); $request->getBody()->on('close', $closeEvent); $request->getBody()->on('end', $endEvent); @@ -2104,7 +2144,7 @@ public function testRequestWithoutDefinedLengthWillIgnoreDataEvent() $closeEvent = $this->expectCallableOnce(); $errorEvent = $this->expectCallableNever(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($dataEvent, $endEvent, $closeEvent, $errorEvent) { $request->getBody()->on('data', $dataEvent); $request->getBody()->on('end', $endEvent); $request->getBody()->on('close', $closeEvent); @@ -2123,7 +2163,7 @@ public function testRequestWithoutDefinedLengthWillIgnoreDataEvent() public function testResponseWithBodyStreamWillUseChunkedTransferEncodingByDefault() { $stream = new ThroughStream(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 200, array(), @@ -2157,7 +2197,7 @@ function ($data) use (&$buffer) { public function testResponseWithBodyStringWillOverwriteExplicitContentLengthAndTransferEncoding() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array( @@ -2198,7 +2238,7 @@ public function testResponseContainsResponseBodyWithTransferEncodingChunkedForBo $body->expects($this->once())->method('getSize')->willReturn(null); $body->expects($this->once())->method('__toString')->willReturn('body'); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($body) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($body) { return new Response( 200, array(), @@ -2227,6 +2267,8 @@ function ($data) use (&$buffer) { $this->assertContainsString("Transfer-Encoding: chunked", $buffer); $this->assertNotContainsString("Content-Length:", $buffer); $this->assertContainsString("body", $buffer); + + Loop::run(); } public function testResponseContainsResponseBodyWithPlainBodyWithUnknownSizeForLegacyHttp10() @@ -2235,7 +2277,7 @@ public function testResponseContainsResponseBodyWithPlainBodyWithUnknownSizeForL $body->expects($this->once())->method('getSize')->willReturn(null); $body->expects($this->once())->method('__toString')->willReturn('body'); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($body) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($body) { return new Response( 200, array(), @@ -2269,7 +2311,7 @@ function ($data) use (&$buffer) { public function testResponseWithCustomTransferEncodingWillBeIgnoredAndUseChunkedTransferEncodingInstead() { $stream = new ThroughStream(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($stream) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($stream) { return new Response( 200, array( @@ -2306,7 +2348,7 @@ function ($data) use (&$buffer) { public function testResponseWithoutExplicitDateHeaderWillAddCurrentDateFromClock() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response(); }); @@ -2344,7 +2386,7 @@ function ($data) use (&$buffer) { public function testResponseWithCustomDateHeaderOverwritesDefault() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array("Date" => "Tue, 15 Nov 1994 08:12:31 GMT") @@ -2377,7 +2419,7 @@ function ($data) use (&$buffer) { public function testResponseWithEmptyDateHeaderRemovesDateHeader() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array('Date' => '') @@ -2410,7 +2452,7 @@ function ($data) use (&$buffer) { public function testResponseCanContainMultipleCookieHeaders() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array( @@ -2448,7 +2490,7 @@ function ($data) use (&$buffer) { public function testReponseWithExpectContinueRequestContainsContinueWithLaterResponse() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response(); }); @@ -2480,7 +2522,7 @@ function ($data) use (&$buffer) { public function testResponseWithExpectContinueRequestWontSendContinueForHttp10() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response(); }); @@ -2511,14 +2553,14 @@ function ($data) use (&$buffer) { public function testInvalidCallbackFunctionLeadsToException() { $this->setExpectedException('InvalidArgumentException'); - $server = new StreamingServer(Loop::get(), 'invalid'); + $server = $this->createStreamingServer('invalid'); } public function testResponseBodyStreamWillStreamDataWithChunkedTransferEncoding() { $input = new ThroughStream(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($input) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($input) { return new Response( 200, array(), @@ -2557,7 +2599,7 @@ public function testResponseBodyStreamWithContentLengthWillStreamTillLengthWitho { $input = new ThroughStream(); - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use ($input) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use ($input) { return new Response( 200, array('Content-Length' => 5), @@ -2595,7 +2637,7 @@ function ($data) use (&$buffer) { public function testResponseWithResponsePromise() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return \React\Promise\resolve(new Response()); }); @@ -2623,7 +2665,7 @@ function ($data) use (&$buffer) { public function testResponseReturnInvalidTypeWillResultInError() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return "invalid"; }); @@ -2657,7 +2699,7 @@ function ($data) use (&$buffer) { public function testResponseResolveWrongTypeInPromiseWillResultInError() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return \React\Promise\resolve("invalid"); }); @@ -2685,7 +2727,7 @@ function ($data) use (&$buffer) { public function testResponseRejectedPromiseWillResultInErrorMessage() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Promise(function ($resolve, $reject) { $reject(new \Exception()); }); @@ -2716,7 +2758,7 @@ function ($data) use (&$buffer) { public function testResponseExceptionInCallbackWillResultInErrorMessage() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Promise(function ($resolve, $reject) { throw new \Exception('Bad call'); }); @@ -2747,7 +2789,7 @@ function ($data) use (&$buffer) { public function testResponseWithContentLengthHeaderForStringBodyOverwritesTransferEncoding() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response( 200, array('Transfer-Encoding' => 'chunked'), @@ -2783,7 +2825,7 @@ function ($data) use (&$buffer) { public function testResponseWillBeHandled() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Response(); }); @@ -2811,7 +2853,7 @@ function ($data) use (&$buffer) { public function testResponseExceptionThrowInCallBackFunctionWillResultInErrorMessage() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { throw new \Exception('hello'); }); @@ -2849,7 +2891,7 @@ function ($data) use (&$buffer) { */ public function testResponseThrowableThrowInCallBackFunctionWillResultInErrorMessage() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { throw new \Error('hello'); }); @@ -2892,7 +2934,7 @@ function ($data) use (&$buffer) { public function testResponseRejectOfNonExceptionWillResultInErrorMessage() { - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) { return new Promise(function ($resolve, $reject) { $reject('Invalid type'); }); @@ -2929,7 +2971,7 @@ function ($data) use (&$buffer) { public function testRequestServerRequestParams() { $requestValidation = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestValidation) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestValidation) { $requestValidation = $request; }); @@ -2963,7 +3005,7 @@ public function testRequestServerRequestParams() public function testRequestQueryParametersWillBeAddedToRequest() { $requestValidation = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestValidation) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestValidation) { $requestValidation = $request; }); @@ -2983,7 +3025,7 @@ public function testRequestQueryParametersWillBeAddedToRequest() public function testRequestCookieWillBeAddedToServerRequest() { $requestValidation = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestValidation) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestValidation) { $requestValidation = $request; }); @@ -3004,7 +3046,7 @@ public function testRequestCookieWillBeAddedToServerRequest() public function testRequestInvalidMultipleCookiesWontBeAddedToServerRequest() { $requestValidation = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestValidation) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestValidation) { $requestValidation = $request; }); @@ -3025,7 +3067,7 @@ public function testRequestInvalidMultipleCookiesWontBeAddedToServerRequest() public function testRequestCookieWithSeparatorWillBeAddedToServerRequest() { $requestValidation = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestValidation) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestValidation) { $requestValidation = $request; }); @@ -3045,7 +3087,7 @@ public function testRequestCookieWithSeparatorWillBeAddedToServerRequest() public function testRequestCookieWithCommaValueWillBeAddedToServerRequest() { $requestValidation = null; - $server = new StreamingServer(Loop::get(), function (ServerRequestInterface $request) use (&$requestValidation) { + $server = $this->createStreamingServer(function (ServerRequestInterface $request) use (&$requestValidation) { $requestValidation = $request; }); @@ -3064,7 +3106,7 @@ public function testRequestCookieWithCommaValueWillBeAddedToServerRequest() public function testNewConnectionWillInvokeParserOnce() { - $server = new StreamingServer(Loop::get(), $this->expectCallableNever()); + $server = $this->createStreamingServer($this->expectCallableNever()); $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->disableOriginalConstructor()->getMock(); $parser->expects($this->once())->method('handle'); @@ -3081,7 +3123,7 @@ public function testNewConnectionWillInvokeParserOnceAndInvokeRequestHandlerWhen { $request = new ServerRequest('GET', 'http://localhost/', array(), '', '1.0'); - $server = new StreamingServer(Loop::get(), $this->expectCallableOnceWith($request)); + $server = $this->createStreamingServer($this->expectCallableOnceWith($request)); $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->disableOriginalConstructor()->getMock(); $parser->expects($this->once())->method('handle'); @@ -3104,7 +3146,7 @@ public function testNewConnectionWillInvokeParserOnceAndInvokeRequestHandlerWhen { $request = new ServerRequest('GET', 'http://localhost/', array('Connection' => 'close')); - $server = new StreamingServer(Loop::get(), $this->expectCallableOnceWith($request)); + $server = $this->createStreamingServer($this->expectCallableOnceWith($request)); $parser = $this->getMockBuilder('React\Http\Io\RequestHeaderParser')->disableOriginalConstructor()->getMock(); $parser->expects($this->once())->method('handle'); @@ -3127,7 +3169,7 @@ public function testNewConnectionWillInvokeParserOnceAndInvokeRequestHandlerWhen { $request = new ServerRequest('GET', 'http://localhost/'); - $server = new StreamingServer(Loop::get(), function () { + $server = $this->createStreamingServer(function () { return new Response(200, array('Connection' => 'close')); }); @@ -3152,7 +3194,7 @@ public function testNewConnectionWillInvokeParserTwiceAfterInvokingRequestHandle { $request = new ServerRequest('GET', 'http://localhost/'); - $server = new StreamingServer(Loop::get(), function () { + $server = $this->createStreamingServer(function () { return new Response(); }); @@ -3177,7 +3219,7 @@ public function testNewConnectionWillInvokeParserTwiceAfterInvokingRequestHandle { $request = new ServerRequest('GET', 'http://localhost/', array('Connection' => 'keep-alive'), '', '1.0'); - $server = new StreamingServer(Loop::get(), function () { + $server = $this->createStreamingServer(function () { return new Response(); }); @@ -3203,7 +3245,7 @@ public function testNewConnectionWillInvokeParserOnceAfterInvokingRequestHandler $request = new ServerRequest('GET', 'http://localhost/'); $body = new ThroughStream(); - $server = new StreamingServer(Loop::get(), function () use ($body) { + $server = $this->createStreamingServer(function () use ($body) { return new Response(200, array(), $body); }); @@ -3229,7 +3271,7 @@ public function testNewConnectionWillInvokeParserTwiceAfterInvokingRequestHandle $request = new ServerRequest('GET', 'http://localhost/'); $body = new ThroughStream(); - $server = new StreamingServer(Loop::get(), function () use ($body) { + $server = $this->createStreamingServer(function () use ($body) { return new Response(200, array(), $body); }); @@ -3249,9 +3291,9 @@ public function testNewConnectionWillInvokeParserTwiceAfterInvokingRequestHandle // pretend parser just finished parsing $server->handleRequest($this->connection, $request); - $this->assertCount(2, $this->connection->listeners('close')); + $this->assertCount(3, $this->connection->listeners('close')); $body->end(); - $this->assertCount(1, $this->connection->listeners('close')); + $this->assertCount(2, $this->connection->listeners('close')); } public function testCompletingARequestWillRemoveConnectionOnCloseListener() @@ -3260,7 +3302,7 @@ public function testCompletingARequestWillRemoveConnectionOnCloseListener() $request = new ServerRequest('GET', 'http://localhost/'); - $server = new StreamingServer(Loop::get(), function () { + $server = $this->createStreamingServer(function () { return \React\Promise\resolve(new Response()); }); @@ -3273,6 +3315,49 @@ public function testCompletingARequestWillRemoveConnectionOnCloseListener() $server->handleRequest($connection, $request); } +// public function testIdleConnectionWillBeClosedAfterConfiguredTimeout() +// { +// $callback = null; +// $caughtError = null; +// $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); +// $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); +// $loop->expects($this->exactly(2))->method('addTimer')->with(0.1, $this->callback(function ($cb) use (&$tick, &$callback) { +// $callback = $cb; +// return true; +// }))->willReturn($timer); +// $loop->expects($this->any())->method('cancelTimer')->with($timer); +// +// $connection = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(array('close'))->getMock(); +// $connection->expects($this->once())->method('close'); +// +// $parser = $this->createRequestHeaderParser(new Clock($loop), 0.1, $loop); +// $parser->on('error', function ($error) use (&$caughtError) { +// $caughtError = $error; +// }); +// +// $parser->handle($connection); +// +// $connection->emit('data', array("GET /foo.php?hello=world&test=this HTTP/")); +// +// self::assertTrue(is_callable($callback)); +// $callback(); +// +// self::assertInstanceOf('\RuntimeException', $caughtError); +// self::assertSame('Request timed out', $caughtError->getMessage()); +// } +// +// public function testIdleConnectionWillNotBeClosedAfterConfiguredTimeoutOnFullRequest() +// { +// $connection = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(array('close'))->getMock(); +// $connection->expects($this->never())->method('close'); +// +// $parser = $this->createRequestHeaderParser(new Clock($this->getMockBuilder('React\EventLoop\LoopInterface')->getMock()), 0.1); +// +// $parser->handle($connection); +// +// $connection->emit('data', array($this->createGetRequest())); +// } + private function createGetRequest() { $data = "GET / HTTP/1.1\r\n"; @@ -3282,4 +3367,9 @@ private function createGetRequest() return $data; } + + private function createStreamingServer($requestHandler, $requestTimeout = 1) + { + return new StreamingServer(Loop::get(), $requestHandler, $requestTimeout); + } }