I'm aware that this library is likely EOL (#18) but wanted to share this in case others are seeing the same issue.
The dispatch_events function contains a bug when the demand reaches zero.
|
defp dispatch_events(state, 0, events) do |
|
{:noreply, Enum.reverse(events), state} |
|
end |
|
defp dispatch_events(state, demand, events) do |
|
case :queue.out(state.queue) do |
|
{{:value, event}, queue} -> |
|
dispatch_events(%{state | queue: queue}, demand - 1, [event | events]) |
|
{:empty, _} -> |
|
{:noreply, Enum.reverse(events), %{state | demand: demand}} |
|
end |
|
end |
The dispatch_events(state, demand, events) body does not reset state.demand. This can cause the internal demand counter to remain at 1 even though the demand has been met. This, in turn, can cause single events to be dropped as they are received from RabbitMQ, which prints a warning:
GenStage producer :my_rabbitmq_producer has discarded 1 events from buffer
This patch fixes the bug:
diff --git a/lib/wabbit/gen_stage.ex b/lib/wabbit/gen_stage.ex
index e230132..402e0a7 100644
--- a/lib/wabbit/gen_stage.ex
+++ b/lib/wabbit/gen_stage.ex
@@ -401,7 +401,8 @@ defmodule Wabbit.GenStage do
defp dispatch_events(state, demand, events) do
case :queue.out(state.queue) do
{{:value, event}, queue} ->
- dispatch_events(%{state | queue: queue}, demand - 1, [event | events])
+ new_demand = demand - 1
+ dispatch_events(%{state | queue: queue, demand: new_demand}, new_demand, [event | events])
{:empty, _} ->
{:noreply, Enum.reverse(events), %{state | demand: demand}}
end
I'm aware that this library is likely EOL (#18) but wanted to share this in case others are seeing the same issue.
The
dispatch_eventsfunction contains a bug when the demand reaches zero.wabbit/lib/wabbit/gen_stage.ex
Lines 398 to 408 in 26cf5d2
The
dispatch_events(state, demand, events)body does not resetstate.demand. This can cause the internal demand counter to remain at1even though the demand has been met. This, in turn, can cause single events to be dropped as they are received from RabbitMQ, which prints a warning:This patch fixes the bug: