Beatserver, a periodic task scheduler for django channels | beta software
Follow django channels documentation on howto install channels.
pip install -U beatserver
Add beatserver
to INSTALLED_APPS
in settings.py:
INSTALLED_APPS = [
'beatserver',
'channels',
'...'
]
beatconfig.py
from datetime import timedelta
BEAT_SCHEDULE = {
'testing-print': [
{
# will call test_print method of PrintConsumer
'type': 'test.print',
# message to pass to the consumer
'message': {'testing': 'one'},
# Every 5 seconds
'schedule': timedelta(seconds=5)
},
{
'type': 'test.print',
'message': {'testing': 'two'},
# Precisely at 3AM on Monday
'schedule': '0 3 * * 1'
},
]
}
Schedules can be specified as timedeltas for running tasks on specified intervals, or as cron-syntax strings for running tasks on exact schedules.
routing.py
application = ProtocolTypeRouter({
"channel": ChannelNameRouter({
"testing-print": PrintConsumer,
}),
})
consumers.py
from channels.consumer import SyncConsumer
class PrintConsumer(SyncConsumer):
def test_print(self, message):
print(message)
python manage.py beatserver