Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

broker_priority not work #553

Open
notwhy opened this issue Jun 6, 2023 · 3 comments
Open

broker_priority not work #553

notwhy opened this issue Jun 6, 2023 · 3 comments

Comments

@notwhy
Copy link

notwhy commented Jun 6, 2023

Issues

I want to run tasks based on priority levels 。 but it not work

What OS are you using?

windows

What version of Dramatiq are you using?

Dramatiq 1.14.2 and i use Rabbitmq as Broker

What did you do?

i try two code to test the broker_priority ,but it does not work as i except
dramatiq --threads 2 --processes 2 _url -Q testfun

@dramatiq.actor(time_limit=3600 * 1000, max_retries=0, queue_name='testfun', priority=MEDIUM)
def testfun(priority):
    testfun.logger.info("priority:" + str(priority))
    time.sleep(priority)

first i run

    i = 30
    j = 1
    while i >= 0:
        if (i >= j):
            print(i)
            i -= 1
            testfun.send_with_options(args=(i,), broker_priority=i)
        else:
            break

after it end i run

    for i in range(1,30):
        testfun.send_with_options(args=(i,), broker_priority=i)

What did you expect would happen?

I want to run tasks based on priority levels

What happened?

It appears to be out of order at runtime and not arranged according to the priority order
Is it my misunderstanding that there is no such function ? and I did not see any information about the broker_priority document, please help me!!!

@davidt99
Copy link
Contributor

When you init the broker, you need to add max_priority parameter (10 is the recommended value) so dramatiq will know to create the queue with this setting. Note that this must be set during queue creation, so if you have an existing queue in RabbitMQ, you'll have to create a new one.

@notwhy
Copy link
Author

notwhy commented Sep 21, 2023

i did had max_priority=10 in my broke. but it not work.

broker_middleware = [
    AgeLimit, TimeLimit,
    ShutdownNotifications, Callbacks, Pipelines, Retries
]

broker_middleware = [m() for m in broker_middleware]
from dramatiq.brokers.rabbitmq import RabbitmqBroker

broker = RabbitmqBroker(url="amqp://@%s:31013/my_vhost?heartbeat=0" % raibitmq_ip, max_priority=10,
                        middleware=broker_middleware)

@davidt99
Copy link
Contributor

Take into account that the worker takes the next available message. If you produce the messages at the same rate as you consume them, then your queue always has one message.
Can you make the test where you first send new messages to the queue and start the worker afterwards.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants