Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 120 additions & 45 deletions internal/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -1418,75 +1418,150 @@ func StartSchedulerCLI(ctx *cli.Context) error {
return fmt.Errorf("invalid component %q (expected scheduler, api, or all)", component)
}

jobs, err := scheduler.BuildJobsFromConfig(config.Cfg)
if err != nil {
return err
}

// runCtx is canceled on SIGINT or SIGTERM – triggers a full shutdown.
runCtx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()

type runner struct {
name string
run func(context.Context) error
}

var runners []runner

if runScheduler {
if len(jobs) == 0 {
logger.Info("scheduler: no enabled jobs found in configuration")
} else {
for _, job := range jobs {
logger.Info("scheduler: registering job %s", job.Name)
}
runners = append(runners, runner{
name: "scheduler",
run: func(ctx context.Context) error {
return scheduler.RunJobs(ctx, jobs)
},
})
}
}
// sighupCh receives SIGHUP signals for in-place config reload.
sighupCh := make(chan os.Signal, 1)
signal.Notify(sighupCh, syscall.SIGHUP)
defer signal.Stop(sighupCh)

// Start the API server once. It does not need to restart on reload because
// it handles on-demand requests rather than reading scheduled job config.
if runAPI {
if ok, apiErr := canStartAPIServer(config.Cfg); ok {
apiServer, err := server.New(config.Cfg)
if err != nil {
return fmt.Errorf("api server init failed: %w", err)
}
runners = append(runners, runner{
name: "api-server",
run: func(ctx context.Context) error {
return apiServer.Run(ctx)
},
})
go func() {
if err := apiServer.Run(runCtx); err != nil && !errors.Is(err, context.Canceled) {
logger.Error("api server error: %v", err)
stop()
}
}()
} else if component == "api" {
return fmt.Errorf("api server requested but cannot start: %w", apiErr)
} else {
logger.Info("api server not started: %v", apiErr)
}
}

if len(runners) == 0 {
if !runScheduler {
// API-only mode: just wait for shutdown.
<-runCtx.Done()
return nil
}

errCh := make(chan error, len(runners))
for _, r := range runners {
go func(r runner) {
errCh <- r.run(runCtx)
}(r)
}
// schedulerReloadLoop runs the scheduler and restarts it on each valid
// SIGHUP. It returns only when runCtx is canceled (SIGINT/SIGTERM).
return schedulerReloadLoop(runCtx, sighupCh)
}

