From 452c1e9df63adb9c6911723780ea1fc4dfdc3668 Mon Sep 17 00:00:00 2001 From: Osvaldo Date: Mon, 18 May 2026 11:49:26 -0300 Subject: [PATCH] feat(raft): server-side 307 redirect on ErrNotLeader MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Writes that hit a follower now respond with HTTP 307 Temporary Redirect + Location header pointing at the leader's HTTP URL. Go's http.Client follows 307 transparently (RFC 7231 preserves method + body), so naïve clients land on the leader in one extra round-trip instead of bouncing back with an error. Wiring (layered to avoid circular deps): - pkg/domain/errors.go (new): LeaderHint interface — just error + LeaderHTTPAddr() string. Shared vocabulary between the storage layer that constructs the error and the HTTP layer that interprets it. - internal/repository/pebble.NotLeaderError: typed error with LeaderURL field. Satisfies domain.LeaderHint. errors.Is(err, ErrNotLeader) still matches because NotLeaderError.Is matches the sentinel — existing callers don't change. - pebble.Replicator gains LeaderHTTPAddr() — the raft.DB already exposes it (PR #589). pebble's Set/Delete/CommitBatch build NotLeaderError{LeaderURL: d.repl.LeaderHTTPAddr()} when the local node isn't the leader. - internal/controllers/respond.go: respondWriteError + maybeRedirectLeader helpers. errors.As against domain.LeaderHint, write 307 + Location built from leader URL + the original request URI. - Controllers updated to call the helpers on service errors: create_task, claim_task, submit_result, heartbeat, nack, abandon. Batch endpoints kept on the per-item error path (mixed-shard batches can't redirect cleanly). Test: - TestRaft_307RedirectsToLeader: 3-node single-shard cluster. Identifies leader + follower via probe writes; sends to follower with redirect-following disabled, asserts 307 + Location starts with leader URL + ends with /v1/codeq/tasks. Then re-sends with standard http.Client (follows redirects automatically), asserts 202. ~210ms run. All existing tests still pass. --- .../controllers/abandon_task_controller.go | 3 + internal/controllers/claim_task_controller.go | 2 +- .../controllers/create_task_controller.go | 2 +- internal/controllers/heartbeat_controller.go | 3 + internal/controllers/nack_task_controller.go | 3 + internal/controllers/respond.go | 50 ++++ .../controllers/submit_result_controller.go | 5 + internal/repository/pebble/db.go | 45 +++- pkg/app/raft_307_redirect_test.go | 227 ++++++++++++++++++ pkg/domain/errors.go | 14 ++ 10 files changed, 345 insertions(+), 9 deletions(-) create mode 100644 internal/controllers/respond.go create mode 100644 pkg/app/raft_307_redirect_test.go create mode 100644 pkg/domain/errors.go diff --git a/internal/controllers/abandon_task_controller.go b/internal/controllers/abandon_task_controller.go index d185527..65d58b7 100644 --- a/internal/controllers/abandon_task_controller.go +++ b/internal/controllers/abandon_task_controller.go @@ -23,6 +23,9 @@ func (h *abandonController) Handle(c *gin.Context) { return } if err := h.svc.Abandon(c.Request.Context(), taskID, claims.Subject); err != nil { + if maybeRedirectLeader(c, err) { + return + } status := http.StatusInternalServerError if err.Error() == "not-owner" { status = http.StatusForbidden diff --git a/internal/controllers/claim_task_controller.go b/internal/controllers/claim_task_controller.go index 49c689f..f65632e 100644 --- a/internal/controllers/claim_task_controller.go +++ b/internal/controllers/claim_task_controller.go @@ -68,7 +68,7 @@ func (h *claimTaskController) Handle(c *gin.Context) { task, ok, err := h.svc.ClaimTask(c.Request.Context(), claims.Subject, req.Commands, req.LeaseSeconds, req.WaitSeconds, tenantID) if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + respondWriteError(c, err) return } if !ok { diff --git a/internal/controllers/create_task_controller.go b/internal/controllers/create_task_controller.go index ba9f2b7..ee64bc8 100644 --- a/internal/controllers/create_task_controller.go +++ b/internal/controllers/create_task_controller.go @@ -59,7 +59,7 @@ func (h *createTaskController) Handle(c *gin.Context) { task, err := h.svc.CreateTask(c.Request.Context(), req.Command, payloadJSON, req.Priority, req.Webhook, req.MaxAttempts, req.Idempotency, runAt, req.DelaySecs, tenantID) if err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + respondWriteError(c, err) return } c.JSON(http.StatusAccepted, task) diff --git a/internal/controllers/heartbeat_controller.go b/internal/controllers/heartbeat_controller.go index 29e28e1..27b169b 100644 --- a/internal/controllers/heartbeat_controller.go +++ b/internal/controllers/heartbeat_controller.go @@ -32,6 +32,9 @@ func (h *heartbeatController) Handle(c *gin.Context) { return } if err := h.svc.Heartbeat(c.Request.Context(), taskID, claims.Subject, req.ExtendSeconds); err != nil { + if maybeRedirectLeader(c, err) { + return + } status := http.StatusInternalServerError if err.Error() == "not-owner" { status = http.StatusForbidden diff --git a/internal/controllers/nack_task_controller.go b/internal/controllers/nack_task_controller.go index 48f84ee..13379dc 100644 --- a/internal/controllers/nack_task_controller.go +++ b/internal/controllers/nack_task_controller.go @@ -34,6 +34,9 @@ func (h *nackController) Handle(c *gin.Context) { } delay, movedToDLQ, err := h.svc.NackTask(c.Request.Context(), taskID, claims.Subject, req.DelaySeconds, req.Reason) if err != nil { + if maybeRedirectLeader(c, err) { + return + } status := http.StatusInternalServerError switch err.Error() { case "not-owner": diff --git a/internal/controllers/respond.go b/internal/controllers/respond.go new file mode 100644 index 0000000..d7214ff --- /dev/null +++ b/internal/controllers/respond.go @@ -0,0 +1,50 @@ +package controllers + +import ( + "errors" + "net/http" + "strings" + + "github.com/gin-gonic/gin" + + "github.com/osvaldoandrade/codeq/pkg/domain" +) + +// respondWriteError writes the right response for a service-level +// error from a WRITE controller (create, claim, submit, nack, etc.). +// If the error indicates "not leader" with a known leader URL, replies +// with HTTP 307 Temporary Redirect so a well-behaved HTTP client +// follows to the leader transparently. Otherwise falls back to the +// usual 400. +// +// HTTP 307 (vs the older 302) preserves the request method and body +// across the redirect (RFC 7231), which is what we want for POST /tasks +// and friends. Go's http.Client follows automatically when the +// request body supports Seek (bytes.Reader / strings.Reader / GetBody). +func respondWriteError(c *gin.Context, err error) { + if maybeRedirectLeader(c, err) { + return + } + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) +} + +// maybeRedirectLeader writes a 307 to the leader and returns true when +// err is a "not leader" hint with a known leader URL. Otherwise it +// returns false and writes nothing. +func maybeRedirectLeader(c *gin.Context, err error) bool { + var hint domain.LeaderHint + if !errors.As(err, &hint) { + return false + } + addr := hint.LeaderHTTPAddr() + if addr == "" { + return false + } + location := strings.TrimSuffix(addr, "/") + c.Request.URL.RequestURI() + c.Header("Location", location) + c.JSON(http.StatusTemporaryRedirect, gin.H{ + "error": "not leader", + "leader": addr, + }) + return true +} diff --git a/internal/controllers/submit_result_controller.go b/internal/controllers/submit_result_controller.go index 8759de3..dfeb34b 100644 --- a/internal/controllers/submit_result_controller.go +++ b/internal/controllers/submit_result_controller.go @@ -31,6 +31,11 @@ func (h *submitResultController) Handle(c *gin.Context) { req.WorkerID = claims.Subject rec, err := h.svc.Submit(c.Request.Context(), id, req) if err != nil { + // 307-redirect "not leader" before the per-error status map + // — otherwise a NotLeaderError would fall into 400. + if maybeRedirectLeader(c, err) { + return + } status := http.StatusBadRequest switch err.Error() { case "task not found": diff --git a/internal/repository/pebble/db.go b/internal/repository/pebble/db.go index a9710fa..021e19a 100644 --- a/internal/repository/pebble/db.go +++ b/internal/repository/pebble/db.go @@ -21,14 +21,45 @@ import ( type Replicator interface { IsLeader() bool Replicate(repr []byte) error + // LeaderHTTPAddr returns the leader's HTTP base URL (per the + // configured peer map) when known, or "" otherwise. Used to fill + // the LeaderURL on NotLeaderError so controllers can respond with + // HTTP 307 + Location. + LeaderHTTPAddr() string } -// ErrNotLeader is returned by write APIs when this DB is attached to a -// replicator and the local node isn't the leader. The repository code -// surfaces it to the service layer; the cluster router (not the storage -// layer) decides whether to forward to a leader. +// ErrNotLeader is the sentinel returned by write APIs when this DB is +// attached to a replicator and the local node isn't the leader. The +// concrete return value is a *NotLeaderError, which carries the +// leader hint; bare errors.Is(err, ErrNotLeader) still works because +// NotLeaderError.Is matches the sentinel. var ErrNotLeader = errors.New("pebble: not leader") +// NotLeaderError is the typed error returned by write APIs in raft +// mode when the local node isn't the leader. LeaderURL is the +// leader's HTTP base URL when known (resolved from the configured +// peer map) — controllers use it to write a 307 redirect. +// +// Satisfies pkg/domain.LeaderHint via the LeaderHTTPAddr() method. +type NotLeaderError struct { + LeaderURL string +} + +func (e *NotLeaderError) Error() string { + if e.LeaderURL != "" { + return "pebble: not leader (try " + e.LeaderURL + ")" + } + return "pebble: not leader" +} + +// LeaderHTTPAddr returns the configured HTTP URL for the current +// leader, or "" if unknown. Satisfies pkg/domain.LeaderHint. +func (e *NotLeaderError) LeaderHTTPAddr() string { return e.LeaderURL } + +// Is matches the package-level ErrNotLeader sentinel so existing +// errors.Is callers keep working without code changes. +func (e *NotLeaderError) Is(target error) bool { return target == ErrNotLeader } + // DB wraps a *pebble.DB with the helpers the repository implementations // need: a process-wide monotonic sequence counter (used to order pending // queue entries within a priority bucket) and a few convenience methods @@ -244,7 +275,7 @@ func (d *DB) Has(key []byte) (bool, error) { func (d *DB) Set(key, value []byte) error { if d.repl != nil { if !d.repl.IsLeader() { - return ErrNotLeader + return &NotLeaderError{LeaderURL: d.repl.LeaderHTTPAddr()} } b := d.db.NewBatch() defer b.Close() @@ -260,7 +291,7 @@ func (d *DB) Set(key, value []byte) error { func (d *DB) Delete(key []byte) error { if d.repl != nil { if !d.repl.IsLeader() { - return ErrNotLeader + return &NotLeaderError{LeaderURL: d.repl.LeaderHTTPAddr()} } b := d.db.NewBatch() defer b.Close() @@ -291,7 +322,7 @@ func (d *DB) Batch() *pebbledb.Batch { func (d *DB) CommitBatch(b *pebbledb.Batch) error { if d.repl != nil { if !d.repl.IsLeader() { - return ErrNotLeader + return &NotLeaderError{LeaderURL: d.repl.LeaderHTTPAddr()} } // Replicator owns serialization; the local pebble write happens // on every node via the FSM. The caller still owns b and must diff --git a/pkg/app/raft_307_redirect_test.go b/pkg/app/raft_307_redirect_test.go new file mode 100644 index 0000000..db67b68 --- /dev/null +++ b/pkg/app/raft_307_redirect_test.go @@ -0,0 +1,227 @@ +package app + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + _ "github.com/osvaldoandrade/codeq/pkg/auth/static" + "github.com/osvaldoandrade/codeq/pkg/config" +) + +// TestRaft_307RedirectsToLeader verifies that hitting a follower with +// a write returns HTTP 307 + Location pointing at the leader's HTTP +// URL. Go's http.Client follows the redirect transparently. +// +// Single-shard 3-node setup so the test can pre-grab a known leader +// and a known follower without race. +func TestRaft_307RedirectsToLeader(t *testing.T) { + ports := pickThreeFreePorts(t) + peers := map[string]string{ + "node-1": "127.0.0.1:" + ports[0], + "node-2": "127.0.0.1:" + ports[1], + "node-3": "127.0.0.1:" + ports[2], + } + + // Start the apps. node-1 bootstraps; node-2 and node-3 follow. + // We need their HTTP URLs to fill PeerHTTPAddrs — created BEFORE + // the apps so each app's config can reference the others. + srvs := make([]*httptest.Server, 3) + apps := make([]*Application, 3) + httpAddrs := make(map[string]string) + + // Pre-listen on the HTTP ports via httptest's port-picking, then + // reuse: build httptest.NewUnstartedServer, learn its URL, build + // PeerHTTPAddrs, then start. The trick — httptest.NewUnstartedServer + // needs a Handler at construction but we don't have one yet + // (Application engine is built inside startup). So: start with a + // placeholder, learn the URL, then swap. + ids := []string{"node-1", "node-2", "node-3"} + for i, id := range ids { + s := httptest.NewUnstartedServer(http.NotFoundHandler()) + s.Start() + httpAddrs[id] = s.URL + srvs[i] = s + } + t.Cleanup(func() { + for _, s := range srvs { + if s != nil { + s.Close() + } + } + for _, a := range apps { + if a != nil && a.TracingShutdown != nil { + _ = a.TracingShutdown(context.Background()) + } + } + }) + + // Start the apps. Followers first to be listening before + // bootstrapper dials. + for i, id := range []string{"node-2", "node-3", "node-1"} { + // Map id back to index in ids/peers/srvs. + var origIdx int + for k, v := range ids { + if v == id { + origIdx = k + break + } + } + bootstrap := id == "node-1" + _ = i + app, err := newRaftAppFor307(t, id, peers, httpAddrs, bootstrap) + if err != nil { + t.Fatalf("[%s] NewApplication: %v", id, err) + } + SetupMappings(app) + // Swap the httptest server's handler from NotFound to the app + // engine. The httptest server keeps its listener and URL. + srvs[origIdx].Config.Handler = app.Engine + apps[origIdx] = app + } + + // Wait until any node accepts a write — at that point its raft + // group has elected. + deadline := time.Now().Add(5 * time.Second) + leader := -1 + for time.Now().Before(deadline) { + for i := range ids { + if probeWrite(srvs[i].URL) == http.StatusAccepted { + leader = i + break + } + } + if leader >= 0 { + break + } + time.Sleep(50 * time.Millisecond) + } + if leader < 0 { + t.Fatal("no leader emerged within 5s") + } + t.Logf("leader is %s", ids[leader]) + + // Pick a follower deterministically. + follower := (leader + 1) % len(ids) + t.Logf("follower: %s", ids[follower]) + + // Send a write to the follower WITHOUT following redirects, so we + // can inspect the 307 response directly. + client := &http.Client{ + CheckRedirect: func(req *http.Request, via []*http.Request) error { + return http.ErrUseLastResponse + }, + Timeout: 3 * time.Second, + } + body := `{"command":"GENERATE_MASTER","payload":{"k":"v"},"priority":5}` + req, _ := http.NewRequest(http.MethodPost, srvs[follower].URL+"/v1/codeq/tasks", strings.NewReader(body)) + req.Header.Set("Authorization", "Bearer dev-token") + req.Header.Set("Content-Type", "application/json") + resp, err := client.Do(req) + if err != nil { + t.Fatalf("POST to follower: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusTemporaryRedirect { + t.Fatalf("expected 307 from follower, got %d", resp.StatusCode) + } + location := resp.Header.Get("Location") + if location == "" { + t.Fatal("307 response missing Location header") + } + if !strings.HasPrefix(location, srvs[leader].URL) { + t.Errorf("Location %q should start with leader URL %q", location, srvs[leader].URL) + } + if !strings.HasSuffix(location, "/v1/codeq/tasks") { + t.Errorf("Location %q should end with the request path", location) + } + + // Verify a follow-the-redirect client (standard Go default) + // succeeds with a single high-level call. + followClient := &http.Client{Timeout: 3 * time.Second} + req2, _ := http.NewRequest(http.MethodPost, srvs[follower].URL+"/v1/codeq/tasks", strings.NewReader(body)) + req2.Header.Set("Authorization", "Bearer dev-token") + req2.Header.Set("Content-Type", "application/json") + resp2, err := followClient.Do(req2) + if err != nil { + t.Fatalf("POST with redirect follow: %v", err) + } + defer resp2.Body.Close() + if resp2.StatusCode != http.StatusAccepted { + t.Errorf("after redirect: want 202, got %d", resp2.StatusCode) + } +} + +func probeWrite(serverURL string) int { + body := `{"command":"GENERATE_MASTER","payload":{"probe":true},"priority":5}` + req, _ := http.NewRequest(http.MethodPost, serverURL+"/v1/codeq/tasks", strings.NewReader(body)) + req.Header.Set("Authorization", "Bearer dev-token") + req.Header.Set("Content-Type", "application/json") + client := &http.Client{ + Timeout: 500 * time.Millisecond, + CheckRedirect: func(req *http.Request, via []*http.Request) error { + return http.ErrUseLastResponse + }, + } + resp, err := client.Do(req) + if err != nil { + return 0 + } + resp.Body.Close() + return resp.StatusCode +} + +func newRaftAppFor307(t *testing.T, id string, peers, httpAddrs map[string]string, bootstrap bool) (*Application, error) { + t.Helper() + pcfg, _ := json.Marshal(map[string]any{"path": t.TempDir() + "/pebble"}) + cfg := &config.Config{ + Port: 0, + Timezone: "UTC", + LogLevel: "error", + LogFormat: "json", + Env: "dev", + DefaultLeaseSeconds: 60, + RequeueInspectLimit: 50, + LocalArtifactsDir: t.TempDir(), + MaxAttemptsDefault: 5, + BackoffPolicy: "fixed", + BackoffBaseSeconds: 1, + BackoffMaxSeconds: 3, + WebhookHmacSecret: "secret", + WorkerAudience: "codeq-worker", + SubscriptionMinIntervalSeconds: 5, + SubscriptionCleanupIntervalSeconds: 60, + ResultWebhookMaxAttempts: 1, + ResultWebhookBaseBackoffSeconds: 1, + ResultWebhookMaxBackoffSeconds: 2, + ProducerAuthProvider: "static", + ProducerAuthConfig: json.RawMessage(`{"token":"dev-token","subject":"producer-dev","email":"dev@codeq.local","raw":{"role":"ADMIN","tenantId":"dev-tenant"}}`), + WorkerAuthProvider: "static", + WorkerAuthConfig: json.RawMessage(`{"token":"dev-token","subject":"worker-dev","scopes":["codeq:claim","codeq:heartbeat","codeq:abandon","codeq:nack","codeq:result","codeq:subscribe"],"eventTypes":["*"],"raw":{"tenantId":"dev-tenant"}}`), + PersistenceProvider: "pebble", + PersistenceConfig: pcfg, + RedisAddr: "127.0.0.1:0", + Raft: config.RaftConfig{ + Enabled: true, + SelfID: id, + BindAddr: peers[id], + Bootstrap: bootstrap, + Peers: peers, + PeerHTTPAddrs: httpAddrs, + HeartbeatMS: 50, + ElectionMS: 50, + LeaderLeaseMS: 50, + CommitMS: 10, + ApplyTimeoutSeconds: 3, + }, + } + if err := cfg.Validate(); err != nil { + return nil, err + } + return NewApplication(cfg) +} diff --git a/pkg/domain/errors.go b/pkg/domain/errors.go new file mode 100644 index 0000000..acdfff2 --- /dev/null +++ b/pkg/domain/errors.go @@ -0,0 +1,14 @@ +package domain + +// LeaderHint is satisfied by errors that carry a hint pointing at the +// raft group's current leader (HTTP base URL, e.g. "http://node-2:8080"). +// The HTTP layer uses errors.As against this interface to detect a +// "not leader" error and respond with a 307 redirect to the leader. +// +// The interface lives in pkg/domain so the storage layer (where the +// error is constructed) and the HTTP layer (where it's interpreted) +// share a vocabulary without introducing a circular dependency. +type LeaderHint interface { + error + LeaderHTTPAddr() string +}