Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consider using GuzzleHttp\Pool in flush #367

Open
xPaw opened this issue Feb 28, 2023 · 5 comments
Open

Consider using GuzzleHttp\Pool in flush #367

xPaw opened this issue Feb 28, 2023 · 5 comments

Comments

@xPaw
Copy link

xPaw commented Feb 28, 2023

See https://docs.guzzlephp.org/en/stable/quickstart.html#concurrent-requests

The current design will fire away all the requests at once up to batchSize requests, which means it's not practical to use high batch sizes because the requests will start failing, the guzzle pool has a concurrency limit.

I think their pool design should be better than the current batching mechanism, because it should keep requests going continuously instead of waiting for each batch to complete. In particular the prepare method can be changed to yield request objects.

I briefly looked at the code, and the only problem I see is whether it is possible to yield out of flush method for each response, because guzzle pool has fullfilled callback.

@Minishlink
Copy link
Member

Hello, I think the best course of action if we make a change like this, is to use the PSR-7 interface. That way people can use whatever client and options they want. I don't have the time to do this currently, but now that we have a working CI, I can review pull requests with much higher confidence and they are now accepted quite quickly, if you want to contribute :)

@Gugu7264
Copy link

I'm building a webpush service using this incredible library and need to maximize performance. I'll run some tests with a Pool, which seems like it could be faster to me.
If everything works correctly, I'll even make a pull request! ( will it be accepted quickly? ;) )

@Minishlink
Copy link
Member

Thanks @Gugu7264, yes sure, I just ran the tests on master and all is good so your PR should be accepted quite quickly :)

@Gugu7264
Copy link

I can confirm using a Pool is much faster.
However, I cannot return a generator of MessageSentReport objects using this method, which is quite unfortunate.
If you have any ideas how to do it, please send a message, here's what I made:
https://gist.github.com/Gugu7264/ef42551e18d7f55b0d07db38d051c584

Hopefully this can be looked into and made into a working generator, I'll try more things too

@xPaw
Copy link
Author

xPaw commented Aug 25, 2023

I think an easy way around this could be adding a callback param to flush (or making a separate method for this), that is called with each MessageSentReport instead of returning an iterator.

I tried something like this:

<?php

declare(strict_types=1);

use Minishlink\WebPush\WebPush;
use Minishlink\WebPush\MessageSentReport;
use GuzzleHttp\Pool;
use GuzzleHttp\Exception\RequestException;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\RequestInterface;

class PooledWebPush extends WebPush
{
    /**
     * Flush notifications. Triggers the requests.
     *
     * @param callable(MessageSentReport): void $callback Callback for each notification
     * @param null|int $batchSize Defaults the value defined in defaultOptions during instantiation (which defaults to 1000).
     */
    public function flushPooled($callback, ?int $batchSize = null): void
    {
        if (empty($this->notifications)) {
            return;
        }

        if (null === $batchSize) {
            $batchSize = $this->defaultOptions['batchSize'];
        }

        $batch = $this->prepare($this->notifications);

        // reset queue
        $this->notifications = [];

        $pool = new Pool($this->client, $batch, [
            'concurrency' => $batchSize,
            'fulfilled' => function (ResponseInterface $response, int $index) use ($callback, $batch) {
                /** @var RequestInterface $request **/
                $request = $batch[$index];
                $callback(new MessageSentReport($request, $response));
            },
            'rejected' => function ($reason, $index) use ($callback) {
                if (method_exists($reason, 'getResponse')) {
                    $response = $reason->getResponse();
                } else {
                    $response = null;
                }
                $callback(new MessageSentReport($reason->getRequest(), $response, false, $reason->getMessage()));
            }
        ]);

        $promise = $pool->promise();
        $promise->wait();

        if ($this->reuseVAPIDHeaders) {
            $this->vapidHeaders = [];
        }
    }
}

The $iterator didn't seem to work for me, so I made it simpler by keeping the array of requests.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants