Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 62 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,32 @@ type Config struct {
// setting of Postgres `search_path`.
Schema string

// SoftStopTimeout is the maximum amount of time that the client will wait
// for running jobs to finish during a stop before their contexts are
// cancelled. After the timeout elapses, the client escalates to a hard stop
// by cancelling the context of all running jobs. This applies regardless of
// how stop is initiated — whether by calling Stop, StopAndCancel, or by
// cancelling the context passed to Start.
//
// In combination with signal.NotifyContext on the context passed to Start,
// this can simplify graceful stop to:
//
// ctx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
// defer stop()
//
// if err := client.Start(ctx); err != nil { ... }
// <-client.Stopped()
//
// The signal cancels the Start context, which initiates a soft stop. If
// running jobs haven't finished after SoftStopTimeout, their contexts are
// automatically cancelled to trigger a hard stop.
//
// StopAndCancel bypasses the timeout entirely and cancels job contexts
// immediately.
//
// Defaults to no timeout (wait indefinitely for jobs to finish).
SoftStopTimeout time.Duration

// SkipJobKindValidation causes the job kind format validation check to be
// skipped. This is available as an interim stopgap for users that have
// invalid job kind names, but would rather disable the check rather than
Expand Down Expand Up @@ -458,6 +484,7 @@ func (c *Config) WithDefaults() *Config {
RescueStuckJobsAfter: cmp.Or(c.RescueStuckJobsAfter, rescueAfter),
RetryPolicy: retryPolicy,
Schema: c.Schema,
SoftStopTimeout: c.SoftStopTimeout,
SkipJobKindValidation: c.SkipJobKindValidation,
SkipUnknownJobCheck: c.SkipUnknownJobCheck,
Test: c.Test,
Expand Down Expand Up @@ -1084,10 +1111,19 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
return err
}

// We use separate contexts for fetching and working to allow for a graceful
// stop. Both inherit from the provided context, so if it's cancelled, a
// more aggressive stop will be initiated.
workCtx, workCancel := context.WithCancelCause(ctx)
// We use separate contexts for fetching and working to allow for a
// graceful stop. When SoftStopTimeout is configured, the work context
// is detached from the start context so that cancelling the start
// context initiates a soft stop (with timeout escalation) rather than
// an immediate hard stop. When SoftStopTimeout is not configured, the
// work context inherits from the start context to preserve the
// existing behavior where cancelling the start context is equivalent
// to StopAndCancel.
workParentCtx := ctx
if c.config.SoftStopTimeout > 0 {
workParentCtx = context.WithoutCancel(ctx)
}
workCtx, workCancel := context.WithCancelCause(workParentCtx)

// Client available to executors and to various service hooks.
fetchCtx := withClient(fetchCtx, c)
Expand Down Expand Up @@ -1151,6 +1187,18 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
c.queues.startStopMu.Lock()
defer c.queues.startStopMu.Unlock()

// If SoftStopTimeout is configured, start a timer that will cancel
// the work context (escalating to a hard stop) if producers don't
// finish in time. StopAndCancel also calls workCancel, in which case
// this timer is a harmless no-op because the context is already done.
if c.config.SoftStopTimeout > 0 {
softStopTimer := time.AfterFunc(c.config.SoftStopTimeout, func() {
c.baseService.Logger.WarnContext(ctx, c.baseService.Name+": Soft stop timeout; cancelling remaining job contexts", slog.Duration("soft_stop_timeout", c.config.SoftStopTimeout))
c.workCancel(rivercommon.ErrStop)
})
defer softStopTimer.Stop()
}

// On stop, have the producers stop fetching first of all.
c.baseService.Logger.DebugContext(ctx, c.baseService.Name+": Stopping producers")
startstop.StopAllParallel(producersAsServices()...)
Expand Down Expand Up @@ -1183,6 +1231,10 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
// complete before exiting. If the provided context is done before shutdown has
// completed, Stop will return immediately with the context's error.
//
// If SoftStopTimeout is configured, running job contexts will be automatically
// cancelled after the timeout elapses, escalating to a hard stop. This also
// applies when stop is initiated by cancelling the context passed to Start.
//
// There's no need to call this method if a hard stop has already been initiated
// by cancelling the context passed to Start or by calling StopAndCancel.
func (c *Client[TTx]) Stop(ctx context.Context) error {
Expand Down Expand Up @@ -1211,6 +1263,12 @@ func (c *Client[TTx]) Stop(ctx context.Context) error {
// This can also be initiated by cancelling the context passed to Start. There is
// no need to call this method if the context passed to Start is cancelled
// instead.
//
// In most cases, using Stop with SoftStopTimeout configured is preferable to
// calling StopAndCancel directly. SoftStopTimeout gives running jobs a chance
// to finish before automatically escalating to context cancellation, providing
// graceful stop semantics without requiring manual orchestration of Stop and
// StopAndCancel.
func (c *Client[TTx]) StopAndCancel(ctx context.Context) error {
c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": Hard stop started; cancelling all work")
c.workCancel(rivercommon.ErrStop)
Expand Down
116 changes: 116 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2361,6 +2361,122 @@ func Test_Client_StopAndCancel(t *testing.T) {
})
}

func Test_Client_SoftStopTimeout(t *testing.T) {
t.Parallel()

ctx := context.Background()

type JobArgs struct {
testutil.JobArgsReflectKind[JobArgs]
}

t.Run("EscalatesToHardStopAfterTimeout", func(t *testing.T) {
t.Parallel()

config := newTestConfig(t, "")
config.SoftStopTimeout = 100 * time.Millisecond

jobDoneChan := make(chan struct{})
jobStartedChan := make(chan struct{})
AddWorker(config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
close(jobStartedChan)
<-ctx.Done() // only finishes when context is cancelled
close(jobDoneChan)
return nil
}))

client := runNewTestClient(ctx, t, config)

_, err := client.Insert(ctx, JobArgs{}, nil)
require.NoError(t, err)

riversharedtest.WaitOrTimeout(t, jobStartedChan)

// Stop initiates a soft stop. The job won't finish on its own, but
// SoftStopTimeout should escalate to a hard stop after 100ms.
require.NoError(t, client.Stop(ctx))

// Verify the job's context was indeed cancelled.
select {
case <-jobDoneChan:
default:
t.Fatal("expected job to have been cancelled by soft stop timeout")
}
})

