Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion lib/amigo/retry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ def initialize(msg=nil)
# Raise this class, or a subclass of it, to:
# - Use +Retry+ exception semantics while the current attempt is <= +attempts+, or
# - Use +Die+ exception semantics if the current attempt is > +attempts+.
#
# Callers can provide a subclass with two methods that are looked for:
#
# If on_retry is defined, it is called with (worker instance, job hash).
# If on_retry returns +:skip+, do NOT retry (do not send to the retry set).
#
# If on_die is defined, it is called with (worker instance, job hash).
# If on_die returns +:skip+, do NOT send to the dead set.
class OrDie < Error
attr_reader :attempts, :interval_or_timestamp, :wrapped

Expand Down Expand Up @@ -99,13 +107,27 @@ def call(worker, job, _queue)
end

def handle_retry(worker, job, e)
if e.respond_to?(:on_retry)
callback_result = e.on_retry(worker, job)
if callback_result == :skip
Sidekiq.logger.warn("skipping_retryset_schedule")
return
end
end
Sidekiq.logger.info("scheduling_retry")
job["error_class"] = e.class.to_s
job["error_message"] = e.to_s
self.amigo_retry_in(worker.class, job, e.interval_or_timestamp)
end

def handle_die(_worker, job, e)
def handle_die(worker, job, e)
if e.respond_to?(:on_die)
callback_result = e.on_die(worker, job)
if callback_result == :skip
Sidekiq.logger.warn("skipping_deadset_send")
return
end
end
Sidekiq.logger.warn("sending_to_deadset")
job["error_class"] = e.class.to_s
job["error_message"] = e.to_s
Expand Down
10 changes: 9 additions & 1 deletion spec/amigo/amigo_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,19 @@
end.to_not publish("some-event")
end.to raise_error(
RSpec::Expectations::ExpectationNotMetError,
"expected a 'some-event' event not to be fired but one was: [{\"x\" => 1}].",
match(/expected a 'some-event' event not to be fired but one was: \[\{"x"\s?=>\s?1}\]\./),
)
end

it "can handle publish with no event name" do
expect do
expect do
described_class.publish("some-event", {x: 1})
end.to_not publish
end.to raise_error(
RSpec::Expectations::ExpectationNotMetError,
match(/expected a 'some-event' event not to be fired but one was: \[\{"x"\s?=>\s?1}\]\./),
)
end
end

Expand Down
199 changes: 133 additions & 66 deletions spec/amigo/retry_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,85 +36,152 @@ def self.to_s
cls
end

it "catches retry exceptions and reschedules with the given interval" do
kls = create_job_class(ex: Amigo::Retry::Retry.new(30, "trying"))
kls.perform_async(1)
describe "raising Retry" do
it "catches retry exceptions and reschedules with the given interval" do
kls = create_job_class(ex: Amigo::Retry::Retry.new(30, "trying"))
kls.perform_async(1)

expect(all_sidekiq_jobs(Sidekiq::Queue.new)).to have_attributes(length: 1)
drain_sidekiq_jobs(Sidekiq::Queue.new)

expect(all_sidekiq_jobs(Sidekiq::ScheduledSet.new)).to contain_exactly(
have_attributes(
score: be_within(5).of(Time.now.to_f + 30),
item: include("retry_count" => 1),
),
)

# Continue to retry
drain_sidekiq_jobs(Sidekiq::ScheduledSet.new)
sched2 = all_sidekiq_jobs(Sidekiq::ScheduledSet.new)
expect(sched2).to have_attributes(length: 1)
expect(sched2.first).to have_attributes(
score: be_within(5).of(Time.now.to_f + 30),
item: include("retry_count" => 2, "error_class" => "Amigo::Retry::Retry", "error_message" => "trying"),
)
end

expect(all_sidekiq_jobs(Sidekiq::Queue.new)).to have_attributes(length: 1)
drain_sidekiq_jobs(Sidekiq::Queue.new)
it "retries on the correct queue" do
kls = create_job_class(ex: Amigo::Retry::Retry.new(30)) do |cls|
cls.sidekiq_options queue: "otherq"
end
kls.perform_async(1)

expect(all_sidekiq_jobs(Sidekiq::ScheduledSet.new)).to contain_exactly(
have_attributes(
score: be_within(5).of(Time.now.to_f + 30),
item: include("retry_count" => 1),
),
)