for i := 0; i < len(runners); i++ {
if err := <-errCh; err != nil && !errors.Is(err, context.Canceled) {
stop()
// schedulerReloadLoop is the heart of the SIGHUP feature.
//
// Design:
// 1. Build jobs from the current config and start the gocron scheduler.
// 2. Block waiting for either a shutdown signal (runCtx.Done) or SIGHUP.
// 3. On SIGHUP:
// a. Load and validate the new config (parse YAML + dry-run job build).
// b. If invalid: log the error and keep the current scheduler running.
// c. If valid: cancel the per-iteration scheduler context, which causes
// gocron.Shutdown() to drain all in-flight jobs before returning.
// Then atomically swap in the new config and loop back to step 1.
// 4. On SIGINT/SIGTERM: drain in-flight jobs via schedCancel and return.
func schedulerReloadLoop(
runCtx context.Context,
sighupCh <-chan os.Signal,
) error {
for {
currentCfg := config.Get()

jobs, err := scheduler.BuildJobsFromConfig(currentCfg)
if err != nil {
return err
}
}

return nil
if len(jobs) == 0 {
logger.Info("scheduler: no enabled jobs found in configuration")
} else {
for _, job := range jobs {
logger.Info("scheduler: registering job %s", job.Name)
}
}

// Per-iteration context: canceled to trigger graceful scheduler drain.
schedCtx, schedCancel := context.WithCancel(runCtx)
schedDone := make(chan error, 1)

go func() {
if len(jobs) == 0 {
// No jobs: park until the context is canceled.
<-schedCtx.Done()
schedDone <- nil
return
}
schedDone <- scheduler.RunJobs(schedCtx, jobs)
}()

// Inner loop: handle repeated SIGHUPs without restarting for invalid configs.
reloaded := false
for !reloaded {
select {
case <-runCtx.Done():
// SIGINT / SIGTERM: drain in-flight ops and exit.
schedCancel()
if err := <-schedDone; err != nil && !errors.Is(err, context.Canceled) {
return err
}
return nil

case err := <-schedDone:
// The scheduler exited on its own – without being told to via
// schedCancel. This is unexpected (RunJobs normally blocks until
// its context is canceled). Treat a real error as fatal; a nil
// or Canceled result means it exited cleanly and we just stop.
schedCancel()
if err != nil && !errors.Is(err, context.Canceled) {
return err
}
return nil

case <-sighupCh:
logger.Info("scheduler: received SIGHUP – reloading configuration from %s", config.CfgPath)

newCfg, loadErr := config.Reload(config.CfgPath)
if loadErr != nil {
logger.Error("scheduler: config reload failed (keeping current config): %v", loadErr)
continue // wait for next signal
}

// Validate the new config by doing a dry-run job build.
if _, buildErr := scheduler.BuildJobsFromConfig(newCfg); buildErr != nil {
logger.Error("scheduler: new config rejected (keeping current config): %v", buildErr)
continue // wait for next signal
}

// New config is valid. Drain in-flight jobs, then swap.
logger.Info("scheduler: new configuration valid – draining in-flight operations")
schedCancel()
if err := <-schedDone; err != nil && !errors.Is(err, context.Canceled) {
logger.Error("scheduler: error while draining for reload: %v", err)
// The scheduler exited with a real error; propagate it.
return err
}

// Atomic config swap.
config.Set(newCfg)
logger.Info("scheduler: configuration reloaded successfully")
reloaded = true // break inner loop → outer loop restarts scheduler
}
}
// schedCancel is always called before reaching here (either inside the
// SIGTERM branch, which returns, or inside the SIGHUP branch above).
schedCancel()
}
}

func StartAPIServerCLI(ctx *cli.Context) error {
Expand Down
49 changes: 45 additions & 4 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ package config
import (
"os"
"strings"
"sync"

"gopkg.in/yaml.v3"
)
Expand Down Expand Up @@ -113,9 +114,17 @@ type CertAuthConfig struct {
}

// Cfg holds the loaded config for the whole app.
// Reads may use Cfg directly; concurrent reloads must go through Set/Get.
var Cfg *Config

// Load reads and parses path into a Config.
// CfgPath is the path from which the current config was loaded.
// It is set by Init and used by the SIGHUP reload handler.
var CfgPath string

// cfgMu protects Cfg and CfgPath during live reloads.
var cfgMu sync.RWMutex

// Load reads and parses path into a Config without applying it.
func Load(path string) (*Config, error) {
data, err := os.ReadFile(path)
if err != nil {
Expand All @@ -128,20 +137,52 @@ func Load(path string) (*Config, error) {
return &c, nil
}

// Init loads the config and assigns it to the package variable.
// Init loads the config from path and assigns it to the package variables.
func Init(path string) error {
c, err := Load(path)
if err != nil {
return err
}
cfgMu.Lock()
Cfg = c
CfgPath = path
cfgMu.Unlock()
return nil
}

// Reload loads a new Config from path and returns it for validation.
// The caller is responsible for calling Set to apply the new config.
// Reload does NOT modify the active Cfg; use it as a dry-run / validation step.
func Reload(path string) (*Config, error) {
if path == "" {
cfgMu.RLock()
path = CfgPath
cfgMu.RUnlock()
}
return Load(path)
}

// Set atomically replaces the active configuration.
func Set(c *Config) {
cfgMu.Lock()
Cfg = c
cfgMu.Unlock()
}

// Get returns the active configuration under a read lock.
// Prefer Get() over reading Cfg directly in code that runs concurrently with
// SIGHUP reload (e.g. the scheduler loop).
func Get() *Config {
cfgMu.RLock()
defer cfgMu.RUnlock()
return Cfg
}

// DefaultCluster returns the trimmed default cluster name from the loaded config.
func DefaultCluster() string {
if Cfg == nil {
c := Get()
if c == nil {
return ""
}
return strings.TrimSpace(Cfg.DefaultCluster)
return strings.TrimSpace(c.DefaultCluster)
}
Loading