From 8929cf11c37a9a2ddf8b96f844b2a13ffcaa1857 Mon Sep 17 00:00:00 2001 From: Michael van Rooijen Date: Sun, 9 Jun 2024 11:38:54 +0200 Subject: [PATCH] Introduce HireFire::Macro::Resque::Cache This change adds a caching mechanism to the Resque macro. Since we have to scan the entire delayed set in Redis to acquire the sizes of the requested queues containing jobs scheduled to run now, we can instead process the size of every queue associated with the scanned jobs, whether requested or not, and cache it for a certain amount of time. This approach allows us to avoid scanning the entire delayed set on subsequent requests within the cache duration. Instead, we can access and aggregate the cached data. This will significantly improve throughput for applications that have a large number of scheduled jobs and/or a large number of Resque process types that need to be monitored. --- lib/hirefire/macro/resque.rb | 66 +++++++++++++++++++++++++++--- test/hirefire/macro/test_resque.rb | 23 +++++++---- 2 files changed, 75 insertions(+), 14 deletions(-) diff --git a/lib/hirefire/macro/resque.rb b/lib/hirefire/macro/resque.rb index bf11a5d..94371b8 100644 --- a/lib/hirefire/macro/resque.rb +++ b/lib/hirefire/macro/resque.rb @@ -67,9 +67,13 @@ def working_size(queues) end def scheduled_size(queues) + cached_result = cache.fetch(queues) + return cached_result if cached_result + cursor = 0 batch = 1000 total_size = 0 + sizes = Hash.new(0) current_time = Time.now.to_i loop do @@ -101,8 +105,9 @@ def scheduled_size(queues) break if encoded_jobs.empty? - total_size += encoded_jobs.count do |encoded_job| - queues.include?(::Resque.decode(encoded_job)["queue"]) + encoded_jobs.each do |encoded_job| + queue = ::Resque.decode(encoded_job)["queue"] + sizes[queue] += 1 end break if encoded_jobs.size < batch @@ -117,14 +122,65 @@ def scheduled_size(queues) cursor += batch end - total_size + if queues.empty? + total_size + else + cache.store(sizes) + cache.fetch(queues) + end end - private - def registered_queues ::Resque.redis.keys("queue:*").map { |key| key[6..] }.to_set end + + class Cache + EXPIRY_TIME = 5 # seconds + + def initialize + @sizes = Hash.new(0) + @cached_at = expired_time + end + + def fetch(queues) + return nil if expired? + + if queues.empty? + sizes.values.sum + else + sizes.values_at(*queues).sum + end + end + + def store(sizes) + @sizes = Hash.new(0).merge(sizes) + @cached_at = current_time + end + + def expire! + @cached_at = expired_time + end + + private + + attr_reader :sizes, :cached_at + + def current_time + Time.now.to_i + end + + def expired_time + current_time - EXPIRY_TIME + end + + def expired? + current_time - cached_at >= EXPIRY_TIME + end + end + + def cache + @cache ||= Cache.new + end end end end diff --git a/test/hirefire/macro/test_resque.rb b/test/hirefire/macro/test_resque.rb index ac80732..72225b6 100644 --- a/test/hirefire/macro/test_resque.rb +++ b/test/hirefire/macro/test_resque.rb @@ -4,6 +4,7 @@ class HireFire::Macro::ResqueTest < Minitest::Test def setup + expire_cache! Resque.redis = Redis.new(db: 15).tap(&:flushdb) end @@ -11,6 +12,10 @@ def teardown Resque.redis.close end + def expire_cache! + HireFire::Macro::Resque.send(:cache).expire! + end + def test_job_queue_latency_unsupported assert_raises(HireFire::Errors::JobQueueLatencyUnsupportedError) do HireFire::Macro::Resque.job_queue_latency @@ -41,20 +46,20 @@ def test_job_queue_size_with_scheduled_jobs Resque.enqueue_in_with_queue(:default, 300, BasicJob) Resque.enqueue_in_with_queue(:mailer, 300, BasicJob) - assert_equal 0, HireFire::Macro::Resque.job_queue_size + assert_equal 0, HireFire::Macro::Resque.job_queue_size # uncached Timecop.freeze(Time.now + 200) do - assert_equal 1, HireFire::Macro::Resque.job_queue_size - assert_equal 1, HireFire::Macro::Resque.job_queue_size(:default) - assert_equal 0, HireFire::Macro::Resque.job_queue_size(:mailer) - assert_equal 1, HireFire::Macro::Resque.job_queue_size(:default, :mailer) + assert_equal 1, HireFire::Macro::Resque.job_queue_size # uncached + assert_equal 1, HireFire::Macro::Resque.job_queue_size(:default) # uncached + assert_equal 0, HireFire::Macro::Resque.job_queue_size(:mailer) # cached + assert_equal 1, HireFire::Macro::Resque.job_queue_size(:default, :mailer) # cached end Timecop.freeze(Time.now + 400) do - assert_equal 3, HireFire::Macro::Resque.job_queue_size - assert_equal 2, HireFire::Macro::Resque.job_queue_size(:default) - assert_equal 1, HireFire::Macro::Resque.job_queue_size(:mailer) - assert_equal 3, HireFire::Macro::Resque.job_queue_size(:default, :mailer) + assert_equal 2, HireFire::Macro::Resque.job_queue_size(:default) # expired + assert_equal 1, HireFire::Macro::Resque.job_queue_size(:mailer) # cached + assert_equal 3, HireFire::Macro::Resque.job_queue_size(:default, :mailer) # cached + assert_equal 3, HireFire::Macro::Resque.job_queue_size # cached end end