# Continue to retry
drain_sidekiq_jobs(Sidekiq::ScheduledSet.new)
sched2 = all_sidekiq_jobs(Sidekiq::ScheduledSet.new)
expect(sched2).to have_attributes(length: 1)
expect(sched2.first).to have_attributes(
score: be_within(5).of(Time.now.to_f + 30),
item: include("retry_count" => 2, "error_class" => "Amigo::Retry::Retry", "error_message" => "trying"),
)
jobs = all_sidekiq_jobs(Sidekiq::Queue.new("otherq"))
expect(jobs).to have_attributes(length: 1)
drain_sidekiq_jobs(Sidekiq::Queue.new("otherq"))

# Should have moved to retry set
sched = all_sidekiq_jobs(Sidekiq::ScheduledSet.new)
expect(sched).to have_attributes(length: 1)
expect(sched.first).to have_attributes(queue: "otherq")
end
end

it "retries on the correct queue" do
kls = create_job_class(ex: Amigo::Retry::Retry.new(30)) do |cls|
cls.sidekiq_options queue: "otherq"
describe "raising Die" do
it "catches die exceptions and sends to the dead set" do
kls = create_job_class(ex: Amigo::Retry::Die.new("im dead"))
kls.perform_async(1)

drain_sidekiq_jobs(Sidekiq::Queue.new)

# Ends up in dead set, not scheduled set
expect(all_sidekiq_jobs(Sidekiq::ScheduledSet.new)).to be_empty
dead = all_sidekiq_jobs(Sidekiq::DeadSet.new)
expect(dead).to have_attributes(length: 1)
expect(dead.first).to have_attributes(
klass: kls.name,
args: [1],
item: include("error_class" => "Amigo::Retry::Die", "error_message" => "im dead"),
)
end
kls.perform_async(1)
end

jobs = all_sidekiq_jobs(Sidekiq::Queue.new("otherq"))
expect(jobs).to have_attributes(length: 1)
drain_sidekiq_jobs(Sidekiq::Queue.new("otherq"))
describe "raising OrDie" do
it "conditionally retries or dies depending on the retry count" do
kls = create_job_class(ex: Amigo::Retry::OrDie.new(2, 30))
kls.perform_async(1)

drain_sidekiq_jobs(Sidekiq::Queue.new) # will go to be retried
expect(all_sidekiq_jobs(Sidekiq::ScheduledSet.new)).to have_attributes(length: 1)
drain_sidekiq_jobs(Sidekiq::ScheduledSet.new) # retry once
expect(all_sidekiq_jobs(Sidekiq::ScheduledSet.new)).to have_attributes(length: 1)
drain_sidekiq_jobs(Sidekiq::ScheduledSet.new) # retry twice
expect(all_sidekiq_jobs(Sidekiq::ScheduledSet.new)).to have_attributes(length: 1)
drain_sidekiq_jobs(Sidekiq::ScheduledSet.new) # the third retry moves to the dead set
expect(all_sidekiq_jobs(Sidekiq::DeadSet.new)).to have_attributes(length: 1)
end

# Should have moved to retry set
sched = all_sidekiq_jobs(Sidekiq::ScheduledSet.new)
expect(sched).to have_attributes(length: 1)
expect(sched.first).to have_attributes(queue: "otherq")
end
it "will call the exception's on_retry callback on retry and on_die on die" do
retries = []
deaths = []
kls = nil
excls = Class.new(Amigo::Retry::OrDie) do
define_method(:on_retry) do |worker, job|
raise "wrong worker: #{worker}" unless worker.is_a?(kls)
raise "wrong job: #{job}" unless job["class"] == "Retry::TestJob"
retries << 1
end

define_method(:on_die) do |worker, job|
raise "wrong worker: #{worker}" unless worker.is_a?(kls)
raise "wrong job: #{job}" unless job["class"] == "Retry::TestJob"
deaths << 1
end
end
kls = create_job_class(ex: excls.new(2, 30))
kls.perform_async(1)

drain_sidekiq_jobs(Sidekiq::Queue.new) # will go to be retried
expect(all_sidekiq_jobs(Sidekiq::ScheduledSet.new)).to have_attributes(length: 1)
drain_sidekiq_jobs(Sidekiq::ScheduledSet.new) # retry once
expect(all_sidekiq_jobs(Sidekiq::ScheduledSet.new)).to have_attributes(length: 1)
drain_sidekiq_jobs(Sidekiq::ScheduledSet.new) # retry twice
expect(all_sidekiq_jobs(Sidekiq::ScheduledSet.new)).to have_attributes(length: 1)
drain_sidekiq_jobs(Sidekiq::ScheduledSet.new) # the third retry moves to the dead set
expect(all_sidekiq_jobs(Sidekiq::DeadSet.new)).to have_attributes(length: 1)
expect(retries).to eq([1, 1, 1])
expect(deaths).to eq([1])
end