t.Run("SoftStopSucceedsBeforeTimeout", func(t *testing.T) {
t.Parallel()

config := newTestConfig(t, "")
config.SoftStopTimeout = 5 * time.Second

jobStartedChan := make(chan struct{})
AddWorker(config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
close(jobStartedChan)
return nil // finishes immediately
}))

client := runNewTestClient(ctx, t, config)

_, err := client.Insert(ctx, JobArgs{}, nil)
require.NoError(t, err)

riversharedtest.WaitOrTimeout(t, jobStartedChan)

// Stop should succeed quickly since the job finishes on its own.
// The 5s timeout should not fire.
require.NoError(t, client.Stop(ctx))
})

t.Run("ContextCancellationEscalatesAfterTimeout", func(t *testing.T) {
t.Parallel()

config := newTestConfig(t, "")
config.SoftStopTimeout = 100 * time.Millisecond

jobDoneChan := make(chan struct{})
jobStartedChan := make(chan struct{})
AddWorker(config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
close(jobStartedChan)
<-ctx.Done()
close(jobDoneChan)
return nil
}))

var (
dbPool = riversharedtest.DBPool(ctx, t)
driver = riverpgxv5.New(dbPool)
schema = riverdbtest.TestSchema(ctx, t, driver, nil)
)
config.Schema = schema

client, err := NewClient(driver, config)
require.NoError(t, err)

startCtx, startCtxCancel := context.WithCancel(ctx)
defer startCtxCancel()

require.NoError(t, client.Start(startCtx))

_, err = client.Insert(ctx, JobArgs{}, nil)
require.NoError(t, err)

riversharedtest.WaitOrTimeout(t, jobStartedChan)

// Cancel the start context. This should initiate a soft stop, then
// escalate to hard stop after SoftStopTimeout.
startCtxCancel()

riversharedtest.WaitOrTimeout(t, client.Stopped())

select {
case <-jobDoneChan:
default:
t.Fatal("expected job to have been cancelled by soft stop timeout")
}
})
}

type callbackWithCustomTimeoutArgs struct {
TimeoutValue time.Duration `json:"timeout"`
}
Expand Down
32 changes: 27 additions & 5 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,39 @@ that inserted job kinds have a worker that can run them.

### Stopping

The client should also be stopped on program shutdown:
The client should also be stopped on program shutdown. There's a number of ways
to go about this (see [graceful shutdown]), but the shortest is to cancel the
context send to `Start` when the program's ready to stop. For example, to stop
on `SIGINT`/`SIGTERM`:

```go
// Stop fetching new work and wait for active jobs to finish.
if err := riverClient.Stop(ctx); err != nil {
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
SoftStopTimeout: 10 * time.Second,
...
})
if err != nil {
panic(err)
}

signalCtx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
defer stop()

// Stop fetching new work and wait for active jobs to finish. Cancel jobs after
// SoftStopTimeout elapses.
if err := riverClient.Start(signalCtx); err != nil {
panic(err)
}

<-riverClient.Stopped()
```

There are some complexities around ensuring clients stop cleanly, but also in a
timely manner. See [graceful shutdown] for more details on River's stop modes.
Alternatively, use an explicit call to `Stop`:

```go
if err := riverClient.Stop(ctx); err != nil {
panic(err)
}
```

[Insert-only clients](/docs/insert-only-clients) will insert jobs, but not work
them, and don't need to be started or stopped.
Expand Down
Loading
Loading