From ef740f1a9762a987221613f21ce84a229a2bf650 Mon Sep 17 00:00:00 2001 From: michalsn Date: Sun, 5 Nov 2023 10:14:28 +0100 Subject: [PATCH 1/8] add priority to the queue --- docs/commands.md | 5 + docs/configuration.md | 34 ++++ docs/running_queues.md | 39 +++++ phpstan.neon.dist | 9 +- src/Commands/QueuePublish.php | 12 ++ src/Commands/QueueWork.php | 20 ++- src/Config/Queue.php | 37 +++++ .../2023-11-05-064053_AddPriorityField.php | 36 +++++ src/Entities/QueueJob.php | 1 + src/Entities/QueueJobFailed.php | 1 + src/Exceptions/QueueException.php | 15 ++ src/Handlers/BaseHandler.php | 148 ++++++++++++++++++ src/Handlers/DatabaseHandler.php | 101 +----------- src/Interfaces/QueueInterface.php | 2 +- src/Language/en/Queue.php | 7 +- src/Models/QueueJobFailedModel.php | 2 +- src/Models/QueueJobModel.php | 38 ++++- tests/DatabaseHandlerTest.php | 64 +++++++- tests/_support/Config/Queue.php | 16 ++ .../Database/Seeds/TestQueueSeeder.php | 3 + 20 files changed, 472 insertions(+), 118 deletions(-) create mode 100644 src/Database/Migrations/2023-11-05-064053_AddPriorityField.php create mode 100644 src/Handlers/BaseHandler.php diff --git a/docs/commands.md b/docs/commands.md index 8d5a5b7..b539f6d 100644 --- a/docs/commands.md +++ b/docs/commands.md @@ -60,6 +60,7 @@ Allows you to consume jobs from a specific queue. * `-max-jobs` - The maximum number of jobs to handle before worker should exit. Disabled by default. * `-max-time` - The maximum number of seconds worker should run. Disabled by default. * `-memory` - The maximum memory in MB that worker can take. Default value: `128`. +* `-priority` - The priority for the jobs from the queue (comma separated). If not provided explicit, will follow the priorities defined in the config via `$queuePriorities` for the given queue. Disabled by default. * `-tries` - The number of attempts after which the job will be considered as failed. Overrides settings from the Job class. Disabled by default. * `-retry-after` - The number of seconds after which the job is to be restarted in case of failure. Overrides settings from the Job class. Disabled by default. * `--stop-when-empty` - Stop when the queue is empty. @@ -70,6 +71,10 @@ Allows you to consume jobs from a specific queue. It will listen for 5 jobs from the `emails` queue and then stop. + php spark queue:work emails -max-jobs 5 -priority low,high + +It will work the same as the previous command but will first consume jobs from the `emails` queue that were added with the `low` priority. + ### queue:stop Allows you to stop a specific queue in a safe way. It does this as soon as the job that is running in the queue is completed. diff --git a/docs/configuration.md b/docs/configuration.md index d5c5279..c872654 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -17,6 +17,8 @@ Available options: - [$database](#database) - [$keepDoneJobs](#keepDoneJobs) - [$keepFailedJobs](#keepFailedJobs) +- [$queueDefaultPriority](#queueDefaultPriority) +- [$queuePriorities](#queuePriorities) - [$jobHandlers](#jobHandlers) ### $defaultHandler @@ -44,6 +46,38 @@ If the job failed, should we move it to the failed jobs table? Default value: `t This is very useful when you want to be able to see which tasks are failing and why. +### $queueDefaultPriority + +The default priority for the `queue` if non default `queuePriorities` are set. Not set by default. + +This is needed only if you have defined non default priorities for the queue and the default priority should be different from the `default` value. + +Example: + +```php +public array $queueDefaultPriority = [ + 'emails' => 'low', +]; +``` + +This means that all the jobs added to the `emails` queue will have the default priority set to `low`. + +### $queuePriorities + +The valid priorities for the `queue` in the order they will be consumed first. Not set by default. + +By default, the priority is set to `['default']`. If you want to have multiple priorities in the queue, you can define them here. + +Example: + +```php +public array $queuePriorities = [ + 'emails' => ['high', 'low'], +]; +``` + +This means that the jobs added to the `emails` queue can have either `high` or `low` priority. + ### $jobHandlers An array of available jobs as key-value. Every job that you want to use with the queue has to be defined here. diff --git a/docs/running_queues.md b/docs/running_queues.md index e8b3266..10d4139 100644 --- a/docs/running_queues.md +++ b/docs/running_queues.md @@ -24,6 +24,45 @@ So choosing the right command is not so obvious. We have to estimate how many jo You might use CodeIgniter [Tasks](https://github.com/codeigniter4/tasks) library to schedule queue worker instead of working directly with CRON. +### Working with priorities + +By default, every job in the queue has the same priority. However, we can send the jobs to the queue with different priorities. This way some jobs may be handled earlier. + +As an example, we will define priorities for the `emails` queue: + +```php +// app/Config/Queue.php + +public array $queueDefaultPriority = [ + 'emails' => 'low', +]; + +public array $queuePriorities = [ + 'emails' => ['high', 'low'], +]; +``` + +With this configuration, we can now add new jobs to the queue like this: + +```php +// This job will have low priority: +service('queue')->push('emails', 'email', ['message' => 'Email message with low priority']); +// But this one will have high priority +service('queue')->setPriority('high')->push('emails', 'email', ['message' => 'Email message with high priority']); +``` + +Now, if we run the worker: + + php spark queue:work emails + +It will consume the jobs from the queue based on priority set in the config: `$queuePriorities`. So, first `high` priority and then `low` priority. + +But we can also run the worker like this: + + php spark queue:work emails -priority low,high + +This way, worker will consume jobs with the `low` priority and then with `high`. The order set in the config file is override. + ### Running many instances of the same queue As mentioned above, sometimes we may want to have multiple instances of the same command running at the same time. The queue is safe to use in that scenario with all databases except `SQLite3` since it doesn't guarantee that the job will be selected only by one process. diff --git a/phpstan.neon.dist b/phpstan.neon.dist index 7237eff..d1d87e5 100644 --- a/phpstan.neon.dist +++ b/phpstan.neon.dist @@ -16,19 +16,24 @@ parameters: message: '#Variable \$config on left side of \?\?\= always exists and is not nullable.#' paths: - src/Config/Services.php + - + message: '#Call to an undefined method Michalsn\\CodeIgniterQueue\\Handlers\\BaseHandler::push\(\).#' + paths: + - src/Handlers/BaseHandler.php - message: '#Access to an undefined property CodeIgniter\\I18n\\Time::\$timestamp.#' paths: + - src/Handlers/BaseHandler.php - src/Handlers/DatabaseHandler.php - src/Models/QueueJobModel.php - message: '#Call to an undefined method Michalsn\\CodeIgniterQueue\\Models\\QueueJobFailedModel::affectedRows\(\).#' paths: - - src/Handlers/DatabaseHandler.php + - src/Handlers/BaseHandler.php - message: '#Call to an undefined method Michalsn\\CodeIgniterQueue\\Models\\QueueJobFailedModel::truncate\(\).#' paths: - - src/Handlers/DatabaseHandler.php + - src/Handlers/BaseHandler.php - message: '#Parameter \#3 \$tries of method Michalsn\\CodeIgniterQueue\\Commands\\QueueWork::handleWork\(\) expects int\|null, string\|true\|null given.#' paths: diff --git a/src/Commands/QueuePublish.php b/src/Commands/QueuePublish.php index 6f76e22..606cb60 100644 --- a/src/Commands/QueuePublish.php +++ b/src/Commands/QueuePublish.php @@ -59,6 +59,18 @@ public function resolveJobClass(string $name): string return $this->jobHandlers[$name]; } + + /** + * Stringify queue priorities. + */ + public function getQueuePriorities(string $name): ?string + { + if (! isset($this->queuePriorities[$name])) { + return null; + } + + return implode(',', $this->queuePriorities[$name]); + } EOT; $contents = str_replace($method, '', $contents); file_put_contents($file, $contents); diff --git a/src/Commands/QueueWork.php b/src/Commands/QueueWork.php index 74c2b4e..5053993 100644 --- a/src/Commands/QueueWork.php +++ b/src/Commands/QueueWork.php @@ -59,6 +59,7 @@ class QueueWork extends BaseCommand '-max-jobs' => 'The maximum number of jobs to handle before worker should exit. Disabled by default.', '-max-time' => 'The maximum number of seconds worker should run. Disabled by default.', '-memory' => 'The maximum memory in MB that worker can take. Default value: 128', + '-priority' => 'The priority for the jobs from the queue (comma separated). If not provided explicit, will follow the priorities defined in the config via $queuePriorities for the given queue. Disabled by default.', '-tries' => 'The number of attempts after which the job will be considered as failed. Overrides settings from the Job class. Disabled by default.', '-retry-after' => 'The number of seconds after which the job is to be restarted in case of failure. Overrides settings from the Job class. Disabled by default.', '--stop-when-empty' => 'Stop when the queue is empty.', @@ -71,6 +72,8 @@ class QueueWork extends BaseCommand */ public function run(array $params) { + set_time_limit(0); + /** @var QueueConfig $config */ $config = config('Queue'); $stopWhenEmpty = false; @@ -89,6 +92,7 @@ public function run(array $params) $maxJobs = $params['max-jobs'] ?? CLI::getOption('max-jobs') ?? 0; $maxTime = $params['max-time'] ?? CLI::getOption('max-time') ?? 0; $memory = $params['memory'] ?? CLI::getOption('memory') ?? 128; + $priority = $params['priority'] ?? CLI::getOption('priority') ?? $config->getQueuePriorities($queue) ?? 'default'; $tries = $params['tries'] ?? CLI::getOption('tries'); $retryAfter = $params['retry-after'] ?? CLI::getOption('retry-after'); $countJobs = 0; @@ -99,10 +103,18 @@ public function run(array $params) $startTime = microtime(true); - CLI::write('Listening for the jobs with the queue: ' . CLI::color($queue, 'light_cyan') . PHP_EOL, 'cyan'); + CLI::write('Listening for the jobs with the queue: ' . CLI::color($queue, 'light_cyan'), 'cyan'); + + if ($priority !== 'default') { + CLI::write('Jobs will be consumed according to priority: ' . CLI::color($priority, 'light_cyan'), 'cyan'); + } + + CLI::write(PHP_EOL); + + $priority = array_map('trim', explode(',', $priority)); while (true) { - $work = service('queue')->pop($queue); + $work = service('queue')->pop($queue, $priority); if ($work === null) { if ($stopWhenEmpty) { @@ -216,7 +228,7 @@ private function maxTimeCheck(int $maxTime, float $startTime): bool private function checkMemory(int $memory): bool { - if (memory_get_peak_usage() > $memory * 1024 * 1024) { + if (memory_get_usage(true) > $memory * 1024 * 1024) { CLI::write(sprintf('The memory limit of %s MB was reached. Stopping.', $memory), 'yellow'); return true; @@ -234,7 +246,7 @@ private function checkStop(string $queue, float $startTime): bool } if ($startTime < (float) $time) { - CLI::write('This worker has been scheduled to end. Stopping.', 'yellow'); + CLI::write('The termination of this worker has been planned. Stopping.', 'yellow'); return true; } diff --git a/src/Config/Queue.php b/src/Config/Queue.php index edb5f78..8cec0d1 100644 --- a/src/Config/Queue.php +++ b/src/Config/Queue.php @@ -5,6 +5,7 @@ use CodeIgniter\Config\BaseConfig; use Michalsn\CodeIgniterQueue\Exceptions\QueueException; use Michalsn\CodeIgniterQueue\Handlers\DatabaseHandler; +use Michalsn\CodeIgniterQueue\Handlers\RedisHandler; class Queue extends BaseConfig { @@ -18,6 +19,7 @@ class Queue extends BaseConfig */ public array $handlers = [ 'database' => DatabaseHandler::class, + 'redis' => RedisHandler::class, ]; /** @@ -28,6 +30,17 @@ class Queue extends BaseConfig 'getShared' => true, ]; + /** + * Redis and Predis handler config. + */ + public array $redis = [ + 'host' => '127.0.0.1', + 'password' => null, + 'port' => 6379, + 'timeout' => 0, + 'database' => 0, + ]; + /** * Whether to keep the DONE jobs in the queue. */ @@ -38,6 +51,18 @@ class Queue extends BaseConfig */ public bool $keepFailedJobs = true; + /** + * Default priorities for the queue + * if different from the "default". + */ + public array $queueDefaultPriority = []; + + /** + * Valid priorities in the order for the queue, + * if different from the "default". + */ + public array $queuePriorities = []; + /** * Your jobs handlers. */ @@ -63,4 +88,16 @@ public function resolveJobClass(string $name): string return $this->jobHandlers[$name]; } + + /** + * Stringify queue priorities. + */ + public function getQueuePriorities(string $name): ?string + { + if (! isset($this->queuePriorities[$name])) { + return null; + } + + return implode(',', $this->queuePriorities[$name]); + } } diff --git a/src/Database/Migrations/2023-11-05-064053_AddPriorityField.php b/src/Database/Migrations/2023-11-05-064053_AddPriorityField.php new file mode 100644 index 0000000..3209b65 --- /dev/null +++ b/src/Database/Migrations/2023-11-05-064053_AddPriorityField.php @@ -0,0 +1,36 @@ + [ + 'type' => 'varchar', + 'constraint' => 64, + 'null' => false, + 'default' => 'default', + 'after' => 'payload', + ], + ]; + + $this->forge->addColumn('queue_jobs', $fields); + $this->forge->addColumn('queue_jobs_failed', $fields); + + $this->forge->dropKey('queue_jobs', 'queue_status_available_at'); + $this->forge->addKey(['queue', 'priority', 'status', 'available_at']); + } + + public function down() + { + $this->forge->dropKey('queue_jobs', 'queue_priority_status_available_at'); + $this->forge->addKey(['queue', 'status', 'available_at']); + + $this->forge->dropColumn('queue_jobs', 'priority'); + $this->forge->dropColumn('queue_jobs_failed', 'priority'); + } +} diff --git a/src/Entities/QueueJob.php b/src/Entities/QueueJob.php index ae49c53..59beb3a 100644 --- a/src/Entities/QueueJob.php +++ b/src/Entities/QueueJob.php @@ -11,6 +11,7 @@ class QueueJob extends Entity 'id' => 'integer', 'queue' => 'string', 'payload' => 'json-array', + 'priority' => 'string', 'status' => 'integer', 'attempts' => 'integer', ]; diff --git a/src/Entities/QueueJobFailed.php b/src/Entities/QueueJobFailed.php index b8dc0ed..06c1a97 100644 --- a/src/Entities/QueueJobFailed.php +++ b/src/Entities/QueueJobFailed.php @@ -12,6 +12,7 @@ class QueueJobFailed extends Entity 'connection' => 'string', 'queue' => 'string', 'payload' => 'json-array', + 'priority' => 'string', 'exceptions' => 'string', ]; } diff --git a/src/Exceptions/QueueException.php b/src/Exceptions/QueueException.php index 0da0cab..93b240c 100644 --- a/src/Exceptions/QueueException.php +++ b/src/Exceptions/QueueException.php @@ -15,4 +15,19 @@ public static function forIncorrectJobHandler(): static { return new self(lang('Queue.incorrectJobHandler')); } + + public static function forIncorrectPriorityFormat(): static + { + return new self(lang('Queue.incorrectPriorityFormat')); + } + + public static function forTooLongPriorityName(): static + { + return new self(lang('Queue.tooLongPriorityName')); + } + + public static function forIncorrectQueuePriority(string $priority, string $queue): static + { + return new self(lang('Queue.incorrectQueuePriority', [$priority, $queue])); + } } diff --git a/src/Handlers/BaseHandler.php b/src/Handlers/BaseHandler.php new file mode 100644 index 0000000..4ffac17 --- /dev/null +++ b/src/Handlers/BaseHandler.php @@ -0,0 +1,148 @@ + 64) { + throw QueueException::forTooLongPriorityName(); + } + + $this->priority = $priority; + + return $this; + } + + /** + * Retry failed job. + * + * @throws ReflectionException + */ + public function retry(?int $id, ?string $queue): int + { + $jobs = model(QueueJobFailedModel::class) + ->when( + $id !== null, + static fn ($query) => $query->where('id', $id) + ) + ->when( + $queue !== null, + static fn ($query) => $query->where('queue', $queue) + ) + ->findAll(); + + foreach ($jobs as $job) { + $this->setPriority($job->priority)->push($job->queue, $job->payload['job'], $job->payload['data']); + $this->forget($job->id); + } + + return count($jobs); + } + + /** + * Delete failed job by ID. + */ + public function forget(int $id): bool + { + if (model(QueueJobFailedModel::class)->delete($id)) { + return model(QueueJobFailedModel::class)->affectedRows() > 0; + } + + return false; + } + + /** + * Delete many failed jobs at once. + */ + public function flush(?int $hours, ?string $queue): bool + { + if ($hours === null && $queue === null) { + return model(QueueJobFailedModel::class)->truncate(); + } + + return model(QueueJobFailedModel::class) + ->when( + $hours !== null, + static fn ($query) => $query->where('failed_at <=', Time::now()->subHours($hours)->timestamp) + ) + ->when( + $queue !== null, + static fn ($query) => $query->where('queue', $queue) + ) + ->delete(); + } + + /** + * List failed queue jobs. + */ + public function listFailed(?string $queue): array + { + return model(QueueJobFailedModel::class) + ->when( + $queue !== null, + static fn ($query) => $query->where('queue', $queue) + ) + ->orderBy('failed_at', 'desc') + ->findAll(); + } + + /** + * Log failed job. + * + * @throws ReflectionException + */ + protected function logFailed(QueueJob $queueJob, Throwable $err): bool + { + $exception = "Exception: {$err->getCode()} - {$err->getMessage()}" . PHP_EOL . + "file: {$err->getFile()}:{$err->getLine()}"; + + $queueJobFailed = new QueueJobFailed([ + 'connection' => 'database', + 'queue' => $queueJob->queue, + 'payload' => $queueJob->payload, + 'priority' => $queueJob->priority, + 'exception' => $exception, + ]); + + return model(QueueJobFailedModel::class)->insert($queueJobFailed, false); + } + + /** + * Validate job and priority. + */ + protected function validateJobAndPriority(string $queue, string $job): void + { + // Validate jobHandler. + if (! in_array($job, array_keys($this->config->jobHandlers), true)) { + throw QueueException::forIncorrectJobHandler(); + } + + if ($this->priority === null) { + $this->setPriority($this->config->queueDefaultPriority[$queue] ?? 'default'); + } + + // Validate non-standard priority. + if (! in_array($this->priority, $this->config->queuePriorities[$queue] ?? ['default'], true)) { + throw QueueException::forIncorrectQueuePriority($this->priority, $queue); + } + } +} diff --git a/src/Handlers/DatabaseHandler.php b/src/Handlers/DatabaseHandler.php index 94d357b..bf74dc5 100644 --- a/src/Handlers/DatabaseHandler.php +++ b/src/Handlers/DatabaseHandler.php @@ -5,17 +5,14 @@ use CodeIgniter\I18n\Time; use Michalsn\CodeIgniterQueue\Config\Queue as QueueConfig; use Michalsn\CodeIgniterQueue\Entities\QueueJob; -use Michalsn\CodeIgniterQueue\Entities\QueueJobFailed; use Michalsn\CodeIgniterQueue\Enums\Status; -use Michalsn\CodeIgniterQueue\Exceptions\QueueException; use Michalsn\CodeIgniterQueue\Interfaces\QueueInterface; -use Michalsn\CodeIgniterQueue\Models\QueueJobFailedModel; use Michalsn\CodeIgniterQueue\Models\QueueJobModel; use Michalsn\CodeIgniterQueue\Payload; use ReflectionException; use Throwable; -class DatabaseHandler implements QueueInterface +class DatabaseHandler extends BaseHandler implements QueueInterface { private readonly QueueJobModel $jobModel; @@ -32,18 +29,19 @@ public function __construct(protected QueueConfig $config) */ public function push(string $queue, string $job, array $data): bool { - if (! in_array($job, array_keys($this->config->jobHandlers), true)) { - throw QueueException::forIncorrectJobHandler(); - } + $this->validateJobAndPriority($queue, $job); $queueJob = new QueueJob([ 'queue' => $queue, 'payload' => new Payload($job, $data), + 'priority' => $this->priority, 'status' => Status::PENDING->value, 'attempts' => 0, 'available_at' => Time::now()->timestamp, ]); + $this->priority = null; + return $this->jobModel->insert($queueJob, false); } @@ -52,9 +50,9 @@ public function push(string $queue, string $job, array $data): bool * * @throws ReflectionException */ - public function pop(string $queue): ?QueueJob + public function pop(string $queue, array $priorities): ?QueueJob { - $queueJob = $this->jobModel->getFromQueue($queue); + $queueJob = $this->jobModel->getFromQueue($queue, $priorities); if ($queueJob === null) { return null; @@ -88,16 +86,7 @@ public function later(QueueJob $queueJob, int $seconds): bool public function failed(QueueJob $queueJob, Throwable $err, bool $keepJob): bool { if ($keepJob) { - $exception = "Exception: {$err->getCode()} - {$err->getMessage()}" . PHP_EOL . - "file: {$err->getFile()}:{$err->getLine()}"; - - $queueJobFailed = new QueueJobFailed([ - 'connection' => 'database', - 'queue' => $queueJob->queue, - 'payload' => $queueJob->payload, - 'exception' => $exception, - ]); - model(QueueJobFailedModel::class)->insert($queueJobFailed, false); + $this->logFailed($queueJob, $err); } return $this->jobModel->delete($queueJob->id); @@ -128,78 +117,4 @@ public function clear(?string $queue = null): bool return $this->jobModel->delete(); } - - /** - * Retry failed job. - * ∂ - * - * @throws ReflectionException - */ - public function retry(?int $id, ?string $queue): int - { - $jobs = model(QueueJobFailedModel::class) - ->when( - $id !== null, - static fn ($query) => $query->where('id', $id) - ) - ->when( - $queue !== null, - static fn ($query) => $query->where('queue', $queue) - ) - ->findAll(); - - foreach ($jobs as $job) { - $this->push($job->queue, $job->payload['job'], $job->payload['data']); - $this->forget($job->id); - } - - return count($jobs); - } - - /** - * Delete failed job by ID. - */ - public function forget(int $id): bool - { - if (model(QueueJobFailedModel::class)->delete($id)) { - return model(QueueJobFailedModel::class)->affectedRows() > 0; - } - - return false; - } - - /** - * Delete many failed jobs at once. - */ - public function flush(?int $hours, ?string $queue): bool - { - if ($hours === null && $queue === null) { - return model(QueueJobFailedModel::class)->truncate(); - } - - return model(QueueJobFailedModel::class) - ->when( - $hours !== null, - static fn ($query) => $query->where('failed_at <=', Time::now()->subHours($hours)->timestamp) - ) - ->when( - $queue !== null, - static fn ($query) => $query->where('queue', $queue) - ) - ->delete(); - } - - /** - * List failed queue jobs. - */ - public function listFailed(?string $queue) - { - return model(QueueJobFailedModel::class) - ->when( - $queue !== null, - static fn ($query) => $query->where('queue', $queue) - ) - ->orderBy('failed_at', 'desc') - ->findAll(); - } } diff --git a/src/Interfaces/QueueInterface.php b/src/Interfaces/QueueInterface.php index aef9125..2f6be86 100644 --- a/src/Interfaces/QueueInterface.php +++ b/src/Interfaces/QueueInterface.php @@ -9,7 +9,7 @@ interface QueueInterface { public function push(string $queue, string $job, array $data); - public function pop(string $queue); + public function pop(string $queue, array $priorities); public function later(QueueJob $queueJob, int $seconds); diff --git a/src/Language/en/Queue.php b/src/Language/en/Queue.php index ab54ff9..d7c1a09 100644 --- a/src/Language/en/Queue.php +++ b/src/Language/en/Queue.php @@ -6,6 +6,9 @@ 'job' => 'Job class name', ], ], - 'incorrectHandler' => 'This queue handler is incorrect.', - 'incorrectJobHandler' => 'This job name is not defined in the $jobHandlers array.', + 'incorrectHandler' => 'This queue handler is incorrect.', + 'incorrectJobHandler' => 'This job name is not defined in the $jobHandlers array.', + 'incorrectPriorityFormat' => 'The priority name should consists only lowercase letters.', + 'tooLongPriorityName' => 'The priority name is too long. It should be no longer than 64 letters.', + 'incorrectQueuePriority' => 'This queue has incorrectly defined priority: "{0}" for the queue: "{1}".', ]; diff --git a/src/Models/QueueJobFailedModel.php b/src/Models/QueueJobFailedModel.php index 9a51c5f..25932d2 100644 --- a/src/Models/QueueJobFailedModel.php +++ b/src/Models/QueueJobFailedModel.php @@ -13,7 +13,7 @@ class QueueJobFailedModel extends Model protected $returnType = QueueJobFailed::class; protected $useSoftDeletes = false; protected $protectFields = true; - protected $allowedFields = ['connection', 'queue', 'payload', 'exception']; + protected $allowedFields = ['connection', 'queue', 'payload', 'priority', 'exception']; // Dates protected $useTimestamps = true; diff --git a/src/Models/QueueJobModel.php b/src/Models/QueueJobModel.php index 0bf5a0d..58d2fb0 100644 --- a/src/Models/QueueJobModel.php +++ b/src/Models/QueueJobModel.php @@ -2,6 +2,8 @@ namespace Michalsn\CodeIgniterQueue\Models; +use CodeIgniter\Database\BaseBuilder; +use CodeIgniter\Database\RawSql; use CodeIgniter\I18n\Time; use CodeIgniter\Model; use Michalsn\CodeIgniterQueue\Entities\QueueJob; @@ -16,7 +18,7 @@ class QueueJobModel extends Model protected $returnType = QueueJob::class; protected $useSoftDeletes = false; protected $protectFields = true; - protected $allowedFields = ['queue', 'payload', 'status', 'attempts', 'available_at']; + protected $allowedFields = ['queue', 'payload', 'priority', 'status', 'attempts', 'available_at']; // Dates protected $useTimestamps = true; @@ -35,7 +37,7 @@ class QueueJobModel extends Model * * @throws ReflectionException */ - public function getFromQueue(string $name): ?QueueJob + public function getFromQueue(string $name, array $priority): ?QueueJob { // For SQLite3 memory database this will cause problems // so check if we're not in the testing environment first. @@ -47,14 +49,14 @@ public function getFromQueue(string $name): ?QueueJob $this->db->transStart(); // Prepare SQL - $sql = $this->builder() + $builder = $this->builder() ->where('queue', $name) ->where('status', Status::PENDING->value) ->where('available_at <=', Time::now()->timestamp) - ->orderBy('available_at', 'asc') - ->orderBy('id', 'asc') - ->limit(1) - ->getCompiledSelect(); + ->limit(1); + + $builder = $this->setPriority($builder, $priority); + $sql = $builder->getCompiledSelect(); $query = $this->db->query($this->skipLocked($sql)); if ($query === false) { @@ -90,4 +92,26 @@ private function skipLocked(string $sql): string return $sql .= ' FOR UPDATE SKIP LOCKED'; } + + /** + * Handle priority of the queue. + */ + private function setPriority(BaseBuilder $builder, array $priority): BaseBuilder + { + $builder->whereIn('priority', $priority); + + if ($priority !== ['default']) { + if ($this->db->DBDriver === 'SQLite3') { + $builder->orderBy(new RawSql('CASE priority ' . implode(' ', array_map(static fn ($value, $key) => "WHEN '{$value}' THEN {$key}", $priority, array_keys($priority))) . ' END')); + } else { + $builder->orderBy(new RawSql('FIELD(priority, ' . implode(',', array_map(static fn ($value) => "'{$value}'", $priority)) . ')')); + } + } + + $builder + ->orderBy('available_at', 'asc') + ->orderBy('id', 'asc'); + + return $builder; + } } diff --git a/tests/DatabaseHandlerTest.php b/tests/DatabaseHandlerTest.php index 1932ca6..cd1f1ec 100644 --- a/tests/DatabaseHandlerTest.php +++ b/tests/DatabaseHandlerTest.php @@ -2,6 +2,7 @@ namespace Tests; +use CodeIgniter\Test\ReflectionHelper; use Exception; use Michalsn\CodeIgniterQueue\Entities\QueueJob; use Michalsn\CodeIgniterQueue\Enums\Status; @@ -18,6 +19,8 @@ */ final class DatabaseHandlerTest extends TestCase { + use ReflectionHelper; + protected $seed = TestQueueSeeder::class; private QueueConfig $config; @@ -34,6 +37,23 @@ public function testDatabaseHandler() $this->assertInstanceOf(DatabaseHandler::class, $handler); } + public function testPriority() + { + $handler = new DatabaseHandler($this->config); + $handler->setPriority('high'); + + $this->assertSame('high', self::getPrivateProperty($handler, 'priority')); + } + + public function testPriorityException() + { + $this->expectException(QueueException::class); + $this->expectExceptionMessage('The priority name should consists only lowercase letters.'); + + $handler = new DatabaseHandler($this->config); + $handler->setPriority('high_:'); + } + /** * @throws ReflectionException */ @@ -49,6 +69,22 @@ public function testPush() ]); } + /** + * @throws ReflectionException + */ + public function testPushWithPriority() + { + $handler = new DatabaseHandler($this->config); + $result = $handler->setPriority('high')->push('queue', 'success', ['key' => 'value']); + + $this->assertTrue($result); + $this->seeInDatabase('queue_jobs', [ + 'queue' => 'queue', + 'payload' => json_encode(['job' => 'success', 'data' => ['key' => 'value']]), + 'priority' => 'high', + ]); + } + /** * @throws ReflectionException */ @@ -61,13 +97,25 @@ public function testPushException() $handler->push('queue', 'not-exists', ['key' => 'value']); } + /** + * @throws ReflectionException + */ + public function testPushWithPriorityException() + { + $this->expectException(QueueException::class); + $this->expectExceptionMessage('This queue has incorrectly defined priority: "invalid" for the queue: "queue".'); + + $handler = new DatabaseHandler($this->config); + $handler->setPriority('invalid')->push('queue', 'success', ['key' => 'value']); + } + /** * @throws ReflectionException */ public function testPop() { $handler = new DatabaseHandler($this->config); - $result = $handler->pop('queue1'); + $result = $handler->pop('queue1', ['default']); $this->assertInstanceOf(QueueJob::class, $result); $this->seeInDatabase('queue_jobs', [ @@ -82,7 +130,7 @@ public function testPop() public function testPopEmpty() { $handler = new DatabaseHandler($this->config); - $result = $handler->pop('queue123'); + $result = $handler->pop('queue123', ['default']); $this->assertNull($result); } @@ -93,7 +141,7 @@ public function testPopEmpty() public function testLater() { $handler = new DatabaseHandler($this->config); - $queueJob = $handler->pop('queue1'); + $queueJob = $handler->pop('queue1', ['default']); $this->seeInDatabase('queue_jobs', [ 'id' => 2, @@ -115,7 +163,7 @@ public function testLater() public function testFailedAndKeepJob() { $handler = new DatabaseHandler($this->config); - $queueJob = $handler->pop('queue1'); + $queueJob = $handler->pop('queue1', ['default']); $err = new Exception('Sample exception'); $result = $handler->failed($queueJob, $err, true); @@ -134,7 +182,7 @@ public function testFailedAndKeepJob() public function testFailedAndDontKeepJob() { $handler = new DatabaseHandler($this->config); - $queueJob = $handler->pop('queue1'); + $queueJob = $handler->pop('queue1', ['default']); $err = new Exception('Sample exception'); $result = $handler->failed($queueJob, $err, false); @@ -156,7 +204,7 @@ public function testFailedAndDontKeepJob() public function testDoneAndKeepJob() { $handler = new DatabaseHandler($this->config); - $queueJob = $handler->pop('queue1'); + $queueJob = $handler->pop('queue1', ['default']); $result = $handler->done($queueJob, true); @@ -173,7 +221,7 @@ public function testDoneAndKeepJob() public function testDoneAndDontKeepJob() { $handler = new DatabaseHandler($this->config); - $queueJob = $handler->pop('queue1'); + $queueJob = $handler->pop('queue1', ['default']); $result = $handler->done($queueJob, false); @@ -233,7 +281,7 @@ public function testForget() public function testFlush() { $handler = new DatabaseHandler($this->config); - $queueJob = $handler->pop('queue1'); + $queueJob = $handler->pop('queue1', ['default']); $err = new Exception('Sample exception here'); $result = $handler->failed($queueJob, $err, true); diff --git a/tests/_support/Config/Queue.php b/tests/_support/Config/Queue.php index 508a8e0..57f7256 100644 --- a/tests/_support/Config/Queue.php +++ b/tests/_support/Config/Queue.php @@ -39,6 +39,22 @@ class Queue extends BaseQueue */ public bool $keepFailedJobs = true; + /** + * Default priorities for the queue + * if different from the "default". + */ + public array $queueDefaultPriority = [ + 'queue' => 'low', + ]; + + /** + * Valid priorities for the queue, + * if different from the "default". + */ + public array $queuePriorities = [ + 'queue' => ['high', 'low'], + ]; + /** * Your jobs handlers. */ diff --git a/tests/_support/Database/Seeds/TestQueueSeeder.php b/tests/_support/Database/Seeds/TestQueueSeeder.php index 9dff812..5d3b315 100644 --- a/tests/_support/Database/Seeds/TestQueueSeeder.php +++ b/tests/_support/Database/Seeds/TestQueueSeeder.php @@ -16,6 +16,7 @@ public function run(): void model(QueueJobModel::class)->insert(new QueueJob([ 'queue' => 'queue1', 'payload' => ['job' => 'success', 'data' => []], + 'priority' => 'default', 'status' => Status::RESERVED->value, 'attempts' => 0, 'available_at' => 1_697_269_864, @@ -24,6 +25,7 @@ public function run(): void model(QueueJobModel::class)->insert(new QueueJob([ 'queue' => 'queue1', 'payload' => ['job' => 'failure', 'data' => []], + 'priority' => 'default', 'status' => Status::PENDING->value, 'attempts' => 0, 'available_at' => 1_697_269_860, @@ -33,6 +35,7 @@ public function run(): void 'connection' => 'database', 'queue' => 'queue1', 'payload' => ['job' => 'failure', 'data' => []], + 'priority' => 'default', 'exception' => 'Exception info', ])); } From 0d4d31968143188585b606b7f72a170b9a4787e9 Mon Sep 17 00:00:00 2001 From: michalsn Date: Sun, 5 Nov 2023 10:18:54 +0100 Subject: [PATCH 2/8] cleanup --- src/Config/Queue.php | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/src/Config/Queue.php b/src/Config/Queue.php index 8cec0d1..fc8a37b 100644 --- a/src/Config/Queue.php +++ b/src/Config/Queue.php @@ -5,7 +5,6 @@ use CodeIgniter\Config\BaseConfig; use Michalsn\CodeIgniterQueue\Exceptions\QueueException; use Michalsn\CodeIgniterQueue\Handlers\DatabaseHandler; -use Michalsn\CodeIgniterQueue\Handlers\RedisHandler; class Queue extends BaseConfig { @@ -19,7 +18,6 @@ class Queue extends BaseConfig */ public array $handlers = [ 'database' => DatabaseHandler::class, - 'redis' => RedisHandler::class, ]; /** @@ -30,17 +28,6 @@ class Queue extends BaseConfig 'getShared' => true, ]; - /** - * Redis and Predis handler config. - */ - public array $redis = [ - 'host' => '127.0.0.1', - 'password' => null, - 'port' => 6379, - 'timeout' => 0, - 'database' => 0, - ]; - /** * Whether to keep the DONE jobs in the queue. */ From d4d31963441a31a1eb965c8b61c137f4db1c68b2 Mon Sep 17 00:00:00 2001 From: michalsn Date: Sun, 5 Nov 2023 11:51:11 +0100 Subject: [PATCH 3/8] fix migration --- .../2023-11-05-064053_AddPriorityField.php | 25 ++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/src/Database/Migrations/2023-11-05-064053_AddPriorityField.php b/src/Database/Migrations/2023-11-05-064053_AddPriorityField.php index 3209b65..703cd51 100644 --- a/src/Database/Migrations/2023-11-05-064053_AddPriorityField.php +++ b/src/Database/Migrations/2023-11-05-064053_AddPriorityField.php @@ -21,14 +21,33 @@ public function up() $this->forge->addColumn('queue_jobs', $fields); $this->forge->addColumn('queue_jobs_failed', $fields); - $this->forge->dropKey('queue_jobs', 'queue_status_available_at'); - $this->forge->addKey(['queue', 'priority', 'status', 'available_at']); + // Ugly fix for dropping the correct index + // since it had no name given + $keys = $this->db->getIndexData('queue_jobs'); + foreach ($keys as $key) { + if ($key->fields === ['queue', 'status', 'available_at']) { + $this->forge->dropKey('queue_jobs', $key->name, false); + break; + } + } + + $this->forge->addKey(['queue', 'priority', 'status', 'available_at'], 'queue_priority_status_available_at'); + $this->forge->processIndexes('queue_jobs'); } public function down() { - $this->forge->dropKey('queue_jobs', 'queue_priority_status_available_at'); + // Ugly fix for dropping the correct index + $keys = $this->db->getIndexData('queue_jobs'); + foreach ($keys as $key) { + if ($key->fields === ['queue', 'priority', 'status', 'available_at']) { + $this->forge->dropKey('queue_jobs', $key->name, false); + break; + } + } + $this->forge->addKey(['queue', 'status', 'available_at']); + $this->forge->processIndexes('queue_jobs'); $this->forge->dropColumn('queue_jobs', 'priority'); $this->forge->dropColumn('queue_jobs_failed', 'priority'); From 62b3776ffc8371d03f0e9d620f4dbae686018481 Mon Sep 17 00:00:00 2001 From: michalsn Date: Sun, 5 Nov 2023 11:51:41 +0100 Subject: [PATCH 4/8] fix phpstan error --- src/Handlers/BaseHandler.php | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/Handlers/BaseHandler.php b/src/Handlers/BaseHandler.php index 4ffac17..19b61a1 100644 --- a/src/Handlers/BaseHandler.php +++ b/src/Handlers/BaseHandler.php @@ -3,6 +3,7 @@ namespace Michalsn\CodeIgniterQueue\Handlers; use CodeIgniter\I18n\Time; +use Michalsn\CodeIgniterQueue\Config\Queue as QueueConfig; use Michalsn\CodeIgniterQueue\Entities\QueueJob; use Michalsn\CodeIgniterQueue\Entities\QueueJobFailed; use Michalsn\CodeIgniterQueue\Exceptions\QueueException; @@ -10,6 +11,9 @@ use ReflectionException; use Throwable; +/** + * @property-read QueueConfig $config + */ abstract class BaseHandler { protected ?string $priority = null; From f63de8a610b81c712a9a324064c4bc9b27f0f420 Mon Sep 17 00:00:00 2001 From: michalsn Date: Sun, 5 Nov 2023 11:58:07 +0100 Subject: [PATCH 5/8] fixes --- .../Migrations/2023-11-05-064053_AddPriorityField.php | 8 +++++++- src/Handlers/BaseHandler.php | 2 +- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/Database/Migrations/2023-11-05-064053_AddPriorityField.php b/src/Database/Migrations/2023-11-05-064053_AddPriorityField.php index 703cd51..1f9fe82 100644 --- a/src/Database/Migrations/2023-11-05-064053_AddPriorityField.php +++ b/src/Database/Migrations/2023-11-05-064053_AddPriorityField.php @@ -2,8 +2,12 @@ namespace Michalsn\CodeIgniterQueue\Database\Migrations; +use CodeIgniter\Database\BaseConnection; use CodeIgniter\Database\Migration; +/** + * @property BaseConnection $db + */ class AddPriorityField extends Migration { public function up() @@ -24,6 +28,7 @@ public function up() // Ugly fix for dropping the correct index // since it had no name given $keys = $this->db->getIndexData('queue_jobs'); + foreach ($keys as $key) { if ($key->fields === ['queue', 'status', 'available_at']) { $this->forge->dropKey('queue_jobs', $key->name, false); @@ -31,7 +36,7 @@ public function up() } } - $this->forge->addKey(['queue', 'priority', 'status', 'available_at'], 'queue_priority_status_available_at'); + $this->forge->addKey(['queue', 'priority', 'status', 'available_at'], false, false, 'queue_priority_status_available_at'); $this->forge->processIndexes('queue_jobs'); } @@ -39,6 +44,7 @@ public function down() { // Ugly fix for dropping the correct index $keys = $this->db->getIndexData('queue_jobs'); + foreach ($keys as $key) { if ($key->fields === ['queue', 'priority', 'status', 'available_at']) { $this->forge->dropKey('queue_jobs', $key->name, false); diff --git a/src/Handlers/BaseHandler.php b/src/Handlers/BaseHandler.php index 19b61a1..d5afe96 100644 --- a/src/Handlers/BaseHandler.php +++ b/src/Handlers/BaseHandler.php @@ -12,7 +12,7 @@ use Throwable; /** - * @property-read QueueConfig $config + * @property QueueConfig $config */ abstract class BaseHandler { From 4c5c464606ed736ea23854f8a11cf9dad0cf89fe Mon Sep 17 00:00:00 2001 From: michalsn Date: Sun, 5 Nov 2023 12:04:07 +0100 Subject: [PATCH 6/8] fix --- src/Handlers/BaseHandler.php | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Handlers/BaseHandler.php b/src/Handlers/BaseHandler.php index d5afe96..aad22b4 100644 --- a/src/Handlers/BaseHandler.php +++ b/src/Handlers/BaseHandler.php @@ -16,6 +16,7 @@ */ abstract class BaseHandler { + protected QueueConfig $config; protected ?string $priority = null; /** From 49dfa35e520988f2c5417943b8f85172baf0b6e6 Mon Sep 17 00:00:00 2001 From: michalsn Date: Sun, 5 Nov 2023 12:17:24 +0100 Subject: [PATCH 7/8] add test --- tests/DatabaseHandlerTest.php | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/tests/DatabaseHandlerTest.php b/tests/DatabaseHandlerTest.php index cd1f1ec..26eb20a 100644 --- a/tests/DatabaseHandlerTest.php +++ b/tests/DatabaseHandlerTest.php @@ -45,7 +45,7 @@ public function testPriority() $this->assertSame('high', self::getPrivateProperty($handler, 'priority')); } - public function testPriorityException() + public function testPriorityNameException() { $this->expectException(QueueException::class); $this->expectExceptionMessage('The priority name should consists only lowercase letters.'); @@ -54,6 +54,15 @@ public function testPriorityException() $handler->setPriority('high_:'); } + public function testPriorityNameLengthException() + { + $this->expectException(QueueException::class); + $this->expectExceptionMessage('The priority name is too long. It should be no longer than 64 letters.'); + + $handler = new DatabaseHandler($this->config); + $handler->setPriority(str_repeat('a', 65)); + } + /** * @throws ReflectionException */ From 9a85afea6628f6b5f461b282ef0fa196ed21dff2 Mon Sep 17 00:00:00 2001 From: michalsn Date: Thu, 9 Nov 2023 19:22:20 +0100 Subject: [PATCH 8/8] add service return type - fixes #3 --- src/Config/Services.php | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Config/Services.php b/src/Config/Services.php index 99502ca..fe5e158 100644 --- a/src/Config/Services.php +++ b/src/Config/Services.php @@ -4,11 +4,12 @@ use CodeIgniter\Config\BaseService; use Michalsn\CodeIgniterQueue\Config\Queue as QueueConfig; +use Michalsn\CodeIgniterQueue\Interfaces\QueueInterface; use Michalsn\CodeIgniterQueue\Queue; class Services extends BaseService { - public static function queue(?QueueConfig $config = null, $getShared = true) + public static function queue(?QueueConfig $config = null, $getShared = true): QueueInterface { if ($getShared) { return static::getSharedInstance('queue', $config);