it "catches die exceptions and sends to the dead set" do
kls = create_job_class(ex: Amigo::Retry::Die.new("im dead"))
kls.perform_async(1)

drain_sidekiq_jobs(Sidekiq::Queue.new)

# Ends up in dead set, not scheduled set
expect(all_sidekiq_jobs(Sidekiq::ScheduledSet.new)).to be_empty
dead = all_sidekiq_jobs(Sidekiq::DeadSet.new)
expect(dead).to have_attributes(length: 1)
expect(dead.first).to have_attributes(
klass: kls.name,
args: [1],
item: include("error_class" => "Amigo::Retry::Die", "error_message" => "im dead"),
)
end
it "ignores default retry behavior if callback returns :skip" do
excls = Class.new(Amigo::Retry::OrDie) { define_method(:on_retry) { |*_| :skip } }
kls = create_job_class(ex: excls.new(2, 30))
kls.perform_async(1)

drain_sidekiq_jobs(Sidekiq::Queue.new)
expect(all_sidekiq_jobs(Sidekiq::ScheduledSet.new)).to be_empty
expect(all_sidekiq_jobs(Sidekiq::DeadSet.new)).to be_empty
end

it "can conditionally retry or die depending on the retry count" do
kls = create_job_class(ex: Amigo::Retry::OrDie.new(2, 30))
kls.perform_async(1)

drain_sidekiq_jobs(Sidekiq::Queue.new) # will go to be retried
expect(all_sidekiq_jobs(Sidekiq::ScheduledSet.new)).to have_attributes(length: 1)
drain_sidekiq_jobs(Sidekiq::ScheduledSet.new) # retry once
expect(all_sidekiq_jobs(Sidekiq::ScheduledSet.new)).to have_attributes(length: 1)
drain_sidekiq_jobs(Sidekiq::ScheduledSet.new) # retry twice
expect(all_sidekiq_jobs(Sidekiq::ScheduledSet.new)).to have_attributes(length: 1)
drain_sidekiq_jobs(Sidekiq::ScheduledSet.new) # the third retry moves to the dead set
expect(all_sidekiq_jobs(Sidekiq::DeadSet.new)).to have_attributes(length: 1)
it "ignores default die behavior if callback returns :skip" do
excls = Class.new(Amigo::Retry::OrDie) do
define_method(:on_die) { |*_| :skip }
end
kls = create_job_class(ex: excls.new(2, 30))
kls.perform_async(1)

drain_sidekiq_jobs(Sidekiq::Queue.new) # will go to be retried
expect(all_sidekiq_jobs(Sidekiq::ScheduledSet.new)).to have_attributes(length: 1)
drain_sidekiq_jobs(Sidekiq::ScheduledSet.new) # retry once
expect(all_sidekiq_jobs(Sidekiq::ScheduledSet.new)).to have_attributes(length: 1)
drain_sidekiq_jobs(Sidekiq::ScheduledSet.new) # retry twice
expect(all_sidekiq_jobs(Sidekiq::ScheduledSet.new)).to have_attributes(length: 1)
drain_sidekiq_jobs(Sidekiq::ScheduledSet.new) # the third retry noops due to retry
expect(all_sidekiq_jobs(Sidekiq::DeadSet.new)).to be_empty
end
end

it "catches quit exceptions and ends the job" do
kls = create_job_class(ex: Amigo::Retry::Quit.new("gone"))
kls.perform_async(1)
describe "raising Quit" do
it "catches quit exceptions and ends the job" do
kls = create_job_class(ex: Amigo::Retry::Quit.new("gone"))
kls.perform_async(1)

drain_sidekiq_jobs(Sidekiq::Queue.new)
drain_sidekiq_jobs(Sidekiq::Queue.new)

expect(all_sidekiq_jobs(Sidekiq::ScheduledSet.new)).to be_empty
expect(all_sidekiq_jobs(Sidekiq::DeadSet.new)).to be_empty
expect(all_sidekiq_jobs(Sidekiq::ScheduledSet.new)).to be_empty
expect(all_sidekiq_jobs(Sidekiq::DeadSet.new)).to be_empty
end
end

describe "Retry" do
Expand Down