Skip to content

Commit

Permalink
Close inactive requests
Browse files Browse the repository at this point in the history
This builds on top of reactphp#405 and further builds out reactphp#423 by also
close connections with inactive requests.
  • Loading branch information
WyriHaximus committed Aug 17, 2022
1 parent 0423cf7 commit 4f47478
Show file tree
Hide file tree
Showing 7 changed files with 363 additions and 307 deletions.
5 changes: 4 additions & 1 deletion src/HttpServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
use Evenement\EventEmitter;
use React\EventLoop\Loop;
use React\EventLoop\LoopInterface;
use React\Http\Io\Clock;
use React\Http\Io\IniUtil;
use React\Http\Io\MiddlewareRunner;
use React\Http\Io\RequestHeaderParser;
use React\Http\Io\StreamingServer;
use React\Http\Middleware\InactiveConnectionTimeoutMiddleware;
use React\Http\Middleware\LimitConcurrentRequestsMiddleware;
Expand Down Expand Up @@ -259,7 +261,8 @@ public function __construct($requestHandlerOrLoop)
return !($handler instanceof StreamingRequestMiddleware) && !($handler instanceof InactiveConnectionTimeoutMiddleware);
});

$this->streamingServer = new StreamingServer($loop, new MiddlewareRunner($middleware), $idleConnectTimeout);
$clock = new Clock($loop);
$this->streamingServer = new StreamingServer(new MiddlewareRunner($middleware), new RequestHeaderParser($loop, $clock, $idleConnectTimeout), $clock);

