Skip to content

Commit

Permalink
add DelayAgent for buffering incoming Events
Browse files Browse the repository at this point in the history
  • Loading branch information
cantino committed Oct 3, 2015
1 parent 9c58474 commit 2d72e18
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 2 deletions.
65 changes: 65 additions & 0 deletions app/models/agents/delay_agent.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
module Agents
class DelayAgent < Agent
default_schedule "every_12h"

description <<-MD
The DelayAgent stores received Events and emits copies of them on a schedule. Use this as a buffer or queue of Events.
`max_events` should be set to the maximum number of events that you'd like to hold in the buffer. When this number is
reached, new events will either be ignored, or will displace the oldest event already in the buffer, depending on
whether you set `keep` to `newest` or `oldest`.
`expected_receive_period_in_days` is used to determine if the Agent is working. Set it to the maximum number of days
that you anticipate passing without this Agent receiving an incoming Event.
MD

def default_options
{
'expected_receive_period_in_days' => "10",
'max_events' => "100",
'keep' => 'newest'
}
end

def validate_options
unless options['expected_receive_period_in_days'].present? && options['expected_receive_period_in_days'].to_i > 0
errors.add(:base, "Please provide 'expected_receive_period_in_days' to indicate how many days can pass before this Agent is considered to be not working")
end

unless options['keep'].present? && options['keep'].in?(%w[newest oldest])
errors.add(:base, "The 'keep' option is required and must be set to 'oldest' or 'newest'")
end

unless options['max_events'].present? && options['max_events'].to_i > 0
errors.add(:base, "The 'max_events' option is required and must be an integer greater than 0")
end
end

def working?
last_receive_at && last_receive_at > options['expected_receive_period_in_days'].to_i.days.ago && !recent_error_logs?
end

def receive(incoming_events)
incoming_events.each do |event|
memory['event_ids'] ||= []
memory['event_ids'] << event.id
if memory['event_ids'].length > interpolated['max_events'].to_i
if interpolated['keep'] == 'newest'
memory['event_ids'].shift
else
memory['event_ids'].pop
end
end
end
end

def check
if memory['event_ids'] && memory['event_ids'].length > 0
received_events.where(id: memory['event_ids']).reorder('events.id asc').each do |event|
create_event payload: event.payload
end
memory['event_ids'] = []
end
end
end
end
4 changes: 2 additions & 2 deletions config/initializers/delayed_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
Delayed::Worker.delay_jobs = !Rails.env.test?
Delayed::Worker.sleep_delay = (ENV['DELAYED_JOB_SLEEP_DELAY'].presence || 10).to_f

# Delayed::Worker.logger = Logger.new(Rails.root.join('log', 'delayed_job.log'))
# Delayed::Worker.logger.level = Logger::DEBUG
Delayed::Worker.logger = Logger.new(STDOUT)
Delayed::Worker.logger.level = Logger::WARN

class Delayed::Job
scope :pending, ->{ where("locked_at IS NULL AND attempts = 0") }
Expand Down
112 changes: 112 additions & 0 deletions spec/models/agents/delay_agent_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
require 'spec_helper'

describe Agents::DelayAgent do
let(:agent) do
_agent = Agents::DelayAgent.new(name: 'My DelayAgent')
_agent.options = _agent.default_options.merge('max_events' => 2)
_agent.user = users(:bob)
_agent.sources << agents(:bob_website_agent)
_agent.save!
_agent
end

def create_event
_event = Event.new(payload: { random: rand })
_event.agent = agents(:bob_website_agent)
_event.save!
_event
end

let(:first_event) { create_event }
let(:second_event) { create_event }
let(:third_event) { create_event }

describe "#working?" do
it "checks if events have been received within expected receive period" do
expect(agent).not_to be_working
Agents::DelayAgent.async_receive agent.id, [events(:bob_website_agent_event).id]
expect(agent.reload).to be_working
the_future = (agent.options[:expected_receive_period_in_days].to_i + 1).days.from_now
stub(Time).now { the_future }
expect(agent.reload).not_to be_working
end
end

describe "validation" do
before do
expect(agent).to be_valid
end

it "should validate max_events" do
agent.options.delete('max_events')
expect(agent).not_to be_valid
agent.options['max_events'] = ""
expect(agent).not_to be_valid
agent.options['max_events'] = "0"
expect(agent).not_to be_valid
agent.options['max_events'] = "10"
expect(agent).to be_valid
end

it "should validate presence of expected_receive_period_in_days" do
agent.options['expected_receive_period_in_days'] = ""
expect(agent).not_to be_valid
agent.options['expected_receive_period_in_days'] = 0
expect(agent).not_to be_valid
agent.options['expected_receive_period_in_days'] = -1
expect(agent).not_to be_valid
end

it "should validate keep" do
agent.options.delete('keep')
expect(agent).not_to be_valid
agent.options['keep'] = ""
expect(agent).not_to be_valid
agent.options['keep'] = 'wrong'
expect(agent).not_to be_valid
agent.options['keep'] = 'newest'
expect(agent).to be_valid
agent.options['keep'] = 'oldest'
expect(agent).to be_valid
end
end

describe "#receive" do
it "records Events" do
expect(agent.memory).to be_empty
agent.receive([first_event])
expect(agent.memory).not_to be_empty
agent.receive([second_event])
expect(agent.memory['event_ids']).to eq [first_event.id, second_event.id]
end

it "keeps the newest when 'keep' is set to 'newest'" do
expect(agent.options['keep']).to eq 'newest'
agent.receive([first_event, second_event, third_event])
expect(agent.memory['event_ids']).to eq [second_event.id, third_event.id]
end

it "keeps the oldest when 'keep' is set to 'oldest'" do
agent.options['keep'] = 'oldest'
agent.receive([first_event, second_event, third_event])
expect(agent.memory['event_ids']).to eq [first_event.id, second_event.id]
end
end

describe "#check" do
it "re-emits Events and clears the memory" do
agent.receive([first_event, second_event, third_event])
expect(agent.memory['event_ids']).to eq [second_event.id, third_event.id]

expect {
agent.check
}.to change { agent.events.count }.by(2)

events = agent.events.reorder('events.id desc')
expect(events.first.payload).to eq third_event.payload
expect(events.second.payload).to eq second_event.payload

expect(agent.memory['event_ids']).to eq []
end
end
end

0 comments on commit 2d72e18

Please sign in to comment.