diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 929332ee8..16681a9ed 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -76,6 +76,14 @@ jobs:
- name: Compute cache key
id: cache-key
run: echo "key=pgdog-bin-${{ runner.os }}-$(bash integration/ci/cache-key.sh)" >> "$GITHUB_OUTPUT"
+ # rust-cache must run before the binary restore: it lays down a
+ # stale target/ that can otherwise wipe target/release/pgdog when
+ # cargo reconciles fingerprints during plugin builds.
+ - name: Restore Rust cache for plugin builds
+ if: matrix.needs_rust_cache
+ uses: useblacksmith/rust-cache@v3
+ with:
+ prefix-key: build-v1
- name: Restore pgdog binaries
uses: useblacksmith/cache/restore@v5
with:
@@ -84,11 +92,6 @@ jobs:
target/release/pgdog
key: ${{ steps.cache-key.outputs.key }}
fail-on-cache-miss: true
- - name: Restore Rust cache for plugin builds
- if: matrix.needs_rust_cache
- uses: useblacksmith/rust-cache@v3
- with:
- prefix-key: build-v1
- name: Setup dependencies
run: bash integration/ci/setup.sh --with-toxi
- name: Run ${{ matrix.name }}
diff --git a/.schema/pgdog.schema.json b/.schema/pgdog.schema.json
index 24c9d2acc..bdecdeafd 100644
--- a/.schema/pgdog.schema.json
+++ b/.schema/pgdog.schema.json
@@ -108,6 +108,10 @@
"tls_verify": "prefer",
"two_phase_commit": false,
"two_phase_commit_auto": null,
+ "two_phase_commit_wal_checkpoint_interval": 60,
+ "two_phase_commit_wal_dir": null,
+ "two_phase_commit_wal_fsync_interval": 2,
+ "two_phase_commit_wal_segment_size": 16777216,
"unique_id_function": "standard",
"unique_id_min": 0,
"workers": 2
@@ -1074,6 +1078,35 @@
],
"default": null
},
+ "two_phase_commit_wal_checkpoint_interval": {
+ "description": "How often, in seconds, to write a checkpoint record to the two-phase commit WAL and garbage-collect old segments.\n\n_Default:_ `60`\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/general/#two_phase_commit_wal_checkpoint_interval",
+ "type": "integer",
+ "format": "uint64",
+ "default": 60,
+ "minimum": 0
+ },
+ "two_phase_commit_wal_dir": {
+ "description": "Directory where the two-phase commit write-ahead log is stored.\n\n**Note:** This setting cannot be changed at runtime. PgDog acquires an exclusive `flock` on `
/.lock` at startup. If the directory cannot be created or written to, or another PgDog process already holds the lock, the WAL is disabled and a warning is logged: 2PC will continue to function but will not be durable across restarts.\n\n_Default:_ `./pgdog_wal`\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/general/#two_phase_commit_wal_dir",
+ "type": [
+ "string",
+ "null"
+ ],
+ "default": null
+ },
+ "two_phase_commit_wal_fsync_interval": {
+ "description": "How long, in milliseconds, the two-phase commit WAL writer waits to coalesce concurrent appends into a single fsync.\n\n**Note:** Setting this to `0` disables waiting for additional appends; records already queued in the channel when the writer wakes are still batched into one fsync. Higher values trade per-transaction commit latency for fewer fsyncs under load.\n\n_Default:_ `2`\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/general/#two_phase_commit_wal_fsync_interval",
+ "type": "integer",
+ "format": "uint64",
+ "default": 2,
+ "minimum": 0
+ },
+ "two_phase_commit_wal_segment_size": {
+ "description": "Maximum size, in bytes, of a single two-phase commit WAL segment file before it is rotated.\n\n_Default:_ `16777216` (16 MiB)\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/general/#two_phase_commit_wal_segment_size",
+ "type": "integer",
+ "format": "uint64",
+ "default": 16777216,
+ "minimum": 0
+ },
"unique_id_function": {
"description": "Unique ID generation function.",
"$ref": "#/$defs/UniqueIdFunction",
diff --git a/Cargo.lock b/Cargo.lock
index 70e762f71..82797c972 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1148,6 +1148,15 @@ version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5"
+[[package]]
+name = "crc32c"
+version = "0.6.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3a47af21622d091a8f0fb295b88bc886ac74efcc613efc19f5d0b21de5c89e47"
+dependencies = [
+ "rustc_version",
+]
+
[[package]]
name = "crc32fast"
version = "1.5.0"
@@ -3199,6 +3208,7 @@ dependencies = [
"cc",
"chrono",
"clap",
+ "crc32c",
"csv-core",
"dashmap",
"derive_builder",
@@ -5553,6 +5563,15 @@ version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
+[[package]]
+name = "two-pc-crash-safety-wal-helper"
+version = "0.0.0"
+dependencies = [
+ "bytes",
+ "pgdog",
+ "tokio",
+]
+
[[package]]
name = "typenum"
version = "1.19.0"
diff --git a/Cargo.toml b/Cargo.toml
index f8a6be62f..504bb5bde 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -4,6 +4,7 @@ exclude = ["fuzz"]
members = [
"examples/demo",
"integration/rust",
+ "integration/two_pc_crash_safety/wal_helper",
"pgdog",
"pgdog-config",
"pgdog-macros",
diff --git a/integration/two_pc_crash_safety/Gemfile b/integration/two_pc_crash_safety/Gemfile
new file mode 100644
index 000000000..1fce5e749
--- /dev/null
+++ b/integration/two_pc_crash_safety/Gemfile
@@ -0,0 +1,6 @@
+# frozen_string_literal: true
+
+source 'https://rubygems.org'
+gem 'pg'
+gem 'rspec', '~> 3.4'
+gem 'toxiproxy'
diff --git a/integration/two_pc_crash_safety/Gemfile.lock b/integration/two_pc_crash_safety/Gemfile.lock
new file mode 100644
index 000000000..c122eee29
--- /dev/null
+++ b/integration/two_pc_crash_safety/Gemfile.lock
@@ -0,0 +1,30 @@
+GEM
+ remote: https://rubygems.org/
+ specs:
+ diff-lcs (1.6.2)
+ pg (1.5.9)
+ rspec (3.13.2)
+ rspec-core (~> 3.13.0)
+ rspec-expectations (~> 3.13.0)
+ rspec-mocks (~> 3.13.0)
+ rspec-core (3.13.6)
+ rspec-support (~> 3.13.0)
+ rspec-expectations (3.13.5)
+ diff-lcs (>= 1.2.0, < 2.0)
+ rspec-support (~> 3.13.0)
+ rspec-mocks (3.13.8)
+ diff-lcs (>= 1.2.0, < 2.0)
+ rspec-support (~> 3.13.0)
+ rspec-support (3.13.7)
+ toxiproxy (2.0.2)
+
+PLATFORMS
+ ruby
+
+DEPENDENCIES
+ pg
+ rspec (~> 3.4)
+ toxiproxy
+
+BUNDLED WITH
+ 1.17.2
diff --git a/integration/two_pc_crash_safety/crash_recovery_spec.rb b/integration/two_pc_crash_safety/crash_recovery_spec.rb
new file mode 100644
index 000000000..7a7255c0e
--- /dev/null
+++ b/integration/two_pc_crash_safety/crash_recovery_spec.rb
@@ -0,0 +1,172 @@
+# frozen_string_literal: true
+
+require_relative 'rspec_helper'
+
+CONFIG_DIR = __dir__
+
+describe '2pc crash safety' do
+ let(:wal_dir) { @wal_dir }
+
+ before(:each) do
+ cleanup_prepared_xacts!
+ @pid, @wal_dir = spawn_pgdog(CONFIG_DIR)
+ wait_for_pgdog
+ end
+
+ after(:each) do
+ stop_pgdog(@pid)
+ FileUtils.rm_rf(@wal_dir) if @wal_dir
+ end
+
+ it 'rolls back orphan prepared xacts after a kill mid-PREPARE' do
+ client = pgdog_conn
+ client.exec('BEGIN')
+ client.exec("INSERT INTO crash_safety_test (id, value) VALUES (0, 'a')")
+ client.exec("INSERT INTO crash_safety_test (id, value) VALUES (1, 'b')")
+
+ # Hold shard_1's responses so PREPARE TRANSACTION on shard_1 hangs
+ # waiting for the reply. Latency keeps the connection open so pgdog
+ # stays mid-2PC.
+ Toxiproxy[:crash_safety_shard_1].toxic(:latency, latency: 30_000).apply do
+ Thread.new do
+ begin
+ client.exec('COMMIT')
+ rescue PG::Error
+ # connection drops when we kill pgdog.
+ end
+ end
+ # Wait for pgdog to issue PREPARE on shard_0 and start blocking
+ # on shard_1's response before we kill it.
+ sleep 1.0
+ Process.kill('KILL', @pid)
+ Process.waitpid(@pid)
+ @pid = nil
+ end
+
+ # At least one shard must have an orphan prepared xact left behind,
+ # otherwise recovery has nothing to clean up.
+ leftover = SHARDS.flat_map { |s| prepared_xacts(s) }
+ expect(leftover).not_to be_empty,
+ 'no prepared xacts after kill; the kill landed before any PREPARE made it through'
+
+ # Restart pgdog with the same WAL directory. Recovery sees the
+ # Begin record and drives ROLLBACK PREPARED on every participant.
+ @pid, _ = spawn_pgdog(CONFIG_DIR, wal_dir: @wal_dir)
+ wait_for_pgdog
+
+ expect(metric('two_pc_recovered_total')).to be > 0,
+ 'restarted pgdog reports zero recovered txns; recovery did not run'
+
+ expect(wait_for_no_prepared_xacts).to be(true),
+ lambda {
+ leftover = SHARDS.flat_map { |s| prepared_xacts(s) }
+ "prepared xacts did not drain: #{leftover.inspect}"
+ }
+
+ SHARDS.each do |shard|
+ c = shard_conn(shard)
+ count = c.exec('SELECT COUNT(*) FROM crash_safety_test').to_a[0]['count'].to_i
+ c.close
+ expect(count).to eq(0), "expected no rows on #{shard[:db]}, found #{count}"
+ end
+ end
+
+ it 'commits orphan prepared xacts after a kill mid-COMMIT-PREPARED' do
+ # Synthesize the on-disk state pgdog would leave behind if it
+ # crashed mid-Phase2, then start pgdog and verify it commits the
+ # orphans. Catching pgdog reliably between "Committing record
+ # durable" and "all COMMIT PREPAREDs done" from outside isn't
+ # practical: toxiproxy is byte-level so any delay on shard_1 also
+ # blocks PREPARE, and Postgres has no hook in COMMIT PREPARED.
+ stop_pgdog(@pid)
+ @pid = nil
+ FileUtils.rm_rf(@wal_dir)
+ Dir.mkdir(@wal_dir)
+
+ gid = "__pgdog_2pc_#{rand(1..(1 << 62))}"
+ synthesize_phase2_wal(@wal_dir, gid, 'pgdog', 'pgdog')
+
+ # PgDog suffixes the gid with `_` per shard so the
+ # names don't collide on a single cluster (pgdog/src/backend/pool/
+ # connection/binding.rs). Mirror that scheme here.
+ SHARDS.each_with_index do |shard, idx|
+ c = shard_conn(shard)
+ c.exec('BEGIN')
+ c.exec("INSERT INTO crash_safety_test (id, value) VALUES (#{idx}, 'r')")
+ c.exec("PREPARE TRANSACTION '#{gid}_#{idx}'")
+ c.close
+ end
+
+ @pid, _ = spawn_pgdog(CONFIG_DIR, wal_dir: @wal_dir)
+ wait_for_pgdog
+
+ expect(metric('two_pc_recovered_total')).to be > 0,
+ 'restarted pgdog reports zero recovered txns; recovery did not run'
+
+ expect(wait_for_no_prepared_xacts).to be(true),
+ lambda {
+ leftover = SHARDS.flat_map { |s| prepared_xacts(s) }
+ "prepared xacts did not drain: #{leftover.inspect}"
+ }
+
+ SHARDS.each do |shard|
+ c = shard_conn(shard)
+ count = c.exec('SELECT COUNT(*) FROM crash_safety_test').to_a[0]['count'].to_i
+ c.close
+ expect(count).to eq(1), "expected 1 row on #{shard[:db]}, found #{count}"
+ end
+ end
+
+ it 'no recovery work after a clean shutdown' do
+ client = pgdog_conn
+ client.exec('BEGIN')
+ client.exec("INSERT INTO crash_safety_test (id, value) VALUES (0, 'a')")
+ client.exec("INSERT INTO crash_safety_test (id, value) VALUES (1, 'b')")
+ client.exec('COMMIT')
+ client.close
+
+ # SIGTERM lets Manager::shutdown drain and wal.append_end every
+ # finished txn. The WAL on disk should describe a complete cycle
+ # (Begin, Committing, End) for every txn so recovery has nothing
+ # to do on the next start.
+ stop_pgdog(@pid, signal: 'TERM')
+ @pid = nil
+
+ @pid, _ = spawn_pgdog(CONFIG_DIR, wal_dir: @wal_dir)
+ wait_for_pgdog
+
+ expect(metric('two_pc_recovered_total')).to eq(0),
+ 'clean-shutdown WAL should have left no in-flight txns to recover'
+ end
+
+ it 'second pgdog refuses to share the WAL directory' do
+ # First pgdog (started in before) holds the flock on @wal_dir/.lock.
+ # Spawning a second one against the same dir should fail to acquire
+ # the lock; enable_wal warns and continues without WAL durability,
+ # so we verify the failure via the log line.
+ log_path = File.join(CONFIG_DIR, 'pgdog-second.log')
+ second_pid = Process.spawn(
+ {
+ 'PGDOG_TWO_PHASE_COMMIT_WAL_DIR' => @wal_dir,
+ 'PGDOG_PORT' => '6433',
+ },
+ ENV['PGDOG_BIN'] || File.expand_path('../../target/release/pgdog', __dir__),
+ '--config', File.join(CONFIG_DIR, 'pgdog.toml'),
+ '--users', File.join(CONFIG_DIR, 'users.toml'),
+ out: log_path, err: %i[child out]
+ )
+
+ begin
+ # Give the second pgdog enough time to attempt enable_wal and log
+ # the failure.
+ sleep 2
+ stop_pgdog(second_pid)
+
+ log = File.read(log_path)
+ expect(log).to include('locked by another process'),
+ "expected DirLocked error in second pgdog log; got:\n#{log}"
+ ensure
+ FileUtils.rm_f(log_path)
+ end
+ end
+end
diff --git a/integration/two_pc_crash_safety/pgdog.toml b/integration/two_pc_crash_safety/pgdog.toml
new file mode 100644
index 000000000..7c032d57b
--- /dev/null
+++ b/integration/two_pc_crash_safety/pgdog.toml
@@ -0,0 +1,40 @@
+[general]
+two_phase_commit = true
+openmetrics_port = 9090
+
+[[databases]]
+name = "pgdog"
+host = "127.0.0.1"
+shard = 0
+database_name = "shard_0"
+
+# shard_1 is reached via the toxiproxy listener so the spec can hold
+# requests in flight (timeout toxic) and SIGKILL pgdog mid-2PC.
+[[databases]]
+name = "pgdog"
+host = "127.0.0.1"
+port = 5435
+shard = 1
+database_name = "shard_1"
+
+[[sharded_tables]]
+database = "pgdog"
+column = "id"
+data_type = "bigint"
+
+[[sharded_mappings]]
+database = "pgdog"
+column = "id"
+kind = "list"
+values = [0]
+shard = 0
+
+[[sharded_mappings]]
+database = "pgdog"
+column = "id"
+kind = "list"
+values = [1]
+shard = 1
+
+[admin]
+password = "pgdog"
diff --git a/integration/two_pc_crash_safety/rspec_helper.rb b/integration/two_pc_crash_safety/rspec_helper.rb
new file mode 100644
index 000000000..3801eb026
--- /dev/null
+++ b/integration/two_pc_crash_safety/rspec_helper.rb
@@ -0,0 +1,142 @@
+# frozen_string_literal: true
+
+require 'fileutils'
+require 'net/http'
+require 'pg'
+require 'tmpdir'
+require 'toxiproxy'
+
+OPENMETRICS_PORT = 9090
+
+PGDOG_HOST = '127.0.0.1'
+PGDOG_PORT = 6432
+SHARDS = [
+ { db: 'shard_0', port: 5432 },
+ { db: 'shard_1', port: 5432 } # we always look at the *real* port for state checks
+].freeze
+
+def pgdog_conn(database: 'pgdog')
+ PG.connect(host: PGDOG_HOST, port: PGDOG_PORT, user: 'pgdog',
+ password: 'pgdog', dbname: database)
+end
+
+def shard_conn(shard)
+ PG.connect(host: '127.0.0.1', port: shard[:port], user: 'pgdog',
+ password: 'pgdog', dbname: shard[:db])
+end
+
+def prepared_xacts(shard)
+ c = shard_conn(shard)
+ rows = c.exec('SELECT gid FROM pg_prepared_xacts').to_a.map { |r| r['gid'] }
+ c.close
+ rows
+end
+
+def cleanup_prepared_xacts!
+ SHARDS.each do |shard|
+ c = shard_conn(shard)
+ c.exec('SELECT gid FROM pg_prepared_xacts').to_a.each do |row|
+ gid = c.escape_string(row['gid'])
+ c.exec("ROLLBACK PREPARED '#{gid}'")
+ end
+ c.exec('TRUNCATE crash_safety_test')
+ c.close
+ end
+end
+
+# Spawns pgdog as a child process with our test config. If `wal_dir`
+# is given, the spawned pgdog uses it (so a restart resumes the same
+# durable log); otherwise a fresh tmpdir is created. Returns
+# [pid, wal_dir].
+def spawn_pgdog(config_dir, wal_dir: nil)
+ wal_dir ||= Dir.mktmpdir('pgdog_wal_')
+ binary = ENV['PGDOG_BIN'] || File.expand_path('../../target/release/pgdog', __dir__)
+ unless File.exist?(binary)
+ system('cargo', 'build', '--release', chdir: File.expand_path('../..', __dir__)) ||
+ raise('cargo build failed')
+ end
+ log_path = File.join(config_dir, 'pgdog.log')
+ pid = Process.spawn(
+ { 'PGDOG_TWO_PHASE_COMMIT_WAL_DIR' => wal_dir },
+ binary,
+ '--config', File.join(config_dir, 'pgdog.toml'),
+ '--users', File.join(config_dir, 'users.toml'),
+ out: log_path, err: [:child, :out]
+ )
+ [pid, wal_dir]
+end
+
+def wait_for_pgdog(timeout: 10)
+ deadline = Time.now + timeout
+ loop do
+ begin
+ c = pgdog_conn
+ c.exec('SELECT 1')
+ c.close
+ return
+ rescue PG::Error
+ raise "pgdog did not become ready within #{timeout}s" if Time.now > deadline
+
+ sleep 0.1
+ end
+ end
+end
+
+def stop_pgdog(pid, signal: 'TERM', timeout: 10)
+ return unless pid
+
+ begin
+ Process.kill(signal, pid)
+ rescue Errno::ESRCH
+ return
+ end
+ deadline = Time.now + timeout
+ loop do
+ _, status = Process.waitpid2(pid, Process::WNOHANG)
+ return if status
+
+ if Time.now > deadline
+ Process.kill('KILL', pid) rescue nil
+ Process.waitpid(pid) rescue nil
+ return
+ end
+ sleep 0.05
+ end
+end
+
+# Wait for both shards' pg_prepared_xacts to drain. Returns true if it
+# happened within the deadline, false otherwise.
+def wait_for_no_prepared_xacts(timeout: 30)
+ deadline = Time.now + timeout
+ loop do
+ return true if SHARDS.all? { |s| prepared_xacts(s).empty? }
+ return false if Time.now > deadline
+
+ sleep 0.2
+ end
+end
+
+Toxiproxy.host = 'http://127.0.0.1:8474'
+
+# Fetch /metrics from pgdog and return the integer value of the named
+# counter, or nil if it isn't present.
+def metric(name)
+ body = Net::HTTP.get(URI("http://127.0.0.1:#{OPENMETRICS_PORT}/metrics"))
+ prefix = "#{name} "
+ line = body.lines.find { |l| l.start_with?(prefix) }
+ line&.split&.last&.to_i
+end
+
+# Synthesize a WAL containing a Begin + Committing pair for `gid` so
+# pgdog's recovery sees an in-flight Phase2 txn and drives COMMIT
+# PREPARED on its participants.
+def synthesize_phase2_wal(wal_dir, gid, user, database)
+ workspace = File.expand_path('../..', __dir__)
+ ok = system(
+ 'cargo', 'run', '--quiet', '--release',
+ '-p', 'two-pc-crash-safety-wal-helper', '--',
+ wal_dir, gid, user, database,
+ chdir: workspace
+ )
+ raise 'wal_helper failed' unless ok
+end
diff --git a/integration/two_pc_crash_safety/run.sh b/integration/two_pc_crash_safety/run.sh
new file mode 100755
index 000000000..5e3641e4a
--- /dev/null
+++ b/integration/two_pc_crash_safety/run.sh
@@ -0,0 +1,10 @@
+#!/bin/bash
+set -e
+SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
+
+bash "${SCRIPT_DIR}/setup.sh"
+
+pushd "${SCRIPT_DIR}" >/dev/null
+bundle install
+bundle exec rspec crash_recovery_spec.rb -fd
+popd >/dev/null
diff --git a/integration/two_pc_crash_safety/setup.sh b/integration/two_pc_crash_safety/setup.sh
new file mode 100755
index 000000000..2254d49ee
--- /dev/null
+++ b/integration/two_pc_crash_safety/setup.sh
@@ -0,0 +1,22 @@
+#!/bin/bash
+set -e
+SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
+CLI="${SCRIPT_DIR}/../toxiproxy-cli"
+
+# Create the per-shard table so cross-shard transactions have something
+# to write. Routed by id: 0 -> shard_0, 1 -> shard_1 (matches the
+# sharded_mappings in pgdog.toml).
+export PGPASSWORD=pgdog
+psql -h 127.0.0.1 -p 5432 -U pgdog -d shard_0 \
+ -c 'CREATE TABLE IF NOT EXISTS crash_safety_test (id BIGINT PRIMARY KEY, value TEXT)'
+psql -h 127.0.0.1 -p 5432 -U pgdog -d shard_1 \
+ -c 'CREATE TABLE IF NOT EXISTS crash_safety_test (id BIGINT PRIMARY KEY, value TEXT)'
+
+# Toxiproxy: shard_1's traffic flows through :5435 so the spec can
+# inject a timeout toxic and catch pgdog mid-2PC deterministically.
+killall toxiproxy-server > /dev/null 2>&1 || true
+"${SCRIPT_DIR}/../toxiproxy-server" > /dev/null &
+sleep 1
+
+"${CLI}" delete crash_safety_shard_1 > /dev/null 2>&1 || true
+"${CLI}" create --listen :5435 --upstream 127.0.0.1:5432 crash_safety_shard_1
diff --git a/integration/two_pc_crash_safety/users.toml b/integration/two_pc_crash_safety/users.toml
new file mode 100644
index 000000000..581cdb75b
--- /dev/null
+++ b/integration/two_pc_crash_safety/users.toml
@@ -0,0 +1,4 @@
+[[users]]
+name = "pgdog"
+database = "pgdog"
+password = "pgdog"
diff --git a/integration/two_pc_crash_safety/wal_helper/Cargo.toml b/integration/two_pc_crash_safety/wal_helper/Cargo.toml
new file mode 100644
index 000000000..57c73732a
--- /dev/null
+++ b/integration/two_pc_crash_safety/wal_helper/Cargo.toml
@@ -0,0 +1,14 @@
+[package]
+name = "two-pc-crash-safety-wal-helper"
+version = "0.0.0"
+edition = "2024"
+publish = false
+
+[[bin]]
+name = "wal_helper"
+path = "src/main.rs"
+
+[dependencies]
+bytes = "1"
+pgdog = { path = "../../../pgdog" }
+tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
diff --git a/integration/two_pc_crash_safety/wal_helper/src/main.rs b/integration/two_pc_crash_safety/wal_helper/src/main.rs
new file mode 100644
index 000000000..2481fe219
--- /dev/null
+++ b/integration/two_pc_crash_safety/wal_helper/src/main.rs
@@ -0,0 +1,48 @@
+//! Synthesize a 2PC WAL containing a Begin + Committing pair so the
+//! crash-safety integration spec can simulate the Phase2 recovery
+//! scenario without having to crash pgdog at exactly the right
+//! microsecond.
+//!
+//! Usage: `wal_helper `
+
+use std::path::PathBuf;
+use std::str::FromStr;
+
+use bytes::BytesMut;
+
+use pgdog::frontend::client::query_engine::two_pc::TwoPcTransaction;
+use pgdog::frontend::client::query_engine::two_pc::wal::{
+ BeginPayload, Record, Segment, TxnPayload,
+};
+
+#[tokio::main]
+async fn main() {
+ let args: Vec = std::env::args().collect();
+ if args.len() != 5 {
+ eprintln!("usage: {} ", args[0]);
+ std::process::exit(2);
+ }
+ let dir = PathBuf::from(&args[1]);
+ let txn = TwoPcTransaction::from_str(&args[2])
+ .unwrap_or_else(|_| panic!("invalid gid {:?}", args[2]));
+ let user = args[3].clone();
+ let database = args[4].clone();
+
+ std::fs::create_dir_all(&dir).expect("create wal dir");
+
+ let mut segment = Segment::create(&dir, 0).await.expect("create segment");
+
+ let mut buf = BytesMut::new();
+ Record::Begin(BeginPayload {
+ txn,
+ user,
+ database,
+ })
+ .encode(&mut buf)
+ .expect("encode begin");
+ Record::Committing(TxnPayload { txn })
+ .encode(&mut buf)
+ .expect("encode committing");
+
+ segment.commit(&buf, 2).await.expect("commit batch");
+}
diff --git a/pgdog-config/src/general.rs b/pgdog-config/src/general.rs
index bcfb8c1cd..8b2236e48 100644
--- a/pgdog-config/src/general.rs
+++ b/pgdog-config/src/general.rs
@@ -512,6 +512,42 @@ pub struct General {
#[serde(default)]
pub two_phase_commit_auto: Option,
+ /// Directory where the two-phase commit write-ahead log is stored.
+ ///
+ /// **Note:** This setting cannot be changed at runtime. PgDog acquires an exclusive `flock` on `/.lock` at startup. If the directory cannot be created or written to, or another PgDog process already holds the lock, the WAL is disabled and a warning is logged: 2PC will continue to function but will not be durable across restarts.
+ ///
+ /// _Default:_ `./pgdog_wal`
+ ///
+ /// https://docs.pgdog.dev/configuration/pgdog.toml/general/#two_phase_commit_wal_dir
+ #[serde(default = "General::two_phase_commit_wal_dir")]
+ pub two_phase_commit_wal_dir: Option,
+
+ /// Maximum size, in bytes, of a single two-phase commit WAL segment file before it is rotated.
+ ///
+ /// _Default:_ `16777216` (16 MiB)
+ ///
+ /// https://docs.pgdog.dev/configuration/pgdog.toml/general/#two_phase_commit_wal_segment_size
+ #[serde(default = "General::two_phase_commit_wal_segment_size")]
+ pub two_phase_commit_wal_segment_size: u64,
+
+ /// How long, in milliseconds, the two-phase commit WAL writer waits to coalesce concurrent appends into a single fsync.
+ ///
+ /// **Note:** Setting this to `0` disables waiting for additional appends; records already queued in the channel when the writer wakes are still batched into one fsync. Higher values trade per-transaction commit latency for fewer fsyncs under load.
+ ///
+ /// _Default:_ `2`
+ ///
+ /// https://docs.pgdog.dev/configuration/pgdog.toml/general/#two_phase_commit_wal_fsync_interval
+ #[serde(default = "General::two_phase_commit_wal_fsync_interval")]
+ pub two_phase_commit_wal_fsync_interval: u64,
+
+ /// How often, in seconds, to write a checkpoint record to the two-phase commit WAL and garbage-collect old segments.
+ ///
+ /// _Default:_ `60`
+ ///
+ /// https://docs.pgdog.dev/configuration/pgdog.toml/general/#two_phase_commit_wal_checkpoint_interval
+ #[serde(default = "General::two_phase_commit_wal_checkpoint_interval")]
+ pub two_phase_commit_wal_checkpoint_interval: u64,
+
/// Enable expanded (`\x`) output for `EXPLAIN` results returned by PgDog's built-in query plan aggregation.
#[serde(default = "General::expanded_explain")]
pub expanded_explain: bool,
@@ -744,6 +780,11 @@ impl Default for General {
log_dedup_threshold: 0,
two_phase_commit: bool::default(),
two_phase_commit_auto: None,
+ two_phase_commit_wal_dir: Self::two_phase_commit_wal_dir(),
+ two_phase_commit_wal_segment_size: Self::two_phase_commit_wal_segment_size(),
+ two_phase_commit_wal_fsync_interval: Self::two_phase_commit_wal_fsync_interval(),
+ two_phase_commit_wal_checkpoint_interval:
+ Self::two_phase_commit_wal_checkpoint_interval(),
expanded_explain: Self::expanded_explain(),
server_lifetime: Self::server_lifetime(),
stats_period: Self::stats_period(),
@@ -902,6 +943,22 @@ impl General {
Self::env_or_default("PGDOG_ROLLBACK_TIMEOUT", 5_000)
}
+ fn two_phase_commit_wal_dir() -> Option {
+ Self::env_option_string("PGDOG_TWO_PHASE_COMMIT_WAL_DIR").map(PathBuf::from)
+ }
+
+ fn two_phase_commit_wal_segment_size() -> u64 {
+ Self::env_or_default("PGDOG_TWO_PHASE_COMMIT_WAL_SEGMENT_SIZE", 16 * 1024 * 1024)
+ }
+
+ fn two_phase_commit_wal_fsync_interval() -> u64 {
+ Self::env_or_default("PGDOG_TWO_PHASE_COMMIT_WAL_FSYNC_INTERVAL", 2)
+ }
+
+ fn two_phase_commit_wal_checkpoint_interval() -> u64 {
+ Self::env_or_default("PGDOG_TWO_PHASE_COMMIT_WAL_CHECKPOINT_INTERVAL", 60)
+ }
+
fn idle_timeout() -> u64 {
Self::env_or_default(
"PGDOG_IDLE_TIMEOUT",
diff --git a/pgdog/Cargo.toml b/pgdog/Cargo.toml
index 9758fbd0a..d9aaf70f2 100644
--- a/pgdog/Cargo.toml
+++ b/pgdog/Cargo.toml
@@ -74,6 +74,7 @@ pgdog-stats = { path = "../pgdog-stats" }
pgdog-postgres-types = { path = "../pgdog-postgres-types"}
azure_identity = "0.34.0"
azure_core = "0.34.0"
+crc32c = "0.6.8"
[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = "0.6"
diff --git a/pgdog/src/frontend/client/query_engine/two_pc/manager.rs b/pgdog/src/frontend/client/query_engine/two_pc/manager.rs
index 5d614fe91..2c1ff0611 100644
--- a/pgdog/src/frontend/client/query_engine/two_pc/manager.rs
+++ b/pgdog/src/frontend/client/query_engine/two_pc/manager.rs
@@ -1,9 +1,11 @@
//! Global two-phase commit transaction manager.
+use arc_swap::ArcSwapOption;
use fnv::FnvHashMap as HashMap;
use once_cell::sync::Lazy;
use parking_lot::Mutex;
use std::{
collections::VecDeque,
+ path::PathBuf,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
@@ -11,16 +13,17 @@ use std::{
time::Duration,
};
use tokio::{select, spawn, sync::Notify, time::interval};
-use tracing::{debug, error, info};
+use tracing::{debug, error, info, warn};
use crate::{
backend::{
databases::User,
pool::{Connection, Request},
},
+ config::config,
frontend::{
client::query_engine::{
- two_pc::{TwoPcGuard, TwoPcTransaction},
+ two_pc::{wal::Wal, TwoPcGuard, TwoPcStats, TwoPcTransaction},
TwoPcPhase,
},
router::{
@@ -40,6 +43,11 @@ static MAINTENANCE: Duration = Duration::from_millis(333);
pub struct Manager {
inner: Arc>,
notify: Arc,
+ /// Durable log handle. `None` until [`Self::enable_wal`] succeeds;
+ /// if WAL initialization fails or `enable_wal` is never called, the
+ /// manager continues to coordinate 2PC in memory only.
+ wal: Arc>,
+ stats: Arc,
}
impl Manager {
@@ -56,6 +64,8 @@ impl Manager {
offline: AtomicBool::new(false),
done: Notify::new(),
}),
+ wal: Arc::new(ArcSwapOption::const_empty()),
+ stats: Arc::new(TwoPcStats::default()),
};
let monitor = manager.clone();
@@ -66,6 +76,62 @@ impl Manager {
manager
}
+ /// Open the WAL at the configured directory, replay any in-flight
+ /// transactions back into this manager, and start the writer +
+ /// checkpoint tasks. If WAL initialization fails (lock contention,
+ /// disk error, corrupt segment that can't be quarantined), the
+ /// manager keeps running without durability and a warning is logged
+ /// so operators can investigate.
+ pub async fn enable_wal(&self, wal_dir: &PathBuf) {
+ match Wal::open(self, wal_dir).await {
+ Ok(wal) => {
+ self.wal.store(Some(Arc::new(wal)));
+ info!("[2pc] wal enabled");
+ spawn(Self::checkpoint_loop());
+ }
+ Err(err) => {
+ warn!(
+ "[2pc] wal disabled: {}; 2pc will run without durability",
+ err
+ );
+ }
+ }
+ }
+
+ /// Periodically ask the WAL writer to emit a checkpoint record so
+ /// older segments can be GC'd. A zero interval disables the loop.
+ async fn checkpoint_loop() {
+ let interval_secs = config()
+ .config
+ .general
+ .two_phase_commit_wal_checkpoint_interval;
+ if interval_secs == 0 {
+ return;
+ }
+ let manager = Self::get();
+ let mut tick = tokio::time::interval(Duration::from_secs(interval_secs));
+ tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
+ // First tick fires immediately; skip it so we don't checkpoint
+ // an empty WAL right at startup.
+ tick.tick().await;
+
+ loop {
+ tokio::select! {
+ _ = tick.tick() => {}
+ _ = manager.notify.done.notified() => return,
+ }
+ if manager.notify.offline.load(Ordering::Relaxed) {
+ return;
+ }
+ let Some(wal) = manager.wal.load_full() else {
+ continue;
+ };
+ if let Err(err) = wal.checkpoint().await {
+ warn!("[2pc] checkpoint failed: {}", err);
+ }
+ }
+ }
+
#[cfg(test)]
pub(super) fn transaction(&self, transaction: &TwoPcTransaction) -> Option {
self.inner.lock().transactions.get(transaction).cloned()
@@ -76,6 +142,11 @@ impl Manager {
self.inner.lock().transactions.clone()
}
+ /// Process-level 2PC counters.
+ pub fn stats(&self) -> Arc {
+ Arc::clone(&self.stats)
+ }
+
/// Two-pc transaction finished.
pub(super) async fn done(&self, transaction: &TwoPcTransaction) -> Result<(), Error> {
self.remove(transaction).await;
@@ -83,21 +154,65 @@ impl Manager {
Ok(())
}
- /// Sync transaction state.
+ /// Record a phase transition for a 2PC transaction. Updates the
+ /// in-memory state first, then writes the corresponding WAL record
+ /// (Begin for Phase1, Committing for Phase2). The inner-first
+ /// ordering means checkpoint snapshots always see in-memory state
+ /// that's at least as fresh as any WAL record they might be
+ /// ordered against.
+ ///
+ /// If the WAL append fails the in-memory mutation is rolled back
+ /// and the operation is refused: returning Ok here would cause the
+ /// caller to issue PREPARE / COMMIT PREPARED to backends without a
+ /// durable record, which is exactly the orphan-prepared-xact case
+ /// the WAL exists to prevent.
pub(super) async fn transaction_state(
&self,
transaction: &TwoPcTransaction,
identifier: &Arc,
phase: TwoPcPhase,
) -> Result {
- {
+ let prior = {
let mut guard = self.inner.lock();
+ let prior = guard.transactions.get(transaction).cloned();
let entry = guard.transactions.entry(*transaction).or_default();
entry.identifier = identifier.clone();
entry.phase = phase;
- }
+ prior
+ };
- // TODO: Sync to durable backend.
+ if let Some(wal) = self.wal.load_full() {
+ let result = match phase {
+ TwoPcPhase::Phase1 => {
+ wal.append_begin(
+ *transaction,
+ identifier.user.clone(),
+ identifier.database.clone(),
+ )
+ .await
+ }
+ TwoPcPhase::Phase2 => wal.append_committing(*transaction).await,
+ TwoPcPhase::Rollback => {
+ unreachable!("rollback is not a state transition; it's the cleanup direction")
+ }
+ };
+ if let Err(err) = result {
+ let mut guard = self.inner.lock();
+ match prior {
+ Some(prior) => {
+ guard.transactions.insert(*transaction, prior);
+ }
+ None => {
+ guard.transactions.remove(transaction);
+ }
+ }
+ warn!(
+ "[2pc] wal append failed for {} ({}): {}; refusing 2pc operation",
+ transaction, phase, err
+ );
+ return Err(Error::TwoPcWal(err));
+ }
+ }
Ok(TwoPcGuard {
transaction: *transaction,
@@ -105,6 +220,29 @@ impl Manager {
})
}
+ /// Restore an in-flight 2PC transaction discovered during WAL
+ /// recovery. Inserts it into the transaction table and pushes it onto
+ /// the cleanup queue so the monitor task drives it to a terminal
+ /// state via [`Self::cleanup_phase`].
+ pub(super) fn restore_transaction(
+ &self,
+ transaction: TwoPcTransaction,
+ user: String,
+ database: String,
+ phase: TwoPcPhase,
+ ) {
+ let identifier = Arc::new(User { user, database });
+ {
+ let mut guard = self.inner.lock();
+ guard
+ .transactions
+ .insert(transaction, TransactionInfo { phase, identifier });
+ guard.queue.push_back(transaction);
+ }
+ self.stats.incr_recovered();
+ self.notify.notify.notify_one();
+ }
+
pub(super) fn return_guard(&self, guard: &TwoPcGuard) {
let exists = self
.inner
@@ -162,7 +300,11 @@ impl Manager {
async fn remove(&self, transaction: &TwoPcTransaction) {
self.inner.lock().transactions.remove(transaction);
- // TODO: sync to durable stage manager here.
+ if let Some(wal) = self.wal.load_full() {
+ if let Err(err) = wal.append_end(*transaction).await {
+ warn!("[2pc] wal end record failed for {}: {}", transaction, err);
+ }
+ }
}
/// Reconnect to cluster if available and rollback the two-phase transaction.
@@ -206,6 +348,8 @@ impl Manager {
}
/// Shutdown manager and wait for all transactions to be cleaned up.
+ /// Once the monitor has drained the cleanup queue, the WAL is shut
+ /// down too so any final End records make it to disk before exit.
pub async fn shutdown(&self) {
let waiter = self.notify.done.notified();
self.notify.offline.store(true, Ordering::Relaxed);
@@ -214,6 +358,10 @@ impl Manager {
info!("cleaning up {} two-phase transactions", transactions);
waiter.await;
+
+ if let Some(wal) = self.wal.load_full() {
+ wal.shutdown().await;
+ }
}
}
diff --git a/pgdog/src/frontend/client/query_engine/two_pc/mod.rs b/pgdog/src/frontend/client/query_engine/two_pc/mod.rs
index 8e6e2cfda..b55ac017e 100644
--- a/pgdog/src/frontend/client/query_engine/two_pc/mod.rs
+++ b/pgdog/src/frontend/client/query_engine/two_pc/mod.rs
@@ -9,12 +9,15 @@ pub mod guard;
pub mod manager;
pub mod phase;
pub mod server_transactions;
+pub mod stats;
pub mod transaction;
+pub mod wal;
pub use guard::TwoPcGuard;
pub use manager::Manager;
pub use phase::TwoPcPhase;
pub(crate) use server_transactions::TwoPcTransactions;
+pub use stats::TwoPcStats;
pub use transaction::TwoPcTransaction;
#[cfg(test)]
diff --git a/pgdog/src/frontend/client/query_engine/two_pc/stats.rs b/pgdog/src/frontend/client/query_engine/two_pc/stats.rs
new file mode 100644
index 000000000..f775276f0
--- /dev/null
+++ b/pgdog/src/frontend/client/query_engine/two_pc/stats.rs
@@ -0,0 +1,23 @@
+//! Process-level 2PC counters surfaced via the OpenMetrics endpoint.
+//!
+//! Counters are bumped from the manager during recovery and read by
+//! `pgdog::stats::two_pc::TwoPc::load`.
+
+use std::sync::atomic::{AtomicU64, Ordering};
+
+#[derive(Debug, Default)]
+pub struct TwoPcStats {
+ /// Total number of in-flight 2PC transactions restored from the
+ /// WAL during recovery since this pgdog process started.
+ recovered_total: AtomicU64,
+}
+
+impl TwoPcStats {
+ pub fn incr_recovered(&self) {
+ self.recovered_total.fetch_add(1, Ordering::Relaxed);
+ }
+
+ pub fn recovered_total(&self) -> u64 {
+ self.recovered_total.load(Ordering::Relaxed)
+ }
+}
diff --git a/pgdog/src/frontend/client/query_engine/two_pc/transaction.rs b/pgdog/src/frontend/client/query_engine/two_pc/transaction.rs
index f579764ed..c774d6fe3 100644
--- a/pgdog/src/frontend/client/query_engine/two_pc/transaction.rs
+++ b/pgdog/src/frontend/client/query_engine/two_pc/transaction.rs
@@ -1,7 +1,8 @@
use rand::{rng, Rng};
+use serde::{Deserialize, Serialize};
use std::{fmt::Display, str::FromStr};
-#[derive(Debug, Clone, Copy, PartialEq, Hash, Eq)]
+#[derive(Debug, Clone, Copy, PartialEq, Hash, Eq, Serialize, Deserialize)]
pub struct TwoPcTransaction(usize);
static PREFIX: &str = "__pgdog_2pc_";
diff --git a/pgdog/src/frontend/client/query_engine/two_pc/wal/error.rs b/pgdog/src/frontend/client/query_engine/two_pc/wal/error.rs
new file mode 100644
index 000000000..7d2fdfe83
--- /dev/null
+++ b/pgdog/src/frontend/client/query_engine/two_pc/wal/error.rs
@@ -0,0 +1,84 @@
+//! Two-phase commit WAL errors.
+use thiserror::Error;
+
+#[derive(Debug, Error)]
+pub enum Error {
+ #[error("encode: {0}")]
+ Encode(#[from] rmp_serde::encode::Error),
+
+ #[error("decode: {0}")]
+ Decode(#[from] rmp_serde::decode::Error),
+
+ #[error("crc mismatch: expected {expected:#010x}, got {actual:#010x}")]
+ Crc { expected: u32, actual: u32 },
+
+ #[error("invalid record tag {0}")]
+ InvalidTag(u8),
+
+ #[error("record body length is zero")]
+ EmptyRecord,
+
+ #[error("record of {0} bytes exceeds u32 framing")]
+ RecordTooLarge(usize),
+
+ #[error("io: {0}")]
+ Io(#[from] std::io::Error),
+
+ #[error("segment header is missing or has wrong magic")]
+ BadSegmentHeader,
+
+ #[error("segment filename is not a valid LSN: {0}")]
+ BadSegmentName(String),
+
+ #[error("writer task is no longer running")]
+ WriterGone,
+
+ #[error("wal directory {dir} is not accessible: {source}")]
+ DirNotAccessible {
+ dir: std::path::PathBuf,
+ #[source]
+ source: std::io::Error,
+ },
+
+ #[error("wal directory {dir} is not writable: {source}")]
+ DirNotWritable {
+ dir: std::path::PathBuf,
+ #[source]
+ source: std::io::Error,
+ },
+
+ #[error("torn tail: {unconsumed} unconsumed bytes at offset {offset}")]
+ TornTail { offset: u64, unconsumed: usize },
+
+ #[error("wal directory {dir} is locked by another process:\n{holder}")]
+ DirLocked {
+ dir: std::path::PathBuf,
+ holder: String,
+ },
+
+ /// A batch's write or fsync failed and the rewind that should have
+ /// restored on-disk consistency also failed. The segment's true
+ /// state on disk is unknown; the writer must rotate to a fresh
+ /// segment before accepting more writes.
+ #[error("segment broken: write or sync failed and rewind could not restore on-disk state")]
+ SegmentBroken,
+}
+
+impl Error {
+ /// True for errors that signal on-disk data is malformed or partial:
+ /// CRC mismatch, unknown tag, decode failure, missing magic, bad
+ /// filename, torn tail. Recovery quarantines these and skips the
+ /// segment instead of failing the whole process.
+ pub fn is_corruption(&self) -> bool {
+ matches!(
+ self,
+ Error::BadSegmentName(_)
+ | Error::BadSegmentHeader
+ | Error::Crc { .. }
+ | Error::InvalidTag(_)
+ | Error::EmptyRecord
+ | Error::Decode(_)
+ | Error::TornTail { .. }
+ )
+ }
+}
diff --git a/pgdog/src/frontend/client/query_engine/two_pc/wal/mod.rs b/pgdog/src/frontend/client/query_engine/two_pc/wal/mod.rs
new file mode 100644
index 000000000..6ec8abca8
--- /dev/null
+++ b/pgdog/src/frontend/client/query_engine/two_pc/wal/mod.rs
@@ -0,0 +1,14 @@
+//! Two-phase commit write-ahead log.
+//!
+//! See [`record`] for the on-disk record format.
+
+mod error;
+mod record;
+mod recovery;
+mod segment;
+mod writer;
+
+pub use error::Error;
+pub use record::{BeginPayload, Record, TxnPayload};
+pub use segment::Segment;
+pub use writer::Wal;
diff --git a/pgdog/src/frontend/client/query_engine/two_pc/wal/record.rs b/pgdog/src/frontend/client/query_engine/two_pc/wal/record.rs
new file mode 100644
index 000000000..ead80e1c9
--- /dev/null
+++ b/pgdog/src/frontend/client/query_engine/two_pc/wal/record.rs
@@ -0,0 +1,329 @@
+//! Two-phase commit WAL record format.
+//!
+//! On-disk framing per record:
+//!
+//! ```text
+//! +---------+---------+-----+----------------+
+//! | u32 LE | u32 LE | u8 | rmp-serde body |
+//! | bodylen | crc32c | tag | payload |
+//! +---------+---------+-----+----------------+
+//! ```
+//!
+//! - `bodylen` covers `tag + payload` (everything after the crc).
+//! - `crc32c` is computed over `tag + payload` and catches torn writes
+//! and bit-rot.
+//! - `tag` is a stable [`u8`] discriminant defined by [`Tag`]. New record
+//! kinds receive new tag values; **tag values should never be reused**.
+//! - `payload` is a [`rmp_serde`] encoding of a per-variant payload struct,
+//! using field-name keys (`to_vec_named`) so fields can be added
+//! additively under a single tag.
+//!
+//! There are four record kinds (see [`Record`]). The two write-ahead
+//! invariants that the [`super::Wal`] writer must honour:
+//!
+//! 1. [`Record::Begin`] is fsynced before any `PREPARE TRANSACTION` reaches
+//! a shard. Otherwise a crash can leave prepared xacts with no record
+//! of them.
+//! 2. [`Record::Committing`] is fsynced before any `COMMIT PREPARED` reaches
+//! a shard. Otherwise a crash can leave a partial commit with no way to
+//! know the coordinator had decided to commit.
+//!
+//! [`Record::End`] lets recovery forget a finished txn and is not safety
+//! critical. [`Record::Checkpoint`] is a snapshot of all active txns so
+//! older segments can be garbage-collected.
+//!
+//! # Format evolution rules
+//!
+//! These rules keep the on-disk format stable across versions:
+//!
+//! - **Tag values are immutable.** Once assigned, a [`Tag`] discriminant
+//! should never be repurposed, even if the record kind is removed.
+//! - **Fields may be added.** New fields on an existing variant's payload
+//! struct must carry `#[serde(default)]` so older versions that wrote
+//! records without the field can still be deserialized.
+//! - **Fields may not be removed or renamed.** To change a variant's
+//! shape, introduce a new tag with a new payload struct. The old tag
+//! stays defined so historical records still decode during recovery.
+//! - **Type changes** on existing fields (e.g. `Vec` to `Vec`)
+//! also require a new tag.
+
+use bytes::{Buf, BufMut};
+use serde::{Deserialize, Serialize};
+
+use super::error::Error;
+use crate::frontend::client::query_engine::two_pc::TwoPcTransaction;
+
+/// On-disk record-kind discriminant.
+///
+/// Values are stable; see the format-evolution rules in the module docs.
+#[repr(u8)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum Tag {
+ /// Coordinator is about to issue `PREPARE TRANSACTION` to `shards`.
+ Begin = 1,
+ /// Coordinator has decided to commit; must drive all participants to
+ /// `COMMIT PREPARED`.
+ Committing = 2,
+ /// Coordinator is done with this txn; recovery may forget it.
+ End = 3,
+ /// Snapshot of all active txns for log GC.
+ Checkpoint = 4,
+}
+
+impl Tag {
+ fn from_u8(b: u8) -> Result {
+ match b {
+ 1 => Ok(Self::Begin),
+ 2 => Ok(Self::Committing),
+ 3 => Ok(Self::End),
+ 4 => Ok(Self::Checkpoint),
+ other => Err(other),
+ }
+ }
+}
+
+/// A single WAL record.
+#[derive(Debug, Clone, PartialEq)]
+pub enum Record {
+ Begin(BeginPayload),
+ Committing(TxnPayload),
+ End(TxnPayload),
+ Checkpoint(CheckpointPayload),
+}
+
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
+pub struct BeginPayload {
+ pub txn: TwoPcTransaction,
+ pub user: String,
+ pub database: String,
+}
+
+/// Payload for records that carry only a transaction id
+/// ([`Record::Committing`] and [`Record::End`]).
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
+pub struct TxnPayload {
+ pub txn: TwoPcTransaction,
+}
+
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
+pub struct CheckpointPayload {
+ pub active: Vec,
+}
+
+/// One row in a [`CheckpointPayload`].
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
+pub struct CheckpointEntry {
+ pub txn: TwoPcTransaction,
+ pub user: String,
+ pub database: String,
+ /// `true` iff a [`Record::Committing`] had been fsynced for this txn.
+ pub decided: bool,
+}
+
+/// A successfully decoded record and the number of bytes it consumed.
+#[derive(Debug)]
+pub struct Decoded {
+ pub record: Record,
+ pub consumed: usize,
+}
+
+const HEADER_BYTES: usize = 4 + 4;
+
+impl Record {
+ /// The on-disk tag for this record.
+ pub fn tag(&self) -> Tag {
+ match self {
+ Record::Begin { .. } => Tag::Begin,
+ Record::Committing { .. } => Tag::Committing,
+ Record::End { .. } => Tag::End,
+ Record::Checkpoint { .. } => Tag::Checkpoint,
+ }
+ }
+
+ /// Append a framed copy of this record to `out`.
+ pub fn encode(&self, out: &mut B) -> Result<(), Error> {
+ let payload = match self {
+ Record::Begin(p) => rmp_serde::to_vec_named(p)?,
+ Record::Committing(p) => rmp_serde::to_vec_named(p)?,
+ Record::End(p) => rmp_serde::to_vec_named(p)?,
+ Record::Checkpoint(p) => rmp_serde::to_vec_named(p)?,
+ };
+
+ let tag = self.tag() as u8;
+ let body_len = u32::try_from(1 + payload.len())
+ .map_err(|_| Error::RecordTooLarge(1 + payload.len()))?;
+
+ let mut crc = crc32c::crc32c(&[tag]);
+ crc = crc32c::crc32c_append(crc, &payload);
+
+ out.put_u32_le(body_len);
+ out.put_u32_le(crc);
+ out.put_u8(tag);
+ out.put_slice(&payload);
+ Ok(())
+ }
+
+ /// Try to decode the next record from `buf`.
+ ///
+ /// - `Ok(Some(d))`: a complete record was decoded; advance the cursor
+ /// by `d.consumed`.
+ /// - `Ok(None)`: `buf` doesn't contain a complete record yet. Read
+ /// more bytes and call again. This is normal flow at end-of-segment.
+ /// - `Err(_)`: the record is corrupt (bad framing, CRC mismatch,
+ /// unknown tag, or undecodable payload). The WAL stream effectively
+ /// ends here.
+ pub fn decode(buf: &[u8]) -> Result