diff --git a/internal/cli/cli.go b/internal/cli/cli.go index 1e10639..6cd5b65 100644 --- a/internal/cli/cli.go +++ b/internal/cli/cli.go @@ -1418,49 +1418,29 @@ func StartSchedulerCLI(ctx *cli.Context) error { return fmt.Errorf("invalid component %q (expected scheduler, api, or all)", component) } - jobs, err := scheduler.BuildJobsFromConfig(config.Cfg) - if err != nil { - return err - } - + // runCtx is canceled on SIGINT or SIGTERM – triggers a full shutdown. runCtx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer stop() - type runner struct { - name string - run func(context.Context) error - } - - var runners []runner - - if runScheduler { - if len(jobs) == 0 { - logger.Info("scheduler: no enabled jobs found in configuration") - } else { - for _, job := range jobs { - logger.Info("scheduler: registering job %s", job.Name) - } - runners = append(runners, runner{ - name: "scheduler", - run: func(ctx context.Context) error { - return scheduler.RunJobs(ctx, jobs) - }, - }) - } - } + // sighupCh receives SIGHUP signals for in-place config reload. + sighupCh := make(chan os.Signal, 1) + signal.Notify(sighupCh, syscall.SIGHUP) + defer signal.Stop(sighupCh) + // Start the API server once. It does not need to restart on reload because + // it handles on-demand requests rather than reading scheduled job config. if runAPI { if ok, apiErr := canStartAPIServer(config.Cfg); ok { apiServer, err := server.New(config.Cfg) if err != nil { return fmt.Errorf("api server init failed: %w", err) } - runners = append(runners, runner{ - name: "api-server", - run: func(ctx context.Context) error { - return apiServer.Run(ctx) - }, - }) + go func() { + if err := apiServer.Run(runCtx); err != nil && !errors.Is(err, context.Canceled) { + logger.Error("api server error: %v", err) + stop() + } + }() } else if component == "api" { return fmt.Errorf("api server requested but cannot start: %w", apiErr) } else { @@ -1468,25 +1448,120 @@ func StartSchedulerCLI(ctx *cli.Context) error { } } - if len(runners) == 0 { + if !runScheduler { + // API-only mode: just wait for shutdown. + <-runCtx.Done() return nil } - errCh := make(chan error, len(runners)) - for _, r := range runners { - go func(r runner) { - errCh <- r.run(runCtx) - }(r) - } + // schedulerReloadLoop runs the scheduler and restarts it on each valid + // SIGHUP. It returns only when runCtx is canceled (SIGINT/SIGTERM). + return schedulerReloadLoop(runCtx, sighupCh) +} - for i := 0; i < len(runners); i++ { - if err := <-errCh; err != nil && !errors.Is(err, context.Canceled) { - stop() +// schedulerReloadLoop is the heart of the SIGHUP feature. +// +// Design: +// 1. Build jobs from the current config and start the gocron scheduler. +// 2. Block waiting for either a shutdown signal (runCtx.Done) or SIGHUP. +// 3. On SIGHUP: +// a. Load and validate the new config (parse YAML + dry-run job build). +// b. If invalid: log the error and keep the current scheduler running. +// c. If valid: cancel the per-iteration scheduler context, which causes +// gocron.Shutdown() to drain all in-flight jobs before returning. +// Then atomically swap in the new config and loop back to step 1. +// 4. On SIGINT/SIGTERM: drain in-flight jobs via schedCancel and return. +func schedulerReloadLoop( + runCtx context.Context, + sighupCh <-chan os.Signal, +) error { + for { + currentCfg := config.Get() + + jobs, err := scheduler.BuildJobsFromConfig(currentCfg) + if err != nil { return err } - } - return nil + if len(jobs) == 0 { + logger.Info("scheduler: no enabled jobs found in configuration") + } else { + for _, job := range jobs { + logger.Info("scheduler: registering job %s", job.Name) + } + } + + // Per-iteration context: canceled to trigger graceful scheduler drain. + schedCtx, schedCancel := context.WithCancel(runCtx) + schedDone := make(chan error, 1) + + go func() { + if len(jobs) == 0 { + // No jobs: park until the context is canceled. + <-schedCtx.Done() + schedDone <- nil + return + } + schedDone <- scheduler.RunJobs(schedCtx, jobs) + }() + + // Inner loop: handle repeated SIGHUPs without restarting for invalid configs. + reloaded := false + for !reloaded { + select { + case <-runCtx.Done(): + // SIGINT / SIGTERM: drain in-flight ops and exit. + schedCancel() + if err := <-schedDone; err != nil && !errors.Is(err, context.Canceled) { + return err + } + return nil + + case err := <-schedDone: + // The scheduler exited on its own – without being told to via + // schedCancel. This is unexpected (RunJobs normally blocks until + // its context is canceled). Treat a real error as fatal; a nil + // or Canceled result means it exited cleanly and we just stop. + schedCancel() + if err != nil && !errors.Is(err, context.Canceled) { + return err + } + return nil + + case <-sighupCh: + logger.Info("scheduler: received SIGHUP – reloading configuration from %s", config.CfgPath) + + newCfg, loadErr := config.Reload(config.CfgPath) + if loadErr != nil { + logger.Error("scheduler: config reload failed (keeping current config): %v", loadErr) + continue // wait for next signal + } + + // Validate the new config by doing a dry-run job build. + if _, buildErr := scheduler.BuildJobsFromConfig(newCfg); buildErr != nil { + logger.Error("scheduler: new config rejected (keeping current config): %v", buildErr) + continue // wait for next signal + } + + // New config is valid. Drain in-flight jobs, then swap. + logger.Info("scheduler: new configuration valid – draining in-flight operations") + schedCancel() + if err := <-schedDone; err != nil && !errors.Is(err, context.Canceled) { + logger.Error("scheduler: error while draining for reload: %v", err) + // The scheduler exited with a real error; propagate it. + return err + } + + // Atomic config swap. + config.Set(newCfg) + logger.Info("scheduler: configuration reloaded successfully") + reloaded = true // break inner loop → outer loop restarts scheduler + } + } + // schedCancel is always called before reaching here (either inside the + // SIGTERM branch, which returns, or inside the SIGHUP branch above). + schedCancel() + } } func StartAPIServerCLI(ctx *cli.Context) error { diff --git a/pkg/config/config.go b/pkg/config/config.go index 32c42f0..c56a27f 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -14,6 +14,7 @@ package config import ( "os" "strings" + "sync" "gopkg.in/yaml.v3" ) @@ -113,9 +114,17 @@ type CertAuthConfig struct { } // Cfg holds the loaded config for the whole app. +// Reads may use Cfg directly; concurrent reloads must go through Set/Get. var Cfg *Config -// Load reads and parses path into a Config. +// CfgPath is the path from which the current config was loaded. +// It is set by Init and used by the SIGHUP reload handler. +var CfgPath string + +// cfgMu protects Cfg and CfgPath during live reloads. +var cfgMu sync.RWMutex + +// Load reads and parses path into a Config without applying it. func Load(path string) (*Config, error) { data, err := os.ReadFile(path) if err != nil { @@ -128,20 +137,52 @@ func Load(path string) (*Config, error) { return &c, nil } -// Init loads the config and assigns it to the package variable. +// Init loads the config from path and assigns it to the package variables. func Init(path string) error { c, err := Load(path) if err != nil { return err } + cfgMu.Lock() Cfg = c + CfgPath = path + cfgMu.Unlock() return nil } +// Reload loads a new Config from path and returns it for validation. +// The caller is responsible for calling Set to apply the new config. +// Reload does NOT modify the active Cfg; use it as a dry-run / validation step. +func Reload(path string) (*Config, error) { + if path == "" { + cfgMu.RLock() + path = CfgPath + cfgMu.RUnlock() + } + return Load(path) +} + +// Set atomically replaces the active configuration. +func Set(c *Config) { + cfgMu.Lock() + Cfg = c + cfgMu.Unlock() +} + +// Get returns the active configuration under a read lock. +// Prefer Get() over reading Cfg directly in code that runs concurrently with +// SIGHUP reload (e.g. the scheduler loop). +func Get() *Config { + cfgMu.RLock() + defer cfgMu.RUnlock() + return Cfg +} + // DefaultCluster returns the trimmed default cluster name from the loaded config. func DefaultCluster() string { - if Cfg == nil { + c := Get() + if c == nil { return "" } - return strings.TrimSpace(Cfg.DefaultCluster) + return strings.TrimSpace(c.DefaultCluster) }