diff --git a/lib/hirefire/macro/resque.rb b/lib/hirefire/macro/resque.rb index bf11a5d..94371b8 100644 --- a/lib/hirefire/macro/resque.rb +++ b/lib/hirefire/macro/resque.rb @@ -67,9 +67,13 @@ def working_size(queues) end def scheduled_size(queues) + cached_result = cache.fetch(queues) + return cached_result if cached_result + cursor = 0 batch = 1000 total_size = 0 + sizes = Hash.new(0) current_time = Time.now.to_i loop do @@ -101,8 +105,9 @@ def scheduled_size(queues) break if encoded_jobs.empty? - total_size += encoded_jobs.count do |encoded_job| - queues.include?(::Resque.decode(encoded_job)["queue"]) + encoded_jobs.each do |encoded_job| + queue = ::Resque.decode(encoded_job)["queue"] + sizes[queue] += 1 end break if encoded_jobs.size < batch @@ -117,14 +122,65 @@ def scheduled_size(queues) cursor += batch end - total_size + if queues.empty? + total_size + else + cache.store(sizes) + cache.fetch(queues) + end end - private - def registered_queues ::Resque.redis.keys("queue:*").map { |key| key[6..] }.to_set end + + class Cache + EXPIRY_TIME = 5 # seconds + + def initialize + @sizes = Hash.new(0) + @cached_at = expired_time + end + + def fetch(queues) + return nil if expired? + + if queues.empty? + sizes.values.sum + else + sizes.values_at(*queues).sum + end + end + + def store(sizes) + @sizes = Hash.new(0).merge(sizes) + @cached_at = current_time + end + + def expire! + @cached_at = expired_time + end + + private + + attr_reader :sizes, :cached_at + + def current_time + Time.now.to_i + end + + def expired_time + current_time - EXPIRY_TIME + end + + def expired? + current_time - cached_at >= EXPIRY_TIME + end + end + + def cache + @cache ||= Cache.new + end end end end diff --git a/test/hirefire/macro/test_resque.rb b/test/hirefire/macro/test_resque.rb index ac80732..72225b6 100644 --- a/test/hirefire/macro/test_resque.rb +++ b/test/hirefire/macro/test_resque.rb @@ -4,6 +4,7 @@ class HireFire::Macro::ResqueTest < Minitest::Test def setup + expire_cache! Resque.redis = Redis.new(db: 15).tap(&:flushdb) end @@ -11,6 +12,10 @@ def teardown Resque.redis.close end + def expire_cache! + HireFire::Macro::Resque.send(:cache).expire! + end + def test_job_queue_latency_unsupported assert_raises(HireFire::Errors::JobQueueLatencyUnsupportedError) do HireFire::Macro::Resque.job_queue_latency @@ -41,20 +46,20 @@ def test_job_queue_size_with_scheduled_jobs Resque.enqueue_in_with_queue(:default, 300, BasicJob) Resque.enqueue_in_with_queue(:mailer, 300, BasicJob) - assert_equal 0, HireFire::Macro::Resque.job_queue_size + assert_equal 0, HireFire::Macro::Resque.job_queue_size # uncached Timecop.freeze(Time.now + 200) do - assert_equal 1, HireFire::Macro::Resque.job_queue_size - assert_equal 1, HireFire::Macro::Resque.job_queue_size(:default) - assert_equal 0, HireFire::Macro::Resque.job_queue_size(:mailer) - assert_equal 1, HireFire::Macro::Resque.job_queue_size(:default, :mailer) + assert_equal 1, HireFire::Macro::Resque.job_queue_size # uncached + assert_equal 1, HireFire::Macro::Resque.job_queue_size(:default) # uncached + assert_equal 0, HireFire::Macro::Resque.job_queue_size(:mailer) # cached + assert_equal 1, HireFire::Macro::Resque.job_queue_size(:default, :mailer) # cached end Timecop.freeze(Time.now + 400) do - assert_equal 3, HireFire::Macro::Resque.job_queue_size - assert_equal 2, HireFire::Macro::Resque.job_queue_size(:default) - assert_equal 1, HireFire::Macro::Resque.job_queue_size(:mailer) - assert_equal 3, HireFire::Macro::Resque.job_queue_size(:default, :mailer) + assert_equal 2, HireFire::Macro::Resque.job_queue_size(:default) # expired + assert_equal 1, HireFire::Macro::Resque.job_queue_size(:mailer) # cached + assert_equal 3, HireFire::Macro::Resque.job_queue_size(:default, :mailer) # cached + assert_equal 3, HireFire::Macro::Resque.job_queue_size # cached end end