Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions deploy/docker-compose/raft-cluster/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# 3-node codeq cluster with raft replication

Spins up three codeq processes joined into a raft cluster. Every Pebble
write is consensus-replicated across the three replicas; the leader
fails over automatically when a node dies.

See [`docs/40-raft-replication.md`](../../../docs/40-raft-replication.md)
for the architecture, configuration knobs, status endpoint, and current
limitations.

## Quick start

```bash
# Build the image once (any image with codeq compiled in works).
docker build -f deploy/docker-compose/cluster/Dockerfile -t codeq-service:cluster .

# Bring the cluster up. node-a bootstraps; node-b and node-c join via
# raft replication of the initial configuration.
docker compose -f deploy/docker-compose/raft-cluster/compose.yaml up -d

# All three nodes are reachable:
curl -s http://localhost:8080/v1/codeq/raft/status | jq . # node-a
curl -s http://localhost:8081/v1/codeq/raft/status | jq . # node-b
curl -s http://localhost:8082/v1/codeq/raft/status | jq . # node-c
```

The status endpoint reports which node is leader for each raft group.
With the default single-shard config there is exactly one group.

## Producing tasks

Submit a task to the leader and watch it replicate:

```bash
curl -s -X POST http://localhost:8080/v1/codeq/tasks \
-H 'Authorization: Bearer dev-token' \
-H 'Content-Type: application/json' \
-d '{"command":"GENERATE_MASTER","payload":{"k":"v"},"priority":5}'

# Read the same task back from node-b (local read, replicated state):
curl -s http://localhost:8081/v1/codeq/tasks/<id> \
-H 'Authorization: Bearer dev-token' | jq .
```

If you hit a follower with the write, you'll get `400 not leader` —
client retries on a different node (`http://localhost:8081` /
`:8082`) until it finds the leader. Future server-side forwarding will
make this automatic; today the client retries.

## Testing failover

```bash
# Find the current leader.
curl -s http://localhost:8080/v1/codeq/raft/status | jq '.groups[] | select(.isLeader)'

# Kill it.
docker compose -f deploy/docker-compose/raft-cluster/compose.yaml stop node-a

# Wait ~2 seconds (default election timeout).
sleep 2

# The surviving nodes elected a new leader.
curl -s http://localhost:8081/v1/codeq/raft/status | jq .

# Submit a write against the new leader; it lands and replicates.
```

## Going multi-shard (M2)

For per-shard raft groups (one group per Pebble shard), add
`"numShards": 4` to `PERSISTENCE_CONFIG` and reserve N consecutive
ports per node. The compose template uses container-local port 7000 as
the base; each shard binds 7000 + shardIdx. Adjust as needed.

## Mutual exclusion

Raft mode is mutually exclusive with the legacy static-ring cluster
mode (`CLUSTER_ENABLED=true`). Don't enable both — startup will
reject the config.
124 changes: 124 additions & 0 deletions deploy/docker-compose/raft-cluster/compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# 3-node codeq cluster with raft replication (M1+M2) on embedded
# Pebble. Every write goes through hashicorp/raft and lands on every
# replica's local Pebble via the FSM — automatic failover, no Redis,
# no external coordinator.
#
# Topology:
# node-a: HTTP :8080 host, raft :7000 inside the bridge network
# node-b: HTTP :8081 host, raft :7000 inside (different container)
# node-c: HTTP :8082 host, raft :7000 inside (different container)
#
# Only node-a is `RAFT_BOOTSTRAP=true`. After the first run raft has
# persisted state and ignores the flag on subsequent restarts. Set the
# others to false during initial cluster formation — they'll receive
# the cluster configuration from the leader via AppendEntries.
#
# To enable multi-shard raft (M2), add `"numShards": 4` (or N) to
# PERSISTENCE_CONFIG. Each shard becomes its own raft group on
# bindAddr+shardIdx — reserve N consecutive ports per node.
#
# See docs/40-raft-replication.md for the architecture, startup
# ordering rules, status endpoint, and operational caveats.

name: codeq-raft

