diff --git a/lib/amigo/retry.rb b/lib/amigo/retry.rb index 0fa7330..73d1693 100644 --- a/lib/amigo/retry.rb +++ b/lib/amigo/retry.rb @@ -59,6 +59,14 @@ def initialize(msg=nil) # Raise this class, or a subclass of it, to: # - Use +Retry+ exception semantics while the current attempt is <= +attempts+, or # - Use +Die+ exception semantics if the current attempt is > +attempts+. + # + # Callers can provide a subclass with two methods that are looked for: + # + # If on_retry is defined, it is called with (worker instance, job hash). + # If on_retry returns +:skip+, do NOT retry (do not send to the retry set). + # + # If on_die is defined, it is called with (worker instance, job hash). + # If on_die returns +:skip+, do NOT send to the dead set. class OrDie < Error attr_reader :attempts, :interval_or_timestamp, :wrapped @@ -99,13 +107,27 @@ def call(worker, job, _queue) end def handle_retry(worker, job, e) + if e.respond_to?(:on_retry) + callback_result = e.on_retry(worker, job) + if callback_result == :skip + Sidekiq.logger.warn("skipping_retryset_schedule") + return + end + end Sidekiq.logger.info("scheduling_retry") job["error_class"] = e.class.to_s job["error_message"] = e.to_s self.amigo_retry_in(worker.class, job, e.interval_or_timestamp) end - def handle_die(_worker, job, e) + def handle_die(worker, job, e) + if e.respond_to?(:on_die) + callback_result = e.on_die(worker, job) + if callback_result == :skip + Sidekiq.logger.warn("skipping_deadset_send") + return + end + end Sidekiq.logger.warn("sending_to_deadset") job["error_class"] = e.class.to_s job["error_message"] = e.to_s diff --git a/spec/amigo/amigo_spec.rb b/spec/amigo/amigo_spec.rb index c0c44ef..ae19104 100644 --- a/spec/amigo/amigo_spec.rb +++ b/spec/amigo/amigo_spec.rb @@ -147,11 +147,19 @@ end.to_not publish("some-event") end.to raise_error( RSpec::Expectations::ExpectationNotMetError, - "expected a 'some-event' event not to be fired but one was: [{\"x\" => 1}].", + match(/expected a 'some-event' event not to be fired but one was: \[\{"x"\s?=>\s?1}\]\./), ) end it "can handle publish with no event name" do + expect do + expect do + described_class.publish("some-event", {x: 1}) + end.to_not publish + end.to raise_error( + RSpec::Expectations::ExpectationNotMetError, + match(/expected a 'some-event' event not to be fired but one was: \[\{"x"\s?=>\s?1}\]\./), + ) end end diff --git a/spec/amigo/retry_spec.rb b/spec/amigo/retry_spec.rb index 2da24c1..a62e9de 100644 --- a/spec/amigo/retry_spec.rb +++ b/spec/amigo/retry_spec.rb @@ -36,85 +36,152 @@ def self.to_s cls end - it "catches retry exceptions and reschedules with the given interval" do - kls = create_job_class(ex: Amigo::Retry::Retry.new(30, "trying")) - kls.perform_async(1) + describe "raising Retry" do + it "catches retry exceptions and reschedules with the given interval" do + kls = create_job_class(ex: Amigo::Retry::Retry.new(30, "trying")) + kls.perform_async(1) + + expect(all_sidekiq_jobs(Sidekiq::Queue.new)).to have_attributes(length: 1) + drain_sidekiq_jobs(Sidekiq::Queue.new) + + expect(all_sidekiq_jobs(Sidekiq::ScheduledSet.new)).to contain_exactly( + have_attributes( + score: be_within(5).of(Time.now.to_f + 30), + item: include("retry_count" => 1), + ), + ) + + # Continue to retry + drain_sidekiq_jobs(Sidekiq::ScheduledSet.new) + sched2 = all_sidekiq_jobs(Sidekiq::ScheduledSet.new) + expect(sched2).to have_attributes(length: 1) + expect(sched2.first).to have_attributes( + score: be_within(5).of(Time.now.to_f + 30), + item: include("retry_count" => 2, "error_class" => "Amigo::Retry::Retry", "error_message" => "trying"), + ) + end - expect(all_sidekiq_jobs(Sidekiq::Queue.new)).to have_attributes(length: 1) - drain_sidekiq_jobs(Sidekiq::Queue.new) + it "retries on the correct queue" do + kls = create_job_class(ex: Amigo::Retry::Retry.new(30)) do |cls| + cls.sidekiq_options queue: "otherq" + end + kls.perform_async(1) - expect(all_sidekiq_jobs(Sidekiq::ScheduledSet.new)).to contain_exactly( - have_attributes( - score: be_within(5).of(Time.now.to_f + 30), - item: include("retry_count" => 1), - ), - ) - - # Continue to retry - drain_sidekiq_jobs(Sidekiq::ScheduledSet.new) - sched2 = all_sidekiq_jobs(Sidekiq::ScheduledSet.new) - expect(sched2).to have_attributes(length: 1) - expect(sched2.first).to have_attributes( - score: be_within(5).of(Time.now.to_f + 30), - item: include("retry_count" => 2, "error_class" => "Amigo::Retry::Retry", "error_message" => "trying"), - ) + jobs = all_sidekiq_jobs(Sidekiq::Queue.new("otherq")) + expect(jobs).to have_attributes(length: 1) + drain_sidekiq_jobs(Sidekiq::Queue.new("otherq")) + + # Should have moved to retry set + sched = all_sidekiq_jobs(Sidekiq::ScheduledSet.new) + expect(sched).to have_attributes(length: 1) + expect(sched.first).to have_attributes(queue: "otherq") + end end - it "retries on the correct queue" do - kls = create_job_class(ex: Amigo::Retry::Retry.new(30)) do |cls| - cls.sidekiq_options queue: "otherq" + describe "raising Die" do + it "catches die exceptions and sends to the dead set" do + kls = create_job_class(ex: Amigo::Retry::Die.new("im dead")) + kls.perform_async(1) + + drain_sidekiq_jobs(Sidekiq::Queue.new) + + # Ends up in dead set, not scheduled set + expect(all_sidekiq_jobs(Sidekiq::ScheduledSet.new)).to be_empty + dead = all_sidekiq_jobs(Sidekiq::DeadSet.new) + expect(dead).to have_attributes(length: 1) + expect(dead.first).to have_attributes( + klass: kls.name, + args: [1], + item: include("error_class" => "Amigo::Retry::Die", "error_message" => "im dead"), + ) end - kls.perform_async(1) + end - jobs = all_sidekiq_jobs(Sidekiq::Queue.new("otherq")) - expect(jobs).to have_attributes(length: 1) - drain_sidekiq_jobs(Sidekiq::Queue.new("otherq")) + describe "raising OrDie" do + it "conditionally retries or dies depending on the retry count" do + kls = create_job_class(ex: Amigo::Retry::OrDie.new(2, 30)) + kls.perform_async(1) + + drain_sidekiq_jobs(Sidekiq::Queue.new) # will go to be retried + expect(all_sidekiq_jobs(Sidekiq::ScheduledSet.new)).to have_attributes(length: 1) + drain_sidekiq_jobs(Sidekiq::ScheduledSet.new) # retry once + expect(all_sidekiq_jobs(Sidekiq::ScheduledSet.new)).to have_attributes(length: 1) + drain_sidekiq_jobs(Sidekiq::ScheduledSet.new) # retry twice + expect(all_sidekiq_jobs(Sidekiq::ScheduledSet.new)).to have_attributes(length: 1) + drain_sidekiq_jobs(Sidekiq::ScheduledSet.new) # the third retry moves to the dead set + expect(all_sidekiq_jobs(Sidekiq::DeadSet.new)).to have_attributes(length: 1) + end - # Should have moved to retry set - sched = all_sidekiq_jobs(Sidekiq::ScheduledSet.new) - expect(sched).to have_attributes(length: 1) - expect(sched.first).to have_attributes(queue: "otherq") - end + it "will call the exception's on_retry callback on retry and on_die on die" do + retries = [] + deaths = [] + kls = nil + excls = Class.new(Amigo::Retry::OrDie) do + define_method(:on_retry) do |worker, job| + raise "wrong worker: #{worker}" unless worker.is_a?(kls) + raise "wrong job: #{job}" unless job["class"] == "Retry::TestJob" + retries << 1 + end + + define_method(:on_die) do |worker, job| + raise "wrong worker: #{worker}" unless worker.is_a?(kls) + raise "wrong job: #{job}" unless job["class"] == "Retry::TestJob" + deaths << 1 + end + end + kls = create_job_class(ex: excls.new(2, 30)) + kls.perform_async(1) + + drain_sidekiq_jobs(Sidekiq::Queue.new) # will go to be retried + expect(all_sidekiq_jobs(Sidekiq::ScheduledSet.new)).to have_attributes(length: 1) + drain_sidekiq_jobs(Sidekiq::ScheduledSet.new) # retry once + expect(all_sidekiq_jobs(Sidekiq::ScheduledSet.new)).to have_attributes(length: 1) + drain_sidekiq_jobs(Sidekiq::ScheduledSet.new) # retry twice + expect(all_sidekiq_jobs(Sidekiq::ScheduledSet.new)).to have_attributes(length: 1) + drain_sidekiq_jobs(Sidekiq::ScheduledSet.new) # the third retry moves to the dead set + expect(all_sidekiq_jobs(Sidekiq::DeadSet.new)).to have_attributes(length: 1) + expect(retries).to eq([1, 1, 1]) + expect(deaths).to eq([1]) + end - it "catches die exceptions and sends to the dead set" do - kls = create_job_class(ex: Amigo::Retry::Die.new("im dead")) - kls.perform_async(1) - - drain_sidekiq_jobs(Sidekiq::Queue.new) - - # Ends up in dead set, not scheduled set - expect(all_sidekiq_jobs(Sidekiq::ScheduledSet.new)).to be_empty - dead = all_sidekiq_jobs(Sidekiq::DeadSet.new) - expect(dead).to have_attributes(length: 1) - expect(dead.first).to have_attributes( - klass: kls.name, - args: [1], - item: include("error_class" => "Amigo::Retry::Die", "error_message" => "im dead"), - ) - end + it "ignores default retry behavior if callback returns :skip" do + excls = Class.new(Amigo::Retry::OrDie) { define_method(:on_retry) { |*_| :skip } } + kls = create_job_class(ex: excls.new(2, 30)) + kls.perform_async(1) + + drain_sidekiq_jobs(Sidekiq::Queue.new) + expect(all_sidekiq_jobs(Sidekiq::ScheduledSet.new)).to be_empty + expect(all_sidekiq_jobs(Sidekiq::DeadSet.new)).to be_empty + end - it "can conditionally retry or die depending on the retry count" do - kls = create_job_class(ex: Amigo::Retry::OrDie.new(2, 30)) - kls.perform_async(1) - - drain_sidekiq_jobs(Sidekiq::Queue.new) # will go to be retried - expect(all_sidekiq_jobs(Sidekiq::ScheduledSet.new)).to have_attributes(length: 1) - drain_sidekiq_jobs(Sidekiq::ScheduledSet.new) # retry once - expect(all_sidekiq_jobs(Sidekiq::ScheduledSet.new)).to have_attributes(length: 1) - drain_sidekiq_jobs(Sidekiq::ScheduledSet.new) # retry twice - expect(all_sidekiq_jobs(Sidekiq::ScheduledSet.new)).to have_attributes(length: 1) - drain_sidekiq_jobs(Sidekiq::ScheduledSet.new) # the third retry moves to the dead set - expect(all_sidekiq_jobs(Sidekiq::DeadSet.new)).to have_attributes(length: 1) + it "ignores default die behavior if callback returns :skip" do + excls = Class.new(Amigo::Retry::OrDie) do + define_method(:on_die) { |*_| :skip } + end + kls = create_job_class(ex: excls.new(2, 30)) + kls.perform_async(1) + + drain_sidekiq_jobs(Sidekiq::Queue.new) # will go to be retried + expect(all_sidekiq_jobs(Sidekiq::ScheduledSet.new)).to have_attributes(length: 1) + drain_sidekiq_jobs(Sidekiq::ScheduledSet.new) # retry once + expect(all_sidekiq_jobs(Sidekiq::ScheduledSet.new)).to have_attributes(length: 1) + drain_sidekiq_jobs(Sidekiq::ScheduledSet.new) # retry twice + expect(all_sidekiq_jobs(Sidekiq::ScheduledSet.new)).to have_attributes(length: 1) + drain_sidekiq_jobs(Sidekiq::ScheduledSet.new) # the third retry noops due to retry + expect(all_sidekiq_jobs(Sidekiq::DeadSet.new)).to be_empty + end end - it "catches quit exceptions and ends the job" do - kls = create_job_class(ex: Amigo::Retry::Quit.new("gone")) - kls.perform_async(1) + describe "raising Quit" do + it "catches quit exceptions and ends the job" do + kls = create_job_class(ex: Amigo::Retry::Quit.new("gone")) + kls.perform_async(1) - drain_sidekiq_jobs(Sidekiq::Queue.new) + drain_sidekiq_jobs(Sidekiq::Queue.new) - expect(all_sidekiq_jobs(Sidekiq::ScheduledSet.new)).to be_empty - expect(all_sidekiq_jobs(Sidekiq::DeadSet.new)).to be_empty + expect(all_sidekiq_jobs(Sidekiq::ScheduledSet.new)).to be_empty + expect(all_sidekiq_jobs(Sidekiq::DeadSet.new)).to be_empty + end end describe "Retry" do