diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7782a12b8..6e9be8d23 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -170,6 +170,8 @@ jobs: run: bash integration/complex/run.sh - name: Dry run run: bash integration/dry_run/run.sh + - name: Resharding + run: bash integration/resharding/run.sh - name: Ensure PgDog stopped run: | if pgrep -x pgdog > /dev/null; then diff --git a/.rwx/integration.yml b/.rwx/integration.yml deleted file mode 100644 index 8051c7704..000000000 --- a/.rwx/integration.yml +++ /dev/null @@ -1,613 +0,0 @@ -on: - github: - pull_request: - init: - commit-sha: ${{ event.git.sha }} - branch: ${{ event.git.branch }} - push: - if: ${{ event.git.branch == "main" }} - init: - commit-sha: ${{ event.git.sha }} - branch: ${{ event.git.branch }} - cli: - init: - commit-sha: ${{ event.git.sha }} - branch: local - -base: - image: ubuntu:24.04 - config: rwx/base 1.0.0 - -concurrency-pools: - - id: pgdogdev/pgdog:integration:${{ init.branch }} - if: ${{ init.branch != "main" && init.branch != "local" }} - capacity: 1 - on-overflow: cancel-running - -aliases: - build-filter: &build-filter - - Cargo.toml - - Cargo.lock - - docker-compose.yml - - examples/demo/** - - integration/** - - pgdog/** - - pgdog-config/** - - pgdog-macros/** - - pgdog-plugin/** - - pgdog-postgres-types/** - - pgdog-stats/** - - pgdog-vector/** - - plugins/** - - scripts/** - postgres-bg-processes: &postgres-bg–processes - - key: postgres - run: | - sudo service postgresql start - sudo -u postgres createuser --superuser --login "$USER" || true - sudo -u postgres createdb "$USER" || true - sudo -u postgres psql -c 'ALTER SYSTEM SET wal_level TO logical;' - sudo -u postgres psql -c 'ALTER SYSTEM SET max_prepared_transactions TO 1000;' - sudo -u postgres psql -c 'ALTER SYSTEM SET max_connections TO 1000;' - sudo -u postgres psql -c 'ALTER SYSTEM SET max_worker_processes TO 64'; - sudo -u postgres psql -c 'ALTER SYSTEM SET max_wal_senders TO 32'; - sudo -u postgres psql -c 'ALTER SYSTEM SET max_replication_slots TO 32'; - sudo service postgresql restart - touch .pg_isready - ready-check: | - [ -f .pg_isready ] && pg_isready -U "$USER" -h 127.0.0.1 - -tasks: - - key: code - call: git/clone 2.0.7 - with: - repository: https://github.com/pgdogdev/pgdog.git - ref: ${{ init.commit-sha }} - submodules: false - preserve-git-dir: true - - - key: rust - call: rust-lang/install 1.0.6 - with: - rust-version: 1.95.0 - - - key: integration-node - call: nodejs/install 1.1.13 - with: - node-version: "20.11.1" - - - key: integration-system-deps - run: | - sudo apt-get update - if [ -e /usr/bin/pg_config.libpq-dev ] && ! sudo dpkg-divert --list /usr/bin/pg_config >/dev/null 2>&1; then - sudo rm -f /usr/bin/pg_config.libpq-dev - fi - sudo apt-get install -y \ - build-essential \ - clang \ - curl \ - default-jdk \ - golang-go \ - lcov \ - libpq-dev \ - libpq5 \ - libssl-dev \ - libyaml-dev \ - lsof \ - mold \ - php-cli \ - php-pgsql \ - pkg-config \ - postgresql \ - postgresql-client \ - psmisc \ - python3 \ - python3-pip \ - python3-venv \ - python3-virtualenv \ - ruby-full - sudo install -d /usr/share/keyrings - curl -fsSL https://www.postgresql.org/media/keys/ACCC4CF8.asc | sudo gpg --dearmor -o /usr/share/keyrings/postgresql.gpg - echo "deb [signed-by=/usr/share/keyrings/postgresql.gpg] https://apt.postgresql.org/pub/repos/apt $(. /etc/os-release && echo "${VERSION_CODENAME}")-pgdg main" | sudo tee /etc/apt/sources.list.d/pgdg.list >/dev/null - sudo apt-get update - sudo apt-get install -y postgresql-client-18 - sudo apt-get remove -y cmake || true - sudo pip3 install --break-system-packages cmake==3.31.6 - cmake --version - sudo gem install bundler - sudo curl -SL https://github.com/docker/compose/releases/download/v2.36.1/docker-compose-linux-x86_64 -o /usr/local/bin/docker-compose - sudo chmod +x /usr/local/bin/docker-compose - sudo apt-get clean - cache: - ttl: 7 days - - - key: cargo-nextest - use: [integration-system-deps, rust] - run: cargo install cargo-nextest --version "0.9.78" --locked - - - key: cargo-llvm-cov - use: [cargo-nextest] - run: cargo install cargo-llvm-cov --locked --version "0.6.10" - - - key: codecov-cli - use: integration-system-deps - run: | - curl -fsSL -o codecov https://cli.codecov.io/v11.2.8/linux/codecov - chmod +x codecov - - - key: integration-build-pgdog-cov - use: [code, rust, integration-system-deps, cargo-nextest, cargo-llvm-cov] - agent: - cpus: 16 - memory: 32gb - tmpfs: true - env: - RUSTFLAGS: "-C link-dead-code" - filter: *build-filter - run: | - cargo llvm-cov clean --workspace - mkdir -p target/llvm-cov-target/profiles - cargo llvm-cov run --no-report --release --package pgdog --bin pgdog -- --help - rm -f target/llvm-cov-target/profiles/*.profraw - rm -f target/llvm-cov-target/profiles/.last_snapshot - rm -rf target/llvm-cov-target/reports - - BIN_PATH=$(find target/llvm-cov-target -type f -path '*/release/pgdog' | head -n 1) - if [ -z "$BIN_PATH" ]; then - echo "Instrumented PgDog binary not found" >&2 - exit 1 - fi - - printf '%s\n' "$(realpath "$BIN_PATH")" > "$RWX_ENV/PGDOG_BIN" - - - key: integration-pgbench - use: integration-build-pgdog-cov - background-processes: *postgres-bg–processes - timeout: 15m - run: | - export LLVM_PROFILE_FILE="$PWD/target/llvm-cov-target/profiles/pgbench-%p-%m.profraw" - - bash integration/setup.sh - - source integration/common.sh - run_pgdog "$PWD/integration" - wait_for_pgdog - psql "postgres://admin:pgdog@127.0.0.1:6432/admin" -c "SET query_timeout TO 10000" - timeout --signal=TERM --kill-after=30s 10m bash integration/pgbench/dev.sh - timeout --signal=TERM --kill-after=30s 10m bash integration/pgbench/stress.sh - stop_pgdog - - cargo llvm-cov report --release --package pgdog --lcov --output-path pgbench.lcov - outputs: - filesystem: false - artifacts: - - key: lcov-pgbench - path: pgbench.lcov - - - key: integration-schema-sync - use: integration-build-pgdog-cov - background-processes: *postgres-bg–processes - run: | - export LLVM_PROFILE_FILE="$PWD/target/llvm-cov-target/profiles/schema-sync-%p-%m.profraw" - - bash integration/setup.sh - timeout --signal=TERM --kill-after=30s 8m bash integration/schema_sync/run.sh - - cargo llvm-cov report --release --package pgdog --lcov --output-path schema-sync.lcov - outputs: - filesystem: false - artifacts: - - key: lcov-schema-sync - path: schema-sync.lcov - - - key: integration-go - use: integration-build-pgdog-cov - background-processes: *postgres-bg–processes - timeout: 15m - run: | - export LLVM_PROFILE_FILE="$PWD/target/llvm-cov-target/profiles/go-%p-%m.profraw" - - max_attempts=2 - attempt=1 - while true; do - echo "integration-go attempt ${attempt}/${max_attempts}" - bash integration/setup.sh - if timeout --signal=TERM --kill-after=30s 13m bash integration/go/run.sh; then - break - fi - if [ "${attempt}" -ge "${max_attempts}" ]; then - exit 1 - fi - attempt=$((attempt + 1)) - sleep 5 - done - - cargo llvm-cov report --release --package pgdog --lcov --output-path go.lcov - outputs: - filesystem: false - artifacts: - - key: lcov-go - path: go.lcov - - - key: integration-js - use: [integration-build-pgdog-cov, integration-node] - background-processes: *postgres-bg–processes - timeout: 10m - run: | - export LLVM_PROFILE_FILE="$PWD/target/llvm-cov-target/profiles/js-%p-%m.profraw" - - bash integration/setup.sh - - pushd integration/js/pg_tests - npm pkg set type=module - popd - timeout --signal=TERM --kill-after=30s 6m bash integration/js/pg_tests/run.sh - - cargo llvm-cov report --release --package pgdog --lcov --output-path js.lcov - outputs: - filesystem: false - artifacts: - - key: lcov-js - path: js.lcov - - - key: integration-ruby - use: integration-build-pgdog-cov - background-processes: *postgres-bg–processes - timeout: 10m - run: | - export LLVM_PROFILE_FILE="$PWD/target/llvm-cov-target/profiles/ruby-%p-%m.profraw" - - cleanup() { - killall -TERM toxiproxy-server 2>/dev/null || true - killall -TERM pgdog 2>/dev/null || true - sleep 1 - killall -KILL toxiproxy-server 2>/dev/null || true - killall -KILL pgdog 2>/dev/null || true - } - trap cleanup EXIT - - bash integration/setup.sh - bash integration/toxi/setup.sh - - export PGDOG_KEEP_RUNNING=1 - timeout --signal=TERM --kill-after=30s 6m bash integration/ruby/run.sh - - cargo llvm-cov report --release --package pgdog --lcov --output-path ruby.lcov - outputs: - filesystem: false - artifacts: - - key: lcov-ruby - path: ruby.lcov - - - key: integration-java - use: integration-build-pgdog-cov - background-processes: *postgres-bg–processes - timeout: 10m - run: | - export LLVM_PROFILE_FILE="$PWD/target/llvm-cov-target/profiles/java-%p-%m.profraw" - - bash integration/setup.sh - timeout --signal=TERM --kill-after=30s 6m bash integration/java/run.sh - - cargo llvm-cov report --release --package pgdog --lcov --output-path java.lcov - outputs: - filesystem: false - artifacts: - - key: lcov-java - path: java.lcov - - - key: integration-mirror - use: integration-build-pgdog-cov - background-processes: *postgres-bg–processes - timeout: 10m - run: | - export LLVM_PROFILE_FILE="$PWD/target/llvm-cov-target/profiles/mirror-%p-%m.profraw" - - bash integration/setup.sh - timeout --signal=TERM --kill-after=30s 6m bash integration/mirror/run.sh - - cargo llvm-cov report --release --package pgdog --lcov --output-path mirror.lcov - outputs: - filesystem: false - artifacts: - - key: lcov-mirror - path: mirror.lcov - - - key: integration-sql - use: integration-build-pgdog-cov - background-processes: *postgres-bg–processes - timeout: 10m - run: | - export LLVM_PROFILE_FILE="$PWD/target/llvm-cov-target/profiles/sql-%p-%m.profraw" - - bash integration/setup.sh - timeout --signal=TERM --kill-after=30s 8m bash integration/sql/run.sh - - cargo llvm-cov report --release --package pgdog --lcov --output-path sql.lcov - outputs: - filesystem: false - artifacts: - - key: lcov-sql - path: sql.lcov - - - key: integration-toxi - use: integration-build-pgdog-cov - background-processes: *postgres-bg–processes - timeout: 12m - run: | - export LLVM_PROFILE_FILE="$PWD/target/llvm-cov-target/profiles/toxi-%p-%m.profraw" - - cleanup() { - killall -TERM toxiproxy-server 2>/dev/null || true - killall -TERM pgdog 2>/dev/null || true - sleep 1 - killall -KILL toxiproxy-server 2>/dev/null || true - killall -KILL pgdog 2>/dev/null || true - } - trap cleanup EXIT - - bash integration/setup.sh - bash integration/toxi/setup.sh - - TOXI_LOG=/tmp/integration-toxi.log - : > "$TOXI_LOG" - - set +e - timeout --signal=TERM --kill-after=30s 8m bash integration/toxi/run.sh >"$TOXI_LOG" 2>&1 & - suite_pid=$! - - while kill -0 "$suite_pid" 2>/dev/null; do - if grep -Eq '[0-9]+ examples, 0 failures' "$TOXI_LOG"; then - killall -TERM pgdog 2>/dev/null || true - killall -TERM toxiproxy-server 2>/dev/null || true - sleep 2 - killall -KILL pgdog 2>/dev/null || true - killall -KILL toxiproxy-server 2>/dev/null || true - break - fi - sleep 1 - done - - wait "$suite_pid" - suite_status=$? - cat "$TOXI_LOG" - - if [ "$suite_status" -eq 124 ] && grep -Eq '[0-9]+ examples, 0 failures' "$TOXI_LOG"; then - suite_status=0 - fi - set -e - - if [ "$suite_status" -ne 0 ]; then - exit "$suite_status" - fi - - cargo llvm-cov report --release --package pgdog --lcov --output-path toxi.lcov - outputs: - filesystem: false - artifacts: - - key: lcov-toxi - path: toxi.lcov - - - key: integration-rust - use: integration-build-pgdog-cov - background-processes: *postgres-bg–processes - timeout: 20m - agent: - cpus: 16 - memory: 32gb - tmpfs: true - run: | - export LLVM_PROFILE_FILE="$PWD/target/llvm-cov-target/profiles/rust-%p-%m.profraw" - - cleanup() { - killall -TERM toxiproxy-server 2>/dev/null || true - killall -TERM pgdog 2>/dev/null || true - sleep 1 - killall -KILL toxiproxy-server 2>/dev/null || true - killall -KILL pgdog 2>/dev/null || true - } - trap cleanup EXIT - - bash integration/setup.sh - bash integration/toxi/setup.sh - - export PGDOG_KEEP_RUNNING=1 - timeout --signal=TERM --kill-after=30s 10m bash integration/rust/run.sh - - cargo llvm-cov report --release --package pgdog --lcov --output-path rust.lcov - outputs: - filesystem: false - artifacts: - - key: lcov-rust - path: rust.lcov - - - key: integration-copy-data - use: integration-build-pgdog-cov - docker: true - background-processes: *postgres-bg–processes - timeout: 20m - run: | - export LLVM_PROFILE_FILE="$PWD/target/llvm-cov-target/profiles/copy-data-%p-%m.profraw" - - bash integration/setup.sh - timeout --signal=TERM --kill-after=30s 18m bash integration/copy_data/run.sh - - cargo llvm-cov report --release --package pgdog --lcov --output-path copy-data.lcov - outputs: - filesystem: false - artifacts: - - key: lcov-copy-data - path: copy-data.lcov - - - key: integration-resharding - use: integration-build-pgdog-cov - docker: true - timeout: 20m - agent: - cpus: 4 - memory: 16gb - run: | - export LLVM_PROFILE_FILE="$PWD/target/llvm-cov-target/profiles/resharding-%p-%m.profraw" - - cleanup() { - (cd integration/resharding && docker-compose down >/dev/null 2>&1 || true) - killall -TERM pgdog 2>/dev/null || true - sleep 1 - killall -KILL pgdog 2>/dev/null || true - } - trap cleanup EXIT - - timeout --signal=TERM --kill-after=90s 16m bash integration/resharding/dev.sh - - cargo llvm-cov report --release --package pgdog --lcov --output-path resharding.lcov - outputs: - filesystem: false - artifacts: - - key: lcov-resharding - path: resharding.lcov - - - key: integration-python - use: integration-build-pgdog-cov - background-processes: *postgres-bg–processes - timeout: 12m - run: | - export LLVM_PROFILE_FILE="$PWD/target/llvm-cov-target/profiles/python-%p-%m.profraw" - - bash integration/setup.sh - timeout --signal=TERM --kill-after=30s 10m bash integration/python/run.sh - - cargo llvm-cov report --release --package pgdog --lcov --output-path python.lcov - outputs: - filesystem: false - artifacts: - - key: lcov-python - path: python.lcov - - - key: integration-load-balancer - use: integration-build-pgdog-cov - docker: true - timeout: 20m - agent: - cpus: 4 - memory: 16gb - run: | - export LLVM_PROFILE_FILE="$PWD/target/llvm-cov-target/profiles/load-balancer-%p-%m.profraw" - - cleanup() { - (cd integration/load_balancer && docker-compose down >/dev/null 2>&1 || true) - killall -TERM pgdog 2>/dev/null || true - sleep 1 - killall -KILL pgdog 2>/dev/null || true - } - trap cleanup EXIT - - timeout --signal=TERM --kill-after=30s 16m bash integration/load_balancer/run.sh - cargo llvm-cov report --release --package pgdog --lcov --output-path load-balancer.lcov - outputs: - filesystem: false - artifacts: - - key: lcov-load-balancer - path: load-balancer.lcov - - - key: integration-complex - use: integration-build-pgdog-cov - background-processes: *postgres-bg–processes - timeout: 15m - run: | - export LLVM_PROFILE_FILE="$PWD/target/llvm-cov-target/profiles/complex-%p-%m.profraw" - - bash integration/setup.sh - - pushd integration/python - virtualenv venv - source venv/bin/activate - pip install -r requirements.txt - deactivate - popd - - timeout --signal=TERM --kill-after=30s 10m bash integration/complex/run.sh - - cargo llvm-cov report --release --package pgdog --lcov --output-path complex.lcov - outputs: - filesystem: false - artifacts: - - key: lcov-complex - path: complex.lcov - - - key: integration-dry-run - use: [integration-build-pgdog-cov, integration-node] - background-processes: *postgres-bg–processes - timeout: 10m - run: | - export LLVM_PROFILE_FILE="$PWD/target/llvm-cov-target/profiles/dry-run-%p-%m.profraw" - - bash integration/setup.sh - - pushd integration/dry_run - npm pkg set type=module - popd - timeout --signal=TERM --kill-after=30s 8m bash integration/dry_run/run.sh - - cargo llvm-cov report --release --package pgdog --lcov --output-path dry-run.lcov - outputs: - filesystem: false - artifacts: - - key: lcov-dry-run - path: dry-run.lcov - - - key: integration-lcov - use: integration-system-deps - run: | - lcov \ - -a "$LCOV_PGBENCH" \ - -a "$LCOV_SCHEMA_SYNC" \ - -a "$LCOV_GO" \ - -a "$LCOV_JS" \ - -a "$LCOV_RUBY" \ - -a "$LCOV_JAVA" \ - -a "$LCOV_MIRROR" \ - -a "$LCOV_SQL" \ - -a "$LCOV_TOXI" \ - -a "$LCOV_RUST" \ - -a "$LCOV_COPY_DATA" \ - -a "$LCOV_RESHARDING" \ - -a "$LCOV_PYTHON" \ - -a "$LCOV_LOAD_BALANCER" \ - -a "$LCOV_COMPLEX" \ - -a "$LCOV_DRY_RUN" \ - -o integration.lcov - env: - LCOV_PGBENCH: ${{ tasks.integration-pgbench.artifacts.lcov-pgbench }} - LCOV_SCHEMA_SYNC: ${{ tasks.integration-schema-sync.artifacts.lcov-schema-sync }} - LCOV_GO: ${{ tasks.integration-go.artifacts.lcov-go }} - LCOV_JS: ${{ tasks.integration-js.artifacts.lcov-js }} - LCOV_RUBY: ${{ tasks.integration-ruby.artifacts.lcov-ruby }} - LCOV_JAVA: ${{ tasks.integration-java.artifacts.lcov-java }} - LCOV_MIRROR: ${{ tasks.integration-mirror.artifacts.lcov-mirror }} - LCOV_SQL: ${{ tasks.integration-sql.artifacts.lcov-sql }} - LCOV_TOXI: ${{ tasks.integration-toxi.artifacts.lcov-toxi }} - LCOV_RUST: ${{ tasks.integration-rust.artifacts.lcov-rust }} - LCOV_COPY_DATA: ${{ tasks.integration-copy-data.artifacts.lcov-copy-data }} - LCOV_RESHARDING: ${{ tasks.integration-resharding.artifacts.lcov-resharding }} - LCOV_PYTHON: ${{ tasks.integration-python.artifacts.lcov-python }} - LCOV_LOAD_BALANCER: ${{ tasks.integration-load-balancer.artifacts.lcov-load-balancer }} - LCOV_COMPLEX: ${{ tasks.integration-complex.artifacts.lcov-complex }} - LCOV_DRY_RUN: ${{ tasks.integration-dry-run.artifacts.lcov-dry-run }} - outputs: - artifacts: - - key: integration-lcov - path: integration.lcov - - - key: integration-codecov-upload - use: [code, codecov-cli] - env: - CODECOV_TOKEN: ${{ vaults.pgdog-development.secrets.CODECOV_TOKEN }} - LCOV_FILE: ${{ tasks.integration-lcov.artifacts.integration-lcov }} - run: | - ./codecov --verbose upload-process \ - --disable-search \ - --fail-on-error \ - -F integration \ - -f "$LCOV_FILE" - outputs: - filesystem: false diff --git a/.rwx/rust.yml b/.rwx/rust.yml deleted file mode 100644 index fb298e523..000000000 --- a/.rwx/rust.yml +++ /dev/null @@ -1,349 +0,0 @@ -on: - github: - pull_request: - init: - commit-sha: ${{ event.git.sha }} - branch: ${{ event.git.branch }} - push: - if: ${{ event.git.branch == "main" }} - init: - commit-sha: ${{ event.git.sha }} - branch: ${{ event.git.branch }} - cli: - init: - commit-sha: ${{ event.git.sha }} - branch: local - -base: - image: ubuntu:24.04 - config: rwx/base 1.0.0 - -concurrency-pools: - - id: pgdogdev/pgdog:rust:${{ init.branch }} - if: ${{ init.branch != "main" && init.branch != "local" }} - capacity: 1 - on-overflow: cancel-running - -aliases: - shared-filter: &shared-filter - - "!.claude" - - "!.git" - - "!.gitattributes" - - "!.github" - - "!.gitignore" - - "!.rwx" - - "!AGENTS.md" - - "!CLAUDE.md" - - "!CONTRIBUTING.md" - - "!LICENSE" - - "!README.md" - - "!SECURITY.md" - - "!docs" - -tasks: - - key: code - call: git/clone 2.0.7 - with: - repository: https://github.com/pgdogdev/pgdog.git - ref: ${{ init.commit-sha }} - preserve-git-dir: true - submodules: false - - - key: rust - call: rust-lang/install 1.0.6 - with: - rust-version: 1.95.0 - - - key: system-packages - run: | - sudo apt-get update - if [ -e /usr/bin/pg_config.libpq-dev ] && ! sudo dpkg-divert --list /usr/bin/pg_config >/dev/null 2>&1; then - sudo rm -f /usr/bin/pg_config.libpq-dev - fi - sudo apt-get install -y \ - build-essential \ - clang \ - curl \ - lcov \ - libpq-dev \ - libssl-dev \ - mold \ - pkg-config \ - postgresql \ - postgresql-client \ - psmisc \ - python3-pip \ - python3-venv \ - python3-virtualenv \ - ruby-full - sudo apt-get remove -y cmake || true - sudo pip3 install --break-system-packages cmake==3.31.6 - cmake --version - sudo gem install bundler - sudo apt-get clean - cache: - ttl: 7 days - - - key: cargo-nextest - use: [system-packages, rust] - run: cargo install cargo-nextest --version "0.9.78" --locked - - - key: cargo-llvm-cov - use: [cargo-nextest] - run: cargo install cargo-llvm-cov --locked --version "0.6.10" - - - key: codecov-cli - use: system-packages - run: | - curl -fsSL -o codecov https://cli.codecov.io/v11.2.8/linux/codecov - chmod +x codecov - - - key: cargo-build-workspace - use: [system-packages, code, rust, cargo-nextest] - agent: - cpus: 16 - memory: 32gb - tmpfs: true - run: cargo test --workspace --no-run - outputs: - filesystem: - deduplicate: true - filter: *shared-filter - - - key: cargo-build-workspace-cov - use: [system-packages, code, rust, cargo-nextest, cargo-llvm-cov] - agent: - cpus: 16 - memory: 32gb - tmpfs: true - run: cargo llvm-cov nextest --workspace --no-clean -E 'none()' --no-tests=pass - outputs: - filesystem: - deduplicate: true - filter: ["!**/*.profraw"] - filter: *shared-filter - - - key: cargo-test-pgdog - use: cargo-build-workspace-cov - timeout: 20m - run: | - sudo service postgresql start - sudo -u postgres createuser --superuser --login "$USER" || true - sudo -u postgres createdb "$USER" || true - - bash integration/setup.sh || (sudo service postgresql restart && bash integration/setup.sh) - - cargo llvm-cov nextest \ - --workspace \ - --no-clean \ - --lcov \ - --output-path lcov-pgdog.info \ - --no-fail-fast \ - --test-threads=1 \ - -E 'package(pgdog)' - outputs: - filesystem: false - artifacts: - - key: lcov-pgdog - path: lcov-pgdog.info - filter: *shared-filter - - - key: cargo-test-pgdog-config - use: cargo-build-workspace-cov - run: | - cargo llvm-cov nextest \ - --workspace \ - --no-clean \ - --lcov \ - --output-path lcov-pgdog-config.info \ - --no-fail-fast \ - --test-threads=1 \ - -E 'package(pgdog-config)' - outputs: - filesystem: false - artifacts: - - key: lcov-pgdog-config - path: lcov-pgdog-config.info - filter: *shared-filter - - - key: cargo-test-pgdog-vector - use: cargo-build-workspace-cov - run: | - cargo llvm-cov nextest \ - --workspace \ - --no-clean \ - --lcov \ - --output-path lcov-pgdog-vector.info \ - --no-fail-fast \ - --test-threads=1 \ - -E 'package(pgdog-vector)' - outputs: - filesystem: false - artifacts: - - key: lcov-pgdog-vector - path: lcov-pgdog-vector.info - filter: *shared-filter - - - key: cargo-test-pgdog-stats - use: cargo-build-workspace-cov - run: | - cargo llvm-cov nextest \ - --workspace \ - --no-clean \ - --lcov \ - --output-path lcov-pgdog-stats.info \ - --no-fail-fast \ - --test-threads=1 \ - -E 'package(pgdog-stats)' - outputs: - filesystem: false - artifacts: - - key: lcov-pgdog-stats - path: lcov-pgdog-stats.info - filter: *shared-filter - - - key: cargo-test-pgdog-postgres-types - use: cargo-build-workspace-cov - run: | - cargo llvm-cov nextest \ - --workspace \ - --no-clean \ - --lcov \ - --output-path lcov-pgdog-postgres-types.info \ - --no-fail-fast \ - --test-threads=1 \ - -E 'package(pgdog-postgres-types)' - outputs: - filesystem: false - artifacts: - - key: lcov-pgdog-postgres-types - path: lcov-pgdog-postgres-types.info - filter: *shared-filter - - - key: rust-core-lcov - use: system-packages - run: | - lcov \ - -a "$LCOV_PGDOG" \ - -a "$LCOV_PGDOG_CONFIG" \ - -a "$LCOV_PGDOG_VECTOR" \ - -a "$LCOV_PGDOG_STATS" \ - -a "$LCOV_PGDOG_POSTGRES_TYPES" \ - -o lcov.info - env: - LCOV_PGDOG: ${{ tasks.cargo-test-pgdog.artifacts.lcov-pgdog }} - LCOV_PGDOG_CONFIG: ${{ tasks.cargo-test-pgdog-config.artifacts.lcov-pgdog-config }} - LCOV_PGDOG_VECTOR: ${{ tasks.cargo-test-pgdog-vector.artifacts.lcov-pgdog-vector }} - LCOV_PGDOG_STATS: ${{ tasks.cargo-test-pgdog-stats.artifacts.lcov-pgdog-stats }} - LCOV_PGDOG_POSTGRES_TYPES: ${{ tasks.cargo-test-pgdog-postgres-types.artifacts.lcov-pgdog-postgres-types }} - outputs: - artifacts: - - key: core-lcov - path: lcov.info - - - key: rust-codecov-upload - use: [code, codecov-cli] - env: - CODECOV_TOKEN: ${{ vaults.pgdog-development.secrets.CODECOV_TOKEN }} - LCOV_FILE: ${{ tasks.rust-core-lcov.artifacts.core-lcov }} - run: | - ./codecov --verbose upload-process \ - --disable-search \ - --fail-on-error \ - -F unit \ - -f "$LCOV_FILE" - outputs: - filesystem: false - - - key: rust-fmt-check - use: [code, rust] - run: cargo fmt --all -- --check - outputs: - filesystem: false - filter: *shared-filter - - - key: rust-pgdog-clippy - use: [system-packages, code, rust] - agent: - cpus: 4 - run: cd pgdog && cargo clippy - outputs: - filesystem: false - filter: *shared-filter - - - key: rust-workspace-build - use: [system-packages, code, rust] - agent: - cpus: 8 - run: cargo build - outputs: - filesystem: false - filter: *shared-filter - - - key: rust-workspace-check-release - use: [system-packages, code, rust] - agent: - cpus: 4 - run: cargo check --release - outputs: - filesystem: false - filter: *shared-filter - - - key: rust-doc-tests - use: cargo-build-workspace - run: cargo test --doc - outputs: - filesystem: false - filter: *shared-filter - - - key: rust-plugin-unit - use: cargo-build-workspace - timeout: 30m - run: | - set +e - cargo nextest run -E 'package(pgdog-example-plugin)' --no-fail-fast - status=$? - set -e - - if [ "$status" -ne 0 ]; then - printf '%s\n' \ - "pgdog-example-plugin unit tests failed with exit code $status" \ - > "$RWX_WARNINGS/plugin-unit" - fi - - exit 0 - outputs: - filesystem: false - filter: *shared-filter - - - key: rust-plugin-integration - use: [system-packages, code, rust] - agent: - cpus: 8 - memory: 16gb - timeout: 30m - run: | - sudo service postgresql start - sudo -u postgres createuser --superuser --login "$USER" || true - sudo -u postgres createdb "$USER" || true - - bash integration/setup.sh || (sudo service postgresql restart && bash integration/setup.sh) - - cargo build --release -p pgdog --bin pgdog - export PGDOG_BIN="$PWD/target/release/pgdog" - - set +e - timeout --signal=TERM --kill-after=30s 11m bash integration/plugins/run.sh - status=$? - set -e - - if [ "$status" -ne 0 ]; then - printf '%s\n' \ - "pgdog-example-plugin integration tests failed with exit code $status" \ - > "$RWX_WARNINGS/plugin-integration" - fi - - exit 0 - outputs: - filesystem: false - filter: *shared-filter diff --git a/integration/resharding/docker-compose.yaml b/integration/resharding/docker-compose.yaml index b89eef8c0..26a26475b 100644 --- a/integration/resharding/docker-compose.yaml +++ b/integration/resharding/docker-compose.yaml @@ -1,6 +1,6 @@ services: source_0: - image: postgres:18 + image: postgres:16 command: postgres -c wal_level=logical environment: POSTGRES_USER: pgdog @@ -14,7 +14,7 @@ services: - postgres source_1: - image: postgres:18 + image: postgres:16 command: postgres -c wal_level=logical environment: POSTGRES_USER: pgdog @@ -28,7 +28,7 @@ services: - postgres destination_0: - image: postgres:18 + image: postgres:16 command: postgres -c wal_level=logical environment: POSTGRES_USER: pgdog @@ -40,7 +40,7 @@ services: - postgres destination_1: - image: postgres:18 + image: postgres:16 command: postgres -c wal_level=logical environment: POSTGRES_USER: pgdog diff --git a/integration/resharding/pgdog.toml b/integration/resharding/pgdog.toml index 2d627feb6..5aa5e7bf4 100644 --- a/integration/resharding/pgdog.toml +++ b/integration/resharding/pgdog.toml @@ -51,6 +51,3 @@ data_type = "bigint" [admin] password = "pgdog" user = "pgdog" - -[replication] -pg_dump_path = "/usr/lib/postgresql/18/bin/pg_dump" diff --git a/integration/resharding/run.sh b/integration/resharding/run.sh new file mode 100755 index 000000000..71638fcf5 --- /dev/null +++ b/integration/resharding/run.sh @@ -0,0 +1,16 @@ +#!/bin/bash +set -e + +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) + +# Safety net: docker-compose down and any stray pgdog processes are cleaned up +# on exit even if dev.sh is interrupted mid-flight by timeout or signal. +cleanup() { + (cd "${SCRIPT_DIR}" && docker-compose down >/dev/null 2>&1 || true) + killall -TERM pgdog 2>/dev/null || true + sleep 1 + killall -KILL pgdog 2>/dev/null || true +} +trap cleanup EXIT INT TERM + +timeout --signal=TERM --kill-after=90s 16m bash "${SCRIPT_DIR}/dev.sh" diff --git a/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs b/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs index 77f9f60a3..6097f7e2a 100644 --- a/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs +++ b/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs @@ -7,8 +7,8 @@ use pgdog_config::QueryParserEngine; use tokio::sync::Notify; use tokio::task::JoinHandle; use tokio::time::Instant; -use tokio::{select, spawn, time::interval}; -use tracing::{debug, info, warn}; +use tokio::{select, spawn, time::interval, time::timeout}; +use tracing::{debug, error, info, warn}; use super::super::{publisher::Table, Error, TableValidationError, TableValidationErrors}; use super::ReplicationSlot; @@ -214,6 +214,12 @@ impl Publisher { stream.set_current_lsn(slot.lsn().lsn); let mut check_lag = interval(Duration::from_secs(1)); + // Skip missed ticks: if replication_lag() takes longer than 1 second + // (e.g. due to a slow or half-open server_meta connection), the default + // Burst mode would re-fire the tick immediately, starving slot.replicate() + // and freezing the WAL stream. Skip mode ensures at most one lag check + // per second regardless of how long the previous one took. + check_lag.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); let replication_lag = self.replication_lag.clone(); let stop = self.stop.clone(); let last_transaction = self.last_transaction.clone(); @@ -223,66 +229,102 @@ impl Publisher { // Replicate in parallel. let handle = spawn(async move { - slot.start_replication().await?; - let progress = Progress::new_stream(); - - loop { - select! { - _ = stop.notified() => { - slot.stop_replication().await?; - } + let result = async { + slot.start_replication().await?; + let progress = Progress::new_stream(); + + loop { + select! { + _ = stop.notified() => { + slot.stop_replication().await?; + } - // This is cancellation-safe. - replication_data = slot.replicate(Duration::MAX) => { - let replication_data = replication_data?; - - match replication_data { - Some(ReplicationData::CopyData(data)) => { - let lsn = if let Some(ReplicationMeta::KeepAlive(ka)) = - data.replication_meta() - { - if ka.reply() { - slot.status_update(stream.status_update()).await?; - } - debug!( - "origin at lsn {} [{}]", - Lsn::from_i64(ka.wal_end), - slot.server()?.addr() - ); - ka.wal_end - } else { - if let Some(status_update) = stream.handle(data).await? { - slot.status_update(status_update).await?; - *last_transaction.lock() = Some(Instant::now()); - } - stream.lsn() - }; - progress.update(stream.bytes_sharded(), lsn); - } - Some(ReplicationData::CopyDone) => (), - None => { - slot.drop_slot().await?; - break; + // This is cancellation-safe. + replication_data = slot.replicate(Duration::MAX) => { + let replication_data = replication_data?; + + match replication_data { + Some(ReplicationData::CopyData(data)) => { + let lsn = if let Some(ReplicationMeta::KeepAlive(ka)) = + data.replication_meta() + { + if ka.reply() { + slot.status_update(stream.status_update()).await?; + } + debug!( + "origin at lsn {} [{}]", + Lsn::from_i64(ka.wal_end), + slot.server()?.addr() + ); + ka.wal_end + } else { + if let Some(status_update) = stream.handle(data).await? { + slot.status_update(status_update).await?; + *last_transaction.lock() = Some(Instant::now()); + } + stream.lsn() + }; + progress.update(stream.bytes_sharded(), lsn); + } + Some(ReplicationData::CopyDone) => (), + None => { + slot.drop_slot().await?; + break; + } } } - } - - _ = check_lag.tick() => { - let lag = slot.replication_lag().await?; - let mut guard = replication_lag.lock(); - guard.insert(number, lag); + _ = check_lag.tick() => { + // Timeout guards against a hung server_meta connection + // (half-open TCP in CI Docker environments) that would + // otherwise block this branch — and therefore slot.replicate() + // — indefinitely. + match timeout( + Duration::from_secs(5), + slot.replication_lag(), + ).await { + Ok(Ok(lag)) => { + let mut guard = replication_lag.lock(); + guard.insert(number, lag); + } + Ok(Err(err)) => { + warn!( + "replication lag check failed [{}]: {}", + source_cluster.name(), + err + ); + } + Err(_) => { + warn!( + "replication lag check timed out [{}]", + source_cluster.name() + ); + // Drop server_meta so the next call creates a fresh connection. + slot.reset_server_meta(); + } + } - let missed = stream.missed_rows(); - if missed.non_zero() { - warn!("replication {} => {} has missing rows: {}", source_cluster.name(), dest.name(), missed); + let missed = stream.missed_rows(); + if missed.non_zero() { + warn!("replication {} => {} has missing rows: {}", source_cluster.name(), dest.name(), missed); + } } - } } - } - Ok::<(), Error>(()) + Ok::<(), Error>(()) + }.await; + + if let Err(ref err) = result { + error!( + "replication stream {} => {} shard={} terminated: {}", + source_cluster.name(), + dest.name(), + number, + err + ); + } + result }); streams.push(handle); diff --git a/pgdog/src/backend/replication/logical/publisher/slot.rs b/pgdog/src/backend/replication/logical/publisher/slot.rs index 4636bf101..3a65b6f97 100644 --- a/pgdog/src/backend/replication/logical/publisher/slot.rs +++ b/pgdog/src/backend/replication/logical/publisher/slot.rs @@ -126,6 +126,12 @@ impl ReplicationSlot { } } + /// Drop the server_meta connection so the next call to `server_meta()` creates + /// a fresh one. Called after a lag-check timeout to discard a half-open connection. + pub fn reset_server_meta(&mut self) { + self.server_meta = None; + } + /// Replication lag in bytes for this slot. pub async fn replication_lag(&mut self) -> Result { let query = format!( diff --git a/pgdog/src/backend/replication/logical/subscriber/stream.rs b/pgdog/src/backend/replication/logical/subscriber/stream.rs index 0a065b207..3dd2d33c9 100644 --- a/pgdog/src/backend/replication/logical/subscriber/stream.rs +++ b/pgdog/src/backend/replication/logical/subscriber/stream.rs @@ -265,11 +265,16 @@ impl StreamSubscriber { } for conn in &mut conns { - // Keep server connections always synchronized. - for _ in 0..2 { + // Postgres can send asynchronous messages (Notice 'N', ParameterStatus 'S', + // NotificationResponse 'A') between Bind/Execute responses at any time. + // Count only BindComplete and CommandComplete; loop over everything else. + let mut got_bind = false; + let mut got_cmd = false; + while !got_bind || !got_cmd { let msg = conn.read().await?; match msg.code() { 'C' => { + got_cmd = true; let cmd = CommandComplete::try_from(msg)?; let rows = cmd .rows()? @@ -285,12 +290,22 @@ impl StreamSubscriber { } } } - '2' => (), + '2' => { + got_bind = true; + } 'E' => { return Err(Error::PgError(Box::new(ErrorResponse::from_bytes( msg.to_bytes()?, )?))) } + // Skip asynchronous server messages that may appear between responses. + 'N' | 'S' | 'A' => { + trace!( + "[{}] async message during send: {}", + conn.addr(), + msg.code() + ); + } c => return Err(Error::SendOutOfSync(c)), } } @@ -419,18 +434,29 @@ impl StreamSubscriber { server.flush().await?; } for server in &mut self.connections { - // Drain responses from server. - let msg = server.read().await?; - trace!("[{}] --> {:?}", server.addr(), msg); - - match msg.code() { - 'E' => { - return Err(Error::PgError(Box::new(ErrorResponse::from_bytes( - msg.to_bytes()?, - )?))) + // Loop until ReadyForQuery, skipping any asynchronous server messages + // (Notice 'N', ParameterStatus 'S', NotificationResponse 'A') that Postgres + // may send at any point during the Sync/ReadyForQuery exchange. + loop { + let msg = server.read().await?; + trace!("[{}] --> {:?}", server.addr(), msg); + + match msg.code() { + 'E' => { + return Err(Error::PgError(Box::new(ErrorResponse::from_bytes( + msg.to_bytes()?, + )?))) + } + 'Z' => break, + 'N' | 'S' | 'A' => { + trace!( + "[{}] async message during commit: {}", + server.addr(), + msg.code() + ); + } + c => return Err(Error::CommitOutOfSync(c)), } - 'Z' => (), - c => return Err(Error::CommitOutOfSync(c)), } } @@ -502,8 +528,12 @@ impl StreamSubscriber { } for server in &mut self.connections { - let num_messages = if self.in_transaction { 4 } else { 5 }; - for _ in 0..num_messages { + // Expect 4 ParseComplete ('1') responses, one per statement. + // If not in a transaction a Sync was sent, so there is also a + // ReadyForQuery ('Z') terminator. In either case, skip asynchronous + // server messages (Notice 'N', ParameterStatus 'S', NotificationResponse 'A'). + let mut parse_completes = 0; + loop { let msg = server.read().await?; trace!("[{}] --> {:?}", server.addr(), msg); @@ -513,8 +543,20 @@ impl StreamSubscriber { msg.to_bytes()?, )?))) } - 'Z' => break, - '1' => continue, + 'Z' => break, // ReadyForQuery ends the exchange (not-in-transaction path) + '1' => { + parse_completes += 1; + if self.in_transaction && parse_completes == 4 { + break; // Flush path: stop after 4 ParseCompletes + } + } + 'N' | 'S' | 'A' => { + trace!( + "[{}] async message during relation prepare: {}", + server.addr(), + msg.code() + ); + } c => return Err(Error::RelationOutOfSync(c)), } }