services:
node-a:
image: ${CODEQ_IMAGE:-codeq-service:cluster}
container_name: codeq-raft-node-a
hostname: node-a
restart: unless-stopped
ports:
- "8080:8080"
environment: &raft_env
PORT: "8080"
ENV: dev
GIN_MODE: release
LOG_LEVEL: ${CODEQ_LOG_LEVEL:-warn}
LOG_FORMAT: json
DEFAULT_LEASE_SECONDS: "60"
REQUEUE_INSPECT_LIMIT: "200"
MAX_ATTEMPTS_DEFAULT: "5"
BACKOFF_POLICY: exp_full_jitter
BACKOFF_BASE_SECONDS: "5"
BACKOFF_MAX_SECONDS: "900"
WEBHOOK_HMAC_SECRET: dev-cluster-secret
LOCAL_ARTIFACTS_DIR: /var/lib/codeq/artifacts
WORKER_AUDIENCE: codeq-worker
PERSISTENCE_PROVIDER: pebble
PERSISTENCE_CONFIG: '{"path":"/var/lib/codeq/pebble"}'
RAFT_ENABLED: "true"
# RAFT_PEERS is "id=host:port,..." resolved via docker DNS.
# All three peers run raft on container-local port 7000.
RAFT_PEERS: "node-a=node-a:7000,node-b=node-b:7000,node-c=node-c:7000"
PRODUCER_AUTH_PROVIDER: static
PRODUCER_AUTH_CONFIG: |-
{"token":"dev-token","subject":"producer-dev","email":"dev@codeq.local","raw":{"role":"ADMIN","tenantId":"dev-tenant"}}
WORKER_AUTH_PROVIDER: static
WORKER_AUTH_CONFIG: |-
{"token":"dev-token","subject":"worker-dev","scopes":["codeq:claim","codeq:heartbeat","codeq:abandon","codeq:nack","codeq:result","codeq:subscribe"],"eventTypes":["*"],"raw":{"tenantId":"dev-tenant"}}
# Node-specific overrides
RAFT_SELF_ID: node-a
RAFT_BIND_ADDR: ":7000"
RAFT_BOOTSTRAP: "true"
volumes:
- node-a-data:/var/lib/codeq/pebble
- node-a-art:/var/lib/codeq/artifacts
networks:
- raft

node-b:
image: ${CODEQ_IMAGE:-codeq-service:cluster}
container_name: codeq-raft-node-b
hostname: node-b
restart: unless-stopped
ports:
- "8081:8080"
environment:
<<: *raft_env
RAFT_SELF_ID: node-b
RAFT_BIND_ADDR: ":7000"
RAFT_BOOTSTRAP: "false"
volumes:
- node-b-data:/var/lib/codeq/pebble
- node-b-art:/var/lib/codeq/artifacts
networks:
- raft
# Wait for node-a's transport to be listening before starting —
# avoids node-a's first election rounds hitting connection-refused.
depends_on:
node-a:
condition: service_started

node-c:
image: ${CODEQ_IMAGE:-codeq-service:cluster}
container_name: codeq-raft-node-c
hostname: node-c
restart: unless-stopped
ports:
- "8082:8080"
environment:
<<: *raft_env
RAFT_SELF_ID: node-c
RAFT_BIND_ADDR: ":7000"
RAFT_BOOTSTRAP: "false"
volumes:
- node-c-data:/var/lib/codeq/pebble
- node-c-art:/var/lib/codeq/artifacts
networks:
- raft
depends_on:
node-a:
condition: service_started

volumes:
node-a-data:
node-a-art:
node-b-data:
node-b-art:
node-c-data:
node-c-art:

networks:
raft:
driver: bridge
56 changes: 56 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,62 @@ func applyEnvAndDefaults(c *Config) {
c.Cluster.Nodes = nodes
}
}
// Raft overrides. RAFT_PEERS uses the same "id=addr,id=addr,..."
// shape as CLUSTER_NODES so deployment manifests look consistent.
if v := os.Getenv("RAFT_ENABLED"); v != "" {
c.Raft.Enabled = strings.EqualFold(v, "true") || v == "1" || strings.EqualFold(v, "yes")
}
if v := os.Getenv("RAFT_SELF_ID"); v != "" {
c.Raft.SelfID = v
}
if v := os.Getenv("RAFT_BIND_ADDR"); v != "" {
c.Raft.BindAddr = v
}
if v := os.Getenv("RAFT_BOOTSTRAP"); v != "" {
c.Raft.Bootstrap = strings.EqualFold(v, "true") || v == "1" || strings.EqualFold(v, "yes")
}
if v := os.Getenv("RAFT_PEERS"); v != "" {
peers := make(map[string]string)
for _, pair := range strings.Split(v, ",") {
pair = strings.TrimSpace(pair)
if pair == "" {
continue
}
eq := strings.Index(pair, "=")
if eq <= 0 || eq == len(pair)-1 {
continue
}
peers[strings.TrimSpace(pair[:eq])] = strings.TrimSpace(pair[eq+1:])
}
if len(peers) > 0 {
c.Raft.Peers = peers
}
}
if v := os.Getenv("RAFT_HEARTBEAT_MS"); v != "" {
if n, err := strconv.Atoi(v); err == nil && n > 0 {
c.Raft.HeartbeatMS = n
}
}
if v := os.Getenv("RAFT_ELECTION_MS"); v != "" {
if n, err := strconv.Atoi(v); err == nil && n > 0 {
c.Raft.ElectionMS = n
}
}
if v := os.Getenv("RAFT_LEADER_LEASE_MS"); v != "" {
if n, err := strconv.Atoi(v); err == nil && n > 0 {
c.Raft.LeaderLeaseMS = n
}
}
if v := os.Getenv("RAFT_COMMIT_MS"); v != "" {
if n, err := strconv.Atoi(v); err == nil && n > 0 {
c.Raft.CommitMS = n
}
}
if v := os.Getenv("RAFT_APPLY_TIMEOUT_SECONDS"); v != "" {
if n, err := strconv.Atoi(v); err == nil && n > 0 {
c.Raft.ApplyTimeoutSeconds = n
}
}
if v := os.Getenv("TRACING_ENABLED"); v != "" {
c.TracingEnabled = strings.EqualFold(v, "true") || v == "1" || strings.EqualFold(v, "yes")
}
Expand Down
91 changes: 91 additions & 0 deletions pkg/config/raft_env_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package config

