diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 929332ee8..16681a9ed 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -76,6 +76,14 @@ jobs: - name: Compute cache key id: cache-key run: echo "key=pgdog-bin-${{ runner.os }}-$(bash integration/ci/cache-key.sh)" >> "$GITHUB_OUTPUT" + # rust-cache must run before the binary restore: it lays down a + # stale target/ that can otherwise wipe target/release/pgdog when + # cargo reconciles fingerprints during plugin builds. + - name: Restore Rust cache for plugin builds + if: matrix.needs_rust_cache + uses: useblacksmith/rust-cache@v3 + with: + prefix-key: build-v1 - name: Restore pgdog binaries uses: useblacksmith/cache/restore@v5 with: @@ -84,11 +92,6 @@ jobs: target/release/pgdog key: ${{ steps.cache-key.outputs.key }} fail-on-cache-miss: true - - name: Restore Rust cache for plugin builds - if: matrix.needs_rust_cache - uses: useblacksmith/rust-cache@v3 - with: - prefix-key: build-v1 - name: Setup dependencies run: bash integration/ci/setup.sh --with-toxi - name: Run ${{ matrix.name }} diff --git a/.schema/pgdog.schema.json b/.schema/pgdog.schema.json index 24c9d2acc..bdecdeafd 100644 --- a/.schema/pgdog.schema.json +++ b/.schema/pgdog.schema.json @@ -108,6 +108,10 @@ "tls_verify": "prefer", "two_phase_commit": false, "two_phase_commit_auto": null, + "two_phase_commit_wal_checkpoint_interval": 60, + "two_phase_commit_wal_dir": null, + "two_phase_commit_wal_fsync_interval": 2, + "two_phase_commit_wal_segment_size": 16777216, "unique_id_function": "standard", "unique_id_min": 0, "workers": 2 @@ -1074,6 +1078,35 @@ ], "default": null }, + "two_phase_commit_wal_checkpoint_interval": { + "description": "How often, in seconds, to write a checkpoint record to the two-phase commit WAL and garbage-collect old segments.\n\n_Default:_ `60`\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/general/#two_phase_commit_wal_checkpoint_interval", + "type": "integer", + "format": "uint64", + "default": 60, + "minimum": 0 + }, + "two_phase_commit_wal_dir": { + "description": "Directory where the two-phase commit write-ahead log is stored.\n\n**Note:** This setting cannot be changed at runtime. PgDog acquires an exclusive `flock` on `/.lock` at startup. If the directory cannot be created or written to, or another PgDog process already holds the lock, the WAL is disabled and a warning is logged: 2PC will continue to function but will not be durable across restarts.\n\n_Default:_ `./pgdog_wal`\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/general/#two_phase_commit_wal_dir", + "type": [ + "string", + "null" + ], + "default": null + }, + "two_phase_commit_wal_fsync_interval": { + "description": "How long, in milliseconds, the two-phase commit WAL writer waits to coalesce concurrent appends into a single fsync.\n\n**Note:** Setting this to `0` disables waiting for additional appends; records already queued in the channel when the writer wakes are still batched into one fsync. Higher values trade per-transaction commit latency for fewer fsyncs under load.\n\n_Default:_ `2`\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/general/#two_phase_commit_wal_fsync_interval", + "type": "integer", + "format": "uint64", + "default": 2, + "minimum": 0 + }, + "two_phase_commit_wal_segment_size": { + "description": "Maximum size, in bytes, of a single two-phase commit WAL segment file before it is rotated.\n\n_Default:_ `16777216` (16 MiB)\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/general/#two_phase_commit_wal_segment_size", + "type": "integer", + "format": "uint64", + "default": 16777216, + "minimum": 0 + }, "unique_id_function": { "description": "Unique ID generation function.", "$ref": "#/$defs/UniqueIdFunction", diff --git a/Cargo.lock b/Cargo.lock index 70e762f71..82797c972 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1148,6 +1148,15 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +[[package]] +name = "crc32c" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a47af21622d091a8f0fb295b88bc886ac74efcc613efc19f5d0b21de5c89e47" +dependencies = [ + "rustc_version", +] + [[package]] name = "crc32fast" version = "1.5.0" @@ -3199,6 +3208,7 @@ dependencies = [ "cc", "chrono", "clap", + "crc32c", "csv-core", "dashmap", "derive_builder", @@ -5553,6 +5563,15 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "two-pc-crash-safety-wal-helper" +version = "0.0.0" +dependencies = [ + "bytes", + "pgdog", + "tokio", +] + [[package]] name = "typenum" version = "1.19.0" diff --git a/Cargo.toml b/Cargo.toml index f8a6be62f..504bb5bde 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ exclude = ["fuzz"] members = [ "examples/demo", "integration/rust", + "integration/two_pc_crash_safety/wal_helper", "pgdog", "pgdog-config", "pgdog-macros", diff --git a/integration/two_pc_crash_safety/Gemfile b/integration/two_pc_crash_safety/Gemfile new file mode 100644 index 000000000..1fce5e749 --- /dev/null +++ b/integration/two_pc_crash_safety/Gemfile @@ -0,0 +1,6 @@ +# frozen_string_literal: true + +source 'https://rubygems.org' +gem 'pg' +gem 'rspec', '~> 3.4' +gem 'toxiproxy' diff --git a/integration/two_pc_crash_safety/Gemfile.lock b/integration/two_pc_crash_safety/Gemfile.lock new file mode 100644 index 000000000..c122eee29 --- /dev/null +++ b/integration/two_pc_crash_safety/Gemfile.lock @@ -0,0 +1,30 @@ +GEM + remote: https://rubygems.org/ + specs: + diff-lcs (1.6.2) + pg (1.5.9) + rspec (3.13.2) + rspec-core (~> 3.13.0) + rspec-expectations (~> 3.13.0) + rspec-mocks (~> 3.13.0) + rspec-core (3.13.6) + rspec-support (~> 3.13.0) + rspec-expectations (3.13.5) + diff-lcs (>= 1.2.0, < 2.0) + rspec-support (~> 3.13.0) + rspec-mocks (3.13.8) + diff-lcs (>= 1.2.0, < 2.0) + rspec-support (~> 3.13.0) + rspec-support (3.13.7) + toxiproxy (2.0.2) + +PLATFORMS + ruby + +DEPENDENCIES + pg + rspec (~> 3.4) + toxiproxy + +BUNDLED WITH + 1.17.2 diff --git a/integration/two_pc_crash_safety/crash_recovery_spec.rb b/integration/two_pc_crash_safety/crash_recovery_spec.rb new file mode 100644 index 000000000..7a7255c0e --- /dev/null +++ b/integration/two_pc_crash_safety/crash_recovery_spec.rb @@ -0,0 +1,172 @@ +# frozen_string_literal: true + +require_relative 'rspec_helper' + +CONFIG_DIR = __dir__ + +describe '2pc crash safety' do + let(:wal_dir) { @wal_dir } + + before(:each) do + cleanup_prepared_xacts! + @pid, @wal_dir = spawn_pgdog(CONFIG_DIR) + wait_for_pgdog + end + + after(:each) do + stop_pgdog(@pid) + FileUtils.rm_rf(@wal_dir) if @wal_dir + end + + it 'rolls back orphan prepared xacts after a kill mid-PREPARE' do + client = pgdog_conn + client.exec('BEGIN') + client.exec("INSERT INTO crash_safety_test (id, value) VALUES (0, 'a')") + client.exec("INSERT INTO crash_safety_test (id, value) VALUES (1, 'b')") + + # Hold shard_1's responses so PREPARE TRANSACTION on shard_1 hangs + # waiting for the reply. Latency keeps the connection open so pgdog + # stays mid-2PC. + Toxiproxy[:crash_safety_shard_1].toxic(:latency, latency: 30_000).apply do + Thread.new do + begin + client.exec('COMMIT') + rescue PG::Error + # connection drops when we kill pgdog. + end + end + # Wait for pgdog to issue PREPARE on shard_0 and start blocking + # on shard_1's response before we kill it. + sleep 1.0 + Process.kill('KILL', @pid) + Process.waitpid(@pid) + @pid = nil + end + + # At least one shard must have an orphan prepared xact left behind, + # otherwise recovery has nothing to clean up. + leftover = SHARDS.flat_map { |s| prepared_xacts(s) } + expect(leftover).not_to be_empty, + 'no prepared xacts after kill; the kill landed before any PREPARE made it through' + + # Restart pgdog with the same WAL directory. Recovery sees the + # Begin record and drives ROLLBACK PREPARED on every participant. + @pid, _ = spawn_pgdog(CONFIG_DIR, wal_dir: @wal_dir) + wait_for_pgdog + + expect(metric('two_pc_recovered_total')).to be > 0, + 'restarted pgdog reports zero recovered txns; recovery did not run' + + expect(wait_for_no_prepared_xacts).to be(true), + lambda { + leftover = SHARDS.flat_map { |s| prepared_xacts(s) } + "prepared xacts did not drain: #{leftover.inspect}" + } + + SHARDS.each do |shard| + c = shard_conn(shard) + count = c.exec('SELECT COUNT(*) FROM crash_safety_test').to_a[0]['count'].to_i + c.close + expect(count).to eq(0), "expected no rows on #{shard[:db]}, found #{count}" + end + end + + it 'commits orphan prepared xacts after a kill mid-COMMIT-PREPARED' do + # Synthesize the on-disk state pgdog would leave behind if it + # crashed mid-Phase2, then start pgdog and verify it commits the + # orphans. Catching pgdog reliably between "Committing record + # durable" and "all COMMIT PREPAREDs done" from outside isn't + # practical: toxiproxy is byte-level so any delay on shard_1 also + # blocks PREPARE, and Postgres has no hook in COMMIT PREPARED. + stop_pgdog(@pid) + @pid = nil + FileUtils.rm_rf(@wal_dir) + Dir.mkdir(@wal_dir) + + gid = "__pgdog_2pc_#{rand(1..(1 << 62))}" + synthesize_phase2_wal(@wal_dir, gid, 'pgdog', 'pgdog') + + # PgDog suffixes the gid with `_` per shard so the + # names don't collide on a single cluster (pgdog/src/backend/pool/ + # connection/binding.rs). Mirror that scheme here. + SHARDS.each_with_index do |shard, idx| + c = shard_conn(shard) + c.exec('BEGIN') + c.exec("INSERT INTO crash_safety_test (id, value) VALUES (#{idx}, 'r')") + c.exec("PREPARE TRANSACTION '#{gid}_#{idx}'") + c.close + end + + @pid, _ = spawn_pgdog(CONFIG_DIR, wal_dir: @wal_dir) + wait_for_pgdog + + expect(metric('two_pc_recovered_total')).to be > 0, + 'restarted pgdog reports zero recovered txns; recovery did not run' + + expect(wait_for_no_prepared_xacts).to be(true), + lambda { + leftover = SHARDS.flat_map { |s| prepared_xacts(s) } + "prepared xacts did not drain: #{leftover.inspect}" + } + + SHARDS.each do |shard| + c = shard_conn(shard) + count = c.exec('SELECT COUNT(*) FROM crash_safety_test').to_a[0]['count'].to_i + c.close + expect(count).to eq(1), "expected 1 row on #{shard[:db]}, found #{count}" + end + end + + it 'no recovery work after a clean shutdown' do + client = pgdog_conn + client.exec('BEGIN') + client.exec("INSERT INTO crash_safety_test (id, value) VALUES (0, 'a')") + client.exec("INSERT INTO crash_safety_test (id, value) VALUES (1, 'b')") + client.exec('COMMIT') + client.close + + # SIGTERM lets Manager::shutdown drain and wal.append_end every + # finished txn. The WAL on disk should describe a complete cycle + # (Begin, Committing, End) for every txn so recovery has nothing + # to do on the next start. + stop_pgdog(@pid, signal: 'TERM') + @pid = nil + + @pid, _ = spawn_pgdog(CONFIG_DIR, wal_dir: @wal_dir) + wait_for_pgdog + + expect(metric('two_pc_recovered_total')).to eq(0), + 'clean-shutdown WAL should have left no in-flight txns to recover' + end + + it 'second pgdog refuses to share the WAL directory' do + # First pgdog (started in before) holds the flock on @wal_dir/.lock. + # Spawning a second one against the same dir should fail to acquire + # the lock; enable_wal warns and continues without WAL durability, + # so we verify the failure via the log line. + log_path = File.join(CONFIG_DIR, 'pgdog-second.log') + second_pid = Process.spawn( + { + 'PGDOG_TWO_PHASE_COMMIT_WAL_DIR' => @wal_dir, + 'PGDOG_PORT' => '6433', + }, + ENV['PGDOG_BIN'] || File.expand_path('../../target/release/pgdog', __dir__), + '--config', File.join(CONFIG_DIR, 'pgdog.toml'), + '--users', File.join(CONFIG_DIR, 'users.toml'), + out: log_path, err: %i[child out] + ) + + begin + # Give the second pgdog enough time to attempt enable_wal and log + # the failure. + sleep 2 + stop_pgdog(second_pid) + + log = File.read(log_path) + expect(log).to include('locked by another process'), + "expected DirLocked error in second pgdog log; got:\n#{log}" + ensure + FileUtils.rm_f(log_path) + end + end +end diff --git a/integration/two_pc_crash_safety/pgdog.toml b/integration/two_pc_crash_safety/pgdog.toml new file mode 100644 index 000000000..7c032d57b --- /dev/null +++ b/integration/two_pc_crash_safety/pgdog.toml @@ -0,0 +1,40 @@ +[general] +two_phase_commit = true +openmetrics_port = 9090 + +[[databases]] +name = "pgdog" +host = "127.0.0.1" +shard = 0 +database_name = "shard_0" + +# shard_1 is reached via the toxiproxy listener so the spec can hold +# requests in flight (timeout toxic) and SIGKILL pgdog mid-2PC. +[[databases]] +name = "pgdog" +host = "127.0.0.1" +port = 5435 +shard = 1 +database_name = "shard_1" + +[[sharded_tables]] +database = "pgdog" +column = "id" +data_type = "bigint" + +[[sharded_mappings]] +database = "pgdog" +column = "id" +kind = "list" +values = [0] +shard = 0 + +[[sharded_mappings]] +database = "pgdog" +column = "id" +kind = "list" +values = [1] +shard = 1 + +[admin] +password = "pgdog" diff --git a/integration/two_pc_crash_safety/rspec_helper.rb b/integration/two_pc_crash_safety/rspec_helper.rb new file mode 100644 index 000000000..3801eb026 --- /dev/null +++ b/integration/two_pc_crash_safety/rspec_helper.rb @@ -0,0 +1,142 @@ +# frozen_string_literal: true + +require 'fileutils' +require 'net/http' +require 'pg' +require 'tmpdir' +require 'toxiproxy' + +OPENMETRICS_PORT = 9090 + +PGDOG_HOST = '127.0.0.1' +PGDOG_PORT = 6432 +SHARDS = [ + { db: 'shard_0', port: 5432 }, + { db: 'shard_1', port: 5432 } # we always look at the *real* port for state checks +].freeze + +def pgdog_conn(database: 'pgdog') + PG.connect(host: PGDOG_HOST, port: PGDOG_PORT, user: 'pgdog', + password: 'pgdog', dbname: database) +end + +def shard_conn(shard) + PG.connect(host: '127.0.0.1', port: shard[:port], user: 'pgdog', + password: 'pgdog', dbname: shard[:db]) +end + +def prepared_xacts(shard) + c = shard_conn(shard) + rows = c.exec('SELECT gid FROM pg_prepared_xacts').to_a.map { |r| r['gid'] } + c.close + rows +end + +def cleanup_prepared_xacts! + SHARDS.each do |shard| + c = shard_conn(shard) + c.exec('SELECT gid FROM pg_prepared_xacts').to_a.each do |row| + gid = c.escape_string(row['gid']) + c.exec("ROLLBACK PREPARED '#{gid}'") + end + c.exec('TRUNCATE crash_safety_test') + c.close + end +end + +# Spawns pgdog as a child process with our test config. If `wal_dir` +# is given, the spawned pgdog uses it (so a restart resumes the same +# durable log); otherwise a fresh tmpdir is created. Returns +# [pid, wal_dir]. +def spawn_pgdog(config_dir, wal_dir: nil) + wal_dir ||= Dir.mktmpdir('pgdog_wal_') + binary = ENV['PGDOG_BIN'] || File.expand_path('../../target/release/pgdog', __dir__) + unless File.exist?(binary) + system('cargo', 'build', '--release', chdir: File.expand_path('../..', __dir__)) || + raise('cargo build failed') + end + log_path = File.join(config_dir, 'pgdog.log') + pid = Process.spawn( + { 'PGDOG_TWO_PHASE_COMMIT_WAL_DIR' => wal_dir }, + binary, + '--config', File.join(config_dir, 'pgdog.toml'), + '--users', File.join(config_dir, 'users.toml'), + out: log_path, err: [:child, :out] + ) + [pid, wal_dir] +end + +def wait_for_pgdog(timeout: 10) + deadline = Time.now + timeout + loop do + begin + c = pgdog_conn + c.exec('SELECT 1') + c.close + return + rescue PG::Error + raise "pgdog did not become ready within #{timeout}s" if Time.now > deadline + + sleep 0.1 + end + end +end + +def stop_pgdog(pid, signal: 'TERM', timeout: 10) + return unless pid + + begin + Process.kill(signal, pid) + rescue Errno::ESRCH + return + end + deadline = Time.now + timeout + loop do + _, status = Process.waitpid2(pid, Process::WNOHANG) + return if status + + if Time.now > deadline + Process.kill('KILL', pid) rescue nil + Process.waitpid(pid) rescue nil + return + end + sleep 0.05 + end +end + +# Wait for both shards' pg_prepared_xacts to drain. Returns true if it +# happened within the deadline, false otherwise. +def wait_for_no_prepared_xacts(timeout: 30) + deadline = Time.now + timeout + loop do + return true if SHARDS.all? { |s| prepared_xacts(s).empty? } + return false if Time.now > deadline + + sleep 0.2 + end +end + +Toxiproxy.host = 'http://127.0.0.1:8474' + +# Fetch /metrics from pgdog and return the integer value of the named +# counter, or nil if it isn't present. +def metric(name) + body = Net::HTTP.get(URI("http://127.0.0.1:#{OPENMETRICS_PORT}/metrics")) + prefix = "#{name} " + line = body.lines.find { |l| l.start_with?(prefix) } + line&.split&.last&.to_i +end + +# Synthesize a WAL containing a Begin + Committing pair for `gid` so +# pgdog's recovery sees an in-flight Phase2 txn and drives COMMIT +# PREPARED on its participants. +def synthesize_phase2_wal(wal_dir, gid, user, database) + workspace = File.expand_path('../..', __dir__) + ok = system( + 'cargo', 'run', '--quiet', '--release', + '-p', 'two-pc-crash-safety-wal-helper', '--', + wal_dir, gid, user, database, + chdir: workspace + ) + raise 'wal_helper failed' unless ok +end diff --git a/integration/two_pc_crash_safety/run.sh b/integration/two_pc_crash_safety/run.sh new file mode 100755 index 000000000..5e3641e4a --- /dev/null +++ b/integration/two_pc_crash_safety/run.sh @@ -0,0 +1,10 @@ +#!/bin/bash +set -e +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) + +bash "${SCRIPT_DIR}/setup.sh" + +pushd "${SCRIPT_DIR}" >/dev/null +bundle install +bundle exec rspec crash_recovery_spec.rb -fd +popd >/dev/null diff --git a/integration/two_pc_crash_safety/setup.sh b/integration/two_pc_crash_safety/setup.sh new file mode 100755 index 000000000..2254d49ee --- /dev/null +++ b/integration/two_pc_crash_safety/setup.sh @@ -0,0 +1,22 @@ +#!/bin/bash +set -e +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) +CLI="${SCRIPT_DIR}/../toxiproxy-cli" + +# Create the per-shard table so cross-shard transactions have something +# to write. Routed by id: 0 -> shard_0, 1 -> shard_1 (matches the +# sharded_mappings in pgdog.toml). +export PGPASSWORD=pgdog +psql -h 127.0.0.1 -p 5432 -U pgdog -d shard_0 \ + -c 'CREATE TABLE IF NOT EXISTS crash_safety_test (id BIGINT PRIMARY KEY, value TEXT)' +psql -h 127.0.0.1 -p 5432 -U pgdog -d shard_1 \ + -c 'CREATE TABLE IF NOT EXISTS crash_safety_test (id BIGINT PRIMARY KEY, value TEXT)' + +# Toxiproxy: shard_1's traffic flows through :5435 so the spec can +# inject a timeout toxic and catch pgdog mid-2PC deterministically. +killall toxiproxy-server > /dev/null 2>&1 || true +"${SCRIPT_DIR}/../toxiproxy-server" > /dev/null & +sleep 1 + +"${CLI}" delete crash_safety_shard_1 > /dev/null 2>&1 || true +"${CLI}" create --listen :5435 --upstream 127.0.0.1:5432 crash_safety_shard_1 diff --git a/integration/two_pc_crash_safety/users.toml b/integration/two_pc_crash_safety/users.toml new file mode 100644 index 000000000..581cdb75b --- /dev/null +++ b/integration/two_pc_crash_safety/users.toml @@ -0,0 +1,4 @@ +[[users]] +name = "pgdog" +database = "pgdog" +password = "pgdog" diff --git a/integration/two_pc_crash_safety/wal_helper/Cargo.toml b/integration/two_pc_crash_safety/wal_helper/Cargo.toml new file mode 100644 index 000000000..57c73732a --- /dev/null +++ b/integration/two_pc_crash_safety/wal_helper/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "two-pc-crash-safety-wal-helper" +version = "0.0.0" +edition = "2024" +publish = false + +[[bin]] +name = "wal_helper" +path = "src/main.rs" + +[dependencies] +bytes = "1" +pgdog = { path = "../../../pgdog" } +tokio = { version = "1", features = ["macros", "rt-multi-thread"] } diff --git a/integration/two_pc_crash_safety/wal_helper/src/main.rs b/integration/two_pc_crash_safety/wal_helper/src/main.rs new file mode 100644 index 000000000..2481fe219 --- /dev/null +++ b/integration/two_pc_crash_safety/wal_helper/src/main.rs @@ -0,0 +1,48 @@ +//! Synthesize a 2PC WAL containing a Begin + Committing pair so the +//! crash-safety integration spec can simulate the Phase2 recovery +//! scenario without having to crash pgdog at exactly the right +//! microsecond. +//! +//! Usage: `wal_helper ` + +use std::path::PathBuf; +use std::str::FromStr; + +use bytes::BytesMut; + +use pgdog::frontend::client::query_engine::two_pc::TwoPcTransaction; +use pgdog::frontend::client::query_engine::two_pc::wal::{ + BeginPayload, Record, Segment, TxnPayload, +}; + +#[tokio::main] +async fn main() { + let args: Vec = std::env::args().collect(); + if args.len() != 5 { + eprintln!("usage: {} ", args[0]); + std::process::exit(2); + } + let dir = PathBuf::from(&args[1]); + let txn = TwoPcTransaction::from_str(&args[2]) + .unwrap_or_else(|_| panic!("invalid gid {:?}", args[2])); + let user = args[3].clone(); + let database = args[4].clone(); + + std::fs::create_dir_all(&dir).expect("create wal dir"); + + let mut segment = Segment::create(&dir, 0).await.expect("create segment"); + + let mut buf = BytesMut::new(); + Record::Begin(BeginPayload { + txn, + user, + database, + }) + .encode(&mut buf) + .expect("encode begin"); + Record::Committing(TxnPayload { txn }) + .encode(&mut buf) + .expect("encode committing"); + + segment.commit(&buf, 2).await.expect("commit batch"); +} diff --git a/pgdog-config/src/general.rs b/pgdog-config/src/general.rs index bcfb8c1cd..8b2236e48 100644 --- a/pgdog-config/src/general.rs +++ b/pgdog-config/src/general.rs @@ -512,6 +512,42 @@ pub struct General { #[serde(default)] pub two_phase_commit_auto: Option, + /// Directory where the two-phase commit write-ahead log is stored. + /// + /// **Note:** This setting cannot be changed at runtime. PgDog acquires an exclusive `flock` on `/.lock` at startup. If the directory cannot be created or written to, or another PgDog process already holds the lock, the WAL is disabled and a warning is logged: 2PC will continue to function but will not be durable across restarts. + /// + /// _Default:_ `./pgdog_wal` + /// + /// https://docs.pgdog.dev/configuration/pgdog.toml/general/#two_phase_commit_wal_dir + #[serde(default = "General::two_phase_commit_wal_dir")] + pub two_phase_commit_wal_dir: Option, + + /// Maximum size, in bytes, of a single two-phase commit WAL segment file before it is rotated. + /// + /// _Default:_ `16777216` (16 MiB) + /// + /// https://docs.pgdog.dev/configuration/pgdog.toml/general/#two_phase_commit_wal_segment_size + #[serde(default = "General::two_phase_commit_wal_segment_size")] + pub two_phase_commit_wal_segment_size: u64, + + /// How long, in milliseconds, the two-phase commit WAL writer waits to coalesce concurrent appends into a single fsync. + /// + /// **Note:** Setting this to `0` disables waiting for additional appends; records already queued in the channel when the writer wakes are still batched into one fsync. Higher values trade per-transaction commit latency for fewer fsyncs under load. + /// + /// _Default:_ `2` + /// + /// https://docs.pgdog.dev/configuration/pgdog.toml/general/#two_phase_commit_wal_fsync_interval + #[serde(default = "General::two_phase_commit_wal_fsync_interval")] + pub two_phase_commit_wal_fsync_interval: u64, + + /// How often, in seconds, to write a checkpoint record to the two-phase commit WAL and garbage-collect old segments. + /// + /// _Default:_ `60` + /// + /// https://docs.pgdog.dev/configuration/pgdog.toml/general/#two_phase_commit_wal_checkpoint_interval + #[serde(default = "General::two_phase_commit_wal_checkpoint_interval")] + pub two_phase_commit_wal_checkpoint_interval: u64, + /// Enable expanded (`\x`) output for `EXPLAIN` results returned by PgDog's built-in query plan aggregation. #[serde(default = "General::expanded_explain")] pub expanded_explain: bool, @@ -744,6 +780,11 @@ impl Default for General { log_dedup_threshold: 0, two_phase_commit: bool::default(), two_phase_commit_auto: None, + two_phase_commit_wal_dir: Self::two_phase_commit_wal_dir(), + two_phase_commit_wal_segment_size: Self::two_phase_commit_wal_segment_size(), + two_phase_commit_wal_fsync_interval: Self::two_phase_commit_wal_fsync_interval(), + two_phase_commit_wal_checkpoint_interval: + Self::two_phase_commit_wal_checkpoint_interval(), expanded_explain: Self::expanded_explain(), server_lifetime: Self::server_lifetime(), stats_period: Self::stats_period(), @@ -902,6 +943,22 @@ impl General { Self::env_or_default("PGDOG_ROLLBACK_TIMEOUT", 5_000) } + fn two_phase_commit_wal_dir() -> Option { + Self::env_option_string("PGDOG_TWO_PHASE_COMMIT_WAL_DIR").map(PathBuf::from) + } + + fn two_phase_commit_wal_segment_size() -> u64 { + Self::env_or_default("PGDOG_TWO_PHASE_COMMIT_WAL_SEGMENT_SIZE", 16 * 1024 * 1024) + } + + fn two_phase_commit_wal_fsync_interval() -> u64 { + Self::env_or_default("PGDOG_TWO_PHASE_COMMIT_WAL_FSYNC_INTERVAL", 2) + } + + fn two_phase_commit_wal_checkpoint_interval() -> u64 { + Self::env_or_default("PGDOG_TWO_PHASE_COMMIT_WAL_CHECKPOINT_INTERVAL", 60) + } + fn idle_timeout() -> u64 { Self::env_or_default( "PGDOG_IDLE_TIMEOUT", diff --git a/pgdog/Cargo.toml b/pgdog/Cargo.toml index 9758fbd0a..d9aaf70f2 100644 --- a/pgdog/Cargo.toml +++ b/pgdog/Cargo.toml @@ -74,6 +74,7 @@ pgdog-stats = { path = "../pgdog-stats" } pgdog-postgres-types = { path = "../pgdog-postgres-types"} azure_identity = "0.34.0" azure_core = "0.34.0" +crc32c = "0.6.8" [target.'cfg(not(target_env = "msvc"))'.dependencies] tikv-jemallocator = "0.6" diff --git a/pgdog/src/frontend/client/query_engine/two_pc/manager.rs b/pgdog/src/frontend/client/query_engine/two_pc/manager.rs index 5d614fe91..2c1ff0611 100644 --- a/pgdog/src/frontend/client/query_engine/two_pc/manager.rs +++ b/pgdog/src/frontend/client/query_engine/two_pc/manager.rs @@ -1,9 +1,11 @@ //! Global two-phase commit transaction manager. +use arc_swap::ArcSwapOption; use fnv::FnvHashMap as HashMap; use once_cell::sync::Lazy; use parking_lot::Mutex; use std::{ collections::VecDeque, + path::PathBuf, sync::{ atomic::{AtomicBool, Ordering}, Arc, @@ -11,16 +13,17 @@ use std::{ time::Duration, }; use tokio::{select, spawn, sync::Notify, time::interval}; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, warn}; use crate::{ backend::{ databases::User, pool::{Connection, Request}, }, + config::config, frontend::{ client::query_engine::{ - two_pc::{TwoPcGuard, TwoPcTransaction}, + two_pc::{wal::Wal, TwoPcGuard, TwoPcStats, TwoPcTransaction}, TwoPcPhase, }, router::{ @@ -40,6 +43,11 @@ static MAINTENANCE: Duration = Duration::from_millis(333); pub struct Manager { inner: Arc>, notify: Arc, + /// Durable log handle. `None` until [`Self::enable_wal`] succeeds; + /// if WAL initialization fails or `enable_wal` is never called, the + /// manager continues to coordinate 2PC in memory only. + wal: Arc>, + stats: Arc, } impl Manager { @@ -56,6 +64,8 @@ impl Manager { offline: AtomicBool::new(false), done: Notify::new(), }), + wal: Arc::new(ArcSwapOption::const_empty()), + stats: Arc::new(TwoPcStats::default()), }; let monitor = manager.clone(); @@ -66,6 +76,62 @@ impl Manager { manager } + /// Open the WAL at the configured directory, replay any in-flight + /// transactions back into this manager, and start the writer + + /// checkpoint tasks. If WAL initialization fails (lock contention, + /// disk error, corrupt segment that can't be quarantined), the + /// manager keeps running without durability and a warning is logged + /// so operators can investigate. + pub async fn enable_wal(&self, wal_dir: &PathBuf) { + match Wal::open(self, wal_dir).await { + Ok(wal) => { + self.wal.store(Some(Arc::new(wal))); + info!("[2pc] wal enabled"); + spawn(Self::checkpoint_loop()); + } + Err(err) => { + warn!( + "[2pc] wal disabled: {}; 2pc will run without durability", + err + ); + } + } + } + + /// Periodically ask the WAL writer to emit a checkpoint record so + /// older segments can be GC'd. A zero interval disables the loop. + async fn checkpoint_loop() { + let interval_secs = config() + .config + .general + .two_phase_commit_wal_checkpoint_interval; + if interval_secs == 0 { + return; + } + let manager = Self::get(); + let mut tick = tokio::time::interval(Duration::from_secs(interval_secs)); + tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + // First tick fires immediately; skip it so we don't checkpoint + // an empty WAL right at startup. + tick.tick().await; + + loop { + tokio::select! { + _ = tick.tick() => {} + _ = manager.notify.done.notified() => return, + } + if manager.notify.offline.load(Ordering::Relaxed) { + return; + } + let Some(wal) = manager.wal.load_full() else { + continue; + }; + if let Err(err) = wal.checkpoint().await { + warn!("[2pc] checkpoint failed: {}", err); + } + } + } + #[cfg(test)] pub(super) fn transaction(&self, transaction: &TwoPcTransaction) -> Option { self.inner.lock().transactions.get(transaction).cloned() @@ -76,6 +142,11 @@ impl Manager { self.inner.lock().transactions.clone() } + /// Process-level 2PC counters. + pub fn stats(&self) -> Arc { + Arc::clone(&self.stats) + } + /// Two-pc transaction finished. pub(super) async fn done(&self, transaction: &TwoPcTransaction) -> Result<(), Error> { self.remove(transaction).await; @@ -83,21 +154,65 @@ impl Manager { Ok(()) } - /// Sync transaction state. + /// Record a phase transition for a 2PC transaction. Updates the + /// in-memory state first, then writes the corresponding WAL record + /// (Begin for Phase1, Committing for Phase2). The inner-first + /// ordering means checkpoint snapshots always see in-memory state + /// that's at least as fresh as any WAL record they might be + /// ordered against. + /// + /// If the WAL append fails the in-memory mutation is rolled back + /// and the operation is refused: returning Ok here would cause the + /// caller to issue PREPARE / COMMIT PREPARED to backends without a + /// durable record, which is exactly the orphan-prepared-xact case + /// the WAL exists to prevent. pub(super) async fn transaction_state( &self, transaction: &TwoPcTransaction, identifier: &Arc, phase: TwoPcPhase, ) -> Result { - { + let prior = { let mut guard = self.inner.lock(); + let prior = guard.transactions.get(transaction).cloned(); let entry = guard.transactions.entry(*transaction).or_default(); entry.identifier = identifier.clone(); entry.phase = phase; - } + prior + }; - // TODO: Sync to durable backend. + if let Some(wal) = self.wal.load_full() { + let result = match phase { + TwoPcPhase::Phase1 => { + wal.append_begin( + *transaction, + identifier.user.clone(), + identifier.database.clone(), + ) + .await + } + TwoPcPhase::Phase2 => wal.append_committing(*transaction).await, + TwoPcPhase::Rollback => { + unreachable!("rollback is not a state transition; it's the cleanup direction") + } + }; + if let Err(err) = result { + let mut guard = self.inner.lock(); + match prior { + Some(prior) => { + guard.transactions.insert(*transaction, prior); + } + None => { + guard.transactions.remove(transaction); + } + } + warn!( + "[2pc] wal append failed for {} ({}): {}; refusing 2pc operation", + transaction, phase, err + ); + return Err(Error::TwoPcWal(err)); + } + } Ok(TwoPcGuard { transaction: *transaction, @@ -105,6 +220,29 @@ impl Manager { }) } + /// Restore an in-flight 2PC transaction discovered during WAL + /// recovery. Inserts it into the transaction table and pushes it onto + /// the cleanup queue so the monitor task drives it to a terminal + /// state via [`Self::cleanup_phase`]. + pub(super) fn restore_transaction( + &self, + transaction: TwoPcTransaction, + user: String, + database: String, + phase: TwoPcPhase, + ) { + let identifier = Arc::new(User { user, database }); + { + let mut guard = self.inner.lock(); + guard + .transactions + .insert(transaction, TransactionInfo { phase, identifier }); + guard.queue.push_back(transaction); + } + self.stats.incr_recovered(); + self.notify.notify.notify_one(); + } + pub(super) fn return_guard(&self, guard: &TwoPcGuard) { let exists = self .inner @@ -162,7 +300,11 @@ impl Manager { async fn remove(&self, transaction: &TwoPcTransaction) { self.inner.lock().transactions.remove(transaction); - // TODO: sync to durable stage manager here. + if let Some(wal) = self.wal.load_full() { + if let Err(err) = wal.append_end(*transaction).await { + warn!("[2pc] wal end record failed for {}: {}", transaction, err); + } + } } /// Reconnect to cluster if available and rollback the two-phase transaction. @@ -206,6 +348,8 @@ impl Manager { } /// Shutdown manager and wait for all transactions to be cleaned up. + /// Once the monitor has drained the cleanup queue, the WAL is shut + /// down too so any final End records make it to disk before exit. pub async fn shutdown(&self) { let waiter = self.notify.done.notified(); self.notify.offline.store(true, Ordering::Relaxed); @@ -214,6 +358,10 @@ impl Manager { info!("cleaning up {} two-phase transactions", transactions); waiter.await; + + if let Some(wal) = self.wal.load_full() { + wal.shutdown().await; + } } } diff --git a/pgdog/src/frontend/client/query_engine/two_pc/mod.rs b/pgdog/src/frontend/client/query_engine/two_pc/mod.rs index 8e6e2cfda..b55ac017e 100644 --- a/pgdog/src/frontend/client/query_engine/two_pc/mod.rs +++ b/pgdog/src/frontend/client/query_engine/two_pc/mod.rs @@ -9,12 +9,15 @@ pub mod guard; pub mod manager; pub mod phase; pub mod server_transactions; +pub mod stats; pub mod transaction; +pub mod wal; pub use guard::TwoPcGuard; pub use manager::Manager; pub use phase::TwoPcPhase; pub(crate) use server_transactions::TwoPcTransactions; +pub use stats::TwoPcStats; pub use transaction::TwoPcTransaction; #[cfg(test)] diff --git a/pgdog/src/frontend/client/query_engine/two_pc/stats.rs b/pgdog/src/frontend/client/query_engine/two_pc/stats.rs new file mode 100644 index 000000000..f775276f0 --- /dev/null +++ b/pgdog/src/frontend/client/query_engine/two_pc/stats.rs @@ -0,0 +1,23 @@ +//! Process-level 2PC counters surfaced via the OpenMetrics endpoint. +//! +//! Counters are bumped from the manager during recovery and read by +//! `pgdog::stats::two_pc::TwoPc::load`. + +use std::sync::atomic::{AtomicU64, Ordering}; + +#[derive(Debug, Default)] +pub struct TwoPcStats { + /// Total number of in-flight 2PC transactions restored from the + /// WAL during recovery since this pgdog process started. + recovered_total: AtomicU64, +} + +impl TwoPcStats { + pub fn incr_recovered(&self) { + self.recovered_total.fetch_add(1, Ordering::Relaxed); + } + + pub fn recovered_total(&self) -> u64 { + self.recovered_total.load(Ordering::Relaxed) + } +} diff --git a/pgdog/src/frontend/client/query_engine/two_pc/transaction.rs b/pgdog/src/frontend/client/query_engine/two_pc/transaction.rs index f579764ed..c774d6fe3 100644 --- a/pgdog/src/frontend/client/query_engine/two_pc/transaction.rs +++ b/pgdog/src/frontend/client/query_engine/two_pc/transaction.rs @@ -1,7 +1,8 @@ use rand::{rng, Rng}; +use serde::{Deserialize, Serialize}; use std::{fmt::Display, str::FromStr}; -#[derive(Debug, Clone, Copy, PartialEq, Hash, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Hash, Eq, Serialize, Deserialize)] pub struct TwoPcTransaction(usize); static PREFIX: &str = "__pgdog_2pc_"; diff --git a/pgdog/src/frontend/client/query_engine/two_pc/wal/error.rs b/pgdog/src/frontend/client/query_engine/two_pc/wal/error.rs new file mode 100644 index 000000000..7d2fdfe83 --- /dev/null +++ b/pgdog/src/frontend/client/query_engine/two_pc/wal/error.rs @@ -0,0 +1,84 @@ +//! Two-phase commit WAL errors. +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum Error { + #[error("encode: {0}")] + Encode(#[from] rmp_serde::encode::Error), + + #[error("decode: {0}")] + Decode(#[from] rmp_serde::decode::Error), + + #[error("crc mismatch: expected {expected:#010x}, got {actual:#010x}")] + Crc { expected: u32, actual: u32 }, + + #[error("invalid record tag {0}")] + InvalidTag(u8), + + #[error("record body length is zero")] + EmptyRecord, + + #[error("record of {0} bytes exceeds u32 framing")] + RecordTooLarge(usize), + + #[error("io: {0}")] + Io(#[from] std::io::Error), + + #[error("segment header is missing or has wrong magic")] + BadSegmentHeader, + + #[error("segment filename is not a valid LSN: {0}")] + BadSegmentName(String), + + #[error("writer task is no longer running")] + WriterGone, + + #[error("wal directory {dir} is not accessible: {source}")] + DirNotAccessible { + dir: std::path::PathBuf, + #[source] + source: std::io::Error, + }, + + #[error("wal directory {dir} is not writable: {source}")] + DirNotWritable { + dir: std::path::PathBuf, + #[source] + source: std::io::Error, + }, + + #[error("torn tail: {unconsumed} unconsumed bytes at offset {offset}")] + TornTail { offset: u64, unconsumed: usize }, + + #[error("wal directory {dir} is locked by another process:\n{holder}")] + DirLocked { + dir: std::path::PathBuf, + holder: String, + }, + + /// A batch's write or fsync failed and the rewind that should have + /// restored on-disk consistency also failed. The segment's true + /// state on disk is unknown; the writer must rotate to a fresh + /// segment before accepting more writes. + #[error("segment broken: write or sync failed and rewind could not restore on-disk state")] + SegmentBroken, +} + +impl Error { + /// True for errors that signal on-disk data is malformed or partial: + /// CRC mismatch, unknown tag, decode failure, missing magic, bad + /// filename, torn tail. Recovery quarantines these and skips the + /// segment instead of failing the whole process. + pub fn is_corruption(&self) -> bool { + matches!( + self, + Error::BadSegmentName(_) + | Error::BadSegmentHeader + | Error::Crc { .. } + | Error::InvalidTag(_) + | Error::EmptyRecord + | Error::Decode(_) + | Error::TornTail { .. } + ) + } +} diff --git a/pgdog/src/frontend/client/query_engine/two_pc/wal/mod.rs b/pgdog/src/frontend/client/query_engine/two_pc/wal/mod.rs new file mode 100644 index 000000000..6ec8abca8 --- /dev/null +++ b/pgdog/src/frontend/client/query_engine/two_pc/wal/mod.rs @@ -0,0 +1,14 @@ +//! Two-phase commit write-ahead log. +//! +//! See [`record`] for the on-disk record format. + +mod error; +mod record; +mod recovery; +mod segment; +mod writer; + +pub use error::Error; +pub use record::{BeginPayload, Record, TxnPayload}; +pub use segment::Segment; +pub use writer::Wal; diff --git a/pgdog/src/frontend/client/query_engine/two_pc/wal/record.rs b/pgdog/src/frontend/client/query_engine/two_pc/wal/record.rs new file mode 100644 index 000000000..ead80e1c9 --- /dev/null +++ b/pgdog/src/frontend/client/query_engine/two_pc/wal/record.rs @@ -0,0 +1,329 @@ +//! Two-phase commit WAL record format. +//! +//! On-disk framing per record: +//! +//! ```text +//! +---------+---------+-----+----------------+ +//! | u32 LE | u32 LE | u8 | rmp-serde body | +//! | bodylen | crc32c | tag | payload | +//! +---------+---------+-----+----------------+ +//! ``` +//! +//! - `bodylen` covers `tag + payload` (everything after the crc). +//! - `crc32c` is computed over `tag + payload` and catches torn writes +//! and bit-rot. +//! - `tag` is a stable [`u8`] discriminant defined by [`Tag`]. New record +//! kinds receive new tag values; **tag values should never be reused**. +//! - `payload` is a [`rmp_serde`] encoding of a per-variant payload struct, +//! using field-name keys (`to_vec_named`) so fields can be added +//! additively under a single tag. +//! +//! There are four record kinds (see [`Record`]). The two write-ahead +//! invariants that the [`super::Wal`] writer must honour: +//! +//! 1. [`Record::Begin`] is fsynced before any `PREPARE TRANSACTION` reaches +//! a shard. Otherwise a crash can leave prepared xacts with no record +//! of them. +//! 2. [`Record::Committing`] is fsynced before any `COMMIT PREPARED` reaches +//! a shard. Otherwise a crash can leave a partial commit with no way to +//! know the coordinator had decided to commit. +//! +//! [`Record::End`] lets recovery forget a finished txn and is not safety +//! critical. [`Record::Checkpoint`] is a snapshot of all active txns so +//! older segments can be garbage-collected. +//! +//! # Format evolution rules +//! +//! These rules keep the on-disk format stable across versions: +//! +//! - **Tag values are immutable.** Once assigned, a [`Tag`] discriminant +//! should never be repurposed, even if the record kind is removed. +//! - **Fields may be added.** New fields on an existing variant's payload +//! struct must carry `#[serde(default)]` so older versions that wrote +//! records without the field can still be deserialized. +//! - **Fields may not be removed or renamed.** To change a variant's +//! shape, introduce a new tag with a new payload struct. The old tag +//! stays defined so historical records still decode during recovery. +//! - **Type changes** on existing fields (e.g. `Vec` to `Vec`) +//! also require a new tag. + +use bytes::{Buf, BufMut}; +use serde::{Deserialize, Serialize}; + +use super::error::Error; +use crate::frontend::client::query_engine::two_pc::TwoPcTransaction; + +/// On-disk record-kind discriminant. +/// +/// Values are stable; see the format-evolution rules in the module docs. +#[repr(u8)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Tag { + /// Coordinator is about to issue `PREPARE TRANSACTION` to `shards`. + Begin = 1, + /// Coordinator has decided to commit; must drive all participants to + /// `COMMIT PREPARED`. + Committing = 2, + /// Coordinator is done with this txn; recovery may forget it. + End = 3, + /// Snapshot of all active txns for log GC. + Checkpoint = 4, +} + +impl Tag { + fn from_u8(b: u8) -> Result { + match b { + 1 => Ok(Self::Begin), + 2 => Ok(Self::Committing), + 3 => Ok(Self::End), + 4 => Ok(Self::Checkpoint), + other => Err(other), + } + } +} + +/// A single WAL record. +#[derive(Debug, Clone, PartialEq)] +pub enum Record { + Begin(BeginPayload), + Committing(TxnPayload), + End(TxnPayload), + Checkpoint(CheckpointPayload), +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct BeginPayload { + pub txn: TwoPcTransaction, + pub user: String, + pub database: String, +} + +/// Payload for records that carry only a transaction id +/// ([`Record::Committing`] and [`Record::End`]). +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct TxnPayload { + pub txn: TwoPcTransaction, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct CheckpointPayload { + pub active: Vec, +} + +/// One row in a [`CheckpointPayload`]. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct CheckpointEntry { + pub txn: TwoPcTransaction, + pub user: String, + pub database: String, + /// `true` iff a [`Record::Committing`] had been fsynced for this txn. + pub decided: bool, +} + +/// A successfully decoded record and the number of bytes it consumed. +#[derive(Debug)] +pub struct Decoded { + pub record: Record, + pub consumed: usize, +} + +const HEADER_BYTES: usize = 4 + 4; + +impl Record { + /// The on-disk tag for this record. + pub fn tag(&self) -> Tag { + match self { + Record::Begin { .. } => Tag::Begin, + Record::Committing { .. } => Tag::Committing, + Record::End { .. } => Tag::End, + Record::Checkpoint { .. } => Tag::Checkpoint, + } + } + + /// Append a framed copy of this record to `out`. + pub fn encode(&self, out: &mut B) -> Result<(), Error> { + let payload = match self { + Record::Begin(p) => rmp_serde::to_vec_named(p)?, + Record::Committing(p) => rmp_serde::to_vec_named(p)?, + Record::End(p) => rmp_serde::to_vec_named(p)?, + Record::Checkpoint(p) => rmp_serde::to_vec_named(p)?, + }; + + let tag = self.tag() as u8; + let body_len = u32::try_from(1 + payload.len()) + .map_err(|_| Error::RecordTooLarge(1 + payload.len()))?; + + let mut crc = crc32c::crc32c(&[tag]); + crc = crc32c::crc32c_append(crc, &payload); + + out.put_u32_le(body_len); + out.put_u32_le(crc); + out.put_u8(tag); + out.put_slice(&payload); + Ok(()) + } + + /// Try to decode the next record from `buf`. + /// + /// - `Ok(Some(d))`: a complete record was decoded; advance the cursor + /// by `d.consumed`. + /// - `Ok(None)`: `buf` doesn't contain a complete record yet. Read + /// more bytes and call again. This is normal flow at end-of-segment. + /// - `Err(_)`: the record is corrupt (bad framing, CRC mismatch, + /// unknown tag, or undecodable payload). The WAL stream effectively + /// ends here. + pub fn decode(buf: &[u8]) -> Result, Error> { + if buf.len() < HEADER_BYTES { + return Ok(None); + } + let mut hdr = &buf[..HEADER_BYTES]; + let body_len = hdr.get_u32_le() as usize; + if body_len == 0 { + return Err(Error::EmptyRecord); + } + let total = HEADER_BYTES + body_len; + if buf.len() < total { + return Ok(None); + } + let expected_crc = hdr.get_u32_le(); + let body = &buf[HEADER_BYTES..total]; + let actual_crc = crc32c::crc32c(body); + if actual_crc != expected_crc { + return Err(Error::Crc { + expected: expected_crc, + actual: actual_crc, + }); + } + let tag = Tag::from_u8(body[0]).map_err(Error::InvalidTag)?; + let payload = &body[1..]; + let record = match tag { + Tag::Begin => Record::Begin(rmp_serde::from_slice(payload)?), + Tag::Committing => Record::Committing(rmp_serde::from_slice(payload)?), + Tag::End => Record::End(rmp_serde::from_slice(payload)?), + Tag::Checkpoint => Record::Checkpoint(rmp_serde::from_slice(payload)?), + }; + Ok(Some(Decoded { + record, + consumed: total, + })) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn round_trip(rec: Record) { + let mut buf = Vec::new(); + rec.encode(&mut buf).unwrap(); + let d = Record::decode(&buf).unwrap().unwrap(); + assert_eq!(d.record, rec); + assert_eq!(d.consumed, buf.len()); + } + + #[test] + fn round_trip_begin() { + round_trip(Record::Begin(BeginPayload { + txn: TwoPcTransaction::new(), + user: "alice".into(), + database: "shop".into(), + })); + } + + #[test] + fn round_trip_committing() { + round_trip(Record::Committing(TxnPayload { + txn: TwoPcTransaction::new(), + })); + } + + #[test] + fn round_trip_end() { + round_trip(Record::End(TxnPayload { + txn: TwoPcTransaction::new(), + })); + } + + #[test] + fn round_trip_checkpoint() { + round_trip(Record::Checkpoint(CheckpointPayload { + active: vec![ + CheckpointEntry { + txn: TwoPcTransaction::new(), + user: "u1".into(), + database: "d1".into(), + decided: false, + }, + CheckpointEntry { + txn: TwoPcTransaction::new(), + user: "u2".into(), + database: "d2".into(), + decided: true, + }, + ], + })); + } + + #[test] + fn need_more_when_truncated() { + let mut buf = Vec::new(); + Record::End(TxnPayload { + txn: TwoPcTransaction::new(), + }) + .encode(&mut buf) + .unwrap(); + for prefix_len in 0..buf.len() { + assert!(matches!(Record::decode(&buf[..prefix_len]), Ok(None))); + } + } + + #[test] + fn crc_mismatch_is_error() { + let mut buf = Vec::new(); + Record::End(TxnPayload { + txn: TwoPcTransaction::new(), + }) + .encode(&mut buf) + .unwrap(); + let last = buf.len() - 1; + buf[last] ^= 0xff; + assert!(matches!(Record::decode(&buf), Err(Error::Crc { .. }))); + } + + #[test] + fn unknown_tag_is_error() { + let mut buf = Vec::new(); + Record::End(TxnPayload { + txn: TwoPcTransaction::new(), + }) + .encode(&mut buf) + .unwrap(); + // Tag sits right after the 8-byte framing header. + buf[HEADER_BYTES] = 99; + // Re-stamp the CRC so we hit the InvalidTag branch, not Crc. + let crc = crc32c::crc32c(&buf[HEADER_BYTES..]); + buf[4..8].copy_from_slice(&crc.to_le_bytes()); + assert!(matches!(Record::decode(&buf), Err(Error::InvalidTag(99)))); + } + + #[test] + fn multiple_records_decode_sequentially() { + let mut buf = Vec::new(); + let a = Record::Begin(BeginPayload { + txn: TwoPcTransaction::new(), + user: "u".into(), + database: "d".into(), + }); + let b = Record::Committing(TxnPayload { + txn: TwoPcTransaction::new(), + }); + a.encode(&mut buf).unwrap(); + b.encode(&mut buf).unwrap(); + + let d1 = Record::decode(&buf).unwrap().unwrap(); + assert_eq!(d1.record, a); + let d2 = Record::decode(&buf[d1.consumed..]).unwrap().unwrap(); + assert_eq!(d2.record, b); + assert_eq!(d1.consumed + d2.consumed, buf.len()); + } +} diff --git a/pgdog/src/frontend/client/query_engine/two_pc/wal/recovery.rs b/pgdog/src/frontend/client/query_engine/two_pc/wal/recovery.rs new file mode 100644 index 000000000..f93b3df44 --- /dev/null +++ b/pgdog/src/frontend/client/query_engine/two_pc/wal/recovery.rs @@ -0,0 +1,321 @@ +//! Two-phase commit WAL recovery. +//! +//! Replays every record in every existing segment under the WAL directory +//! to rebuild the set of in-flight 2PC transactions, then hands each one +//! back to [`Manager`] via [`Manager::restore_transaction`] so the monitor +//! task drives them to a terminal state via the existing +//! `cleanup_phase` machinery. +//! +//! Returns a [`Recovered`] containing the writable [`Segment`] the writer +//! task should continue appending to and the active-2PC snapshot the +//! writer uses to serve checkpoints. If the directory had no segments, a +//! fresh one is created at LSN 0. + +use std::path::Path; + +use fnv::FnvHashMap as HashMap; + +use super::error::Error; +use super::record::{CheckpointEntry, Record}; +use super::segment::{list_segments, Segment, SegmentReader}; +use crate::frontend::client::query_engine::two_pc::{Manager, TwoPcPhase, TwoPcTransaction}; + +/// Working entry held only for the duration of [`recover_transactions`]. +/// Each surviving entry becomes a [`Manager::restore_transaction`] call. +struct Entry { + user: String, + database: String, + decided: bool, +} + +/// What [`recover_transactions`] hands back to the WAL setup path. +pub(super) struct Recovered { + /// The writable segment the writer task should continue appending to. + pub segment: Segment, + /// Initial active-2PC snapshot derived from replay so the writer + /// can serve checkpoints without rebuilding it. + pub snapshot: HashMap, +} + +/// Scan every segment in `dir` in LSN order, hand each in-flight +/// transaction to `manager`, and return a [`Recovered`] describing the +/// writable segment plus the initial active-2PC snapshot. +/// +/// Corrupt segments are renamed to `.wal.broken` and skipped. If +/// any corruption is detected the restore phase is skipped: partial +/// restore could silently invert a committed transaction by losing a +/// `Committing` record. The operator handles orphan prepared xacts via +/// `SHOW TRANSACTIONS` / `pg_prepared_xacts`. Genuine IO errors +/// propagate; the caller treats those as "WAL not usable." +pub(super) async fn recover_transactions( + manager: &Manager, + dir: &Path, +) -> Result { + let segments = list_segments(dir).await?; + let mut working: HashMap = HashMap::default(); + let mut corruption = false; + let mut next_lsn: u64 = 0; + + let Some((last_path, prior_paths)) = segments.split_last() else { + return Ok(Recovered { + segment: Segment::create(dir, 0).await?, + snapshot: HashMap::default(), + }); + }; + + for path in prior_paths { + let mut reader = match SegmentReader::open(path).await { + Ok(reader) => reader, + Err(err) if err.is_corruption() => { + quarantine(path, &err).await; + corruption = true; + continue; + } + Err(err) => return Err(err), + }; + let drained = drain(&mut reader, &mut working).await; + next_lsn = reader.next_lsn(); + match drained { + Ok(()) => {} + Err(err) if err.is_corruption() => { + drop(reader); + quarantine(path, &err).await; + corruption = true; + } + Err(err) => return Err(err), + } + } + + let last_writable = match SegmentReader::open(last_path).await { + Ok(mut reader) => { + let drained = drain(&mut reader, &mut working).await; + next_lsn = reader.next_lsn(); + match drained { + Ok(()) | Err(Error::TornTail { .. }) => Some(reader.into_writable().await?), + Err(err) if err.is_corruption() => { + drop(reader); + quarantine(last_path, &err).await; + corruption = true; + None + } + Err(err) => return Err(err), + } + } + Err(err) if err.is_corruption() => { + quarantine(last_path, &err).await; + corruption = true; + None + } + Err(err) => return Err(err), + }; + + // Decided txns (Committing was logged) are safe to restore even if + // corruption was detected later: COMMIT PREPARED is idempotent and + // matches the decision we durably recorded. Undecided txns can only + // be restored when there was no corruption: otherwise their + // Committing might have been in a lost segment and rolling back + // would silently invert a committed transaction. + let mut snapshot: HashMap = HashMap::default(); + for (txn, entry) in working { + let phase = match (entry.decided, corruption) { + (true, _) => TwoPcPhase::Phase2, + (false, false) => TwoPcPhase::Phase1, + (false, true) => continue, + }; + snapshot.insert( + txn, + CheckpointEntry { + txn, + user: entry.user.clone(), + database: entry.database.clone(), + decided: entry.decided, + }, + ); + manager.restore_transaction(txn, entry.user, entry.database, phase); + } + if corruption { + tracing::warn!( + "[2pc] wal recovery detected corruption; undecided transactions \ + must be reconciled via SHOW TRANSACTIONS / pg_prepared_xacts" + ); + } + + let segment = match last_writable { + Some(segment) => segment, + None => Segment::create(dir, next_lsn).await?, + }; + Ok(Recovered { segment, snapshot }) +} + +/// Drain every record in `reader` into `working`. Errors propagate as +/// usual; the caller decides what to do based on the error variant. +async fn drain( + reader: &mut SegmentReader, + working: &mut HashMap, +) -> Result<(), Error> { + while let Some(record) = reader.next().await? { + apply(working, record); + } + Ok(()) +} + +/// Log corruption and rename the segment to `.broken` so +/// subsequent recoveries skip it. Best-effort: rename failures are +/// logged. +async fn quarantine(path: &Path, err: &Error) { + tracing::warn!("[2pc] corrupt wal segment {}: {}", path.display(), err); + let broken = path.with_extension("wal.broken"); + if let Err(rename_err) = tokio::fs::rename(path, &broken).await { + tracing::warn!( + "[2pc] could not rename corrupt wal segment {} to {}: {}", + path.display(), + broken.display(), + rename_err + ); + } +} + +fn apply(working: &mut HashMap, record: Record) { + match record { + Record::Begin(p) => { + working.insert( + p.txn, + Entry { + user: p.user, + database: p.database, + decided: false, + }, + ); + } + Record::Committing(p) => { + if let Some(entry) = working.get_mut(&p.txn) { + entry.decided = true; + } + } + Record::End(p) => { + working.remove(&p.txn); + } + Record::Checkpoint(p) => { + working.clear(); + for CheckpointEntry { + txn, + user, + database, + decided, + .. + } in p.active + { + working.insert( + txn, + Entry { + user, + database, + decided, + }, + ); + } + } + } +} + +#[cfg(test)] +mod tests { + use super::super::record::{BeginPayload, CheckpointPayload, TxnPayload}; + use super::*; + + fn txn(n: usize) -> TwoPcTransaction { + // Inner field is private; round-trip via Display/FromStr so tests + // can build deterministic ids. + format!("__pgdog_2pc_{}", n).parse().unwrap() + } + + #[test] + fn begin_inserts_undecided() { + let mut w: HashMap = HashMap::default(); + apply( + &mut w, + Record::Begin(BeginPayload { + txn: txn(1), + user: "alice".into(), + database: "shop".into(), + }), + ); + let entry = w.get(&txn(1)).unwrap(); + assert_eq!(entry.user, "alice"); + assert_eq!(entry.database, "shop"); + assert!(!entry.decided); + } + + #[test] + fn committing_marks_decided() { + let mut w: HashMap = HashMap::default(); + apply( + &mut w, + Record::Begin(BeginPayload { + txn: txn(1), + user: "u".into(), + database: "d".into(), + }), + ); + apply(&mut w, Record::Committing(TxnPayload { txn: txn(1) })); + assert!(w.get(&txn(1)).unwrap().decided); + } + + #[test] + fn committing_without_begin_is_ignored() { + let mut w: HashMap = HashMap::default(); + apply(&mut w, Record::Committing(TxnPayload { txn: txn(1) })); + assert!(w.is_empty()); + } + + #[test] + fn end_removes() { + let mut w: HashMap = HashMap::default(); + apply( + &mut w, + Record::Begin(BeginPayload { + txn: txn(1), + user: "u".into(), + database: "d".into(), + }), + ); + apply(&mut w, Record::End(TxnPayload { txn: txn(1) })); + assert!(w.is_empty()); + } + + #[test] + fn checkpoint_replaces_state() { + let mut w: HashMap = HashMap::default(); + apply( + &mut w, + Record::Begin(BeginPayload { + txn: txn(1), + user: "u1".into(), + database: "d1".into(), + }), + ); + apply( + &mut w, + Record::Begin(BeginPayload { + txn: txn(2), + user: "u2".into(), + database: "d2".into(), + }), + ); + apply( + &mut w, + Record::Checkpoint(CheckpointPayload { + active: vec![CheckpointEntry { + txn: txn(99), + user: "u99".into(), + database: "d99".into(), + decided: true, + }], + }), + ); + assert_eq!(w.len(), 1); + let entry = w.get(&txn(99)).unwrap(); + assert_eq!(entry.user, "u99"); + assert!(entry.decided); + } +} diff --git a/pgdog/src/frontend/client/query_engine/two_pc/wal/segment.rs b/pgdog/src/frontend/client/query_engine/two_pc/wal/segment.rs new file mode 100644 index 000000000..d799380b3 --- /dev/null +++ b/pgdog/src/frontend/client/query_engine/two_pc/wal/segment.rs @@ -0,0 +1,575 @@ +//! Two-phase commit WAL segment files. +//! +//! Each segment is a single file `.wal` in the WAL +//! directory. It begins with a 16-byte header (a 4-byte magic followed +//! by 12 reserved bytes) and is followed by a stream of records as +//! described in [`super::record`]. +//! +//! [`Segment`] is the writable handle over an open segment file; it is +//! intentionally not `Send`-shared and is owned by a single writer task. +//! Records are written in batches via [`Segment::commit`], which appends +//! a pre-encoded buffer and fsyncs as one atomic operation (group commit). +//! +//! [`SegmentReader`] iterates an existing segment one record at a time. +//! It tracks the byte offset of the last good record so that a torn or +//! corrupt tail can be truncated when a reader is converted into a +//! writable segment via [`SegmentReader::into_writable`]. +//! +//! # Durability +//! +//! [`Segment::commit`] ends with `tokio::fs::File::sync_all`, which on +//! Linux issues `fsync(2)` and provides true durability. On macOS, +//! `fsync(2)` flushes only kernel buffers and does not guarantee that +//! data has reached the physical device; true durability there requires +//! `F_FULLFSYNC`, which is not exposed by tokio. + +use std::path::{Path, PathBuf}; +use std::str::FromStr; + +use bytes::{Buf, BytesMut}; +use tokio::fs::{File, OpenOptions}; +use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom}; +use tracing::warn; + +use super::error::Error; +use super::record::Record; + +const MAGIC: &[u8; 4] = b"PG2W"; +const HEADER_BYTES: u64 = 16; +const READ_CHUNK: usize = 64 * 1024; +const FILE_SUFFIX: &str = ".wal"; + +/// Format a segment filename for the given start LSN. +fn segment_filename(start_lsn: u64) -> String { + format!("{:020}{}", start_lsn, FILE_SUFFIX) +} + +/// Parse a segment filename's start LSN. +fn parse_segment_filename(name: &str) -> Result { + let stem = name + .strip_suffix(FILE_SUFFIX) + .ok_or_else(|| Error::BadSegmentName(name.to_string()))?; + u64::from_str(stem).map_err(|_| Error::BadSegmentName(name.to_string())) +} + +/// Delete every segment in `dir` whose contents lie strictly before +/// `lsn`, i.e. segments whose successor's start LSN is `<= lsn`. The +/// segment that *contains* `lsn` is preserved because it still has live +/// records after the checkpoint that future recovery needs to replay. +/// Per-file deletion errors are logged but don't abort the GC. +pub async fn gc_before_lsn(dir: &Path, lsn: u64) -> Result<(), Error> { + let segments = list_segments(dir).await?; + let mut last_kept = None; + for (i, path) in segments.iter().enumerate() { + let name = path + .file_name() + .and_then(|n| n.to_str()) + .ok_or_else(|| Error::BadSegmentName(path.display().to_string()))?; + if parse_segment_filename(name)? > lsn { + break; + } + last_kept = Some(i); + } + let Some(keep) = last_kept else { + return Ok(()); + }; + for path in &segments[..keep] { + if let Err(err) = tokio::fs::remove_file(path).await { + warn!( + "[2pc] could not delete old wal segment {}: {}", + path.display(), + err + ); + } + } + Ok(()) +} + +/// List the WAL segment files in `dir`, sorted ascending by start LSN. +/// +/// Files in `dir` whose names don't parse as a segment are skipped with a +/// warning. The returned vec is empty if the directory contains no +/// segments yet. +pub async fn list_segments(dir: &Path) -> Result, Error> { + let mut entries = tokio::fs::read_dir(dir).await?; + let mut segments: Vec<(u64, PathBuf)> = Vec::new(); + while let Some(entry) = entries.next_entry().await? { + let name = match entry.file_name().into_string() { + Ok(n) => n, + Err(os) => { + warn!("skipping non-utf8 entry in wal dir: {:?}", os); + continue; + } + }; + if !name.ends_with(FILE_SUFFIX) { + continue; + } + match parse_segment_filename(&name) { + Ok(lsn) => segments.push((lsn, entry.path())), + Err(err) => warn!("skipping malformed segment filename {}: {}", name, err), + } + } + segments.sort_by_key(|(lsn, _)| *lsn); + Ok(segments.into_iter().map(|(_, p)| p).collect()) +} + +/// Iterates records out of an existing segment file. +/// +/// `next` returns one record at a time, reading from the file in chunks. +/// Each terminal condition is reported explicitly: +/// +/// - `Ok(Some(record))`: a record was decoded; iteration continues. +/// - `Ok(None)`: clean end of stream (EOF with no leftover bytes). +/// - `Err(Error::TornTail { .. })`: file ends mid-record. Normal at +/// the last segment after a crash; suspicious anywhere else. +/// - `Err(Error::Crc | InvalidTag | EmptyRecord | Decode)`: record +/// framing or payload is corrupt. +/// - `Err(Error::Io(_))`: disk-side IO error. +/// +/// `last_good_offset` always points at the end of the last successfully +/// decoded record so [`SegmentReader::into_writable`] can truncate the +/// file there regardless of how iteration terminated. +/// +/// After any terminal condition, subsequent `next` calls return +/// `Ok(None)` (the error is reported once). +#[derive(Debug)] +pub struct SegmentReader { + file: File, + /// Identifies the segment for diagnostics. + #[allow(dead_code)] + start_lsn: u64, + next_lsn: u64, + /// Bytes that have been read from the file but not yet decoded. + buf: BytesMut, + /// File offset (in bytes) of the end of the last successfully decoded + /// record, or `HEADER_BYTES` if no records have been decoded yet. + last_good_offset: u64, + /// True once iteration has terminated; further `next` calls return + /// `Ok(None)`. + done: bool, +} + +impl SegmentReader { + pub async fn open(path: &Path) -> Result { + let name = path + .file_name() + .and_then(|n| n.to_str()) + .ok_or_else(|| Error::BadSegmentName(path.display().to_string()))?; + let start_lsn = parse_segment_filename(name)?; + + let mut file = OpenOptions::new().read(true).write(true).open(path).await?; + + let mut header = [0u8; HEADER_BYTES as usize]; + match file.read_exact(&mut header).await { + Ok(_) => {} + Err(err) if err.kind() == std::io::ErrorKind::UnexpectedEof => { + return Err(Error::BadSegmentHeader); + } + Err(err) => return Err(err.into()), + } + if &header[..MAGIC.len()] != MAGIC { + return Err(Error::BadSegmentHeader); + } + + Ok(Self { + file, + start_lsn, + next_lsn: start_lsn, + buf: BytesMut::new(), + last_good_offset: HEADER_BYTES, + done: false, + }) + } + + /// LSN of the next record this reader will return. + pub fn next_lsn(&self) -> u64 { + self.next_lsn + } + + /// Decode and return the next record. See the type-level docs for + /// the full set of return values. + pub async fn next(&mut self) -> Result, Error> { + if self.done { + return Ok(None); + } + loop { + match Record::decode(&self.buf) { + Ok(Some(d)) => { + let consumed = d.consumed; + let record = d.record; + self.buf.advance(consumed); + self.last_good_offset += consumed as u64; + self.next_lsn += 1; + return Ok(Some(record)); + } + Ok(None) => { + let read_more = self.read_chunk().await?; + if !read_more { + self.done = true; + if self.buf.is_empty() { + return Ok(None); + } + return Err(Error::TornTail { + offset: self.last_good_offset, + unconsumed: self.buf.len(), + }); + } + } + Err(err) => { + self.done = true; + return Err(err); + } + } + } + } + + /// Consume this reader and turn it into a writable [`Segment`], + /// truncating any torn or corrupt tail at `last_good_offset`. The + /// reader's underlying file handle is reused. + /// + /// The reader should already have been drained by repeated calls to + /// [`SegmentReader::next`]; if it hasn't, records past + /// `last_good_offset` will be lost. + pub async fn into_writable(self) -> Result { + let SegmentReader { + mut file, + start_lsn, + next_lsn, + last_good_offset, + .. + } = self; + + let on_disk = file.metadata().await?.len(); + if on_disk > last_good_offset { + file.set_len(last_good_offset).await?; + file.sync_all().await?; + } + file.seek(SeekFrom::Start(last_good_offset)).await?; + + Ok(Segment { + file, + start_lsn, + next_lsn, + size_bytes: last_good_offset, + }) + } + + /// Read up to `READ_CHUNK` bytes into `buf`. Returns `Ok(false)` if + /// the read returned 0 bytes (EOF), `Ok(true)` if any bytes were read. + async fn read_chunk(&mut self) -> Result { + let old_len = self.buf.len(); + self.buf.resize(old_len + READ_CHUNK, 0); + let n = self.file.read(&mut self.buf[old_len..]).await?; + self.buf.truncate(old_len + n); + Ok(n > 0) + } +} + +/// A writable WAL segment. +/// +/// Owned by a single writer task. Records are written via +/// [`Segment::commit`], which appends a pre-encoded batch and fsyncs +/// atomically (group commit). +#[derive(Debug)] +pub struct Segment { + file: File, + /// Identifies the segment for diagnostics. + #[allow(dead_code)] + start_lsn: u64, + next_lsn: u64, + size_bytes: u64, +} + +impl Segment { + /// Create a new empty segment file in `dir`, write the header, and fsync. + pub async fn create(dir: &Path, start_lsn: u64) -> Result { + let path = dir.join(segment_filename(start_lsn)); + let mut file = OpenOptions::new() + .create_new(true) + .write(true) + .read(true) + .open(&path) + .await?; + + let mut header = [0u8; HEADER_BYTES as usize]; + header[..MAGIC.len()].copy_from_slice(MAGIC); + file.write_all(&header).await?; + file.sync_all().await?; + + Ok(Self { + file, + start_lsn, + next_lsn: start_lsn, + size_bytes: HEADER_BYTES, + }) + } + + /// Append a pre-encoded batch of `count` records and fsync. + /// Returns the LSN assigned to the first record in the batch; + /// subsequent records have LSNs `start + i`. + /// + /// On a write or fsync failure the segment is rewound to its + /// pre-call state so on-disk content matches the caller-visible + /// `Err`: the records did not land. If the rewind itself fails the + /// segment's true on-disk state is unknown and `Error::SegmentBroken` + /// is returned so the writer rotates to a fresh segment instead of + /// continuing to write at a corrupted offset. + pub async fn commit(&mut self, encoded: &[u8], count: u32) -> Result { + let pre_size = self.size_bytes; + let pre_lsn = self.next_lsn; + + if let Err(err) = self.file.write_all(encoded).await { + self.rewind_to(pre_size, pre_lsn).await?; + return Err(err.into()); + } + self.size_bytes += encoded.len() as u64; + self.next_lsn += count as u64; + + if let Err(err) = self.file.sync_all().await { + self.rewind_to(pre_size, pre_lsn).await?; + return Err(err.into()); + } + Ok(pre_lsn) + } + + /// Truncate the segment back to `size` and reset the next-LSN to + /// `next_lsn`, fsyncing the truncation. Used to roll back a batch + /// whose write or fsync failed so on-disk state matches what we + /// told callers: the records did not land. Any failure here means + /// the segment's on-disk state is unknown, so it returns + /// `Error::SegmentBroken` and the caller must rotate. + async fn rewind_to(&mut self, size: u64, next_lsn: u64) -> Result<(), Error> { + self.file + .set_len(size) + .await + .map_err(|_| Error::SegmentBroken)?; + self.file + .seek(SeekFrom::Start(size)) + .await + .map_err(|_| Error::SegmentBroken)?; + self.file + .sync_all() + .await + .map_err(|_| Error::SegmentBroken)?; + self.size_bytes = size; + self.next_lsn = next_lsn; + Ok(()) + } + + /// LSN that will be assigned to the next [`Segment::commit`] call. + pub fn next_lsn(&self) -> u64 { + self.next_lsn + } + + /// Total file size on disk, including the header. + pub fn size_bytes(&self) -> u64 { + self.size_bytes + } +} + +#[cfg(test)] +mod tests { + use super::super::record::{BeginPayload, TxnPayload}; + use super::*; + use crate::frontend::client::query_engine::two_pc::TwoPcTransaction; + use tempfile::TempDir; + + #[test] + fn filename_round_trip() { + for lsn in [0u64, 1, 16, 1234, u64::MAX] { + let name = segment_filename(lsn); + assert_eq!(parse_segment_filename(&name).unwrap(), lsn); + assert_eq!(name.len(), 24); + } + } + + #[test] + fn bad_segment_name_rejected() { + assert!(parse_segment_filename("not-a-segment.txt").is_err()); + assert!(parse_segment_filename("xyz.wal").is_err()); + } + + #[tokio::test] + async fn create_then_reopen_empty() { + let dir = TempDir::new().unwrap(); + let seg = Segment::create(dir.path(), 0).await.unwrap(); + assert_eq!(seg.next_lsn(), 0); + drop(seg); + + let path = dir.path().join(segment_filename(0)); + let mut reader = SegmentReader::open(&path).await.unwrap(); + assert!(reader.next().await.unwrap().is_none()); + assert_eq!(reader.last_good_offset, HEADER_BYTES); + } + + #[tokio::test] + async fn append_sync_reopen_yields_records() { + let dir = TempDir::new().unwrap(); + let mut seg = Segment::create(dir.path(), 0).await.unwrap(); + let r1 = Record::Begin(BeginPayload { + txn: TwoPcTransaction::new(), + user: "u".into(), + database: "d".into(), + }); + let r2 = Record::End(TxnPayload { + txn: TwoPcTransaction::new(), + }); + let mut buf = BytesMut::new(); + r1.encode(&mut buf).unwrap(); + r2.encode(&mut buf).unwrap(); + let start = seg.commit(&buf, 2).await.unwrap(); + assert_eq!(start, 0); + assert_eq!(seg.next_lsn(), 2); + drop(seg); + + let path = dir.path().join(segment_filename(0)); + let mut reader = SegmentReader::open(&path).await.unwrap(); + let mut records = Vec::new(); + while let Some(r) = reader.next().await.unwrap() { + records.push(r); + } + assert_eq!(records, vec![r1, r2]); + assert_eq!(reader.next_lsn(), 2); + } + + #[tokio::test] + async fn torn_tail_is_truncated_via_into_writable() { + let dir = TempDir::new().unwrap(); + let path = { + let mut seg = Segment::create(dir.path(), 0).await.unwrap(); + let mut buf = BytesMut::new(); + Record::Begin(BeginPayload { + txn: TwoPcTransaction::new(), + user: "u".into(), + database: "d".into(), + }) + .encode(&mut buf) + .unwrap(); + seg.commit(&buf, 1).await.unwrap(); + // Append half a second record's framing to simulate a torn write. + buf.clear(); + Record::End(TxnPayload { + txn: TwoPcTransaction::new(), + }) + .encode(&mut buf) + .unwrap(); + tokio::io::AsyncWriteExt::write_all(&mut seg.file, &buf[..buf.len() / 2]) + .await + .unwrap(); + tokio::io::AsyncWriteExt::flush(&mut seg.file) + .await + .unwrap(); + dir.path().join(segment_filename(0)) + }; + + let mut reader = SegmentReader::open(&path).await.unwrap(); + let mut records = Vec::new(); + let term = loop { + match reader.next().await { + Ok(Some(r)) => records.push(r), + Ok(None) => break Ok(()), + Err(err) => break Err(err), + } + }; + assert_eq!(records.len(), 1); + assert!(matches!(term, Err(Error::TornTail { .. }))); + let last_good = reader.last_good_offset; + let seg = reader.into_writable().await.unwrap(); + assert_eq!(seg.size_bytes(), last_good); + + let meta = tokio::fs::metadata(&path).await.unwrap(); + assert_eq!(meta.len(), last_good); + } + + #[tokio::test] + async fn corrupt_record_stops_iteration_at_corruption() { + let dir = TempDir::new().unwrap(); + let mut seg = Segment::create(dir.path(), 0).await.unwrap(); + let r1 = Record::Begin(BeginPayload { + txn: TwoPcTransaction::new(), + user: "u".into(), + database: "d".into(), + }); + let mut buf = BytesMut::new(); + r1.encode(&mut buf).unwrap(); + Record::End(TxnPayload { + txn: TwoPcTransaction::new(), + }) + .encode(&mut buf) + .unwrap(); + seg.commit(&buf, 2).await.unwrap(); + let path = dir.path().join(segment_filename(0)); + drop(seg); + + let mut on_disk = tokio::fs::read(&path).await.unwrap(); + let last = on_disk.len() - 1; + on_disk[last] ^= 0xff; + tokio::fs::write(&path, &on_disk).await.unwrap(); + + let mut reader = SegmentReader::open(&path).await.unwrap(); + let mut records = Vec::new(); + let term = loop { + match reader.next().await { + Ok(Some(r)) => records.push(r), + Ok(None) => break Ok(()), + Err(err) => break Err(err), + } + }; + assert_eq!(records, vec![r1]); + assert!(matches!(term, Err(Error::Crc { .. }))); + } + + #[tokio::test] + async fn bad_magic_is_rejected() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join(segment_filename(0)); + tokio::fs::write(&path, vec![0u8; HEADER_BYTES as usize]) + .await + .unwrap(); + assert!(matches!( + SegmentReader::open(&path).await, + Err(Error::BadSegmentHeader) + )); + } + + #[tokio::test] + async fn missing_header_is_rejected() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join(segment_filename(0)); + tokio::fs::write(&path, b"PG2W").await.unwrap(); + assert!(matches!( + SegmentReader::open(&path).await, + Err(Error::BadSegmentHeader) + )); + } + + #[tokio::test] + async fn many_records_span_multiple_read_chunks() { + let dir = TempDir::new().unwrap(); + let start_lsn = 100; + let mut seg = Segment::create(dir.path(), start_lsn).await.unwrap(); + let payload = "x".repeat(1024); + let mut expected = Vec::new(); + let mut buf = BytesMut::new(); + for _ in 0..200 { + let r = Record::Begin(BeginPayload { + txn: TwoPcTransaction::new(), + user: payload.clone(), + database: "d".into(), + }); + r.encode(&mut buf).unwrap(); + expected.push(r); + } + seg.commit(&buf, expected.len() as u32).await.unwrap(); + let path = dir.path().join(segment_filename(start_lsn)); + drop(seg); + + let mut reader = SegmentReader::open(&path).await.unwrap(); + let mut records = Vec::new(); + while let Some(r) = reader.next().await.unwrap() { + records.push(r); + } + assert_eq!(records, expected); + assert_eq!(reader.next_lsn(), start_lsn + expected.len() as u64); + } +} diff --git a/pgdog/src/frontend/client/query_engine/two_pc/wal/writer.rs b/pgdog/src/frontend/client/query_engine/two_pc/wal/writer.rs new file mode 100644 index 000000000..01477ec74 --- /dev/null +++ b/pgdog/src/frontend/client/query_engine/two_pc/wal/writer.rs @@ -0,0 +1,497 @@ +//! Two-phase commit WAL writer task. +//! +//! Owns the active [`Segment`] and serializes appends from many concurrent +//! callers behind a single fsync per batch (group commit). Callers send a +//! [`WriteRequest`] on the [`Wal`]'s mpsc channel and await a oneshot ack +//! that fires once the record is durable. +//! +//! Batching strategy: when at least one request arrives, the task drains +//! the channel non-blockingly to grab any other immediately-available +//! requests. If the batch is still smaller than [`MAX_BATCH`], the task +//! races a `recv` against a `sleep_until(deadline)` where `deadline` is +//! `fsync_interval` after the first request arrived. Once either limit is +//! hit, the batch is encoded into a single [`BytesMut`], written to the +//! segment in one `write_all`, and a single `sync_all` covers it. +//! +//! Segment rotation happens after the sync of any batch that pushed the +//! current segment over the configured size limit. +//! +//! The writer's body is wrapped in `catch_unwind` so a panic doesn't hang +//! shutdown: any unwinding is logged and the `done` notify still fires. + +use std::io::{Read, Seek, SeekFrom, Write}; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +use bytes::BytesMut; +use futures::FutureExt; +use tokio::sync::{mpsc, oneshot, Notify}; +use tokio::time::{sleep_until, Instant}; +use tracing::{error, warn}; + +use fnv::FnvHashMap as HashMap; + +use super::error::Error; +use super::record::{BeginPayload, CheckpointEntry, CheckpointPayload, Record, TxnPayload}; +use super::recovery; +use super::segment::{gc_before_lsn, Segment}; +use crate::config::config; +use crate::frontend::client::query_engine::two_pc::{Manager, TwoPcTransaction}; + +/// Acquire an exclusive flock on `/.lock` and stamp it with our +/// PID + start time. Returns the locked `File`; dropping it releases +/// the lock. If another process holds the lock, the existing file is +/// read back and surfaced verbatim in [`Error::DirLocked`] so an +/// operator can see who's holding it. +fn lock_dir(dir: &Path) -> Result { + let lock_path = dir.join(".lock"); + let mut file = std::fs::OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(false) + .open(&lock_path) + .map_err(|source| Error::DirNotAccessible { + dir: dir.to_path_buf(), + source, + })?; + + match file.try_lock() { + Ok(()) => {} + Err(std::fs::TryLockError::WouldBlock) => { + let mut holder = String::new(); + let _ = file.read_to_string(&mut holder); + return Err(Error::DirLocked { + dir: dir.to_path_buf(), + holder, + }); + } + Err(std::fs::TryLockError::Error(source)) => { + return Err(Error::DirNotWritable { + dir: dir.to_path_buf(), + source, + }); + } + } + + let stamp = format!( + "pid={}\nstarted={}\n", + std::process::id(), + chrono::Utc::now().to_rfc3339(), + ); + file.set_len(0).map_err(|source| Error::DirNotWritable { + dir: dir.to_path_buf(), + source, + })?; + file.seek(SeekFrom::Start(0)) + .map_err(|source| Error::DirNotWritable { + dir: dir.to_path_buf(), + source, + })?; + file.write_all(stamp.as_bytes()) + .map_err(|source| Error::DirNotWritable { + dir: dir.to_path_buf(), + source, + })?; + Ok(file) +} + +/// Maximum number of records coalesced into a single fsync. +const MAX_BATCH: usize = 1024; +/// Initial capacity of the batch encoding buffer; grows on demand. +const ENCODE_BUF_INITIAL: usize = 64 * 1024; +/// Channel capacity for incoming write requests. +const CHANNEL_CAPACITY: usize = 1024; + +/// What a [`WriteRequest`] asks the writer to append. +pub(super) enum WalAppend { + /// Append this record verbatim. + Record(Record), + /// Materialize a [`Record::Checkpoint`] from the writer's snapshot + /// at this point in the batch, then append it. + Checkpoint, +} + +/// One outstanding append request from a [`Wal`] caller. +pub(super) struct WriteRequest { + pub append: WalAppend, + pub ack: oneshot::Sender>>, +} + +#[derive(Debug, Default)] +struct WalShutdown { + cancel: Notify, + cancelled: AtomicBool, + done: Notify, +} + +/// Handle to the WAL writer task. +#[derive(Debug, Clone)] +pub struct Wal { + tx: mpsc::Sender, + shutdown: Arc, +} + +impl Wal { + /// Take an exclusive flock on the configured WAL directory's + /// `.lock` so a second pgdog can't race us, replay any existing + /// log into `manager`, and spawn the writer task. The flock is + /// held for the lifetime of the writer task; on shutdown or panic + /// the underlying `File` drops and the kernel releases the lock. + /// + /// Returns `Err` if the directory isn't usable, another pgdog + /// already holds the dir, or recovery fails; the caller is + /// responsible for deciding whether to continue running without + /// WAL durability. + pub async fn open(manager: &Manager, dir: &PathBuf) -> Result { + // Ensure the dir exists before lock_dir tries to open .lock in it. + tokio::fs::create_dir_all(dir) + .await + .map_err(|source| Error::DirNotAccessible { + dir: dir.to_path_buf(), + source, + })?; + let lock = lock_dir(dir)?; + let recovered = recovery::recover_transactions(manager, dir).await?; + + let (tx, rx) = mpsc::channel::(CHANNEL_CAPACITY); + let shutdown = Arc::new(WalShutdown::default()); + + tokio::spawn({ + let shutdown = Arc::clone(&shutdown); + + async move { + let fut = std::panic::AssertUnwindSafe(run( + recovered.segment, + recovered.snapshot, + rx, + Arc::clone(&shutdown), + lock, + )); + if fut.catch_unwind().await.is_err() { + error!("2pc wal writer task panicked"); + } + shutdown.done.notify_waiters(); + } + }); + + Ok(Self { tx, shutdown }) + } + + /// Log that `txn` is about to issue PREPARE TRANSACTION on its + /// participants. Must complete before any PREPARE leaves the + /// coordinator. + pub async fn append_begin( + &self, + txn: TwoPcTransaction, + user: String, + database: String, + ) -> Result> { + self.append(Record::Begin(BeginPayload { + txn, + user, + database, + })) + .await + } + + /// Log that `txn` has crossed the point of no return: every + /// participant must now be driven to COMMIT PREPARED. + pub async fn append_committing(&self, txn: TwoPcTransaction) -> Result> { + self.append(Record::Committing(TxnPayload { txn })).await + } + + /// Log that `txn` is fully resolved (committed or aborted on every + /// participant) and recovery may forget it. + pub async fn append_end(&self, txn: TwoPcTransaction) -> Result> { + self.append(Record::End(TxnPayload { txn })).await + } + + /// Snapshot the active 2PC set into a [`Record::Checkpoint`] and + /// garbage-collect any segment fully superseded by it. The snapshot + /// is taken inside the writer task so any records ahead of this + /// marker in the same group-commit batch are reflected. Returns the + /// LSN of the checkpoint record. + pub async fn checkpoint(&self) -> Result> { + let (ack, rx) = oneshot::channel(); + self.tx + .send(WriteRequest { + append: WalAppend::Checkpoint, + ack, + }) + .await + .map_err(|_| Arc::new(Error::WriterGone))?; + rx.await.map_err(|_| Arc::new(Error::WriterGone))? + } + + /// Send a record to the writer task. Resolves once the record (and + /// any other records in its group-commit batch) have been fsynced. + async fn append(&self, record: Record) -> Result> { + let (ack, rx) = oneshot::channel(); + self.tx + .send(WriteRequest { + append: WalAppend::Record(record), + ack, + }) + .await + .map_err(|_| Arc::new(Error::WriterGone))?; + rx.await.map_err(|_| Arc::new(Error::WriterGone))? + } + + /// Signal the writer to stop accepting new records, drain any + /// in-flight requests, and exit. Resolves once the writer task has + /// finished (cleanly, via panic, or via a final fsync). + pub async fn shutdown(&self) { + let waiter = self.shutdown.done.notified(); + self.shutdown.cancelled.store(true, Ordering::Relaxed); + self.shutdown.cancel.notify_waiters(); + waiter.await; + } +} + +async fn run( + mut segment: Segment, + mut snapshot: HashMap, + mut rx: mpsc::Receiver, + shutdown: Arc, + // Held for the writer's lifetime so the kernel keeps our flock on + // `/.lock`; dropped when this function returns or panics. + _dir_lock: std::fs::File, +) { + let mut batch: Vec = Vec::with_capacity(MAX_BATCH); + let mut encode_buf = BytesMut::with_capacity(ENCODE_BUF_INITIAL); + // Holds the in-flight GC if one is still running; awaited before + // the next GC spawns so segment GCs never overlap. + let mut gc_handle: Option> = None; + let wal_dir = config() + .config + .general + .two_phase_commit_wal_dir + .clone() + .expect("two_phase_commit_wal_dir must be set"); + + loop { + if shutdown.cancelled.load(Ordering::Relaxed) { + // Drain any remaining requests, fsync them, exit. + while let Ok(req) = rx.try_recv() { + batch.push(req); + } + if !batch.is_empty() { + // Drain-on-shutdown: process_batch acks the callers and + // warns on its own; the rotate/gc signal in the result + // has nothing to act on because we're about to return, + // and recovery handles whatever state we leave on disk. + let _ = + process_batch(&mut segment, &mut batch, &mut encode_buf, &mut snapshot).await; + } + return; + } + + // Wait for the first request or a wake from shutdown. + let first = tokio::select! { + biased; + _ = shutdown.cancel.notified() => continue, + req = rx.recv() => req, + }; + let Some(first) = first else { return }; + batch.push(first); + let deadline = Instant::now() + + Duration::from_millis(config().config.general.two_phase_commit_wal_fsync_interval); + + // Greedy drain of immediately-available requests, no yields. + while batch.len() < MAX_BATCH { + match rx.try_recv() { + Ok(req) => batch.push(req), + Err(_) => break, + } + } + + // Race more recv against the deadline (or cancellation). + while batch.len() < MAX_BATCH { + tokio::select! { + biased; + _ = shutdown.cancel.notified() => break, + _ = sleep_until(deadline) => break, + req = rx.recv() => match req { + Some(req) => batch.push(req), + None => break, + } + } + } + + let outcome = process_batch(&mut segment, &mut batch, &mut encode_buf, &mut snapshot).await; + + match outcome { + Ok(Some(lsn)) => { + // Run GC off the writer task; filesystem work shouldn't + // hold up the next batch. Chain through `gc_handle` so two + // GCs never race on the same directory: the new task awaits + // the previous one before starting. + let prev = gc_handle.take(); + let wal_dir = wal_dir.clone(); + gc_handle = Some(tokio::spawn(async move { + if let Some(prev) = prev { + let _ = prev.await; + } + if let Err(err) = gc_before_lsn(&wal_dir, lsn).await { + warn!("[2pc] wal checkpoint gc failed: {}", err); + } + })); + } + Ok(None) => {} + Err(ref err) if matches!(**err, Error::SegmentBroken) => { + // Broken segment can't take more writes. Failing to + // create a replacement here means the disk is gone + // and we have nowhere to log: panic. + segment = Segment::create(&wal_dir, segment.next_lsn()).await.expect( + "2pc wal: cannot create new segment after segment broken; disk unusable", + ); + } + Err(_) => {} + } + + if segment.size_bytes() >= config().config.general.two_phase_commit_wal_segment_size { + match Segment::create(&wal_dir, segment.next_lsn()).await { + Ok(new_seg) => segment = new_seg, + Err(err) => { + error!( + "2pc wal: failed to rotate segment at lsn {}: {}", + segment.next_lsn(), + err + ); + // Keep using the over-sized segment; it still works. + } + } + } + } +} + +async fn process_batch( + segment: &mut Segment, + batch: &mut Vec, + encode_buf: &mut BytesMut, + snapshot: &mut HashMap, +) -> Result, Arc> { + encode_buf.clear(); + let mut encode_err: Option = None; + let mut last_checkpoint_idx: Option = None; + // Records every snapshot mutation in batch order so we can roll + // back to the start-of-batch state if encode or sync fails. + let mut undo: Vec = Vec::with_capacity(batch.len()); + + for (i, req) in batch.iter().enumerate() { + let result = match &req.append { + WalAppend::Record(r) => { + apply_to_snapshot(snapshot, r, &mut undo); + r.encode(encode_buf) + } + WalAppend::Checkpoint => { + last_checkpoint_idx = Some(i); + Record::Checkpoint(CheckpointPayload { + active: snapshot.values().cloned().collect(), + }) + .encode(encode_buf) + } + }; + if let Err(err) = result { + encode_err = Some(err); + break; + } + } + + if let Some(err) = encode_err { + replay_undo(snapshot, undo); + warn!("2pc wal: encode failed for batch: {}", err); + let shared = Arc::new(err); + for req in batch.drain(..) { + let _ = req.ack.send(Err(Arc::clone(&shared))); + } + return Err(shared); + } + + let count = batch.len() as u32; + match segment.commit(encode_buf, count).await { + Ok(start) => { + for (i, req) in batch.drain(..).enumerate() { + let _ = req.ack.send(Ok(start + i as u64)); + } + Ok(last_checkpoint_idx.map(|i| start + i as u64)) + } + Err(err) => { + replay_undo(snapshot, undo); + warn!("2pc wal: write/sync failed for batch: {}", err); + let shared = Arc::new(err); + for req in batch.drain(..) { + let _ = req.ack.send(Err(Arc::clone(&shared))); + } + Err(shared) + } + } +} + +/// One snapshot mutation captured for rollback. +struct Undo { + txn: TwoPcTransaction, + /// Value the txn held before the mutation; `None` means the txn + /// wasn't in the snapshot at all and should be removed on undo. + prior: Option, +} + +/// Apply `record` to `snapshot` and push an entry onto `undo` capturing +/// what to revert to if the batch later fails. +fn apply_to_snapshot( + snapshot: &mut HashMap, + record: &Record, + undo: &mut Vec, +) { + match record { + Record::Begin(p) => { + let prior = snapshot.insert( + p.txn, + CheckpointEntry { + txn: p.txn, + user: p.user.clone(), + database: p.database.clone(), + decided: false, + }, + ); + undo.push(Undo { txn: p.txn, prior }); + } + Record::Committing(p) => { + if let Some(entry) = snapshot.get_mut(&p.txn) { + let prior = entry.clone(); + entry.decided = true; + undo.push(Undo { + txn: p.txn, + prior: Some(prior), + }); + } + } + Record::End(p) => { + if let Some(prior) = snapshot.remove(&p.txn) { + undo.push(Undo { + txn: p.txn, + prior: Some(prior), + }); + } + } + Record::Checkpoint(_) => {} + } +} + +/// Replay `undo` in reverse to restore `snapshot` to its state before +/// the batch started. +fn replay_undo(snapshot: &mut HashMap, undo: Vec) { + for u in undo.into_iter().rev() { + match u.prior { + Some(entry) => { + snapshot.insert(u.txn, entry); + } + None => { + snapshot.remove(&u.txn); + } + } + } +} diff --git a/pgdog/src/frontend/error.rs b/pgdog/src/frontend/error.rs index 3b8002dcd..6852702f0 100644 --- a/pgdog/src/frontend/error.rs +++ b/pgdog/src/frontend/error.rs @@ -1,6 +1,7 @@ //! Frontend errors. use std::io::ErrorKind; +use std::sync::Arc; use thiserror::Error; @@ -73,6 +74,12 @@ pub enum Error { // to reach so deep into a module. #[error("{0}")] Multi(#[from] Box), + + /// A 2PC WAL append failed; the operation is refused so the + /// caller does not issue PREPARE / COMMIT PREPARED to backends + /// without a durable record of it. + #[error("2pc wal: {0}")] + TwoPcWal(Arc), } impl From for Error { diff --git a/pgdog/src/main.rs b/pgdog/src/main.rs index 3d2312b2c..14f7f6c68 100644 --- a/pgdog/src/main.rs +++ b/pgdog/src/main.rs @@ -9,6 +9,7 @@ use pgdog::backend::databases; use pgdog::backend::pool::dns_cache::DnsCache; use pgdog::cli::{self, Commands}; use pgdog::config::{self, config}; +use pgdog::frontend::client::query_engine::two_pc::Manager; use pgdog::frontend::listener::Listener; use pgdog::frontend::prepared_statements; use pgdog::plugin; @@ -16,7 +17,7 @@ use pgdog::stats; use pgdog::util::pgdog_version; use pgdog::{healthcheck, net}; use tokio::runtime::Builder; -use tracing::{error, info}; +use tracing::{error, info, warn}; fn main() -> Result<(), Box> { let args = cli::Cli::parse(); @@ -148,6 +149,14 @@ async fn pgdog(command: Option) -> Result<(), Box) -> Result>, Infallible> { let clients = Clients::load(); @@ -26,13 +26,16 @@ async fn metrics(_: Request) -> Result for MeasurementType { } } +impl From for MeasurementType { + fn from(value: u64) -> Self { + Self::Integer(value as i64) + } +} + impl From for MeasurementType { fn from(value: usize) -> Self { Self::Integer(value as i64) diff --git a/pgdog/src/stats/two_pc.rs b/pgdog/src/stats/two_pc.rs new file mode 100644 index 000000000..0253c1dd3 --- /dev/null +++ b/pgdog/src/stats/two_pc.rs @@ -0,0 +1,42 @@ +//! Two-phase commit metrics. + +use crate::frontend::client::query_engine::two_pc::Manager; + +use super::{Measurement, Metric, OpenMetric}; + +pub struct TwoPc { + recovered_total: u64, +} + +impl TwoPc { + pub fn load() -> Metric { + let stats = Manager::get().stats(); + Metric::new(Self { + recovered_total: stats.recovered_total(), + }) + } +} + +impl OpenMetric for TwoPc { + fn name(&self) -> String { + "two_pc_recovered_total".into() + } + + fn metric_type(&self) -> String { + "counter".into() + } + + fn help(&self) -> Option { + Some( + "Total number of in-flight 2PC transactions restored from the WAL during recovery." + .into(), + ) + } + + fn measurements(&self) -> Vec { + vec![Measurement { + labels: vec![], + measurement: self.recovered_total.into(), + }] + } +}