diff --git a/README.md b/README.md index 659855b0..cf9e5b75 100644 --- a/README.md +++ b/README.md @@ -78,6 +78,7 @@ multiple concurrent HTTP requests without blocking. * [ServerRequest](#serverrequest) * [ResponseException](#responseexception) * [React\Http\Middleware](#reacthttpmiddleware) + * [InactiveConnectionTimeoutMiddleware](#inactiveconnectiontimeoutmiddleware) * [StreamingRequestMiddleware](#streamingrequestmiddleware) * [LimitConcurrentRequestsMiddleware](#limitconcurrentrequestsmiddleware) * [RequestBodyBufferMiddleware](#requestbodybuffermiddleware) @@ -2630,6 +2631,22 @@ access its underlying response object. ### React\Http\Middleware +#### InactiveConnectionTimeoutMiddleware + +The `React\Http\Middleware\InactiveConnectionTimeoutMiddleware` is purely a configuration middleware to configure the +`HttpServer` to close any inactive connections between requests to close the connection and not leave them needlessly open. + +The following example configures the `HttpServer` to close any inactive connections after one and a half second: + +```php +$http = new React\Http\HttpServer( + new React\Http\Middleware\InactiveConnectionTimeoutMiddleware(1.5), + $handler +); +``` +> Internally, this class is used as a "value object" to override the default timeout of one minute. + As such it doesn't have any behavior internally, that is all in the internal "StreamingServer". + #### StreamingRequestMiddleware The `React\Http\Middleware\StreamingRequestMiddleware` can be used to diff --git a/src/HttpServer.php b/src/HttpServer.php index f2334733..22945309 100644 --- a/src/HttpServer.php +++ b/src/HttpServer.php @@ -5,9 +5,12 @@ use Evenement\EventEmitter; use React\EventLoop\Loop; use React\EventLoop\LoopInterface; +use React\Http\Io\Clock; use React\Http\Io\IniUtil; use React\Http\Io\MiddlewareRunner; +use React\Http\Io\RequestHeaderParser; use React\Http\Io\StreamingServer; +use React\Http\Middleware\InactiveConnectionTimeoutMiddleware; use React\Http\Middleware\LimitConcurrentRequestsMiddleware; use React\Http\Middleware\StreamingRequestMiddleware; use React\Http\Middleware\RequestBodyBufferMiddleware; @@ -219,10 +222,13 @@ public function __construct($requestHandlerOrLoop) } $streaming = false; + $idleConnectTimeout = InactiveConnectionTimeoutMiddleware::DEFAULT_TIMEOUT; foreach ((array) $requestHandlers as $handler) { if ($handler instanceof StreamingRequestMiddleware) { $streaming = true; - break; + } + if ($handler instanceof InactiveConnectionTimeoutMiddleware) { + $idleConnectTimeout = $handler->getTimeout(); } } @@ -252,10 +258,11 @@ public function __construct($requestHandlerOrLoop) * doing anything with the request. */ $middleware = \array_filter($middleware, function ($handler) { - return !($handler instanceof StreamingRequestMiddleware); + return !($handler instanceof StreamingRequestMiddleware) && !($handler instanceof InactiveConnectionTimeoutMiddleware); }); - $this->streamingServer = new StreamingServer($loop, new MiddlewareRunner($middleware)); + $clock = new Clock($loop); + $this->streamingServer = new StreamingServer(new MiddlewareRunner($middleware), new RequestHeaderParser($loop, $clock, $idleConnectTimeout), $clock); $that = $this; $this->streamingServer->on('error', function ($error) use ($that) { diff --git a/src/Io/RequestHeaderParser.php b/src/Io/RequestHeaderParser.php index b8336f5b..c73e7883 100644 --- a/src/Io/RequestHeaderParser.php +++ b/src/Io/RequestHeaderParser.php @@ -4,6 +4,7 @@ use Evenement\EventEmitter; use Psr\Http\Message\ServerRequestInterface; +use React\EventLoop\LoopInterface; use React\Http\Message\Response; use React\Http\Message\ServerRequest; use React\Socket\ConnectionInterface; @@ -24,23 +25,53 @@ class RequestHeaderParser extends EventEmitter { private $maxSize = 8192; + /** + * @var LoopInterface + */ + private $loop; + /** @var Clock */ private $clock; + /** + * @var float + */ + private $idleConnectionTimeout; + /** @var array> */ private $connectionParams = array(); - public function __construct(Clock $clock) + /** + * @param LoopInterface $loop + * @param float $idleConnectionTimeout + */ + public function __construct(LoopInterface $loop, Clock $clock, $idleConnectionTimeout) { + $this->loop = $loop; $this->clock = $clock; + $this->idleConnectionTimeout = $idleConnectionTimeout; } public function handle(ConnectionInterface $conn) { + $loop = $this->loop; + $idleConnectionTimeout = $this->idleConnectionTimeout; + $that = $this; + $idleConnectionTimeoutHandler = function () use ($that, $conn) { + $that->emit('error', array( + new \RuntimeException('Request timed out', Response::STATUS_REQUEST_TIMEOUT), + $conn + )); + $conn->close(); + }; + $timer = $loop->addTimer($idleConnectionTimeout, $idleConnectionTimeoutHandler); + $conn->on('close', function () use ($loop, &$timer) { + $loop->cancelTimer($timer); + }); $buffer = ''; $maxSize = $this->maxSize; - $that = $this; - $conn->on('data', $fn = function ($data) use (&$buffer, &$fn, $conn, $maxSize, $that) { + $conn->on('data', $fn = function ($data) use (&$buffer, &$fn, $conn, $maxSize, $that, $loop, &$timer, $idleConnectionTimeout, $idleConnectionTimeoutHandler) { + $loop->cancelTimer($timer); // append chunk of data to buffer and look for end of request headers $buffer .= $data; $endOfHeader = \strpos($buffer, "\r\n\r\n"); @@ -59,6 +90,7 @@ public function handle(ConnectionInterface $conn) // ignore incomplete requests if ($endOfHeader === false) { + $timer = $loop->addTimer($idleConnectionTimeout, $idleConnectionTimeoutHandler); return; } diff --git a/src/Io/StreamingServer.php b/src/Io/StreamingServer.php index 13f0b0c4..84e228e3 100644 --- a/src/Io/StreamingServer.php +++ b/src/Io/StreamingServer.php @@ -5,7 +5,6 @@ use Evenement\EventEmitter; use Psr\Http\Message\ResponseInterface; use Psr\Http\Message\ServerRequestInterface; -use React\EventLoop\LoopInterface; use React\Http\Message\Response; use React\Http\Message\ServerRequest; use React\Promise; @@ -28,7 +27,7 @@ * object in return: * * ```php - * $server = new StreamingServer($loop, function (ServerRequestInterface $request) { + * $server = new StreamingServer(function (ServerRequestInterface $request) { * return new Response( * Response::STATUS_OK, * array( @@ -53,7 +52,7 @@ * in order to start a plaintext HTTP server like this: * * ```php - * $server = new StreamingServer($loop, $handler); + * $server = new StreamingServer($handler); * * $socket = new React\Socket\SocketServer('0.0.0.0:8080', array(), $loop); * $server->listen($socket); @@ -86,6 +85,8 @@ final class StreamingServer extends EventEmitter /** @var Clock */ private $clock; + private $loop; + private $idleConnectionTimeout; /** * Creates an HTTP server that invokes the given callback for each incoming HTTP request @@ -95,19 +96,19 @@ final class StreamingServer extends EventEmitter * connections in order to then parse incoming data as HTTP. * See also [listen()](#listen) for more details. * - * @param LoopInterface $loop * @param callable $requestHandler + * @param float $idleConnectTimeout * @see self::listen() */ - public function __construct(LoopInterface $loop, $requestHandler) + public function __construct($requestHandler, RequestHeaderParser $parser, Clock $clock) { if (!\is_callable($requestHandler)) { throw new \InvalidArgumentException('Invalid request handler given'); } $this->callback = $requestHandler; - $this->clock = new Clock($loop); - $this->parser = new RequestHeaderParser($this->clock); + $this->clock = $clock; + $this->parser = $parser; $that = $this; $this->parser->on('headers', function (ServerRequestInterface $request, ConnectionInterface $conn) use ($that) { diff --git a/src/Middleware/InactiveConnectionTimeoutMiddleware.php b/src/Middleware/InactiveConnectionTimeoutMiddleware.php new file mode 100644 index 00000000..0ca4c759 --- /dev/null +++ b/src/Middleware/InactiveConnectionTimeoutMiddleware.php @@ -0,0 +1,62 @@ + Internally, this class is used as a "value object" to override the default timeout of one minute. + * As such it doesn't have any behavior internally, that is all in the internal "StreamingServer". + */ +final class InactiveConnectionTimeoutMiddleware +{ + /** + * @internal + */ + const DEFAULT_TIMEOUT = 60; + + /** + * @var float + */ + private $timeout; + + /** + * @param float $timeout + */ + public function __construct($timeout = self::DEFAULT_TIMEOUT) + { + $this->timeout = $timeout; + } + + public function __invoke(ServerRequestInterface $request, $next) + { + return $next($request); + } + + /** + * @return float + * @internal + */ + public function getTimeout() + { + return $this->timeout; + } +} diff --git a/tests/HttpServerTest.php b/tests/HttpServerTest.php index 72d48468..90b89dde 100644 --- a/tests/HttpServerTest.php +++ b/tests/HttpServerTest.php @@ -6,6 +6,8 @@ use React\EventLoop\Loop; use React\Http\HttpServer; use React\Http\Io\IniUtil; +use React\Http\Io\StreamingServer; +use React\Http\Middleware\InactiveConnectionTimeoutMiddleware; use React\Http\Middleware\StreamingRequestMiddleware; use React\Promise; use React\Promise\Deferred; @@ -60,10 +62,18 @@ public function testConstructWithoutLoopAssignsLoopAutomatically() $ref->setAccessible(true); $clock = $ref->getValue($streamingServer); + $ref = new \ReflectionProperty($streamingServer, 'parser'); + $ref->setAccessible(true); + $parser = $ref->getValue($streamingServer); + $ref = new \ReflectionProperty($clock, 'loop'); $ref->setAccessible(true); $loop = $ref->getValue($clock); + $ref = new \ReflectionProperty($parser, 'loop'); + $ref->setAccessible(true); + $loop = $ref->getValue($parser); + $this->assertInstanceOf('React\EventLoop\LoopInterface', $loop); } @@ -257,6 +267,18 @@ function (ServerRequestInterface $request) use (&$streaming) { $this->assertEquals(true, $streaming); } + public function testIdleConnectionWillBeClosedAfterConfiguredTimeout() + { + $this->connection->expects($this->once())->method('close'); + + $http = new HttpServer(Loop::get(), new InactiveConnectionTimeoutMiddleware(0.1), $this->expectCallableNever()); + + $http->listen($this->socket); + $this->socket->emit('connection', array($this->connection)); + + Loop::run(); + } + public function testForwardErrors() { $exception = new \Exception(); @@ -439,7 +461,7 @@ public function testConstructServerWithMemoryLimitDoesLimitConcurrency() public function testConstructFiltersOutConfigurationMiddlewareBefore() { - $http = new HttpServer(new StreamingRequestMiddleware(), function () { }); + $http = new HttpServer(new InactiveConnectionTimeoutMiddleware(0), new StreamingRequestMiddleware(), function () { }); $ref = new \ReflectionProperty($http, 'streamingServer'); $ref->setAccessible(true); diff --git a/tests/Io/RequestHeaderParserTest.php b/tests/Io/RequestHeaderParserTest.php index def3a631..cfb1c77c 100644 --- a/tests/Io/RequestHeaderParserTest.php +++ b/tests/Io/RequestHeaderParserTest.php @@ -3,6 +3,8 @@ namespace React\Tests\Http\Io; use Psr\Http\Message\ServerRequestInterface; +use React\EventLoop\Loop; +use React\EventLoop\LoopInterface; use React\Http\Io\Clock; use React\Http\Io\RequestHeaderParser; use React\Tests\Http\TestCase; @@ -814,7 +816,7 @@ public function testServerParamsWillBeReusedForMultipleRequestsFromSameConnectio $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); $clock->expects($this->exactly(2))->method('now')->willReturn(1652972091.3958); - $parser = new RequestHeaderParser($clock); + $parser = $this->createRequestHeaderParser($clock); $connection = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(array('getLocalAddress', 'getRemoteAddress'))->getMock(); $connection->expects($this->once())->method('getLocalAddress')->willReturn('tcp://127.1.1.1:8000'); @@ -849,7 +851,7 @@ public function testServerParamsWillBeRememberedUntilConnectionIsClosed() { $clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock(); - $parser = new RequestHeaderParser($clock); + $parser = $this->createRequestHeaderParser($clock); $connection = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(array('getLocalAddress', 'getRemoteAddress'))->getMock(); @@ -888,6 +890,49 @@ public function testQueryParametersWillBeSet() $this->assertEquals('this', $queryParams['test']); } + public function testIdleConnectionWillBeClosedAfterConfiguredTimeout() + { + $callback = null; + $caughtError = null; + $timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock(); + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + $loop->expects($this->exactly(2))->method('addTimer')->with(0.1, $this->callback(function ($cb) use (&$tick, &$callback) { + $callback = $cb; + return true; + }))->willReturn($timer); + $loop->expects($this->any())->method('cancelTimer')->with($timer); + + $connection = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(array('close'))->getMock(); + $connection->expects($this->once())->method('close'); + + $parser = $this->createRequestHeaderParser(new Clock($loop), 0.1, $loop); + $parser->on('error', function ($error) use (&$caughtError) { + $caughtError = $error; + }); + + $parser->handle($connection); + + $connection->emit('data', array("GET /foo.php?hello=world&test=this HTTP/")); + + self::assertTrue(is_callable($callback)); + $callback(); + + self::assertInstanceOf('\RuntimeException', $caughtError); + self::assertSame('Request timed out', $caughtError->getMessage()); + } + + public function testIdleConnectionWillNotBeClosedAfterConfiguredTimeoutOnFullRequest() + { + $connection = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(array('close'))->getMock(); + $connection->expects($this->never())->method('close'); + + $parser = $this->createRequestHeaderParser(new Clock($this->getMockBuilder('React\EventLoop\LoopInterface')->getMock()), 0.1); + + $parser->handle($connection); + + $connection->emit('data', array($this->createGetRequest())); + } + private function createGetRequest() { $data = "GET / HTTP/1.1\r\n"; @@ -909,8 +954,8 @@ private function createAdvancedPostRequest() return $data; } - private function createRequestHeaderParser(Clock $clock) + private function createRequestHeaderParser(Clock $clock, $timeout = 1, LoopInterface $loop = null) { - return new RequestHeaderParser($clock); + return new RequestHeaderParser($loop instanceof LoopInterface ? $loop : Loop::get(), $clock, $timeout); } } diff --git a/tests/Io/StreamingServerTest.php b/tests/Io/StreamingServerTest.php index 8413c53e..8a073d8b 100644 --- a/tests/Io/StreamingServerTest.php +++ b/tests/Io/StreamingServerTest.php @@ -4,6 +4,8 @@ use Psr\Http\Message\ServerRequestInterface; use React\EventLoop\Loop; +use React\Http\Io\Clock; +use React\Http\Io\RequestHeaderParser; use React\Http\Io\StreamingServer; use React\Http\Message\Response; use React\Http\Message\ServerRequest; @@ -61,6 +63,21 @@ public function testRequestEventWillNotBeEmittedForIncompleteHeaders() $this->connection->emit('data', array($data)); } + public function testIdleConnectionWillBeClosedAfterConfiguredTimeout() + { + $this->connection->expects($this->once())->method('close'); + + $server = $this->createStreamingServer($this->expectCallableNever(), 0.1); + + $server->listen($this->socket); + $this->socket->emit('connection', array($this->connection)); + + Loop::addTimer(0.2, function () { + Loop::stop(); + }); + Loop::run(); + } + public function testRequestEventIsEmitted() { $server = $this->createStreamingServer($this->expectCallableOnce()); @@ -802,6 +819,8 @@ function ($data) use (&$buffer) { $this->assertStringStartsWith("HTTP/1.1 200 OK\r\n", $buffer); $this->assertStringEndsWith("\r\n\r\n0\r\n\r\n", $buffer); + + Loop::run(); } public function testResponseBodyStreamEndingWillSendEmptyBodyChunkedEncoded() @@ -839,6 +858,8 @@ function ($data) use (&$buffer) { $this->assertStringStartsWith("HTTP/1.1 200 OK\r\n", $buffer); $this->assertStringEndsWith("\r\n\r\n0\r\n\r\n", $buffer); + + Loop::run(); } public function testResponseBodyStreamAlreadyClosedWillSendEmptyBodyPlainHttp10() @@ -987,6 +1008,8 @@ function ($data) use (&$buffer) { $this->connection->emit('data', array($data)); $this->assertEquals("HTTP/1.1 200 OK\r\nUpgrade: demo\r\nContent-Length: 3\r\n\r\nfoo", $buffer); + + Loop::run(); } public function testResponseUpgradeWishInRequestCanBeIgnoredByReturningNormalResponse() @@ -1022,6 +1045,8 @@ function ($data) use (&$buffer) { $this->connection->emit('data', array($data)); $this->assertEquals("HTTP/1.1 200 OK\r\nContent-Length: 3\r\n\r\nfoo", $buffer); + + Loop::run(); } public function testResponseUpgradeSwitchingProtocolIncludesConnectionUpgradeHeaderWithoutContentLength() @@ -1193,6 +1218,8 @@ function ($data) use (&$buffer) { $this->assertContainsString("HTTP/1.1 200 OK\r\n", $buffer); $this->assertContainsString("bye", $buffer); + + Loop::run(); } public function testResponseContainsSameRequestProtocolVersionAndRawBodyForHttp10() @@ -1260,6 +1287,8 @@ function ($data) use (&$buffer) { $this->assertContainsString("HTTP/1.1 200 OK\r\n", $buffer); $this->assertContainsString("\r\nContent-Length: 3\r\n", $buffer); $this->assertNotContainsString("bye", $buffer); + + Loop::run(); } public function testResponseContainsNoResponseBodyForHeadRequestWithStreamingResponse() @@ -1295,6 +1324,8 @@ function ($data) use (&$buffer) { $this->assertContainsString("HTTP/1.1 200 OK\r\n", $buffer); $this->assertContainsString("\r\nContent-Length: 3\r\n", $buffer); + + Loop::run(); } public function testResponseContainsNoResponseBodyAndNoContentLengthForNoContentStatus() @@ -1328,6 +1359,8 @@ function ($data) use (&$buffer) { $this->assertContainsString("HTTP/1.1 204 No Content\r\n", $buffer); $this->assertNotContainsString("\r\nContent-Length: 3\r\n", $buffer); $this->assertNotContainsString("bye", $buffer); + + Loop::run(); } public function testResponseContainsNoResponseBodyAndNoContentLengthForNoContentStatusResponseWithStreamingBody() @@ -1363,6 +1396,8 @@ function ($data) use (&$buffer) { $this->assertContainsString("HTTP/1.1 204 No Content\r\n", $buffer); $this->assertNotContainsString("\r\nContent-Length: 3\r\n", $buffer); + + Loop::run(); } public function testResponseContainsNoContentLengthHeaderForNotModifiedStatus() @@ -1395,6 +1430,8 @@ function ($data) use (&$buffer) { $this->assertContainsString("HTTP/1.1 304 Not Modified\r\n", $buffer); $this->assertNotContainsString("\r\nContent-Length: 0\r\n", $buffer); + + Loop::run(); } public function testResponseContainsExplicitContentLengthHeaderForNotModifiedStatus() @@ -1427,6 +1464,8 @@ function ($data) use (&$buffer) { $this->assertContainsString("HTTP/1.1 304 Not Modified\r\n", $buffer); $this->assertContainsString("\r\nContent-Length: 3\r\n", $buffer); + + Loop::run(); } public function testResponseContainsExplicitContentLengthHeaderForHeadRequests() @@ -1459,6 +1498,8 @@ function ($data) use (&$buffer) { $this->assertContainsString("HTTP/1.1 200 OK\r\n", $buffer); $this->assertContainsString("\r\nContent-Length: 3\r\n", $buffer); + + Loop::run(); } public function testResponseContainsNoResponseBodyForNotModifiedStatus() @@ -1492,6 +1533,8 @@ function ($data) use (&$buffer) { $this->assertContainsString("HTTP/1.1 304 Not Modified\r\n", $buffer); $this->assertContainsString("\r\nContent-Length: 3\r\n", $buffer); $this->assertNotContainsString("bye", $buffer); + + Loop::run(); } public function testResponseContainsNoResponseBodyForNotModifiedStatusWithStreamingBody() @@ -1527,6 +1570,8 @@ function ($data) use (&$buffer) { $this->assertContainsString("HTTP/1.1 304 Not Modified\r\n", $buffer); $this->assertContainsString("\r\nContent-Length: 3\r\n", $buffer); + + Loop::run(); } public function testRequestInvalidHttpProtocolVersionWillEmitErrorAndSendErrorResponse() @@ -2218,6 +2263,8 @@ function ($data) use (&$buffer) { $this->assertContainsString("Transfer-Encoding: chunked", $buffer); $this->assertNotContainsString("Content-Length:", $buffer); $this->assertContainsString("body", $buffer); + + Loop::run(); } public function testResponseContainsResponseBodyWithPlainBodyWithUnknownSizeForLegacyHttp10() @@ -3255,8 +3302,8 @@ private function createGetRequest() return $data; } - private function createStreamingServer($requestHandler) + private function createStreamingServer($requestHandler, $requestTimeout = 1) { - return new StreamingServer(Loop::get(), $requestHandler); + return new StreamingServer($requestHandler, new RequestHeaderParser(Loop::get(), ($clock = new Clock(Loop::get())), $requestTimeout), $clock); } }