Skip to content

Commit

Permalink
Restart TwitterStreamAgent::Worker every hour to prevent stalling
Browse files Browse the repository at this point in the history
  • Loading branch information
dsander committed Sep 7, 2015
1 parent 08028aa commit 44ac1ed
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 9 deletions.
12 changes: 11 additions & 1 deletion app/concerns/long_runnable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def setup_worker
end

class Worker
attr_reader :thread, :id, :agent, :config, :mutex
attr_reader :thread, :id, :agent, :config, :mutex, :scheduler

def initialize(options = {})
@id = options[:id]
Expand Down Expand Up @@ -92,6 +92,12 @@ def stop!
end
end

def restart!
stop!
setup!(scheduler, mutex)
run!
end

def every(*args, &blk)
schedule(:every, args, &blk)
end
Expand All @@ -100,6 +106,10 @@ def cron(*args, &blk)
schedule(:cron, args, &blk)
end

def schedule_in(*args, &blk)
schedule(:schedule_in, args, &blk)
end

def boolify(value)
agent.send(:boolify, value)
end
Expand Down
11 changes: 8 additions & 3 deletions app/models/agents/twitter_stream_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,18 @@ def self.setup_worker
end

class Worker < LongRunnable::Worker
RELOAD_TIMEOUT = 10.minutes
RELOAD_TIMEOUT = 60.minutes
DUPLICATE_DETECTION_LENGTH = 1000
SEPARATOR = /[^\w_\-]+/

def setup
require 'twitter/json_stream'
@filter_to_agent_map = @config[:filter_to_agent_map]

schedule_in RELOAD_TIMEOUT do
puts "--> Restarting TwitterStream #{id}"
restart!
end
end

def run
Expand Down Expand Up @@ -199,13 +204,13 @@ def stream!(filters, agent, &block)

stream.on_no_data do |message|
STDERR.puts " --> Got no data for awhile; trying to reconnect."
stop
restart!
end

stream.on_max_reconnects do |timeout, retries|
STDERR.puts " --> Oops, tried too many times! <--"
sleep 60
stop
restart!
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/agent_runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def initialize(options = {})
@mutex = Mutex.new
@scheduler = Rufus::Scheduler.new(frequency: ENV['SCHEDULER_FREQUENCY'].presence || 0.3)

@scheduler.every 1 do
@scheduler.every 5 do
restart_dead_workers if @running
end

Expand Down
28 changes: 27 additions & 1 deletion spec/concerns/long_runnable_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def default_options
describe LongRunnable::Worker do
before(:each) do
@agent = Object.new
@worker = LongRunnable::Worker.new(agent: @agent)
@worker = LongRunnable::Worker.new(agent: @agent, id: 'test1234')
@worker.setup!(Rufus::Scheduler.new, Mutex.new)
end

Expand Down Expand Up @@ -84,5 +84,31 @@ def default_options
@worker.stop!
end
end

context "#restart!" do
it "stops, setups and starts the worker" do
mock(@worker).stop!
mock(@worker).setup!(@worker.scheduler, @worker.mutex)
mock(@worker).run!
@worker.restart!
end
end

context "scheduling" do
it "schedules tasks once" do
mock(@worker.scheduler).send(:schedule_in, 1.hour, tag: 'test1234')
@worker.schedule_in 1.hour do noop; end
end

it "schedules repeating tasks" do
mock(@worker.scheduler).send(:every, 1.hour, tag: 'test1234')
@worker.every 1.hour do noop; end
end

it "allows the cron syntax" do
mock(@worker.scheduler).send(:cron, '0 * * * *', tag: 'test1234')
@worker.cron '0 * * * *' do noop; end
end
end
end
end
7 changes: 4 additions & 3 deletions spec/models/agents/twitter_stream_agent_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@
@config = {agent: @agent, config: {filter_to_agent_map: {'agent' => [@mock_agent]}}}
@worker = Agents::TwitterStreamAgent::Worker.new(@config)
@worker.instance_variable_set(:@recent_tweets, [])
@worker.setup
mock(@worker).schedule_in(Agents::TwitterStreamAgent::Worker::RELOAD_TIMEOUT)
@worker.setup!(nil, Mutex.new)
end

context "#run" do
Expand Down Expand Up @@ -226,7 +227,7 @@ def stub_without(method = nil)

it "stop when no data was received"do
stub_without(:on_no_data).on_no_data.yields
mock(@worker).stop
mock(@worker).restart!
mock(STDERR).puts(" --> Got no data for awhile; trying to reconnect.")
@worker.send(:stream!, ['agent'], @agent)
end
Expand All @@ -235,7 +236,7 @@ def stub_without(method = nil)
stub_without(:on_max_reconnects).on_max_reconnects.yields
mock(STDERR).puts(" --> Oops, tried too many times! <--")
mock(@worker).sleep(60)
mock(@worker).stop
mock(@worker).restart!
@worker.send(:stream!, ['agent'], @agent)
end

Expand Down

0 comments on commit 44ac1ed

Please sign in to comment.