Skip to content

Commit ff45de1

Browse files
authored
feat: LSN tracker persistence through replication slot confirmed flush position (#3462)
Closes #3448 This is part of a larger piece of work described in #3442 The initial idea was to have a separate, periodic persister of the last processed LSN, but as I was working through the code I realised that we might not even need that. Since now consumers are idempotent in the transactions they handle (see #3452), even the our global last processed LSN is slightly behind any given shape that is ok. Instead of persisting it, we actually recover it from the replication slot's confirmed flush LSN. - If the slot is new, we reset the last processed LSN to the new slot's LSN since all shapes will be invalidated - If the slot is an existing one, we initialise it to the confirmed flush LSN - If the reconnection to the slot happens at runtime (e.g. because of network partition), we preserve the current last processed LSN - If the reconnection happens at startup, we initialise the LSN from the slot. This value might be _slightly_ behind if shapes processed transactions that they did not manage to acknowledge back, but we can tolerate this. This way, shape recovery does not have to read all shapes' last processed LSN during startup. If we need stronger guarantees of the global processed lsn being the max, we can also attempt to set it everytime we lazily recover an existing shape (if the shape's max lsn is larger than the current max processed lsn). Happy to include this in this PR or a next one.
1 parent a36bf4c commit ff45de1

File tree

16 files changed

+169
-112
lines changed

16 files changed

+169
-112
lines changed

.changeset/polite-moons-compete.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@core/sync-service': patch
3+
---
4+
5+
Replace max LSN recovery from on-disk shapes with direct read from replication slot flushed LSN.

packages/sync-service/lib/electric/lsn_tracker.ex

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,47 @@ defmodule Electric.LsnTracker do
22
alias Electric.Postgres.Lsn
33

44
# this function is idempotent to avoid problems in tests
5-
def create_table(stack_id) do
5+
@spec initialize(Electric.stack_id()) :: :ok
6+
def initialize(stack_id) do
67
table = table(stack_id)
78

89
case :ets.info(table, :id) do
910
:undefined ->
1011
:ets.new(table, [:public, :named_table])
12+
:ok
1113

1214
ref when is_reference(ref) ->
1315
:ok
1416
end
1517
end
1618

17-
@spec set_last_processed_lsn(Lsn.t() | non_neg_integer(), String.t()) :: :ok
18-
def set_last_processed_lsn(lsn, stack_id) when is_struct(lsn, Lsn) do
19+
@spec set_last_processed_lsn(Electric.stack_id(), Lsn.t() | non_neg_integer()) :: :ok
20+
def set_last_processed_lsn(stack_id, lsn) when is_struct(lsn, Lsn) do
1921
stack_id
2022
|> table()
2123
|> :ets.insert({:last_processed_lsn, lsn})
2224

2325
:ok
2426
end
2527

26-
def set_last_processed_lsn(lsn, stack_id) when is_integer(lsn) do
27-
set_last_processed_lsn(Lsn.from_integer(lsn), stack_id)
28+
def set_last_processed_lsn(stack_id, lsn) when is_integer(lsn) do
29+
set_last_processed_lsn(stack_id, Lsn.from_integer(lsn))
30+
end
31+
32+
@spec initialize_last_processed_lsn(Electric.stack_id(), Lsn.t()) :: :ok
33+
def initialize_last_processed_lsn(stack_id, lsn) when is_struct(lsn, Lsn) do
34+
stack_id
35+
|> table()
36+
|> :ets.insert_new({:last_processed_lsn, lsn})
37+
38+
:ok
39+
end
40+
41+
def initialize_last_processed_lsn(stack_id, lsn) when is_integer(lsn) do
42+
initialize_last_processed_lsn(stack_id, Lsn.from_integer(lsn))
2843
end
2944

30-
@spec get_last_processed_lsn(String.t()) :: Lsn.t()
45+
@spec get_last_processed_lsn(Electric.stack_id()) :: Lsn.t()
3146
def get_last_processed_lsn(stack_id) do
3247
[last_processed_lsn: lsn] =
3348
stack_id

packages/sync-service/lib/electric/postgres/replication_client.ex

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ defmodule Electric.Postgres.ReplicationClient do
2424
| :check_if_publication_exists
2525
| :drop_slot
2626
| :create_slot
27+
| :query_slot_flushed_lsn
2728
| :set_display_setting
2829
| :ready_to_stream
2930
| :start_streaming
@@ -246,8 +247,15 @@ defmodule Electric.Postgres.ReplicationClient do
246247
end
247248
end
248249

249-
if current_step == :create_slot and extra_info == :created_new_slot,
250-
do: notify_created_new_slot(state)
250+
# for new slots, always reset the last processed LSN
251+
if current_step == :create_slot and extra_info == :created_new_slot do
252+
Electric.LsnTracker.set_last_processed_lsn(state.stack_id, state.flushed_wal)
253+
notify_created_new_slot(state)
254+
end
255+
256+
# for existing slots, populate the last processed LSN if not present
257+
if current_step == :query_slot_flushed_lsn,
258+
do: Electric.LsnTracker.initialize_last_processed_lsn(state.stack_id, state.flushed_wal)
251259

252260
if next_step == :ready_to_stream,
253261
do: notify_ready_to_stream(state)

packages/sync-service/lib/electric/postgres/replication_client/connection_setup.ex

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do
99
"""
1010
alias Electric.Utils
1111
alias Electric.Postgres.ReplicationClient.State
12+
alias Electric.Postgres.Lsn
1213

1314
require Logger
1415

@@ -263,8 +264,9 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do
263264
} = result
264265

265266
Logger.debug("Created new slot at lsn=#{lsn_str}")
267+
lsn = lsn_str |> Lsn.from_string() |> Lsn.to_integer()
266268

267-
{:created_new_slot, state}
269+
{:created_new_slot, %{state | flushed_wal: lsn}}
268270
end
269271

270272
defp create_slot_result(%Postgrex.Error{} = error, state) do
@@ -284,6 +286,32 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do
284286

285287
###
286288

289+
defp query_slot_flushed_lsn_query(%State{slot_name: slot_name} = state) do
290+
Logger.debug("ReplicationClient step: query_slot_flushed_lsn")
291+
292+
query = """
293+
SELECT confirmed_flush_lsn
294+
FROM pg_replication_slots
295+
WHERE slot_name = #{Utils.quote_string(slot_name)}
296+
"""
297+
298+
{:query, query, state}
299+
end
300+
301+
defp query_slot_flushed_lsn_result([%Postgrex.Result{} = result], state) do
302+
%{rows: [[lsn_str]]} = result
303+
Logger.debug("Queried existing slot flushed lsn=#{lsn_str}")
304+
lsn = lsn_str |> Lsn.from_string() |> Lsn.to_integer()
305+
%{state | flushed_wal: lsn}
306+
end
307+
308+
defp query_slot_flushed_lsn_result(%Postgrex.Error{} = error, _state) do
309+
# Unexpected error, fail loudly.
310+
raise error
311+
end
312+
313+
###
314+
287315
defp set_display_setting_query(%{display_settings: [query | rest]} = state) do
288316
Logger.debug("ReplicationClient step: set_display_setting")
289317
{:query, query, %{state | display_settings: rest}}
@@ -363,9 +391,15 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do
363391
defp next_step(%{step: :drop_slot}),
364392
do: :create_slot
365393

394+
defp next_step(%{step: :create_slot, flushed_wal: 0}),
395+
do: :query_slot_flushed_lsn
396+
366397
defp next_step(%{step: :create_slot}),
367398
do: :set_display_setting
368399

400+
defp next_step(%{step: :query_slot_flushed_lsn}),
401+
do: :set_display_setting
402+
369403
defp next_step(%{step: :set_display_setting, display_settings: queries}) when queries != [],
370404
do: :set_display_setting
371405

@@ -392,6 +426,7 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do
392426

393427
defp query_for_step(:drop_slot, state), do: drop_slot_query(state)
394428
defp query_for_step(:create_slot, state), do: create_slot_query(state)
429+
defp query_for_step(:query_slot_flushed_lsn, state), do: query_slot_flushed_lsn_query(state)
395430
defp query_for_step(:set_display_setting, state), do: set_display_setting_query(state)
396431
defp query_for_step(:ready_to_stream, state), do: ready_to_stream(state)
397432
defp query_for_step(:start_streaming, state), do: start_replication_slot_query(state)
@@ -424,6 +459,9 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do
424459
defp dispatch_query_result(:create_slot, result, state),
425460
do: create_slot_result(result, state)
426461

462+
defp dispatch_query_result(:query_slot_flushed_lsn, result, state),
463+
do: query_slot_flushed_lsn_result(result, state)
464+
427465
defp dispatch_query_result(:set_display_setting, result, state),
428466
do: set_display_setting_result(result, state)
429467
end

packages/sync-service/lib/electric/replication/shape_log_collector.ex

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ defmodule Electric.Replication.ShapeLogCollector do
4747
Electric.ProcessRegistry.name(stack_id, __MODULE__)
4848
end
4949

50-
def set_last_processed_lsn(server_ref, last_processed_lsn) do
51-
GenServer.call(server(server_ref), {:set_last_processed_lsn, last_processed_lsn})
50+
def mark_as_ready(server_ref) do
51+
GenServer.call(server(server_ref), :mark_as_ready)
5252
end
5353

5454
# use `GenServer.call/2` here to make the event processing synchronous.
@@ -212,8 +212,8 @@ defmodule Electric.Replication.ShapeLogCollector do
212212
)
213213
end
214214

215-
def handle_call({:set_last_processed_lsn, lsn}, _from, state) do
216-
LsnTracker.set_last_processed_lsn(lsn, state.stack_id)
215+
def handle_call(:mark_as_ready, _from, state) do
216+
lsn = LsnTracker.get_last_processed_lsn(state.stack_id)
217217
Electric.StatusMonitor.mark_shape_log_collector_ready(state.stack_id, self())
218218
{:reply, :ok, Map.put(state, :last_processed_lsn, lsn)}
219219
end
@@ -377,7 +377,7 @@ defmodule Electric.Replication.ShapeLogCollector do
377377

378378
OpenTelemetry.start_interval("shape_log_collector.set_last_processed_lsn")
379379

380-
LsnTracker.set_last_processed_lsn(state.last_processed_lsn, state.stack_id)
380+
LsnTracker.set_last_processed_lsn(state.stack_id, state.last_processed_lsn)
381381

382382
flush_tracker =
383383
if is_struct(event, Transaction) do

packages/sync-service/lib/electric/shape_cache.ex

Lines changed: 7 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,9 @@ end
2727
defmodule Electric.ShapeCache do
2828
use GenServer
2929

30-
alias Electric.Postgres.Lsn
3130
alias Electric.Replication.LogOffset
3231
alias Electric.Replication.ShapeLogCollector
3332
alias Electric.ShapeCache.ShapeStatus
34-
alias Electric.ShapeCache
3533
alias Electric.Shapes
3634
alias Electric.ShapeCache.ShapeCleaner
3735
alias Electric.Shapes.Shape
@@ -195,29 +193,30 @@ defmodule Electric.ShapeCache do
195193
subscription: nil
196194
}
197195

198-
{:ok, state, {:continue, :recover_shapes}}
196+
{:ok, state, {:continue, :wait_for_restore}}
199197
end
200198

201199
@impl GenServer
202-
def handle_continue(:recover_shapes, state) do
200+
def handle_continue(:wait_for_restore, state) do
203201
start_time = System.monotonic_time()
204-
{last_processed_lsn, total_recovered, total_failed_to_recover} = recover_shapes(state)
202+
203+
total_recovered = ShapeStatus.count_shapes(state.stack_id)
205204

206205
Electric.Replication.PublicationManager.wait_for_restore(state.stack_id)
207206

208207
# Let ShapeLogCollector that it can start processing after finishing this function so that
209208
# we're subscribed to the producer before it starts forwarding its demand.
210-
ShapeLogCollector.set_last_processed_lsn(state.stack_id, last_processed_lsn)
209+
ShapeLogCollector.mark_as_ready(state.stack_id)
211210

212211
duration = System.monotonic_time() - start_time
213212

214213
Logger.notice(
215-
"Consumers ready in #{System.convert_time_unit(duration, :native, :millisecond)}ms (#{total_recovered} shapes, #{total_failed_to_recover} failed to recover)"
214+
"Consumers ready in #{System.convert_time_unit(duration, :native, :millisecond)}ms (#{total_recovered} shapes)"
216215
)
217216

218217
Electric.Telemetry.OpenTelemetry.execute(
219218
[:electric, :connection, :consumers_ready],
220-
%{duration: duration, total: total_recovered, failed_to_recover: total_failed_to_recover},
219+
%{duration: duration, total: total_recovered},
221220
%{stack_id: state.stack_id}
222221
)
223222

@@ -255,55 +254,6 @@ defmodule Electric.ShapeCache do
255254
end
256255
end
257256

258-
defp recover_shapes(%{stack_id: stack_id} = _state) do
259-
import Electric.Postgres.Lsn, only: [is_larger: 2]
260-
261-
start_time = System.monotonic_time()
262-
storage = ShapeCache.Storage.for_stack(stack_id)
263-
all_handles_and_shapes = ShapeStatus.list_shapes(stack_id)
264-
265-
{max_lsn, total_recovered} =
266-
all_handles_and_shapes
267-
|> Task.async_stream(
268-
fn {shape_handle, shape} ->
269-
shape_storage = ShapeCache.Storage.for_shape(shape_handle, storage)
270-
271-
case ShapeCache.Storage.get_current_position(shape_storage) do
272-
{:ok, latest_offset, _pg_snapshot} ->
273-
{shape_handle, LogOffset.extract_lsn(latest_offset)}
274-
275-
{:error, reason} ->
276-
Logger.error([
277-
"shape #{inspect(shape)} (#{inspect(shape_handle)})",
278-
" returned error from get_current_position: #{inspect(reason)}"
279-
])
280-
281-
ShapeCleaner.remove_shape(stack_id, shape_handle)
282-
283-
{shape_handle, :error}
284-
end
285-
end,
286-
ordered: false
287-
)
288-
|> Enum.reduce({Lsn.from_integer(0), 0}, fn
289-
{:ok, {_handle, :error}}, acc -> acc
290-
{:ok, {_handle, lsn}}, {max, recovered} when is_larger(lsn, max) -> {lsn, recovered + 1}
291-
_, {max, recovered} -> {max, recovered + 1}
292-
end)
293-
294-
total_failed_to_recover = length(all_handles_and_shapes) - total_recovered
295-
296-
duration = System.monotonic_time() - start_time
297-
298-
Logger.info([
299-
"Restored LSN position #{max_lsn} in",
300-
" #{System.convert_time_unit(duration, :native, :millisecond)}ms",
301-
" (#{total_recovered} shapes, #{total_failed_to_recover} failed to recover)"
302-
])
303-
304-
{max_lsn, total_recovered, total_failed_to_recover}
305-
end
306-
307257
defp maybe_create_shape(shape, otel_ctx, %{stack_id: stack_id} = state) do
308258
if shape_state = ShapeStatus.get_existing_shape(stack_id, shape) do
309259
shape_state

packages/sync-service/lib/electric/shape_cache/shape_status_owner.ex

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,7 @@ defmodule Electric.ShapeCache.ShapeStatusOwner do
4141
Electric.Telemetry.Sentry.set_tags_context(stack_id: stack_id)
4242

4343
:ok = ShapeStatus.initialize_from_storage(stack_id, config.storage)
44-
45-
Electric.LsnTracker.create_table(stack_id)
44+
:ok = Electric.LsnTracker.initialize(stack_id)
4645

4746
{:ok, %{stack_id: stack_id, backup_dir: ShapeStatus.backup_dir(config.storage)}}
4847
end

packages/sync-service/test/electric/lsn_tracker_test.exs

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,28 +9,68 @@ defmodule Electric.LsnTrackerTest do
99

1010
describe "get_last_processed_lsn/1" do
1111
setup ctx do
12-
LsnTracker.create_table(ctx.stack_id)
12+
LsnTracker.initialize(ctx.stack_id)
1313
:ok
1414
end
1515

1616
test "returns inital lsn", %{stack_id: stack_id} do
1717
lsn = Lsn.from_integer(7)
18-
LsnTracker.set_last_processed_lsn(lsn, stack_id)
18+
LsnTracker.set_last_processed_lsn(stack_id, lsn)
1919

2020
assert LsnTracker.get_last_processed_lsn(stack_id) == lsn
2121
end
2222

2323
test "returns last set lsn", %{stack_id: stack_id} do
2424
lsn = Lsn.from_integer(7)
25-
LsnTracker.set_last_processed_lsn(lsn, stack_id)
25+
LsnTracker.set_last_processed_lsn(stack_id, lsn)
2626

2727
lsn = Lsn.from_integer(77)
28-
LsnTracker.set_last_processed_lsn(lsn, stack_id)
28+
LsnTracker.set_last_processed_lsn(stack_id, lsn)
2929

3030
lsn = Lsn.from_integer(111)
31-
LsnTracker.set_last_processed_lsn(lsn, stack_id)
31+
LsnTracker.set_last_processed_lsn(stack_id, lsn)
3232

3333
assert LsnTracker.get_last_processed_lsn(stack_id) == lsn
3434
end
3535
end
36+
37+
describe "initialize_last_processed_lsn/2" do
38+
setup ctx do
39+
LsnTracker.initialize(ctx.stack_id)
40+
:ok
41+
end
42+
43+
test "sets lsn when not previously set", %{stack_id: stack_id} do
44+
lsn = Lsn.from_integer(42)
45+
assert :ok = LsnTracker.initialize_last_processed_lsn(stack_id, lsn)
46+
assert LsnTracker.get_last_processed_lsn(stack_id) == lsn
47+
end
48+
49+
test "accepts integer and converts to Lsn", %{stack_id: stack_id} do
50+
assert :ok = LsnTracker.initialize_last_processed_lsn(stack_id, 100)
51+
assert LsnTracker.get_last_processed_lsn(stack_id) == Lsn.from_integer(100)
52+
end
53+
54+
test "does not overwrite existing lsn", %{stack_id: stack_id} do
55+
initial_lsn = Lsn.from_integer(50)
56+
LsnTracker.set_last_processed_lsn(stack_id, initial_lsn)
57+
58+
new_lsn = Lsn.from_integer(100)
59+
assert :ok = LsnTracker.initialize_last_processed_lsn(stack_id, new_lsn)
60+
61+
# Should still be the initial LSN, not the new one
62+
assert LsnTracker.get_last_processed_lsn(stack_id) == initial_lsn
63+
end
64+
65+
test "is idempotent - can be called multiple times", %{stack_id: stack_id} do
66+
lsn1 = Lsn.from_integer(10)
67+
lsn2 = Lsn.from_integer(20)
68+
69+
assert :ok = LsnTracker.initialize_last_processed_lsn(stack_id, lsn1)
70+
assert :ok = LsnTracker.initialize_last_processed_lsn(stack_id, lsn2)
71+
72+
# First call wins
73+
assert LsnTracker.get_last_processed_lsn(stack_id) == lsn1
74+
end
75+
end
3676
end

0 commit comments

Comments
 (0)