import (
"testing"
)

func TestLoadConfigOptional_RaftEnvOverrides(t *testing.T) {
t.Setenv("RAFT_ENABLED", "true")
t.Setenv("RAFT_SELF_ID", "node-7")
t.Setenv("RAFT_BIND_ADDR", ":7100")
t.Setenv("RAFT_BOOTSTRAP", "true")
t.Setenv("RAFT_PEERS", "node-1=host1:7000,node-2=host2:7000,node-7=host7:7100")
t.Setenv("RAFT_HEARTBEAT_MS", "250")
t.Setenv("RAFT_ELECTION_MS", "500")
t.Setenv("RAFT_LEADER_LEASE_MS", "200")
t.Setenv("RAFT_COMMIT_MS", "20")
t.Setenv("RAFT_APPLY_TIMEOUT_SECONDS", "8")

cfg, err := LoadConfigOptional("")
if err != nil {
t.Fatalf("LoadConfigOptional: %v", err)
}
if !cfg.Raft.Enabled {
t.Errorf("Raft.Enabled: want true, got false")
}
if cfg.Raft.SelfID != "node-7" {
t.Errorf("Raft.SelfID: want node-7, got %q", cfg.Raft.SelfID)
}
if cfg.Raft.BindAddr != ":7100" {
t.Errorf("Raft.BindAddr: want :7100, got %q", cfg.Raft.BindAddr)
}
if !cfg.Raft.Bootstrap {
t.Errorf("Raft.Bootstrap: want true, got false")
}
wantPeers := map[string]string{
"node-1": "host1:7000",
"node-2": "host2:7000",
"node-7": "host7:7100",
}
if len(cfg.Raft.Peers) != len(wantPeers) {
t.Fatalf("Raft.Peers: want %d entries, got %d (%+v)", len(wantPeers), len(cfg.Raft.Peers), cfg.Raft.Peers)
}
for id, addr := range wantPeers {
if got := cfg.Raft.Peers[id]; got != addr {
t.Errorf("Raft.Peers[%s]: want %q, got %q", id, addr, got)
}
}
if cfg.Raft.HeartbeatMS != 250 {
t.Errorf("Raft.HeartbeatMS: want 250, got %d", cfg.Raft.HeartbeatMS)
}
if cfg.Raft.ElectionMS != 500 {
t.Errorf("Raft.ElectionMS: want 500, got %d", cfg.Raft.ElectionMS)
}
if cfg.Raft.LeaderLeaseMS != 200 {
t.Errorf("Raft.LeaderLeaseMS: want 200, got %d", cfg.Raft.LeaderLeaseMS)
}
if cfg.Raft.CommitMS != 20 {
t.Errorf("Raft.CommitMS: want 20, got %d", cfg.Raft.CommitMS)
}
if cfg.Raft.ApplyTimeoutSeconds != 8 {
t.Errorf("Raft.ApplyTimeoutSeconds: want 8, got %d", cfg.Raft.ApplyTimeoutSeconds)
}
}

func TestLoadConfigOptional_RaftDisabledByDefault(t *testing.T) {
cfg, err := LoadConfigOptional("")
if err != nil {
t.Fatalf("LoadConfigOptional: %v", err)
}
if cfg.Raft.Enabled {
t.Errorf("default Raft.Enabled: want false, got true")
}
}

func TestLoadConfigOptional_RaftPeersIgnoresMalformed(t *testing.T) {
t.Setenv("RAFT_PEERS", "node-1=ok:7000,malformed-no-equals,=missing-id,trailing-empty=,node-2=ok:7001,")
cfg, err := LoadConfigOptional("")
if err != nil {
t.Fatalf("LoadConfigOptional: %v", err)
}
if got := cfg.Raft.Peers["node-1"]; got != "ok:7000" {
t.Errorf("node-1: want ok:7000, got %q", got)
}
if got := cfg.Raft.Peers["node-2"]; got != "ok:7001" {
t.Errorf("node-2: want ok:7001, got %q", got)
}
// Malformed entries are silently dropped.
if len(cfg.Raft.Peers) != 2 {
t.Errorf("len(Peers): want 2 (malformed dropped), got %d: %+v", len(cfg.Raft.Peers), cfg.Raft.Peers)
}
}
Loading