From f68264cd5b051db98256288d79b7912590f93960 Mon Sep 17 00:00:00 2001 From: Osvaldo Date: Mon, 18 May 2026 11:26:46 -0300 Subject: [PATCH] feat(raft): RAFT_* env vars + 3-node compose template Operationally, raft was hard to deploy: cfg.Raft was YAML-only, and deploy/docker-compose had no template for a multi-node raft cluster. This commit closes both gaps. ### RAFT_* env var bindings pkg/config/config.go gains overrides for every cfg.Raft field that operators tune: - RAFT_ENABLED (bool) - RAFT_SELF_ID (string) - RAFT_BIND_ADDR (string, e.g. ":7000") - RAFT_BOOTSTRAP (bool) - RAFT_PEERS ("id=host:port,id=host:port,...") - RAFT_HEARTBEAT_MS, RAFT_ELECTION_MS, RAFT_LEADER_LEASE_MS, RAFT_COMMIT_MS, RAFT_APPLY_TIMEOUT_SECONDS (int) RAFT_PEERS uses the same "id=addr,id=addr" shape as CLUSTER_NODES so manifests look uniform. Malformed entries (no '=', empty id, etc.) are silently dropped. ### Compose template New deploy/docker-compose/raft-cluster/{compose.yaml,README.md}: - 3 codeq services (node-a/b/c) on host ports 8080/8081/8082 - All three share a YAML anchor for the common env; only RAFT_SELF_ID, RAFT_BIND_ADDR, and RAFT_BOOTSTRAP differ - node-a has Bootstrap=true; the others wait via `depends_on` so node-a's transport is listening before they try to join - Per-node Pebble + artifacts volumes (no shared state) - Bridge network with docker DNS resolution between peers README walks through quick-start, status endpoint probing, failover testing (kill leader, wait ~2s, verify new leader on a survivor), and the path to multi-shard raft via PERSISTENCE_CONFIG.numShards. ### Tests - TestLoadConfigOptional_RaftEnvOverrides: every RAFT_* env var lands on the right cfg.Raft field. - TestLoadConfigOptional_RaftDisabledByDefault: no env means cfg.Raft.Enabled stays false. - TestLoadConfigOptional_RaftPeersIgnoresMalformed: malformed peer entries are dropped without failing the load. --- deploy/docker-compose/raft-cluster/README.md | 79 +++++++++++ .../docker-compose/raft-cluster/compose.yaml | 124 ++++++++++++++++++ pkg/config/config.go | 56 ++++++++ pkg/config/raft_env_test.go | 91 +++++++++++++ 4 files changed, 350 insertions(+) create mode 100644 deploy/docker-compose/raft-cluster/README.md create mode 100644 deploy/docker-compose/raft-cluster/compose.yaml create mode 100644 pkg/config/raft_env_test.go diff --git a/deploy/docker-compose/raft-cluster/README.md b/deploy/docker-compose/raft-cluster/README.md new file mode 100644 index 0000000..f62634b --- /dev/null +++ b/deploy/docker-compose/raft-cluster/README.md @@ -0,0 +1,79 @@ +# 3-node codeq cluster with raft replication + +Spins up three codeq processes joined into a raft cluster. Every Pebble +write is consensus-replicated across the three replicas; the leader +fails over automatically when a node dies. + +See [`docs/40-raft-replication.md`](../../../docs/40-raft-replication.md) +for the architecture, configuration knobs, status endpoint, and current +limitations. + +## Quick start + +```bash +# Build the image once (any image with codeq compiled in works). +docker build -f deploy/docker-compose/cluster/Dockerfile -t codeq-service:cluster . + +# Bring the cluster up. node-a bootstraps; node-b and node-c join via +# raft replication of the initial configuration. +docker compose -f deploy/docker-compose/raft-cluster/compose.yaml up -d + +# All three nodes are reachable: +curl -s http://localhost:8080/v1/codeq/raft/status | jq . # node-a +curl -s http://localhost:8081/v1/codeq/raft/status | jq . # node-b +curl -s http://localhost:8082/v1/codeq/raft/status | jq . # node-c +``` + +The status endpoint reports which node is leader for each raft group. +With the default single-shard config there is exactly one group. + +## Producing tasks + +Submit a task to the leader and watch it replicate: + +```bash +curl -s -X POST http://localhost:8080/v1/codeq/tasks \ + -H 'Authorization: Bearer dev-token' \ + -H 'Content-Type: application/json' \ + -d '{"command":"GENERATE_MASTER","payload":{"k":"v"},"priority":5}' + +# Read the same task back from node-b (local read, replicated state): +curl -s http://localhost:8081/v1/codeq/tasks/ \ + -H 'Authorization: Bearer dev-token' | jq . +``` + +If you hit a follower with the write, you'll get `400 not leader` — +client retries on a different node (`http://localhost:8081` / +`:8082`) until it finds the leader. Future server-side forwarding will +make this automatic; today the client retries. + +## Testing failover + +```bash +# Find the current leader. +curl -s http://localhost:8080/v1/codeq/raft/status | jq '.groups[] | select(.isLeader)' + +# Kill it. +docker compose -f deploy/docker-compose/raft-cluster/compose.yaml stop node-a + +# Wait ~2 seconds (default election timeout). +sleep 2 + +# The surviving nodes elected a new leader. +curl -s http://localhost:8081/v1/codeq/raft/status | jq . + +# Submit a write against the new leader; it lands and replicates. +``` + +## Going multi-shard (M2) + +For per-shard raft groups (one group per Pebble shard), add +`"numShards": 4` to `PERSISTENCE_CONFIG` and reserve N consecutive +ports per node. The compose template uses container-local port 7000 as +the base; each shard binds 7000 + shardIdx. Adjust as needed. + +## Mutual exclusion + +Raft mode is mutually exclusive with the legacy static-ring cluster +mode (`CLUSTER_ENABLED=true`). Don't enable both — startup will +reject the config. diff --git a/deploy/docker-compose/raft-cluster/compose.yaml b/deploy/docker-compose/raft-cluster/compose.yaml new file mode 100644 index 0000000..2d652a9 --- /dev/null +++ b/deploy/docker-compose/raft-cluster/compose.yaml @@ -0,0 +1,124 @@ +# 3-node codeq cluster with raft replication (M1+M2) on embedded +# Pebble. Every write goes through hashicorp/raft and lands on every +# replica's local Pebble via the FSM — automatic failover, no Redis, +# no external coordinator. +# +# Topology: +# node-a: HTTP :8080 host, raft :7000 inside the bridge network +# node-b: HTTP :8081 host, raft :7000 inside (different container) +# node-c: HTTP :8082 host, raft :7000 inside (different container) +# +# Only node-a is `RAFT_BOOTSTRAP=true`. After the first run raft has +# persisted state and ignores the flag on subsequent restarts. Set the +# others to false during initial cluster formation — they'll receive +# the cluster configuration from the leader via AppendEntries. +# +# To enable multi-shard raft (M2), add `"numShards": 4` (or N) to +# PERSISTENCE_CONFIG. Each shard becomes its own raft group on +# bindAddr+shardIdx — reserve N consecutive ports per node. +# +# See docs/40-raft-replication.md for the architecture, startup +# ordering rules, status endpoint, and operational caveats. + +name: codeq-raft + +services: + node-a: + image: ${CODEQ_IMAGE:-codeq-service:cluster} + container_name: codeq-raft-node-a + hostname: node-a + restart: unless-stopped + ports: + - "8080:8080" + environment: &raft_env + PORT: "8080" + ENV: dev + GIN_MODE: release + LOG_LEVEL: ${CODEQ_LOG_LEVEL:-warn} + LOG_FORMAT: json + DEFAULT_LEASE_SECONDS: "60" + REQUEUE_INSPECT_LIMIT: "200" + MAX_ATTEMPTS_DEFAULT: "5" + BACKOFF_POLICY: exp_full_jitter + BACKOFF_BASE_SECONDS: "5" + BACKOFF_MAX_SECONDS: "900" + WEBHOOK_HMAC_SECRET: dev-cluster-secret + LOCAL_ARTIFACTS_DIR: /var/lib/codeq/artifacts + WORKER_AUDIENCE: codeq-worker + PERSISTENCE_PROVIDER: pebble + PERSISTENCE_CONFIG: '{"path":"/var/lib/codeq/pebble"}' + RAFT_ENABLED: "true" + # RAFT_PEERS is "id=host:port,..." resolved via docker DNS. + # All three peers run raft on container-local port 7000. + RAFT_PEERS: "node-a=node-a:7000,node-b=node-b:7000,node-c=node-c:7000" + PRODUCER_AUTH_PROVIDER: static + PRODUCER_AUTH_CONFIG: |- + {"token":"dev-token","subject":"producer-dev","email":"dev@codeq.local","raw":{"role":"ADMIN","tenantId":"dev-tenant"}} + WORKER_AUTH_PROVIDER: static + WORKER_AUTH_CONFIG: |- + {"token":"dev-token","subject":"worker-dev","scopes":["codeq:claim","codeq:heartbeat","codeq:abandon","codeq:nack","codeq:result","codeq:subscribe"],"eventTypes":["*"],"raw":{"tenantId":"dev-tenant"}} + # Node-specific overrides + RAFT_SELF_ID: node-a + RAFT_BIND_ADDR: ":7000" + RAFT_BOOTSTRAP: "true" + volumes: + - node-a-data:/var/lib/codeq/pebble + - node-a-art:/var/lib/codeq/artifacts + networks: + - raft + + node-b: + image: ${CODEQ_IMAGE:-codeq-service:cluster} + container_name: codeq-raft-node-b + hostname: node-b + restart: unless-stopped + ports: + - "8081:8080" + environment: + <<: *raft_env + RAFT_SELF_ID: node-b + RAFT_BIND_ADDR: ":7000" + RAFT_BOOTSTRAP: "false" + volumes: + - node-b-data:/var/lib/codeq/pebble + - node-b-art:/var/lib/codeq/artifacts + networks: + - raft + # Wait for node-a's transport to be listening before starting — + # avoids node-a's first election rounds hitting connection-refused. + depends_on: + node-a: + condition: service_started + + node-c: + image: ${CODEQ_IMAGE:-codeq-service:cluster} + container_name: codeq-raft-node-c + hostname: node-c + restart: unless-stopped + ports: + - "8082:8080" + environment: + <<: *raft_env + RAFT_SELF_ID: node-c + RAFT_BIND_ADDR: ":7000" + RAFT_BOOTSTRAP: "false" + volumes: + - node-c-data:/var/lib/codeq/pebble + - node-c-art:/var/lib/codeq/artifacts + networks: + - raft + depends_on: + node-a: + condition: service_started + +volumes: + node-a-data: + node-a-art: + node-b-data: + node-b-art: + node-c-data: + node-c-art: + +networks: + raft: + driver: bridge diff --git a/pkg/config/config.go b/pkg/config/config.go index b2c74ec..7412f38 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -241,6 +241,62 @@ func applyEnvAndDefaults(c *Config) { c.Cluster.Nodes = nodes } } + // Raft overrides. RAFT_PEERS uses the same "id=addr,id=addr,..." + // shape as CLUSTER_NODES so deployment manifests look consistent. + if v := os.Getenv("RAFT_ENABLED"); v != "" { + c.Raft.Enabled = strings.EqualFold(v, "true") || v == "1" || strings.EqualFold(v, "yes") + } + if v := os.Getenv("RAFT_SELF_ID"); v != "" { + c.Raft.SelfID = v + } + if v := os.Getenv("RAFT_BIND_ADDR"); v != "" { + c.Raft.BindAddr = v + } + if v := os.Getenv("RAFT_BOOTSTRAP"); v != "" { + c.Raft.Bootstrap = strings.EqualFold(v, "true") || v == "1" || strings.EqualFold(v, "yes") + } + if v := os.Getenv("RAFT_PEERS"); v != "" { + peers := make(map[string]string) + for _, pair := range strings.Split(v, ",") { + pair = strings.TrimSpace(pair) + if pair == "" { + continue + } + eq := strings.Index(pair, "=") + if eq <= 0 || eq == len(pair)-1 { + continue + } + peers[strings.TrimSpace(pair[:eq])] = strings.TrimSpace(pair[eq+1:]) + } + if len(peers) > 0 { + c.Raft.Peers = peers + } + } + if v := os.Getenv("RAFT_HEARTBEAT_MS"); v != "" { + if n, err := strconv.Atoi(v); err == nil && n > 0 { + c.Raft.HeartbeatMS = n + } + } + if v := os.Getenv("RAFT_ELECTION_MS"); v != "" { + if n, err := strconv.Atoi(v); err == nil && n > 0 { + c.Raft.ElectionMS = n + } + } + if v := os.Getenv("RAFT_LEADER_LEASE_MS"); v != "" { + if n, err := strconv.Atoi(v); err == nil && n > 0 { + c.Raft.LeaderLeaseMS = n + } + } + if v := os.Getenv("RAFT_COMMIT_MS"); v != "" { + if n, err := strconv.Atoi(v); err == nil && n > 0 { + c.Raft.CommitMS = n + } + } + if v := os.Getenv("RAFT_APPLY_TIMEOUT_SECONDS"); v != "" { + if n, err := strconv.Atoi(v); err == nil && n > 0 { + c.Raft.ApplyTimeoutSeconds = n + } + } if v := os.Getenv("TRACING_ENABLED"); v != "" { c.TracingEnabled = strings.EqualFold(v, "true") || v == "1" || strings.EqualFold(v, "yes") } diff --git a/pkg/config/raft_env_test.go b/pkg/config/raft_env_test.go new file mode 100644 index 0000000..db8dc65 --- /dev/null +++ b/pkg/config/raft_env_test.go @@ -0,0 +1,91 @@ +package config + +import ( + "testing" +) + +func TestLoadConfigOptional_RaftEnvOverrides(t *testing.T) { + t.Setenv("RAFT_ENABLED", "true") + t.Setenv("RAFT_SELF_ID", "node-7") + t.Setenv("RAFT_BIND_ADDR", ":7100") + t.Setenv("RAFT_BOOTSTRAP", "true") + t.Setenv("RAFT_PEERS", "node-1=host1:7000,node-2=host2:7000,node-7=host7:7100") + t.Setenv("RAFT_HEARTBEAT_MS", "250") + t.Setenv("RAFT_ELECTION_MS", "500") + t.Setenv("RAFT_LEADER_LEASE_MS", "200") + t.Setenv("RAFT_COMMIT_MS", "20") + t.Setenv("RAFT_APPLY_TIMEOUT_SECONDS", "8") + + cfg, err := LoadConfigOptional("") + if err != nil { + t.Fatalf("LoadConfigOptional: %v", err) + } + if !cfg.Raft.Enabled { + t.Errorf("Raft.Enabled: want true, got false") + } + if cfg.Raft.SelfID != "node-7" { + t.Errorf("Raft.SelfID: want node-7, got %q", cfg.Raft.SelfID) + } + if cfg.Raft.BindAddr != ":7100" { + t.Errorf("Raft.BindAddr: want :7100, got %q", cfg.Raft.BindAddr) + } + if !cfg.Raft.Bootstrap { + t.Errorf("Raft.Bootstrap: want true, got false") + } + wantPeers := map[string]string{ + "node-1": "host1:7000", + "node-2": "host2:7000", + "node-7": "host7:7100", + } + if len(cfg.Raft.Peers) != len(wantPeers) { + t.Fatalf("Raft.Peers: want %d entries, got %d (%+v)", len(wantPeers), len(cfg.Raft.Peers), cfg.Raft.Peers) + } + for id, addr := range wantPeers { + if got := cfg.Raft.Peers[id]; got != addr { + t.Errorf("Raft.Peers[%s]: want %q, got %q", id, addr, got) + } + } + if cfg.Raft.HeartbeatMS != 250 { + t.Errorf("Raft.HeartbeatMS: want 250, got %d", cfg.Raft.HeartbeatMS) + } + if cfg.Raft.ElectionMS != 500 { + t.Errorf("Raft.ElectionMS: want 500, got %d", cfg.Raft.ElectionMS) + } + if cfg.Raft.LeaderLeaseMS != 200 { + t.Errorf("Raft.LeaderLeaseMS: want 200, got %d", cfg.Raft.LeaderLeaseMS) + } + if cfg.Raft.CommitMS != 20 { + t.Errorf("Raft.CommitMS: want 20, got %d", cfg.Raft.CommitMS) + } + if cfg.Raft.ApplyTimeoutSeconds != 8 { + t.Errorf("Raft.ApplyTimeoutSeconds: want 8, got %d", cfg.Raft.ApplyTimeoutSeconds) + } +} + +func TestLoadConfigOptional_RaftDisabledByDefault(t *testing.T) { + cfg, err := LoadConfigOptional("") + if err != nil { + t.Fatalf("LoadConfigOptional: %v", err) + } + if cfg.Raft.Enabled { + t.Errorf("default Raft.Enabled: want false, got true") + } +} + +func TestLoadConfigOptional_RaftPeersIgnoresMalformed(t *testing.T) { + t.Setenv("RAFT_PEERS", "node-1=ok:7000,malformed-no-equals,=missing-id,trailing-empty=,node-2=ok:7001,") + cfg, err := LoadConfigOptional("") + if err != nil { + t.Fatalf("LoadConfigOptional: %v", err) + } + if got := cfg.Raft.Peers["node-1"]; got != "ok:7000" { + t.Errorf("node-1: want ok:7000, got %q", got) + } + if got := cfg.Raft.Peers["node-2"]; got != "ok:7001" { + t.Errorf("node-2: want ok:7001, got %q", got) + } + // Malformed entries are silently dropped. + if len(cfg.Raft.Peers) != 2 { + t.Errorf("len(Peers): want 2 (malformed dropped), got %d: %+v", len(cfg.Raft.Peers), cfg.Raft.Peers) + } +}