Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions internal/controllers/abandon_task_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ func (h *abandonController) Handle(c *gin.Context) {
return
}
if err := h.svc.Abandon(c.Request.Context(), taskID, claims.Subject); err != nil {
if maybeRedirectLeader(c, err) {
return
}
status := http.StatusInternalServerError
if err.Error() == "not-owner" {
status = http.StatusForbidden
Expand Down
2 changes: 1 addition & 1 deletion internal/controllers/claim_task_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (h *claimTaskController) Handle(c *gin.Context) {

task, ok, err := h.svc.ClaimTask(c.Request.Context(), claims.Subject, req.Commands, req.LeaseSeconds, req.WaitSeconds, tenantID)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
respondWriteError(c, err)
return
}
if !ok {
Expand Down
2 changes: 1 addition & 1 deletion internal/controllers/create_task_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (h *createTaskController) Handle(c *gin.Context) {

task, err := h.svc.CreateTask(c.Request.Context(), req.Command, payloadJSON, req.Priority, req.Webhook, req.MaxAttempts, req.Idempotency, runAt, req.DelaySecs, tenantID)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
respondWriteError(c, err)
return
}
c.JSON(http.StatusAccepted, task)
Expand Down
3 changes: 3 additions & 0 deletions internal/controllers/heartbeat_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ func (h *heartbeatController) Handle(c *gin.Context) {
return
}
if err := h.svc.Heartbeat(c.Request.Context(), taskID, claims.Subject, req.ExtendSeconds); err != nil {
if maybeRedirectLeader(c, err) {
return
}
status := http.StatusInternalServerError
if err.Error() == "not-owner" {
status = http.StatusForbidden
Expand Down
3 changes: 3 additions & 0 deletions internal/controllers/nack_task_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ func (h *nackController) Handle(c *gin.Context) {
}
delay, movedToDLQ, err := h.svc.NackTask(c.Request.Context(), taskID, claims.Subject, req.DelaySeconds, req.Reason)
if err != nil {
if maybeRedirectLeader(c, err) {
return
}
status := http.StatusInternalServerError
switch err.Error() {
case "not-owner":
Expand Down
50 changes: 50 additions & 0 deletions internal/controllers/respond.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package controllers

import (
"errors"
"net/http"
"strings"

"github.com/gin-gonic/gin"

"github.com/osvaldoandrade/codeq/pkg/domain"
)

// respondWriteError writes the right response for a service-level
// error from a WRITE controller (create, claim, submit, nack, etc.).
// If the error indicates "not leader" with a known leader URL, replies
// with HTTP 307 Temporary Redirect so a well-behaved HTTP client
// follows to the leader transparently. Otherwise falls back to the
// usual 400.
//
// HTTP 307 (vs the older 302) preserves the request method and body
// across the redirect (RFC 7231), which is what we want for POST /tasks
// and friends. Go's http.Client follows automatically when the
// request body supports Seek (bytes.Reader / strings.Reader / GetBody).
func respondWriteError(c *gin.Context, err error) {
if maybeRedirectLeader(c, err) {
return
}
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
}

// maybeRedirectLeader writes a 307 to the leader and returns true when
// err is a "not leader" hint with a known leader URL. Otherwise it
// returns false and writes nothing.
func maybeRedirectLeader(c *gin.Context, err error) bool {
var hint domain.LeaderHint
if !errors.As(err, &hint) {
return false
}
addr := hint.LeaderHTTPAddr()
if addr == "" {
return false
}
location := strings.TrimSuffix(addr, "/") + c.Request.URL.RequestURI()
c.Header("Location", location)
c.JSON(http.StatusTemporaryRedirect, gin.H{
"error": "not leader",
"leader": addr,
})
return true
}
5 changes: 5 additions & 0 deletions internal/controllers/submit_result_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ func (h *submitResultController) Handle(c *gin.Context) {
req.WorkerID = claims.Subject
rec, err := h.svc.Submit(c.Request.Context(), id, req)
if err != nil {
// 307-redirect "not leader" before the per-error status map
// — otherwise a NotLeaderError would fall into 400.
if maybeRedirectLeader(c, err) {
return
}
status := http.StatusBadRequest
switch err.Error() {
case "task not found":
Expand Down
45 changes: 38 additions & 7 deletions internal/repository/pebble/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,45 @@ import (
type Replicator interface {
IsLeader() bool
Replicate(repr []byte) error
// LeaderHTTPAddr returns the leader's HTTP base URL (per the
// configured peer map) when known, or "" otherwise. Used to fill
// the LeaderURL on NotLeaderError so controllers can respond with
// HTTP 307 + Location.
LeaderHTTPAddr() string
}

// ErrNotLeader is returned by write APIs when this DB is attached to a
// replicator and the local node isn't the leader. The repository code
// surfaces it to the service layer; the cluster router (not the storage
// layer) decides whether to forward to a leader.
// ErrNotLeader is the sentinel returned by write APIs when this DB is
// attached to a replicator and the local node isn't the leader. The
// concrete return value is a *NotLeaderError, which carries the
// leader hint; bare errors.Is(err, ErrNotLeader) still works because
// NotLeaderError.Is matches the sentinel.
var ErrNotLeader = errors.New("pebble: not leader")

// NotLeaderError is the typed error returned by write APIs in raft
// mode when the local node isn't the leader. LeaderURL is the
// leader's HTTP base URL when known (resolved from the configured
// peer map) — controllers use it to write a 307 redirect.
//
// Satisfies pkg/domain.LeaderHint via the LeaderHTTPAddr() method.
type NotLeaderError struct {
LeaderURL string
}

func (e *NotLeaderError) Error() string {
if e.LeaderURL != "" {
return "pebble: not leader (try " + e.LeaderURL + ")"
}
return "pebble: not leader"
}

// LeaderHTTPAddr returns the configured HTTP URL for the current
// leader, or "" if unknown. Satisfies pkg/domain.LeaderHint.
func (e *NotLeaderError) LeaderHTTPAddr() string { return e.LeaderURL }

// Is matches the package-level ErrNotLeader sentinel so existing
// errors.Is callers keep working without code changes.
func (e *NotLeaderError) Is(target error) bool { return target == ErrNotLeader }

// DB wraps a *pebble.DB with the helpers the repository implementations
// need: a process-wide monotonic sequence counter (used to order pending
// queue entries within a priority bucket) and a few convenience methods
Expand Down Expand Up @@ -244,7 +275,7 @@ func (d *DB) Has(key []byte) (bool, error) {
func (d *DB) Set(key, value []byte) error {
if d.repl != nil {
if !d.repl.IsLeader() {
return ErrNotLeader
return &NotLeaderError{LeaderURL: d.repl.LeaderHTTPAddr()}
}
b := d.db.NewBatch()
defer b.Close()
Expand All @@ -260,7 +291,7 @@ func (d *DB) Set(key, value []byte) error {
func (d *DB) Delete(key []byte) error {
if d.repl != nil {
if !d.repl.IsLeader() {
return ErrNotLeader
return &NotLeaderError{LeaderURL: d.repl.LeaderHTTPAddr()}
}
b := d.db.NewBatch()
defer b.Close()
Expand Down Expand Up @@ -291,7 +322,7 @@ func (d *DB) Batch() *pebbledb.Batch {
func (d *DB) CommitBatch(b *pebbledb.Batch) error {
if d.repl != nil {
if !d.repl.IsLeader() {
return ErrNotLeader
return &NotLeaderError{LeaderURL: d.repl.LeaderHTTPAddr()}
}
// Replicator owns serialization; the local pebble write happens
// on every node via the FSM. The caller still owns b and must
Expand Down
Loading
Loading