$that = $this;
$this->streamingServer->on('error', function ($error) use ($that) {
Expand Down
35 changes: 33 additions & 2 deletions src/Io/RequestHeaderParser.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Evenement\EventEmitter;
use Psr\Http\Message\ServerRequestInterface;
use React\EventLoop\LoopInterface;
use React\Http\Message\Response;
use React\Http\Message\ServerRequest;
use React\Socket\ConnectionInterface;
Expand All @@ -24,20 +25,48 @@ class RequestHeaderParser extends EventEmitter
{
private $maxSize = 8192;

/**
* @var LoopInterface
*/
private $loop;

/** @var Clock */
private $clock;

public function __construct(Clock $clock)
/**
* @var float
*/
private $idleConnectionTimeout;

/**
* @param LoopInterface $loop
* @param float $idleConnectionTimeout
*/
public function __construct(LoopInterface $loop, Clock $clock, $idleConnectionTimeout)
{
$this->loop = $loop;
$this->clock = $clock;
$this->idleConnectionTimeout = $idleConnectionTimeout;
}

public function handle(ConnectionInterface $conn)
{
$loop = $this->loop;
$idleConnectionTimeout = $this->idleConnectionTimeout;
$timer = $loop->addTimer($idleConnectionTimeout, function () use ($conn) {
$conn->close();
});
$conn->on('close', function () use ($loop, &$timer) {
$loop->cancelTimer($timer);
});
$buffer = '';
$maxSize = $this->maxSize;
$that = $this;
$conn->on('data', $fn = function ($data) use (&$buffer, &$fn, $conn, $maxSize, $that) {
$conn->on('data', $fn = function ($data) use (&$buffer, &$fn, $conn, $maxSize, $that, $loop, &$timer, $idleConnectionTimeout) {
$loop->cancelTimer($timer);
$timer = $loop->addTimer($idleConnectionTimeout, function () use ($conn) {
$conn->close();
});
// append chunk of data to buffer and look for end of request headers
$buffer .= $data;
$endOfHeader = \strpos($buffer, "\r\n\r\n");
Expand All @@ -51,6 +80,7 @@ public function handle(ConnectionInterface $conn)
new \OverflowException("Maximum header size of {$maxSize} exceeded.", Response::STATUS_REQUEST_HEADER_FIELDS_TOO_LARGE),
$conn
));
$loop->cancelTimer($timer);
return;
}

Expand All @@ -60,6 +90,7 @@ public function handle(ConnectionInterface $conn)
}

// request headers received => try to parse request
$loop->cancelTimer($timer);
$conn->removeListener('data', $fn);
$fn = null;

Expand Down
46 changes: 10 additions & 36 deletions src/Io/StreamingServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@
use Evenement\EventEmitter;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface;
use React\EventLoop\LoopInterface;
use React\Http\Message\Response;
use React\Http\Message\ServerRequest;
use React\Http\Middleware\InactiveConnectionTimeoutMiddleware;
use React\Promise;
use React\Promise\CancellablePromiseInterface;
use React\Promise\PromiseInterface;
Expand All @@ -30,7 +28,7 @@
* object in return:
*
* ```php
* $server = new StreamingServer($loop, function (ServerRequestInterface $request) {
* $server = new StreamingServer(function (ServerRequestInterface $request) {
* return new Response(
* Response::STATUS_OK,
* array(
Expand All @@ -55,7 +53,7 @@
* in order to start a plaintext HTTP server like this:
*
* ```php
* $server = new StreamingServer($loop, $handler);
* $server = new StreamingServer($handler);
*
* $socket = new React\Socket\SocketServer('0.0.0.0:8080', array(), $loop);
* $server->listen($socket);
Expand Down Expand Up @@ -99,23 +97,19 @@ final class StreamingServer extends EventEmitter
* connections in order to then parse incoming data as HTTP.
* See also [listen()](#listen) for more details.
*
* @param LoopInterface $loop
* @param callable $requestHandler
* @param float $idleConnectTimeout
* @see self::listen()
*/
public function __construct(LoopInterface $loop, $requestHandler, $idleConnectTimeout = InactiveConnectionTimeoutMiddleware::DEFAULT_TIMEOUT)
public function __construct($requestHandler, RequestHeaderParser $parser, Clock $clock)
{
if (!\is_callable($requestHandler)) {
throw new \InvalidArgumentException('Invalid request handler given');
}

$this->loop = $loop;
$this->idleConnectionTimeout = $idleConnectTimeout;

$this->callback = $requestHandler;
$this->clock = new Clock($loop);
$this->parser = new RequestHeaderParser($this->clock);
$this->clock = $clock;
$this->parser = $parser;

$that = $this;
$this->parser->on('headers', function (ServerRequestInterface $request, ConnectionInterface $conn) use ($that) {
Expand All @@ -142,27 +136,7 @@ public function __construct(LoopInterface $loop, $requestHandler, $idleConnectTi
*/
public function listen(ServerInterface $socket)
{
$socket->on('connection', array($this, 'handle'));
}

/** @internal */
public function handle(ConnectionInterface $conn)
{
$timer = $this->loop->addTimer($this->idleConnectionTimeout, function () use ($conn) {
$conn->close();
});
$loop = $this->loop;
$conn->once('data', function () use ($loop, $timer) {
$loop->cancelTimer($timer);
});
$conn->on('end', function () use ($loop, $timer) {
$loop->cancelTimer($timer);
});
$conn->on('close', function () use ($loop, $timer) {
$loop->cancelTimer($timer);
});

$this->parser->handle($conn);
$socket->on('connection', array($this->parser, 'handle'));
}

/** @internal */
Expand Down Expand Up @@ -380,7 +354,7 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt

// either wait for next request over persistent connection or end connection
if ($persist) {
$this->handle($connection);
$this->parser->handle($connection);
} else {
$connection->end();
}
Expand All @@ -401,10 +375,10 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt
// write streaming body and then wait for next request over persistent connection
if ($persist) {
$body->pipe($connection, array('end' => false));
$that = $this;
$body->on('end', function () use ($connection, $that, $body) {
$parser = $this->parser;
$body->on('end', function () use ($connection, $parser, $body) {
$connection->removeListener('close', array($body, 'close'));
$that->handle($connection);
$parser->handle($connection);
});
} else {
$body->pipe($connection);
Expand Down
3 changes: 3 additions & 0 deletions src/Middleware/InactiveConnectionTimeoutMiddleware.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
*/
final class InactiveConnectionTimeoutMiddleware
{
/**
* @internal
*/
const DEFAULT_TIMEOUT = 60;

/**
Expand Down
31 changes: 19 additions & 12 deletions tests/HttpServerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,18 @@ public function testConstructWithoutLoopAssignsLoopAutomatically()
$ref->setAccessible(true);
$clock = $ref->getValue($streamingServer);

$ref = new \ReflectionProperty($streamingServer, 'parser');
$ref->setAccessible(true);
$parser = $ref->getValue($streamingServer);

$ref = new \ReflectionProperty($clock, 'loop');
$ref->setAccessible(true);
$loop = $ref->getValue($clock);

$ref = new \ReflectionProperty($parser, 'loop');
$ref->setAccessible(true);
$loop = $ref->getValue($parser);

$this->assertInstanceOf('React\EventLoop\LoopInterface', $loop);
}

Expand Down Expand Up @@ -259,18 +267,17 @@ function (ServerRequestInterface $request) use (&$streaming) {
$this->assertEquals(true, $streaming);
}

public function testIdleConnectionWillBeClosedAfterConfiguredTimeout()
{
$this->connection->expects($this->once())->method('close');

$loop = Factory::create();
$http = new HttpServer($loop, new InactiveConnectionTimeoutMiddleware(0.1), $this->expectCallableNever());

$http->listen($this->socket);
$this->socket->emit('connection', array($this->connection));

$loop->run();
}
// public function testIdleConnectionWillBeClosedAfterConfiguredTimeout()
// {
// $this->connection->expects($this->once())->method('close');
//
// $http = new HttpServer(Loop::get(), new InactiveConnectionTimeoutMiddleware(0.1), $this->expectCallableNever());
//
// $http->listen($this->socket);
// $this->socket->emit('connection', array($this->connection));
//
// Loop::run();
// }

public function testForwardErrors()
{
Expand Down
Loading

0 comments on commit 4f47478

Please sign in to comment.