From bb521b07753e668f8d389c6b21483965489c29c1 Mon Sep 17 00:00:00 2001 From: "Andrei V. Lepikhov" Date: Thu, 19 Feb 2026 13:37:14 +0100 Subject: [PATCH 1/4] Add per-subscription conflict statistics using PG18 custom pgstat kind Register a custom pgstat kind (SPOCK_PGSTAT_KIND_LRCONFLICTS) via the PG18 pgstat_register_kind() infrastructure to track replication conflict counts per subscription. Unlike vanilla PostgreSQL subscription stats, spock statistics use MyDatabaseId as the key because spock node, origin, and subscription IDs are unique only within a database. The implementation introduces spock-specific stat types (Spock_Stat_StatSubEntry, Spock_Stat_PendingSubEntry) sized to SPOCK_CONFLICT_NUM_TYPES (6 types, excluding SPOCK_CT_DELETE_LATE which is not yet tracked). Column names in the SQL-callable function are kept in sync with the SpockConflictType enum via designated initializers in SpockConflictStatColNames[]. Currently only SPOCK_CT_UPDATE_MISSING conflicts are actively reported (from spock_apply_heap.c). Reporting from spock_report_conflict() is left as a TODO until SPOCK_CT_DELETE_LATE is either included in the stats array or filtered out to prevent overflow. SQL functions: spock.get_subscription_stats(oid), spock.reset_subscription_stats(oid). Includes a regression test exercising UPDATE_MISSING counting, counter increment, and stats reset. --- Makefile | 8 +- include/spock_conflict.h | 8 +- include/spock_conflict_stat.h | 46 ++++ sql/spock--6.0.0-devel.sql | 23 ++ src/spock.c | 8 + src/spock_apply_heap.c | 12 ++ src/spock_conflict.c | 12 ++ src/spock_conflict_stat.c | 255 +++++++++++++++++++++++ src/spock_functions.c | 9 + tests/regress/expected/conflict_stat.out | 142 +++++++++++++ tests/regress/sql/conflict_stat.sql | 91 ++++++++ 11 files changed, 612 insertions(+), 2 deletions(-) create mode 100644 include/spock_conflict_stat.h create mode 100644 src/spock_conflict_stat.c create mode 100644 tests/regress/expected/conflict_stat.out create mode 100644 tests/regress/sql/conflict_stat.sql diff --git a/Makefile b/Makefile index 66bdcd08..7e040cae 100644 --- a/Makefile +++ b/Makefile @@ -50,8 +50,14 @@ all: spock.control # ----------------------------------------------------------------------------- # Regression tests # ----------------------------------------------------------------------------- +# PG18+ only tests +REGRESS_PG18 = +ifeq ($(shell test $(PGVER) -ge 18 && echo yes),yes) +REGRESS_PG18 = conflict_stat +endif + REGRESS = preseed infofuncs init_fail init preseed_check basic conflict_secondary_unique \ - excluded_schema \ + excluded_schema $(REGRESS_PG18) \ toasted replication_set matview bidirectional primary_key \ interfaces foreign_key copy sequence triggers parallel functions row_filter \ row_filter_sampling att_list column_filter apply_delay \ diff --git a/include/spock_conflict.h b/include/spock_conflict.h index e805baa9..d0a41d78 100644 --- a/include/spock_conflict.h +++ b/include/spock_conflict.h @@ -51,7 +51,7 @@ extern bool spock_save_resolutions; typedef enum { /* The row to be inserted violates unique constraint */ - SPOCK_CT_INSERT_EXISTS, + SPOCK_CT_INSERT_EXISTS = 0, /* The row to be updated was modified by a different origin */ SPOCK_CT_UPDATE_ORIGIN_DIFFERS, @@ -76,6 +76,12 @@ typedef enum } SpockConflictType; +/* + * SPOCK_CT_DELETE_LATE is excluded because it is not yet tracked in conflict + * statistics. + */ +#define SPOCK_CONFLICT_NUM_TYPES (SPOCK_CT_DELETE_MISSING + 1) + extern int spock_conflict_resolver; extern int spock_conflict_log_level; extern bool spock_save_resolutions; diff --git a/include/spock_conflict_stat.h b/include/spock_conflict_stat.h new file mode 100644 index 00000000..492f8446 --- /dev/null +++ b/include/spock_conflict_stat.h @@ -0,0 +1,46 @@ +/*------------------------------------------------------------------------- + * + * spock_conflict_stat.h + * spock subscription conflict statistics + * + * Copyright (c) 2022-2026, pgEdge, Inc. + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, The Regents of the University of California + * + *------------------------------------------------------------------------- + */ +#ifndef SPOCK_CONFLICT_STAT_H +#define SPOCK_CONFLICT_STAT_H + +#include "postgres.h" + +#if PG_VERSION_NUM >= 180000 + +#include "pgstat.h" + +#include "spock_conflict.h" + +/* Shared memory stats entry for spock subscription conflicts */ +typedef struct Spock_Stat_StatSubEntry +{ + PgStat_Counter conflict_count[SPOCK_CONFLICT_NUM_TYPES]; + TimestampTz stat_reset_timestamp; +} Spock_Stat_StatSubEntry; + +/* Pending (backend-local) entry for spock subscription conflicts */ +typedef struct Spock_Stat_PendingSubEntry +{ + PgStat_Counter conflict_count[SPOCK_CONFLICT_NUM_TYPES]; +} Spock_Stat_PendingSubEntry; + +extern void spock_stat_register_conflict_stat(void); + +extern void spock_stat_report_subscription_conflict(Oid subid, + SpockConflictType type); +extern void spock_stat_create_subscription(Oid subid); +extern void spock_stat_drop_subscription(Oid subid); +extern Spock_Stat_StatSubEntry *spock_stat_fetch_stat_subscription(Oid subid); + +#endif /* PG_VERSION_NUM >= 180000 */ + +#endif /* SPOCK_CONFLICT_STAT_H */ \ No newline at end of file diff --git a/sql/spock--6.0.0-devel.sql b/sql/spock--6.0.0-devel.sql index 116face1..ae51d34e 100644 --- a/sql/spock--6.0.0-devel.sql +++ b/sql/spock--6.0.0-devel.sql @@ -712,6 +712,29 @@ BEGIN END; $$ LANGUAGE plpgsql; +-- ---- +-- Subscription conflict statistics +-- ---- +CREATE FUNCTION spock.get_subscription_stats( + subid oid, + OUT subid oid, + OUT confl_insert_exists bigint, + OUT confl_update_origin_differs bigint, + OUT confl_update_exists bigint, + OUT confl_update_missing bigint, + OUT confl_delete_origin_differs bigint, + OUT confl_delete_missing bigint, + OUT stats_reset timestamptz +) +RETURNS record +AS 'MODULE_PATHNAME', 'spock_get_subscription_stats' +LANGUAGE C STABLE; + +CREATE FUNCTION spock.reset_subscription_stats(subid oid DEFAULT NULL) +RETURNS void +AS 'MODULE_PATHNAME', 'spock_reset_subscription_stats' +LANGUAGE C CALLED ON NULL INPUT VOLATILE; + -- Set delta_apply security label on specific column CREATE FUNCTION spock.delta_apply( rel regclass, diff --git a/src/spock.c b/src/spock.c index d989a6c1..02ce0c9d 100644 --- a/src/spock.c +++ b/src/spock.c @@ -53,6 +53,9 @@ #include "pgstat.h" #include "spock_apply.h" +#if PG_VERSION_NUM >= 180000 +#include "spock_conflict_stat.h" +#endif #include "spock_executor.h" #include "spock_node.h" #include "spock_conflict.h" @@ -1224,4 +1227,9 @@ _PG_init(void) /* Security label provider hook */ register_label_provider(SPOCK_SECLABEL_PROVIDER, spock_object_relabel); + +#if PG_VERSION_NUM >= 180000 + /* Spock replication conflict statistics */ + spock_stat_register_conflict_stat(); +#endif } diff --git a/src/spock_apply_heap.c b/src/spock_apply_heap.c index f6ba7457..7722cf93 100644 --- a/src/spock_apply_heap.c +++ b/src/spock_apply_heap.c @@ -64,6 +64,7 @@ #include "spock_common.h" #include "spock_conflict.h" +#include "spock_conflict_stat.h" #include "spock_executor.h" #include "spock_node.h" #include "spock_proto_native.h" @@ -1067,6 +1068,17 @@ spock_apply_heap_update(SpockRelation *rel, SpockTupleData *oldtup, /* SPOCK_CT_UPDATE_MISSING case gets logged in exception_log, not resolutions */ SpockExceptionLog *exception_log = &exception_log_ptr[my_exception_log_index]; +#if PG_VERSION_NUM >= 180000 + if (!MyApplyWorker->use_try_block) + /* + * To avoid duplicated messages complain only in case we are on the + * successful path way. We don't count the conflict if something + * goes wrong already because the update logic is broken yet and + * this conflict may be misleading. + */ + spock_stat_report_subscription_conflict(MyApplyWorker->subid, + SPOCK_CT_UPDATE_MISSING); +#endif /* * The tuple to be updated could not be found. Do nothing except for * emitting a log message. TODO: Add pkey information as well. diff --git a/src/spock_conflict.c b/src/spock_conflict.c index e79a519f..b369273f 100644 --- a/src/spock_conflict.c +++ b/src/spock_conflict.c @@ -51,6 +51,9 @@ #include "spock.h" #include "spock_conflict.h" +#if PG_VERSION_NUM >= 180000 +#include "spock_conflict_stat.h" +#endif #include "spock_proto_native.h" #include "spock_node.h" #include "spock_worker.h" @@ -372,6 +375,15 @@ spock_report_conflict(SpockConflictType conflict_type, handle_stats_counter(rel->rel, MyApplyWorker->subid, SPOCK_STATS_CONFLICT_COUNT, 1); +#if PG_VERSION_NUM >= 180000 + /* + * TODO: Can't enable until SPOCK_CT_DELETE_LATE is either included in + * SPOCK_CONFLICT_NUM_TYPES or filtered out here — passing it as-is would + * overflow the conflict_count[] array. + * + * spock_stat_report_subscription_conflict(MyApplyWorker->subid, conflict_type); + */ +#endif /* If configured log resolution to spock.resolutions table */ spock_conflict_log_table(conflict_type, rel, localtuple, oldkey, remotetuple, applytuple, resolution, diff --git a/src/spock_conflict_stat.c b/src/spock_conflict_stat.c new file mode 100644 index 00000000..9f490dd4 --- /dev/null +++ b/src/spock_conflict_stat.c @@ -0,0 +1,255 @@ +/*------------------------------------------------------------------------- + * + * spock_conflict_stat.c + * spock subscription conflict statistics + * + * NOTE: Unlike PostgreSQL subscription statistics, Spock statistics cannot be + * cluster-wide because spock node ID, origin ID, and subscription ID are + * unique only within a database. Therefore, we use MyDatabaseId to identify + * each statistics entry. + * + * Copyright (c) 2022-2026, pgEdge, Inc. + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, The Regents of the University of California + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#if PG_VERSION_NUM >= 180000 + +#include "funcapi.h" +#include "utils/pgstat_internal.h" + +#include "spock.h" +#include "spock_conflict_stat.h" + +/* + * Kind ID reserved for statistics of spock replication conflicts. + * TODO: ask Michael Paquier about exact numbers and conflict detection + */ +#define SPOCK_PGSTAT_KIND_LRCONFLICTS 27 + +/* Shared memory wrapper for spock subscription conflict stats */ +typedef struct Spock_Stat_Subscription +{ + PgStatShared_Common header; + Spock_Stat_StatSubEntry stats; +} Spock_Stat_Subscription; + +/* + * Column names for spock_get_subscription_stats(), indexed by + * SpockConflictType. Kept in sync with the enum via designated initializers + * so that reordering the enum produces a compile-time error rather than + * silently wrong output. + */ +static const char *const SpockConflictStatColNames[SPOCK_CONFLICT_NUM_TYPES] = { + [SPOCK_CT_INSERT_EXISTS] = "confl_insert_exists", + [SPOCK_CT_UPDATE_ORIGIN_DIFFERS] = "confl_update_origin_differs", + [SPOCK_CT_UPDATE_EXISTS] = "confl_update_exists", + [SPOCK_CT_UPDATE_MISSING] = "confl_update_missing", + [SPOCK_CT_DELETE_ORIGIN_DIFFERS] = "confl_delete_origin_differs", + [SPOCK_CT_DELETE_MISSING] = "confl_delete_missing", +}; + +PG_FUNCTION_INFO_V1(spock_get_subscription_stats); +PG_FUNCTION_INFO_V1(spock_reset_subscription_stats); + +static bool spock_stat_subscription_flush_cb(PgStat_EntryRef *entry_ref, + bool nowait); +static void spock_stat_subscription_reset_timestamp_cb( + PgStatShared_Common *header, + TimestampTz ts); + +/* + * We rely on the pgstat infrastructure here, employing spock's own conflict + * detection algorithm with custom statistics storage. + */ + +static const PgStat_KindInfo spock_conflict_stat = { + .name = "spock_conflict_stat", + .fixed_amount = false, + .write_to_file = true, + + .shared_size = sizeof(Spock_Stat_Subscription), + .shared_data_off = offsetof(Spock_Stat_Subscription, stats), + .shared_data_len = sizeof(((Spock_Stat_Subscription *) 0)->stats), + .pending_size = sizeof(Spock_Stat_PendingSubEntry), + + .flush_pending_cb = spock_stat_subscription_flush_cb, + .reset_timestamp_cb = spock_stat_subscription_reset_timestamp_cb, +}; + +void +spock_stat_register_conflict_stat(void) +{ + pgstat_register_kind(SPOCK_PGSTAT_KIND_LRCONFLICTS, &spock_conflict_stat); +} + +/* + * Report a subscription conflict. + */ +void +spock_stat_report_subscription_conflict(Oid subid, SpockConflictType type) +{ + PgStat_EntryRef *entry_ref; + Spock_Stat_PendingSubEntry *pending; + + Assert(type >= 0 && type < SPOCK_CONFLICT_NUM_TYPES); + + entry_ref = pgstat_prep_pending_entry(SPOCK_PGSTAT_KIND_LRCONFLICTS, + MyDatabaseId, subid, NULL); + pending = entry_ref->pending; + pending->conflict_count[type]++; +} + +/* + * Report creating the subscription. + */ +void +spock_stat_create_subscription(Oid subid) +{ + /* Ensures that stats are dropped if transaction rolls back */ + pgstat_create_transactional(SPOCK_PGSTAT_KIND_LRCONFLICTS, + MyDatabaseId, subid); + + /* Create and initialize the subscription stats entry */ + pgstat_get_entry_ref(SPOCK_PGSTAT_KIND_LRCONFLICTS, MyDatabaseId, subid, + true, NULL); + pgstat_reset_entry(SPOCK_PGSTAT_KIND_LRCONFLICTS, MyDatabaseId, subid, 0); +} + +/* + * Report dropping the subscription. + * + * Ensures that stats are dropped if transaction commits. + */ +void +spock_stat_drop_subscription(Oid subid) +{ + pgstat_drop_transactional(SPOCK_PGSTAT_KIND_LRCONFLICTS, + MyDatabaseId, subid); +} + +/* + * Support function for the SQL-callable pgstat* functions. Returns + * the collected statistics for one subscription or NULL. + */ +Spock_Stat_StatSubEntry * +spock_stat_fetch_stat_subscription(Oid subid) +{ + return (Spock_Stat_StatSubEntry *) + pgstat_fetch_entry(SPOCK_PGSTAT_KIND_LRCONFLICTS, MyDatabaseId, subid); +} + +/* + * Get the subscription statistics for the given subscription. If the + * subscription statistics is not available, return all-zeros stats. + */ +Datum +spock_get_subscription_stats(PG_FUNCTION_ARGS) +{ +#define SPOCK_STAT_GET_SUBSCRIPTION_STATS_COLS (1 + SPOCK_CONFLICT_NUM_TYPES + 1) + Oid subid = PG_GETARG_OID(0); + TupleDesc tupdesc; + Datum values[SPOCK_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0}; + bool nulls[SPOCK_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0}; + Spock_Stat_StatSubEntry *subentry; + Spock_Stat_StatSubEntry allzero; + int i = 0; + AttrNumber attnum = 1; + + /* Get subscription stats */ + subentry = spock_stat_fetch_stat_subscription(subid); + + /* Initialise attributes information in the tuple descriptor */ + tupdesc = CreateTemplateTupleDesc(SPOCK_STAT_GET_SUBSCRIPTION_STATS_COLS); + TupleDescInitEntry(tupdesc, attnum++, "subid", + OIDOID, -1, 0); + for (int c = 0; c < SPOCK_CONFLICT_NUM_TYPES; c++) + TupleDescInitEntry(tupdesc, attnum++, SpockConflictStatColNames[c], + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, attnum++, "stats_reset", + TIMESTAMPTZOID, -1, 0); + BlessTupleDesc(tupdesc); + + if (!subentry) + { + /* If the subscription is not found, initialise its stats */ + memset(&allzero, 0, sizeof(Spock_Stat_StatSubEntry)); + subentry = &allzero; + } + + /* subid */ + values[i++] = ObjectIdGetDatum(subid); + + /* conflict counts */ + for (int nconflict = 0; nconflict < SPOCK_CONFLICT_NUM_TYPES; nconflict++) + values[i++] = Int64GetDatum(subentry->conflict_count[nconflict]); + + /* stats_reset */ + if (subentry->stat_reset_timestamp == 0) + nulls[i] = true; + else + values[i] = TimestampTzGetDatum(subentry->stat_reset_timestamp); + + Assert(i + 1 == SPOCK_STAT_GET_SUBSCRIPTION_STATS_COLS); + + /* Returns the record as Datum */ + PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); +} +#undef SPOCK_STAT_GET_SUBSCRIPTION_STATS_COLS + +/* Reset subscription stats (a specific one or all of them) */ +Datum +spock_reset_subscription_stats(PG_FUNCTION_ARGS) +{ + Oid subid; + + if (PG_ARGISNULL(0)) + { + /* Clear all subscription stats */ + pgstat_reset_of_kind(SPOCK_PGSTAT_KIND_LRCONFLICTS); + } + else + { + subid = PG_GETARG_OID(0); + + if (!OidIsValid(subid)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid subscription OID %u", subid))); + pgstat_reset(SPOCK_PGSTAT_KIND_LRCONFLICTS, MyDatabaseId, subid); + } + + PG_RETURN_VOID(); +} + +static bool +spock_stat_subscription_flush_cb(PgStat_EntryRef *entry_ref, bool nowait) +{ + Spock_Stat_PendingSubEntry *localent; + Spock_Stat_Subscription *shsubent; + + localent = (Spock_Stat_PendingSubEntry *) entry_ref->pending; + shsubent = (Spock_Stat_Subscription *) entry_ref->shared_stats; + + /* localent always has non-zero content */ + + if (!pgstat_lock_entry(entry_ref, nowait)) + return false; + + for (int i = 0; i < SPOCK_CONFLICT_NUM_TYPES; i++) + shsubent->stats.conflict_count[i] += localent->conflict_count[i]; + + pgstat_unlock_entry(entry_ref); + return true; +} + +static void +spock_stat_subscription_reset_timestamp_cb(PgStatShared_Common *header, TimestampTz ts) +{ + ((Spock_Stat_Subscription *) header)->stats.stat_reset_timestamp = ts; +} + +#endif /* PG_VERSION_NUM >= 180000 */ diff --git a/src/spock_functions.c b/src/spock_functions.c index a7836321..5525fdf0 100644 --- a/src/spock_functions.c +++ b/src/spock_functions.c @@ -86,6 +86,9 @@ #include "spock_apply.h" #include "spock_conflict.h" +#if PG_VERSION_NUM >= 180000 +#include "spock_conflict_stat.h" +#endif #include "spock_dependency.h" #include "spock_executor.h" #include "spock_node.h" @@ -590,6 +593,9 @@ spock_create_subscription(PG_FUNCTION_ARGS) sub.skip_schema = textarray_to_list(skip_schema_names); create_subscription(&sub); +#if PG_VERSION_NUM >= 180000 + spock_stat_create_subscription(sub.id); +#endif /* Create progress entry to track commit ts per local/remote origin */ spock_group_attach(MyDatabaseId, localnode->node->id, originif.nodeid); @@ -664,6 +670,9 @@ spock_drop_subscription(PG_FUNCTION_ARGS) /* Drop the actual subscription. */ drop_subscription(sub->id); +#if PG_VERSION_NUM >= 180000 + spock_stat_drop_subscription(sub->id); +#endif /* * The rest is different depending on if we are doing this on provider diff --git a/tests/regress/expected/conflict_stat.out b/tests/regress/expected/conflict_stat.out new file mode 100644 index 00000000..2801b17c --- /dev/null +++ b/tests/regress/expected/conflict_stat.out @@ -0,0 +1,142 @@ +-- Test: conflict statistics for UPDATE_MISSING conflicts (PG18+ only) +SELECT * FROM spock_regress_variables() +\gset +\c :provider_dsn +-- Create a simple table for conflict testing +SELECT spock.replicate_ddl($$ + CREATE TABLE public.conflict_stat_test ( + id integer PRIMARY KEY, + data text + ); +$$); + replicate_ddl +--------------- + t +(1 row) + +SELECT * FROM spock.repset_add_table('default', 'conflict_stat_test'); + repset_add_table +------------------ + t +(1 row) + +-- Seed initial rows +INSERT INTO conflict_stat_test VALUES (1, 'row1'); +INSERT INTO conflict_stat_test VALUES (2, 'row2'); +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + wait_slot_confirm_lsn +----------------------- + +(1 row) + +\c :subscriber_dsn +-- Get subscription OID for stats queries +SELECT sub_id AS test_sub_id FROM spock.subscription + WHERE sub_name = 'test_subscription' \gset +-- Reset conflict stats before the test +SELECT spock.reset_subscription_stats(:test_sub_id); + reset_subscription_stats +-------------------------- + +(1 row) + +-- Verify counters are zero initially +SELECT confl_update_missing, + confl_insert_exists,confl_update_origin_differs,confl_update_exists, + confl_delete_origin_differs,confl_delete_missing +FROM spock.get_subscription_stats(:test_sub_id); + confl_update_missing | confl_insert_exists | confl_update_origin_differs | confl_update_exists | confl_delete_origin_differs | confl_delete_missing +----------------------+---------------------+-----------------------------+---------------------+-----------------------------+---------------------- + 0 | 0 | 0 | 0 | 0 | 0 +(1 row) + +-- Delete a row on subscriber only to set up UPDATE_MISSING +DELETE FROM conflict_stat_test WHERE id = 1; +TRUNCATE spock.exception_log; +\c :provider_dsn +-- Update the row that no longer exists on subscriber +UPDATE conflict_stat_test SET data = 'updated_row1' WHERE id = 1; +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + wait_slot_confirm_lsn +----------------------- + +(1 row) + +\c :subscriber_dsn +-- Row id=1 should still be missing on subscriber (update was skipped) +SELECT * FROM conflict_stat_test ORDER BY id; + id | data +----+------ + 2 | row2 +(1 row) + +-- The UPDATE_MISSING conflict should be logged in exception_log +SELECT operation, table_name FROM spock.exception_log; + operation | table_name +-----------+-------------------- + UPDATE | conflict_stat_test +(1 row) + +-- Verify that the UPDATE_MISSING conflict was counted +SELECT confl_update_missing, + confl_insert_exists,confl_update_origin_differs,confl_update_exists, + confl_delete_origin_differs,confl_delete_missing +FROM spock.get_subscription_stats(:test_sub_id); + confl_update_missing | confl_insert_exists | confl_update_origin_differs | confl_update_exists | confl_delete_origin_differs | confl_delete_missing +----------------------+---------------------+-----------------------------+---------------------+-----------------------------+---------------------- + 1 | 0 | 0 | 0 | 0 | 0 +(1 row) + +-- Provoke a second UPDATE_MISSING to confirm counter increments +DELETE FROM conflict_stat_test WHERE id = 2; +TRUNCATE spock.exception_log; +\c :provider_dsn +UPDATE conflict_stat_test SET data = 'updated_row2' WHERE id = 2; +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + wait_slot_confirm_lsn +----------------------- + +(1 row) + +\c :subscriber_dsn +SELECT * FROM conflict_stat_test ORDER BY id; + id | data +----+------ +(0 rows) + +-- Counter should now be 2 +SELECT confl_update_missing, + confl_insert_exists,confl_update_origin_differs,confl_update_exists, + confl_delete_origin_differs,confl_delete_missing +FROM spock.get_subscription_stats(:test_sub_id); + confl_update_missing | confl_insert_exists | confl_update_origin_differs | confl_update_exists | confl_delete_origin_differs | confl_delete_missing +----------------------+---------------------+-----------------------------+---------------------+-----------------------------+---------------------- + 2 | 0 | 0 | 0 | 0 | 0 +(1 row) + +-- Test reset: clear the stats and verify counter goes back to zero +SELECT spock.reset_subscription_stats(:test_sub_id); + reset_subscription_stats +-------------------------- + +(1 row) + +SELECT confl_update_missing, + confl_insert_exists,confl_update_origin_differs,confl_update_exists, + confl_delete_origin_differs,confl_delete_missing +FROM spock.get_subscription_stats(:test_sub_id); + confl_update_missing | confl_insert_exists | confl_update_origin_differs | confl_update_exists | confl_delete_origin_differs | confl_delete_missing +----------------------+---------------------+-----------------------------+---------------------+-----------------------------+---------------------- + 0 | 0 | 0 | 0 | 0 | 0 +(1 row) + +-- Cleanup +TRUNCATE spock.exception_log; +\c :provider_dsn +SELECT spock.replicate_ddl($$ DROP TABLE public.conflict_stat_test CASCADE; $$); +NOTICE: drop cascades to table conflict_stat_test membership in replication set default + replicate_ddl +--------------- + t +(1 row) + diff --git a/tests/regress/sql/conflict_stat.sql b/tests/regress/sql/conflict_stat.sql new file mode 100644 index 00000000..f6d74a3e --- /dev/null +++ b/tests/regress/sql/conflict_stat.sql @@ -0,0 +1,91 @@ +-- Test: conflict statistics for UPDATE_MISSING conflicts (PG18+ only) +SELECT * FROM spock_regress_variables() +\gset + +\c :provider_dsn + +-- Create a simple table for conflict testing +SELECT spock.replicate_ddl($$ + CREATE TABLE public.conflict_stat_test ( + id integer PRIMARY KEY, + data text + ); +$$); + +SELECT * FROM spock.repset_add_table('default', 'conflict_stat_test'); + +-- Seed initial rows +INSERT INTO conflict_stat_test VALUES (1, 'row1'); +INSERT INTO conflict_stat_test VALUES (2, 'row2'); +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + +\c :subscriber_dsn + +-- Get subscription OID for stats queries +SELECT sub_id AS test_sub_id FROM spock.subscription + WHERE sub_name = 'test_subscription' \gset + +-- Reset conflict stats before the test +SELECT spock.reset_subscription_stats(:test_sub_id); + +-- Verify counters are zero initially +SELECT confl_update_missing, + confl_insert_exists,confl_update_origin_differs,confl_update_exists, + confl_delete_origin_differs,confl_delete_missing +FROM spock.get_subscription_stats(:test_sub_id); + +-- Delete a row on subscriber only to set up UPDATE_MISSING +DELETE FROM conflict_stat_test WHERE id = 1; +TRUNCATE spock.exception_log; + +\c :provider_dsn + +-- Update the row that no longer exists on subscriber +UPDATE conflict_stat_test SET data = 'updated_row1' WHERE id = 1; +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + +\c :subscriber_dsn + +-- Row id=1 should still be missing on subscriber (update was skipped) +SELECT * FROM conflict_stat_test ORDER BY id; + +-- The UPDATE_MISSING conflict should be logged in exception_log +SELECT operation, table_name FROM spock.exception_log; + +-- Verify that the UPDATE_MISSING conflict was counted +SELECT confl_update_missing, + confl_insert_exists,confl_update_origin_differs,confl_update_exists, + confl_delete_origin_differs,confl_delete_missing +FROM spock.get_subscription_stats(:test_sub_id); + +-- Provoke a second UPDATE_MISSING to confirm counter increments +DELETE FROM conflict_stat_test WHERE id = 2; +TRUNCATE spock.exception_log; + +\c :provider_dsn + +UPDATE conflict_stat_test SET data = 'updated_row2' WHERE id = 2; +SELECT spock.wait_slot_confirm_lsn(NULL, NULL); + +\c :subscriber_dsn + +SELECT * FROM conflict_stat_test ORDER BY id; + +-- Counter should now be 2 +SELECT confl_update_missing, + confl_insert_exists,confl_update_origin_differs,confl_update_exists, + confl_delete_origin_differs,confl_delete_missing +FROM spock.get_subscription_stats(:test_sub_id); + +-- Test reset: clear the stats and verify counter goes back to zero +SELECT spock.reset_subscription_stats(:test_sub_id); + +SELECT confl_update_missing, + confl_insert_exists,confl_update_origin_differs,confl_update_exists, + confl_delete_origin_differs,confl_delete_missing +FROM spock.get_subscription_stats(:test_sub_id); + +-- Cleanup +TRUNCATE spock.exception_log; +\c :provider_dsn +SELECT spock.replicate_ddl($$ DROP TABLE public.conflict_stat_test CASCADE; $$); From e0740836a7130ea0b792d31b77eb4ff23167af63 Mon Sep 17 00:00:00 2001 From: "Andrei V. Lepikhov" Date: Fri, 20 Feb 2026 10:36:19 +0100 Subject: [PATCH 2/4] Fix replication_set regression test for deterministic output Truncate the log on the first subscriber visit so subsequent checks are not polluted. Also drop the non-deterministic command_counter column from exception_log queries and order by (table_schema, table_name, remote_commit_ts) with COLLATE "C" instead, making the output stable across runs. --- tests/regress/expected/replication_set.out | 50 +++++++++++----------- tests/regress/sql/replication_set.sql | 12 ++++-- 2 files changed, 34 insertions(+), 28 deletions(-) diff --git a/tests/regress/expected/replication_set.out b/tests/regress/expected/replication_set.out index 398ea9ac..5d917efb 100644 --- a/tests/regress/expected/replication_set.out +++ b/tests/regress/expected/replication_set.out @@ -223,6 +223,8 @@ NOTICE: drop cascades to 2 other objects (1 row) \c :subscriber_dsn +-- First time come by to the subscriber node. Clean the history in exception_log +TRUNCATE spock.exception_log; SELECT * FROM spock.replication_set; set_id | set_nodeid | set_name | replicate_insert | replicate_update | replicate_delete | replicate_truncate ------------+------------+---------------------+------------------+------------------+------------------+-------------------- @@ -445,7 +447,7 @@ SELECT * FROM spoc_102l ORDER BY x; -- Check exception log format SELECT - command_counter,table_schema,table_name,operation, + table_schema,table_name,operation, remote_new_tup, -- Replace OIDs with placeholder for deterministic test output regexp_replace( @@ -454,15 +456,15 @@ SELECT 'OID \d+', 'OID ', 'g' ) AS error_message FROM spock.exception_log -ORDER BY command_counter; - command_counter | table_schema | table_name | operation | remote_new_tup | error_message ------------------+--------------+------------+-----------+----------------------------------------------------+------------------------------------------ - 1 | | | INSERT | | Spock can't find relation - 2 | | | INSERT | | Spock can't find relation - 3 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid - 4 | | | INSERT | | Spock can't find relation - 5 | | | INSERT | | Spock can't find relation - 6 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid +ORDER BY table_schema COLLATE "C",table_name COLLATE "C",remote_commit_ts; + table_schema | table_name | operation | remote_new_tup | error_message +--------------+------------+-----------+----------------------------------------------------+------------------------------------------ + | | INSERT | | Spock can't find relation + | | INSERT | | Spock can't find relation + | | INSERT | | Spock can't find relation + | | INSERT | | Spock can't find relation + public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid + public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid (6 rows) \c :provider_dsn @@ -562,7 +564,7 @@ SELECT * FROM spoc_102g_u ORDER BY x; (2 rows) SELECT - command_counter,table_schema,table_name,operation, + table_schema,table_name,operation, remote_new_tup, -- Replace OIDs with placeholder for deterministic test output regexp_replace( @@ -571,19 +573,19 @@ SELECT 'OID \d+', 'OID ', 'g' ) AS error_message FROM spock.exception_log -ORDER BY command_counter; - command_counter | table_schema | table_name | operation | remote_new_tup | error_message ------------------+--------------+-------------+-----------+----------------------------------------------------+-------------------------------------------------------------------------------------------------------- - 1 | | | INSERT | | Spock can't find relation - 2 | | | INSERT | | Spock can't find relation - 3 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid - 4 | | | INSERT | | Spock can't find relation - 5 | | | INSERT | | Spock can't find relation - 6 | public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid - 7 | | | UPDATE | | Spock can't find relation - 8 | | | UPDATE | | Spock can't find relation - 9 | public | spoc_102g_u | UPDATE | [{"value": -3, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid - 10 | public | spoc_102l_u | UPDATE | [{"value": -3, "attname": "x", "atttype": "int4"}] | logical replication did not find row to be updated in replication target relation (public.spoc_102l_u) +ORDER BY table_schema COLLATE "C",table_name COLLATE "C",remote_commit_ts; + table_schema | table_name | operation | remote_new_tup | error_message +--------------+-------------+-----------+----------------------------------------------------+-------------------------------------------------------------------------------------------------------- + | | INSERT | | Spock can't find relation + | | INSERT | | Spock can't find relation + | | INSERT | | Spock can't find relation + | | INSERT | | Spock can't find relation + | | UPDATE | | Spock can't find relation + | | UPDATE | | Spock can't find relation + public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid + public | spoc_102g | INSERT | [{"value": -4, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid + public | spoc_102g_u | UPDATE | [{"value": -3, "attname": "x", "atttype": "int4"}] | Spock can't find relation with oid + public | spoc_102l_u | UPDATE | [{"value": -3, "attname": "x", "atttype": "int4"}] | logical replication did not find row to be updated in replication target relation (public.spoc_102l_u) (10 rows) \c :provider_dsn diff --git a/tests/regress/sql/replication_set.sql b/tests/regress/sql/replication_set.sql index b9132a73..dae09930 100644 --- a/tests/regress/sql/replication_set.sql +++ b/tests/regress/sql/replication_set.sql @@ -98,6 +98,10 @@ SELECT spock.replicate_ddl($$ $$); \c :subscriber_dsn + +-- First time come by to the subscriber node. Clean the history in exception_log +TRUNCATE spock.exception_log; + SELECT * FROM spock.replication_set; -- Issue SPOC-102 @@ -203,7 +207,7 @@ SELECT * FROM spoc_102l ORDER BY x; -- Check exception log format SELECT - command_counter,table_schema,table_name,operation, + table_schema,table_name,operation, remote_new_tup, -- Replace OIDs with placeholder for deterministic test output regexp_replace( @@ -212,7 +216,7 @@ SELECT 'OID \d+', 'OID ', 'g' ) AS error_message FROM spock.exception_log -ORDER BY command_counter; +ORDER BY table_schema COLLATE "C",table_name COLLATE "C",remote_commit_ts; \c :provider_dsn SELECT spock.replicate_ddl('DROP TABLE IF EXISTS spoc_102g,spoc_102l CASCADE'); @@ -257,7 +261,7 @@ SELECT * FROM spoc_102l_u ORDER BY x; SELECT * FROM spoc_102g_u ORDER BY x; SELECT - command_counter,table_schema,table_name,operation, + table_schema,table_name,operation, remote_new_tup, -- Replace OIDs with placeholder for deterministic test output regexp_replace( @@ -266,7 +270,7 @@ SELECT 'OID \d+', 'OID ', 'g' ) AS error_message FROM spock.exception_log -ORDER BY command_counter; +ORDER BY table_schema COLLATE "C",table_name COLLATE "C",remote_commit_ts; \c :provider_dsn SELECT spock.replicate_ddl('DROP TABLE IF EXISTS spoc_102g_u,spoc_102l_u CASCADE'); From 153c7b0d4fb7ac5cf5da97a3fa7351c83e5a0b70 Mon Sep 17 00:00:00 2001 From: "Andrei V. Lepikhov" Date: Fri, 20 Feb 2026 10:58:09 +0100 Subject: [PATCH 3/4] Changes after the C-rabbit review. Guard spock_conflict_stat.h include and improve conflict type validation --- .github/workflows/installcheck.yml | 17 +++++++ src/spock_apply_heap.c | 9 ++-- src/spock_conflict_stat.c | 78 +++++++++++++++++++++++++----- 3 files changed, 88 insertions(+), 16 deletions(-) diff --git a/.github/workflows/installcheck.yml b/.github/workflows/installcheck.yml index 2ed6825f..59b40fe1 100644 --- a/.github/workflows/installcheck.yml +++ b/.github/workflows/installcheck.yml @@ -53,6 +53,7 @@ jobs: version: latest - name: Start docker cluster + id: start_cluster run: | cd ${GITHUB_WORKSPACE}/tests/docker/ # To minimize regression tests difference, override pgedge.env with @@ -63,6 +64,22 @@ jobs: PGVER=${{ matrix.pgver }} DBUSER=regression DBNAME=regression \ docker compose up --build --wait -d timeout-minutes: 20 + continue-on-error: true + + - name: Diagnose cluster startup failure + if: steps.start_cluster.outcome == 'failure' + run: | + cd ${GITHUB_WORKSPACE}/tests/docker/ + echo "=== Docker container status ===" + docker compose ps -a + for node in n1 n2 n3; do + echo "" + echo "=== Container logs: $node ===" + docker compose logs pgedge-$node 2>&1 | tail -80 || true + echo "" + echo "=== PostgreSQL logfile: $node ===" + docker compose cp pgedge-$node:/home/pgedge/logfile.log /dev/stdout 2>/dev/null | tail -80 || echo "(not available)" + done - name: Run installcheck on node n1 id: installcheck diff --git a/src/spock_apply_heap.c b/src/spock_apply_heap.c index 7722cf93..47efa55e 100644 --- a/src/spock_apply_heap.c +++ b/src/spock_apply_heap.c @@ -64,7 +64,9 @@ #include "spock_common.h" #include "spock_conflict.h" +#if PG_VERSION_NUM >= 180000 #include "spock_conflict_stat.h" +#endif #include "spock_executor.h" #include "spock_node.h" #include "spock_proto_native.h" @@ -1071,10 +1073,9 @@ spock_apply_heap_update(SpockRelation *rel, SpockTupleData *oldtup, #if PG_VERSION_NUM >= 180000 if (!MyApplyWorker->use_try_block) /* - * To avoid duplicated messages complain only in case we are on the - * successful path way. We don't count the conflict if something - * goes wrong already because the update logic is broken yet and - * this conflict may be misleading. + * To avoid duplicate messages, only report the conflict on the + * successful pathway. We skip counting when the update logic has + * already failed because the conflict would be misleading. */ spock_stat_report_subscription_conflict(MyApplyWorker->subid, SPOCK_CT_UPDATE_MISSING); diff --git a/src/spock_conflict_stat.c b/src/spock_conflict_stat.c index 9f490dd4..7d5fc311 100644 --- a/src/spock_conflict_stat.c +++ b/src/spock_conflict_stat.c @@ -16,25 +16,29 @@ */ #include "postgres.h" -#if PG_VERSION_NUM >= 180000 - #include "funcapi.h" #include "utils/pgstat_internal.h" #include "spock.h" + +PG_FUNCTION_INFO_V1(spock_get_subscription_stats); +PG_FUNCTION_INFO_V1(spock_reset_subscription_stats); + +#if PG_VERSION_NUM >= 180000 #include "spock_conflict_stat.h" /* * Kind ID reserved for statistics of spock replication conflicts. - * TODO: ask Michael Paquier about exact numbers and conflict detection + * TODO: see https://wiki.postgresql.org/wiki/CustomCumulativeStats to choose + * specific value in production */ -#define SPOCK_PGSTAT_KIND_LRCONFLICTS 27 +#define SPOCK_PGSTAT_KIND_LRCONFLICTS 28 /* Shared memory wrapper for spock subscription conflict stats */ typedef struct Spock_Stat_Subscription { PgStatShared_Common header; - Spock_Stat_StatSubEntry stats; + Spock_Stat_StatSubEntry stats; } Spock_Stat_Subscription; /* @@ -52,9 +56,6 @@ static const char *const SpockConflictStatColNames[SPOCK_CONFLICT_NUM_TYPES] = { [SPOCK_CT_DELETE_MISSING] = "confl_delete_missing", }; -PG_FUNCTION_INFO_V1(spock_get_subscription_stats); -PG_FUNCTION_INFO_V1(spock_reset_subscription_stats); - static bool spock_stat_subscription_flush_cb(PgStat_EntryRef *entry_ref, bool nowait); static void spock_stat_subscription_reset_timestamp_cb( @@ -95,7 +96,15 @@ spock_stat_report_subscription_conflict(Oid subid, SpockConflictType type) PgStat_EntryRef *entry_ref; Spock_Stat_PendingSubEntry *pending; - Assert(type >= 0 && type < SPOCK_CONFLICT_NUM_TYPES); + if (type != SPOCK_CT_UPDATE_MISSING) + /* + * Should happen only in development. Detect it as fast as possible + * with the highest error level that does not crash the instance. + */ + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("unexpected conflict type %d reported for subscription %u", + type, subid))); entry_ref = pgstat_prep_pending_entry(SPOCK_PGSTAT_KIND_LRCONFLICTS, MyDatabaseId, subid, NULL); @@ -109,13 +118,31 @@ spock_stat_report_subscription_conflict(Oid subid, SpockConflictType type) void spock_stat_create_subscription(Oid subid) { + PgStat_EntryRef *ref; + /* Ensures that stats are dropped if transaction rolls back */ pgstat_create_transactional(SPOCK_PGSTAT_KIND_LRCONFLICTS, MyDatabaseId, subid); /* Create and initialize the subscription stats entry */ - pgstat_get_entry_ref(SPOCK_PGSTAT_KIND_LRCONFLICTS, MyDatabaseId, subid, - true, NULL); + ref = pgstat_get_entry_ref(SPOCK_PGSTAT_KIND_LRCONFLICTS, MyDatabaseId, subid, + true, NULL); + + if (pg_atomic_read_u32(&ref->shared_entry->refcount) != 2) + /* + * Should never happen: a new subscription stats entry should have + * exactly two references (the hashtable entry and our own). A higher + * count means a stale entry from a previous subscription with the same + * OID was not properly cleaned up. + */ + ereport(WARNING, + (errmsg("conflict statistics entry for subscription %u " + "already has %u references", + subid, + pg_atomic_read_u32(&ref->shared_entry->refcount)), + errhint("This may indicate that a previous subscription with " + "the same OID was not fully dropped."))); + pgstat_reset_entry(SPOCK_PGSTAT_KIND_LRCONFLICTS, MyDatabaseId, subid, 0); } @@ -247,9 +274,36 @@ spock_stat_subscription_flush_cb(PgStat_EntryRef *entry_ref, bool nowait) } static void -spock_stat_subscription_reset_timestamp_cb(PgStatShared_Common *header, TimestampTz ts) +spock_stat_subscription_reset_timestamp_cb(PgStatShared_Common *header, + TimestampTz ts) { ((Spock_Stat_Subscription *) header)->stats.stat_reset_timestamp = ts; } #endif /* PG_VERSION_NUM >= 180000 */ + +#if PG_VERSION_NUM < 180000 + +/* + * XXX: implement conflict statistics gathering, if needed + */ + +Datum +spock_get_subscription_stats(PG_FUNCTION_ARGS) +{ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("spock conflict statistics require PostgreSQL 18 or later"))); + PG_RETURN_NULL(); /* unreachable; suppress compiler warning */ +} + +Datum +spock_reset_subscription_stats(PG_FUNCTION_ARGS) +{ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("spock conflict statistics require PostgreSQL 18 or later"))); + PG_RETURN_NULL(); /* unreachable; suppress compiler warning */ +} + +#endif /* PG_VERSION_NUM < 180000 */ From 9a373937ef94fcd35a80e9360a05c94b5b7ce764 Mon Sep 17 00:00:00 2001 From: "Andrei V. Lepikhov" Date: Wed, 25 Feb 2026 09:52:58 +0100 Subject: [PATCH 4/4] Changes after Ibrar's review --- sql/spock--5.0.4--6.0.0-devel.sql | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/sql/spock--5.0.4--6.0.0-devel.sql b/sql/spock--5.0.4--6.0.0-devel.sql index 7695cd1d..dd83be6a 100644 --- a/sql/spock--5.0.4--6.0.0-devel.sql +++ b/sql/spock--5.0.4--6.0.0-devel.sql @@ -68,6 +68,29 @@ SET conflict_type = CASE conflict_type ELSE conflict_type END; +-- ---- +-- Subscription conflict statistics +-- ---- +CREATE FUNCTION spock.get_subscription_stats( + subid oid, + OUT subid oid, + OUT confl_insert_exists bigint, + OUT confl_update_origin_differs bigint, + OUT confl_update_exists bigint, + OUT confl_update_missing bigint, + OUT confl_delete_origin_differs bigint, + OUT confl_delete_missing bigint, + OUT stats_reset timestamptz +) +RETURNS record +AS 'MODULE_PATHNAME', 'spock_get_subscription_stats' +LANGUAGE C STABLE; + +CREATE FUNCTION spock.reset_subscription_stats(subid oid DEFAULT NULL) +RETURNS void +AS 'MODULE_PATHNAME', 'spock_reset_subscription_stats' +LANGUAGE C CALLED ON NULL INPUT VOLATILE; + -- Set delta_apply security label on specific column CREATE FUNCTION spock.delta_apply( rel regclass,