diff --git a/docs/scheduling.md b/docs/scheduling.md index 92911d9..84683a0 100644 --- a/docs/scheduling.md +++ b/docs/scheduling.md @@ -41,6 +41,14 @@ For long-lived automation, define jobs in `ace.yaml` and let `./ace start` orche - `schedule_jobs`: describes each job (what to run and with which arguments). - `schedule_config`: pairs a job name with either a frequency or a cron expression and marks it enabled/disabled. +### Reloading the Configuration at Runtime + +If you change the configuration and do not wish to restart the process, you can send SIGHUP (1) to reload it. This works for all long-running ACE modes: + +- **`ace start` / `ace start --component=scheduler`** — waits for in-flight jobs to complete, then swaps in the new configuration. +- **`ace start --component=api` / `ace server`** — applies the new configuration immediately and reloads the mTLS security config (certificate revocation list and allowed CN list). +- **`ace start --component=all`** — both of the above. + ### Sample Configuration ```yaml diff --git a/internal/api/http/config_reload_test.go b/internal/api/http/config_reload_test.go new file mode 100644 index 0000000..f5d6d16 --- /dev/null +++ b/internal/api/http/config_reload_test.go @@ -0,0 +1,92 @@ +package server + +import ( + "testing" + + "github.com/pgedge/ace/pkg/config" +) + +// TestResolversPickUpReloadedConfig verifies that the handler resolver +// functions read from the config snapshot passed to them, so that a +// SIGHUP-triggered config.Set takes effect for subsequent API requests +// while keeping each request internally consistent. +func TestResolversPickUpReloadedConfig(t *testing.T) { + original := config.Get() + t.Cleanup(func() { + if original != nil { + config.Set(original) + } else { + config.Set(&config.Config{}) + } + }) + + // Set initial config. + config.Set(&config.Config{ + TableDiff: config.DiffConfig{ + DiffBlockSize: 5000, + ConcurrencyFactor: 0.25, + CompareUnitSize: 2000, + MaxDiffRows: 100, + }, + Server: config.ServerConfig{ + TaskStorePath: "/tmp/old-tasks.db", + }, + }) + + s := &APIServer{} + cfg := config.Get() + + // Verify resolvers return initial values (0 = "use config default"). + if got := s.resolveBlockSize(cfg, 0); got != 5000 { + t.Errorf("resolveBlockSize: got %d, want 5000", got) + } + if got := s.resolveConcurrency(cfg, 0); got != 0.25 { + t.Errorf("resolveConcurrency: got %f, want 0.25", got) + } + if got := s.resolveCompareUnitSize(cfg, 0); got != 2000 { + t.Errorf("resolveCompareUnitSize: got %d, want 2000", got) + } + if got := s.resolveMaxDiffRows(cfg, 0); got != 100 { + t.Errorf("resolveMaxDiffRows: got %d, want 100", got) + } + if got := cfg.Server.TaskStorePath; got != "/tmp/old-tasks.db" { + t.Errorf("TaskStorePath: got %q, want /tmp/old-tasks.db", got) + } + + // Simulate SIGHUP: swap in new config. + config.Set(&config.Config{ + TableDiff: config.DiffConfig{ + DiffBlockSize: 9999, + ConcurrencyFactor: 0.75, + CompareUnitSize: 4000, + MaxDiffRows: 500, + }, + Server: config.ServerConfig{ + TaskStorePath: "/tmp/new-tasks.db", + }, + }) + + newCfg := config.Get() + + // Verify resolvers now return the reloaded values. + if got := s.resolveBlockSize(newCfg, 0); got != 9999 { + t.Errorf("after reload resolveBlockSize: got %d, want 9999", got) + } + if got := s.resolveConcurrency(newCfg, 0); got != 0.75 { + t.Errorf("after reload resolveConcurrency: got %f, want 0.75", got) + } + if got := s.resolveCompareUnitSize(newCfg, 0); got != 4000 { + t.Errorf("after reload resolveCompareUnitSize: got %d, want 4000", got) + } + if got := s.resolveMaxDiffRows(newCfg, 0); got != 500 { + t.Errorf("after reload resolveMaxDiffRows: got %d, want 500", got) + } + if got := newCfg.Server.TaskStorePath; got != "/tmp/new-tasks.db" { + t.Errorf("after reload TaskStorePath: got %q, want /tmp/new-tasks.db", got) + } + + // Verify old snapshot still returns old values (per-request consistency). + if got := s.resolveBlockSize(cfg, 0); got != 5000 { + t.Errorf("old snapshot resolveBlockSize: got %d, want 5000", got) + } +} diff --git a/internal/api/http/handler.go b/internal/api/http/handler.go index fe70b55..d978534 100644 --- a/internal/api/http/handler.go +++ b/internal/api/http/handler.go @@ -202,14 +202,16 @@ func (s *APIServer) handleTableDiff(w http.ResponseWriter, r *http.Request) { return } + cfg := config.Get() + task := diff.NewTableDiffTask() task.ClusterName = cluster task.QualifiedTableName = tableName task.DBName = strings.TrimSpace(req.DBName) - task.BlockSize = s.resolveBlockSize(req.BlockSize) - task.ConcurrencyFactor = s.resolveConcurrency(req.Concurrency) - task.CompareUnitSize = s.resolveCompareUnitSize(req.CompareUnitSize) - task.MaxDiffRows = s.resolveMaxDiffRows(req.MaxDiffRows) + task.BlockSize = s.resolveBlockSize(cfg, req.BlockSize) + task.ConcurrencyFactor = s.resolveConcurrency(cfg, req.Concurrency) + task.CompareUnitSize = s.resolveCompareUnitSize(cfg, req.CompareUnitSize) + task.MaxDiffRows = s.resolveMaxDiffRows(cfg, req.MaxDiffRows) task.Output = "json" task.Nodes = s.resolveNodes(req.Nodes) task.TableFilter = strings.TrimSpace(req.TableFilter) @@ -220,7 +222,7 @@ func (s *APIServer) handleTableDiff(w http.ResponseWriter, r *http.Request) { task.QuietMode = req.Quiet task.SkipDBUpdate = false task.TaskStore = s.taskStore - task.TaskStorePath = s.cfg.Server.TaskStorePath + task.TaskStorePath = cfg.Server.TaskStorePath if err := task.Validate(); err != nil { writeError(w, http.StatusBadRequest, err.Error()) @@ -249,41 +251,41 @@ func (s *APIServer) handleTableDiff(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusAccepted, resp) } -func (s *APIServer) resolveBlockSize(requested int) int { +func (s *APIServer) resolveBlockSize(cfg *config.Config, requested int) int { if requested > 0 { return requested } - if cfg := s.cfg; cfg != nil && cfg.TableDiff.DiffBlockSize > 0 { + if cfg != nil && cfg.TableDiff.DiffBlockSize > 0 { return cfg.TableDiff.DiffBlockSize } return 100000 } -func (s *APIServer) resolveConcurrency(requested float64) float64 { +func (s *APIServer) resolveConcurrency(cfg *config.Config, requested float64) float64 { if requested > 0 { return requested } - if cfg := s.cfg; cfg != nil && cfg.TableDiff.ConcurrencyFactor > 0 { + if cfg != nil && cfg.TableDiff.ConcurrencyFactor > 0 { return cfg.TableDiff.ConcurrencyFactor } return 0.5 } -func (s *APIServer) resolveCompareUnitSize(requested int) int { +func (s *APIServer) resolveCompareUnitSize(cfg *config.Config, requested int) int { if requested > 0 { return requested } - if cfg := s.cfg; cfg != nil && cfg.TableDiff.CompareUnitSize > 0 { + if cfg != nil && cfg.TableDiff.CompareUnitSize > 0 { return cfg.TableDiff.CompareUnitSize } return 10000 } -func (s *APIServer) resolveMaxDiffRows(requested int64) int64 { +func (s *APIServer) resolveMaxDiffRows(cfg *config.Config, requested int64) int64 { if requested > 0 { return requested } - if cfg := s.cfg; cfg != nil && cfg.TableDiff.MaxDiffRows > 0 { + if cfg != nil && cfg.TableDiff.MaxDiffRows > 0 { return cfg.TableDiff.MaxDiffRows } return 0 @@ -360,7 +362,7 @@ func (s *APIServer) handleTableRerun(w http.ResponseWriter, r *http.Request) { task.InvokeMethod = "api" task.SkipDBUpdate = false task.TaskStore = s.taskStore - task.TaskStorePath = s.cfg.Server.TaskStorePath + task.TaskStorePath = config.Get().Server.TaskStorePath if err := s.enqueueTask(task.TaskID, func(ctx context.Context) error { task.Ctx = ctx @@ -441,7 +443,7 @@ func (s *APIServer) handleTableRepair(w http.ResponseWriter, r *http.Request) { task.InvokeMethod = "api" task.SkipDBUpdate = false task.TaskStore = s.taskStore - task.TaskStorePath = s.cfg.Server.TaskStorePath + task.TaskStorePath = config.Get().Server.TaskStorePath if err := task.ValidateAndPrepare(); err != nil { writeError(w, http.StatusBadRequest, err.Error()) @@ -506,7 +508,7 @@ func (s *APIServer) handleSpockDiff(w http.ResponseWriter, r *http.Request) { task.InvokeMethod = "api" task.SkipDBUpdate = false task.TaskStore = s.taskStore - task.TaskStorePath = s.cfg.Server.TaskStorePath + task.TaskStorePath = config.Get().Server.TaskStorePath if err := task.Validate(); err != nil { writeError(w, http.StatusBadRequest, err.Error()) @@ -561,6 +563,8 @@ func (s *APIServer) handleSchemaDiff(w http.ResponseWriter, r *http.Request) { return } + cfg := config.Get() + task := diff.NewSchemaDiffTask() task.ClusterName = cluster task.SchemaName = schema @@ -570,9 +574,9 @@ func (s *APIServer) handleSchemaDiff(w http.ResponseWriter, r *http.Request) { task.SkipFile = req.SkipFile task.Quiet = req.Quiet task.DDLOnly = req.DDLOnly - task.BlockSize = s.resolveBlockSize(req.BlockSize) - task.ConcurrencyFactor = s.resolveConcurrency(req.Concurrency) - task.CompareUnitSize = s.resolveCompareUnitSize(req.CompareUnitSize) + task.BlockSize = s.resolveBlockSize(cfg, req.BlockSize) + task.ConcurrencyFactor = s.resolveConcurrency(cfg, req.Concurrency) + task.CompareUnitSize = s.resolveCompareUnitSize(cfg, req.CompareUnitSize) task.Output = strings.TrimSpace(req.Output) if task.Output == "" { task.Output = "json" @@ -581,7 +585,7 @@ func (s *APIServer) handleSchemaDiff(w http.ResponseWriter, r *http.Request) { task.Ctx = r.Context() task.SkipDBUpdate = false task.TaskStore = s.taskStore - task.TaskStorePath = s.cfg.Server.TaskStorePath + task.TaskStorePath = cfg.Server.TaskStorePath if err := task.Validate(); err != nil { writeError(w, http.StatusBadRequest, err.Error()) @@ -642,6 +646,8 @@ func (s *APIServer) handleRepsetDiff(w http.ResponseWriter, r *http.Request) { return } + cfg := config.Get() + task := diff.NewRepsetDiffTask() task.ClusterName = cluster task.RepsetName = repset @@ -650,9 +656,9 @@ func (s *APIServer) handleRepsetDiff(w http.ResponseWriter, r *http.Request) { task.SkipTables = req.SkipTables task.SkipFile = req.SkipFile task.Quiet = req.Quiet - task.BlockSize = s.resolveBlockSize(req.BlockSize) - task.ConcurrencyFactor = s.resolveConcurrency(req.Concurrency) - task.CompareUnitSize = s.resolveCompareUnitSize(req.CompareUnitSize) + task.BlockSize = s.resolveBlockSize(cfg, req.BlockSize) + task.ConcurrencyFactor = s.resolveConcurrency(cfg, req.Concurrency) + task.CompareUnitSize = s.resolveCompareUnitSize(cfg, req.CompareUnitSize) task.Output = strings.TrimSpace(req.Output) if task.Output == "" { task.Output = "json" @@ -663,7 +669,7 @@ func (s *APIServer) handleRepsetDiff(w http.ResponseWriter, r *http.Request) { task.InvokeMethod = "api" task.SkipDBUpdate = false task.TaskStore = s.taskStore - task.TaskStorePath = s.cfg.Server.TaskStorePath + task.TaskStorePath = cfg.Server.TaskStorePath if err := task.Validate(); err != nil { writeError(w, http.StatusBadRequest, err.Error()) @@ -741,7 +747,7 @@ func (s *APIServer) handleMtreeInit(w http.ResponseWriter, r *http.Request) { task.InvokeMethod = "api" task.SkipDBUpdate = false task.TaskStore = s.taskStore - task.TaskStorePath = s.cfg.Server.TaskStorePath + task.TaskStorePath = config.Get().Server.TaskStorePath if err := s.enqueueTask(task.TaskID, func(ctx context.Context) error { task.Ctx = ctx @@ -799,7 +805,7 @@ func (s *APIServer) handleMtreeTeardown(w http.ResponseWriter, r *http.Request) task.InvokeMethod = "api" task.SkipDBUpdate = false task.TaskStore = s.taskStore - task.TaskStorePath = s.cfg.Server.TaskStorePath + task.TaskStorePath = config.Get().Server.TaskStorePath if err := s.enqueueTask(task.TaskID, func(ctx context.Context) error { task.Ctx = ctx @@ -863,7 +869,7 @@ func (s *APIServer) handleMtreeTeardownTable(w http.ResponseWriter, r *http.Requ task.InvokeMethod = "api" task.SkipDBUpdate = false task.TaskStore = s.taskStore - task.TaskStorePath = s.cfg.Server.TaskStorePath + task.TaskStorePath = config.Get().Server.TaskStorePath if err := s.enqueueTask(task.TaskID, func(ctx context.Context) error { task.Ctx = ctx @@ -934,7 +940,7 @@ func (s *APIServer) handleMtreeBuild(w http.ResponseWriter, r *http.Request) { task.InvokeMethod = "api" task.SkipDBUpdate = false task.TaskStore = s.taskStore - task.TaskStorePath = s.cfg.Server.TaskStorePath + task.TaskStorePath = config.Get().Server.TaskStorePath if err := task.Validate(); err != nil { writeError(w, http.StatusBadRequest, err.Error()) @@ -1009,7 +1015,7 @@ func (s *APIServer) handleMtreeUpdate(w http.ResponseWriter, r *http.Request) { task.InvokeMethod = "api" task.SkipDBUpdate = false task.TaskStore = s.taskStore - task.TaskStorePath = s.cfg.Server.TaskStorePath + task.TaskStorePath = config.Get().Server.TaskStorePath if err := task.Validate(); err != nil { writeError(w, http.StatusBadRequest, err.Error()) @@ -1088,7 +1094,7 @@ func (s *APIServer) handleMtreeDiff(w http.ResponseWriter, r *http.Request) { task.InvokeMethod = "api" task.SkipDBUpdate = false task.TaskStore = s.taskStore - task.TaskStorePath = s.cfg.Server.TaskStorePath + task.TaskStorePath = config.Get().Server.TaskStorePath if err := task.Validate(); err != nil { writeError(w, http.StatusBadRequest, err.Error()) diff --git a/internal/api/http/server.go b/internal/api/http/server.go index 50f80c6..a9455f6 100644 --- a/internal/api/http/server.go +++ b/internal/api/http/server.go @@ -10,6 +10,7 @@ import ( "net" "net/http" "sync" + "sync/atomic" "time" "github.com/pgedge/ace/pkg/config" @@ -18,9 +19,8 @@ import ( ) type APIServer struct { - cfg *config.Config server *http.Server - validator *certValidator + validator atomic.Pointer[certValidator] taskStore *taskstore.Store listenAddr string jobCtx context.Context @@ -62,12 +62,11 @@ func New(cfg *config.Config) (*APIServer, error) { mux := http.NewServeMux() apiServer := &APIServer{ - cfg: cfg, - validator: validator, taskStore: taskStore, listenAddr: fmt.Sprintf("%s:%d", srvCfg.ListenAddress, srvCfg.ListenPort), jobCtx: context.Background(), } + apiServer.validator.Store(validator) mux.Handle("/api/v1/table-diff", apiServer.authenticated(http.HandlerFunc(apiServer.handleTableDiff))) mux.Handle("/api/v1/table-rerun", apiServer.authenticated(http.HandlerFunc(apiServer.handleTableRerun))) @@ -175,6 +174,19 @@ type clientInfo struct { type clientContextKey struct{} +// ReloadSecurityConfig rebuilds the certValidator from cfg and atomically +// swaps it in so that subsequent requests use the updated allowedCNs and CRL. +// Note: the TLS CA pool used for handshake verification requires a restart to +// change; only allowedCNs and CRL changes take effect without a restart. +func (s *APIServer) ReloadSecurityConfig(cfg *config.Config) error { + v, err := newCertValidator(cfg) + if err != nil { + return err + } + s.validator.Store(v) + return nil +} + func (s *APIServer) authenticated(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.TLS == nil || len(r.TLS.PeerCertificates) == 0 { @@ -182,7 +194,7 @@ func (s *APIServer) authenticated(next http.Handler) http.Handler { return } clientCert := r.TLS.PeerCertificates[0] - role, err := s.validator.Validate(clientCert) + role, err := s.validator.Load().Validate(clientCert) if err != nil { logger.Warn("client certificate validation failed: %v", err) writeError(w, http.StatusUnauthorized, err.Error()) diff --git a/internal/api/http/validator_test.go b/internal/api/http/validator_test.go index 7afd766..6cdfe96 100644 --- a/internal/api/http/validator_test.go +++ b/internal/api/http/validator_test.go @@ -79,6 +79,91 @@ func TestCertValidatorRejectsRevokedCertificate(t *testing.T) { } } +func TestAPIServerRejectsCertRevokedAfterConfigReload(t *testing.T) { + caCert, caKey, caPEM := newTestCA(t) + tempDir := t.TempDir() + caPath := filepath.Join(tempDir, "ca.pem") + if err := os.WriteFile(caPath, caPEM, 0o600); err != nil { + t.Fatalf("failed to write CA file: %v", err) + } + + aliceCert := newTestClientCert(t, caCert, caKey, "alice", 10) + + // Build server as at startup – no CRL, alice passes. + s := &APIServer{} + v, err := newCertValidator(&config.Config{ + CertAuth: config.CertAuthConfig{CACertFile: caPath}, + Server: config.ServerConfig{AllowedCNs: []string{"alice"}}, + }) + if err != nil { + t.Fatalf("failed to create initial validator: %v", err) + } + s.validator.Store(v) + + if _, err := s.validator.Load().Validate(aliceCert); err != nil { + t.Fatalf("alice should pass before reload: %v", err) + } + + // Write a CRL that revokes alice. + crlData := newTestCRL(t, caCert, caKey, aliceCert) + crlPath := filepath.Join(tempDir, "clients.crl") + if err := os.WriteFile(crlPath, crlData, 0o600); err != nil { + t.Fatalf("failed to write CRL file: %v", err) + } + + // Simulate SIGHUP: reload security config with CRL. + if err := s.ReloadSecurityConfig(&config.Config{ + CertAuth: config.CertAuthConfig{CACertFile: caPath}, + Server: config.ServerConfig{AllowedCNs: []string{"alice"}, ClientCRLFile: crlPath}, + }); err != nil { + t.Fatalf("ReloadSecurityConfig failed: %v", err) + } + + // After reload, alice must be rejected. + if _, err := s.validator.Load().Validate(aliceCert); err == nil { + t.Error("server should reject alice after CRL config reload") + } +} + +func TestAPIServerAcceptsNewCNAfterConfigReload(t *testing.T) { + caCert, caKey, caPEM := newTestCA(t) + tempDir := t.TempDir() + caPath := filepath.Join(tempDir, "ca.pem") + if err := os.WriteFile(caPath, caPEM, 0o600); err != nil { + t.Fatalf("failed to write CA file: %v", err) + } + + bobCert := newTestClientCert(t, caCert, caKey, "bob", 3) + + // Build server as at startup – only alice allowed, bob rejected. + s := &APIServer{} + v, err := newCertValidator(&config.Config{ + CertAuth: config.CertAuthConfig{CACertFile: caPath}, + Server: config.ServerConfig{AllowedCNs: []string{"alice"}}, + }) + if err != nil { + t.Fatalf("failed to create initial validator: %v", err) + } + s.validator.Store(v) + + if _, err := s.validator.Load().Validate(bobCert); err == nil { + t.Fatalf("bob should be rejected before reload") + } + + // Simulate SIGHUP: reload security config to also allow bob. + if err := s.ReloadSecurityConfig(&config.Config{ + CertAuth: config.CertAuthConfig{CACertFile: caPath}, + Server: config.ServerConfig{AllowedCNs: []string{"alice", "bob"}}, + }); err != nil { + t.Fatalf("ReloadSecurityConfig failed: %v", err) + } + + // After reload, bob must be accepted. + if _, err := s.validator.Load().Validate(bobCert); err != nil { + t.Errorf("server should accept bob after allowedCNs config reload: %v", err) + } +} + func newTestCA(t *testing.T) (*x509.Certificate, *rsa.PrivateKey, []byte) { t.Helper() key, err := rsa.GenerateKey(rand.Reader, 2048) diff --git a/internal/cli/cli.go b/internal/cli/cli.go index 706c562..2ed0e87 100644 --- a/internal/cli/cli.go +++ b/internal/cli/cli.go @@ -1399,7 +1399,8 @@ func RepsetDiffCLI(cmd *cli.Command) error { } func StartSchedulerCLI(_ context.Context, cmd *cli.Command) error { - if config.Cfg == nil { + cfg := config.Get() + if cfg == nil { return fmt.Errorf("configuration not loaded; run inside a directory with ace.yaml or set ACE_CONFIG") } @@ -1429,9 +1430,11 @@ func StartSchedulerCLI(_ context.Context, cmd *cli.Command) error { // Start the API server once. It does not need to restart on reload because // it handles on-demand requests rather than reading scheduled job config. + var apiServer *server.APIServer if runAPI { - if ok, apiErr := canStartAPIServer(config.Cfg); ok { - apiServer, err := server.New(config.Cfg) + if ok, apiErr := canStartAPIServer(cfg); ok { + var err error + apiServer, err = server.New(cfg) if err != nil { return fmt.Errorf("api server init failed: %w", err) } @@ -1449,14 +1452,15 @@ func StartSchedulerCLI(_ context.Context, cmd *cli.Command) error { } if !runScheduler { - // API-only mode: just wait for shutdown. + // API-only mode: reload config on SIGHUP, wait for shutdown. + go runConfigReloadLoop(runCtx, sighupCh, apiServer) <-runCtx.Done() return nil } // schedulerReloadLoop runs the scheduler and restarts it on each valid // SIGHUP. It returns only when runCtx is canceled (SIGINT/SIGTERM). - return schedulerReloadLoop(runCtx, sighupCh) + return schedulerReloadLoop(runCtx, sighupCh, apiServer) } // schedulerReloadLoop is the heart of the SIGHUP feature. @@ -1474,6 +1478,7 @@ func StartSchedulerCLI(_ context.Context, cmd *cli.Command) error { func schedulerReloadLoop( runCtx context.Context, sighupCh <-chan os.Signal, + apiServer *server.APIServer, ) error { for { currentCfg := config.Get() @@ -1529,9 +1534,9 @@ func schedulerReloadLoop( return nil case <-sighupCh: - logger.Info("scheduler: received SIGHUP – reloading configuration from %s", config.CfgPath) + logger.Info("scheduler: received SIGHUP – reloading configuration") - newCfg, loadErr := config.Reload(config.CfgPath) + newCfg, loadErr := config.Reload() if loadErr != nil { logger.Error("scheduler: config reload failed (keeping current config): %v", loadErr) continue // wait for next signal @@ -1554,6 +1559,11 @@ func schedulerReloadLoop( // Atomic config swap. config.Set(newCfg) + if apiServer != nil { + if err := apiServer.ReloadSecurityConfig(newCfg); err != nil { + logger.Warn("scheduler: security config reload failed (mTLS config unchanged): %v", err) + } + } logger.Info("scheduler: configuration reloaded successfully") reloaded = true // break inner loop → outer loop restarts scheduler } @@ -1565,15 +1575,16 @@ func schedulerReloadLoop( } func StartAPIServerCLI(_ context.Context, cmd *cli.Command) error { - if config.Cfg == nil { + cfg := config.Get() + if cfg == nil { return fmt.Errorf("configuration not loaded; run inside a directory with ace.yaml or set ACE_CONFIG") } - if ok, err := canStartAPIServer(config.Cfg); !ok { + if ok, err := canStartAPIServer(cfg); !ok { return err } - apiServer, err := server.New(config.Cfg) + apiServer, err := server.New(cfg) if err != nil { return err } @@ -1581,9 +1592,43 @@ func StartAPIServerCLI(_ context.Context, cmd *cli.Command) error { runCtx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer stop() + sighupCh := make(chan os.Signal, 1) + signal.Notify(sighupCh, syscall.SIGHUP) + defer signal.Stop(sighupCh) + go runConfigReloadLoop(runCtx, sighupCh, apiServer) + return apiServer.Run(runCtx) } +// runConfigReloadLoop reloads config on each SIGHUP until the channel +// is closed or ctx is canceled. Used by API-only and standalone-API code paths. +func runConfigReloadLoop(ctx context.Context, ch <-chan os.Signal, apiServer *server.APIServer) { + for { + select { + case <-ctx.Done(): + return + case <-ch: + logger.Info("api: received SIGHUP – reloading configuration") + newCfg, err := config.Reload() + if err != nil { + logger.Error("api: config reload failed (keeping current config): %v", err) + continue + } + if ok, valErr := canStartAPIServer(newCfg); !ok { + logger.Error("api: new config rejected (keeping current config): %v", valErr) + continue + } + config.Set(newCfg) + if apiServer != nil { + if err := apiServer.ReloadSecurityConfig(newCfg); err != nil { + logger.Warn("api: security config reload failed (mTLS config unchanged): %v", err) + } + } + logger.Info("api: configuration reloaded successfully") + } + } +} + func canStartAPIServer(cfg *config.Config) (bool, error) { if cfg == nil { return false, fmt.Errorf("configuration not loaded") diff --git a/internal/consistency/diff/table_diff.go b/internal/consistency/diff/table_diff.go index 7fd9e28..4ca5bb1 100644 --- a/internal/consistency/diff/table_diff.go +++ b/internal/consistency/diff/table_diff.go @@ -727,18 +727,19 @@ func (t *TableDiffTask) Validate() error { return fmt.Errorf("cluster_name and table_name are required arguments") } - if t.BlockSize > config.Cfg.TableDiff.MaxBlockSize && !t.OverrideBlockSize { - return fmt.Errorf("block row size should be <= %d", config.Cfg.TableDiff.MaxBlockSize) + cfg := config.Get() + if t.BlockSize > cfg.TableDiff.MaxBlockSize && !t.OverrideBlockSize { + return fmt.Errorf("block row size should be <= %d", cfg.TableDiff.MaxBlockSize) } - if t.BlockSize < config.Cfg.TableDiff.MinBlockSize && !t.OverrideBlockSize { - return fmt.Errorf("block row size should be >= %d", config.Cfg.TableDiff.MinBlockSize) + if t.BlockSize < cfg.TableDiff.MinBlockSize && !t.OverrideBlockSize { + return fmt.Errorf("block row size should be >= %d", cfg.TableDiff.MinBlockSize) } if t.MaxDiffRows < 0 { return fmt.Errorf("max_diff_rows must be >= 0") } - if t.MaxDiffRows == 0 && config.Cfg.TableDiff.MaxDiffRows > 0 { - t.MaxDiffRows = config.Cfg.TableDiff.MaxDiffRows + if t.MaxDiffRows == 0 && cfg.TableDiff.MaxDiffRows > 0 { + t.MaxDiffRows = cfg.TableDiff.MaxDiffRows } if t.ConcurrencyFactor > 4.0 || t.ConcurrencyFactor <= 0 { diff --git a/internal/consistency/mtree/merkle.go b/internal/consistency/mtree/merkle.go index ac9e84c..475a39d 100644 --- a/internal/consistency/mtree/merkle.go +++ b/internal/consistency/mtree/merkle.go @@ -47,8 +47,14 @@ import ( "github.com/vbauerster/mpb/v8/decor" ) -func aceSchema() string { - return config.Cfg.MTree.Schema +// aceSchema returns the PostgreSQL schema used for merkle-tree objects. +// The value is captured from config on first call so that a mid-task +// SIGHUP reload cannot cause mixed-schema references within one task. +func (m *MerkleTreeTask) aceSchema() string { + if m.mtreeSchema == "" { + m.mtreeSchema = config.Get().MTree.Schema + } + return m.mtreeSchema } const ( @@ -84,8 +90,10 @@ type MerkleTreeTask struct { TaskStore *taskstore.Store TaskStorePath string - DiffResult types.DiffOutput - diffMutex sync.Mutex + mtreeSchema string + + DiffResult types.DiffOutput + diffMutex sync.Mutex diffRowKeySets map[string]map[string]map[string]struct{} StartTime time.Time SpockNodeNames map[string]string @@ -782,7 +790,7 @@ func (m *MerkleTreeTask) MtreeInit() (err error) { return fmt.Errorf("validation failed: %w", err) } - cfg := config.Cfg.MTree.CDC + cfg := config.Get().MTree.CDC for _, nodeInfo := range m.ClusterNodes { logger.Info("Initialising Merkle tree objects on node: %s", nodeInfo["Name"]) @@ -798,8 +806,8 @@ func (m *MerkleTreeTask) MtreeInit() (err error) { } defer pool.Close() - if err = queries.CreateSchema(m.Ctx, pool, aceSchema()); err != nil { - return fmt.Errorf("failed to create schema '%s': %w", aceSchema(), err) + if err = queries.CreateSchema(m.Ctx, pool, m.aceSchema()); err != nil { + return fmt.Errorf("failed to create schema '%s': %w", m.aceSchema(), err) } tx, err := pool.Begin(m.Ctx) @@ -877,7 +885,7 @@ func (m *MerkleTreeTask) MtreeTeardownTable() (err error) { return fmt.Errorf("validation failed: %w", err) } - cfg := config.Cfg.MTree.CDC + cfg := config.Get().MTree.CDC for _, nodeInfo := range m.ClusterNodes { logger.Info("Tearing down Merkle tree objects for table '%s' on node: %s", m.QualifiedTableName, nodeInfo["Name"]) @@ -913,7 +921,7 @@ func (m *MerkleTreeTask) MtreeTeardownTable() (err error) { } defer tx.Rollback(m.Ctx) - mtreeTableIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)} + mtreeTableIdentifier := pgx.Identifier{m.aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)} mtreeTableName := mtreeTableIdentifier.Sanitize() err = queries.DropMtreeTable(m.Ctx, tx, mtreeTableName) if err != nil { @@ -936,7 +944,7 @@ func (m *MerkleTreeTask) MtreeTeardownTable() (err error) { logger.Info("Metadata for table %s deleted on node %s", m.QualifiedTableName, nodeInfo["Name"]) if !m.SimplePrimaryKey { - compositeTypeIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("%s_%s_key_type", m.Schema, m.Table)} + compositeTypeIdentifier := pgx.Identifier{m.aceSchema(), fmt.Sprintf("%s_%s_key_type", m.Schema, m.Table)} compositeTypeName := compositeTypeIdentifier.Sanitize() err = queries.DropCompositeType(m.Ctx, tx, compositeTypeName) if err != nil { @@ -1095,7 +1103,7 @@ func (m *MerkleTreeTask) Validate() error { if m.Mode == "listen" { return nil } - cfg := config.Cfg.MTree.Diff + cfg := config.Get().MTree.Diff if m.BlockSize != 0 && !m.OverrideBlockSize { if m.BlockSize > cfg.MaxBlockSize { @@ -1246,7 +1254,7 @@ func (m *MerkleTreeTask) BuildMtree() (err error) { var blockRanges []types.BlockRange var numBlocks int - cfg := config.Cfg.MTree.CDC + cfg := config.Get().MTree.CDC pools := make(map[string]*pgxpool.Pool, len(m.ClusterNodes)) numWorkers := int(math.Ceil(float64(runtime.NumCPU()) * m.MaxCpuRatio * 2)) @@ -1484,7 +1492,7 @@ func (m *MerkleTreeTask) UpdateMtree(skipAllChecks bool) (err error) { } if !m.NoCDC { - cdcCfg := config.Cfg.MTree.CDC + cdcCfg := config.Get().MTree.CDC timeout := 30 * time.Second if cdcCfg.CDCProcessingTimeout > 0 { timeout = time.Duration(cdcCfg.CDCProcessingTimeout) * time.Second @@ -1549,7 +1557,7 @@ func (m *MerkleTreeTask) UpdateMtree(skipAllChecks bool) (err error) { var compositeTypeName string if !m.SimplePrimaryKey { - compositeTypeIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("%s_%s_key_type", m.Schema, m.Table)} + compositeTypeIdentifier := pgx.Identifier{m.aceSchema(), fmt.Sprintf("%s_%s_key_type", m.Schema, m.Table)} compositeTypeName = compositeTypeIdentifier.Sanitize() dt, err := conn.Conn().LoadType(m.Ctx, compositeTypeName) if err != nil { @@ -1564,7 +1572,7 @@ func (m *MerkleTreeTask) UpdateMtree(skipAllChecks bool) (err error) { } defer tx.Rollback(m.Ctx) - mtreeTableIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)} + mtreeTableIdentifier := pgx.Identifier{m.aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)} mtreeTableName := mtreeTableIdentifier.Sanitize() // Check if stored hashes use an older algorithm and need full recomputation. @@ -1694,12 +1702,12 @@ func (m *MerkleTreeTask) UpdateMtree(skipAllChecks bool) (err error) { } func (m *MerkleTreeTask) splitBlocks(tx pgx.Tx, blocksToSplit []types.BlockRange) ([]int64, error) { - mtreeTableIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)} + mtreeTableIdentifier := pgx.Identifier{m.aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)} mtreeTableName := mtreeTableIdentifier.Sanitize() isComposite := !m.SimplePrimaryKey var modifiedPositions []int64 - compositeTypeIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("%s_%s_key_type", m.Schema, m.Table)} + compositeTypeIdentifier := pgx.Identifier{m.aceSchema(), fmt.Sprintf("%s_%s_key_type", m.Schema, m.Table)} compositeTypeName := compositeTypeIdentifier.Sanitize() currentBlocks := make([]types.BlockRange, len(blocksToSplit)) @@ -1816,7 +1824,7 @@ func (m *MerkleTreeTask) splitBlocks(tx pgx.Tx, blocksToSplit []types.BlockRange func (m *MerkleTreeTask) performMerges(tx pgx.Tx) ([]int64, error) { var allModifiedPositions []int64 - mtreeTableIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)} + mtreeTableIdentifier := pgx.Identifier{m.aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)} mtreeTableName := mtreeTableIdentifier.Sanitize() for { @@ -1884,7 +1892,7 @@ func (m *MerkleTreeTask) DiffMtree() (err error) { logger.Warn("mtree diff: unable to load spock node names; using raw node_origin values: %v", err) } nodePairs := getNodePairs(m.ClusterNodes) - mtreeTableIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)} + mtreeTableIdentifier := pgx.Identifier{m.aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)} mtreeTableName := mtreeTableIdentifier.Sanitize() allNodePairBatches := make(map[string]struct { @@ -2016,7 +2024,7 @@ func (m *MerkleTreeTask) DiffMtree() (err error) { func (m *MerkleTreeTask) findMismatchedLeaves(pool1, pool2 *pgxpool.Pool, parentLevel int, parentPosition int64) (map[int64]bool, error) { mismatched := make(map[int64]bool) - mtreeTableIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)} + mtreeTableIdentifier := pgx.Identifier{m.aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)} mtreeTableName := mtreeTableIdentifier.Sanitize() children1, err := queries.GetNodeChildren(m.Ctx, pool1, mtreeTableName, parentLevel, int(parentPosition)) @@ -2077,7 +2085,7 @@ func (m *MerkleTreeTask) findMismatchedLeaves(pool1, pool2 *pgxpool.Pool, parent } func (m *MerkleTreeTask) getPkeyBatches(pool1, pool2 *pgxpool.Pool, mismatchedPositions []int64) ([][2][]any, error) { - mtreeTableIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)} + mtreeTableIdentifier := pgx.Identifier{m.aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)} mtreeTableName := mtreeTableIdentifier.Sanitize() leafRanges1, err := queries.GetLeafRanges(m.Ctx, pool1, mtreeTableName, mismatchedPositions, m.SimplePrimaryKey, m.Key) @@ -2274,12 +2282,12 @@ func boundaryToSlice(b any) []any { } func (m *MerkleTreeTask) mergeBlocks(tx pgx.Tx, blocksToMerge []types.BlockRange) ([]int64, error) { - mtreeTableIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)} + mtreeTableIdentifier := pgx.Identifier{m.aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)} mtreeTableName := mtreeTableIdentifier.Sanitize() isComposite := !m.SimplePrimaryKey var modifiedPositions []int64 - compositeTypeIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("%s_%s_key_type", m.Schema, m.Table)} + compositeTypeIdentifier := pgx.Identifier{m.aceSchema(), fmt.Sprintf("%s_%s_key_type", m.Schema, m.Table)} compositeTypeName := compositeTypeIdentifier.Sanitize() if err := queries.DeleteParentNodes(m.Ctx, tx, mtreeTableName); err != nil { @@ -2330,7 +2338,7 @@ func (m *MerkleTreeTask) mergeBlocks(tx pgx.Tx, blocksToMerge []types.BlockRange } func (m *MerkleTreeTask) buildParentNodes(conn queries.DBQuerier) error { - mtreeTableIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)} + mtreeTableIdentifier := pgx.Identifier{m.aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)} mtreeTableName := mtreeTableIdentifier.Sanitize() var err error @@ -2377,7 +2385,7 @@ type LeafHashResult struct { } func (m *MerkleTreeTask) computeLeafHashes(pool *pgxpool.Pool, tx pgx.Tx, ranges []types.BlockRange, numWorkers int, barMessage string) error { - mtreeTableIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)} + mtreeTableIdentifier := pgx.Identifier{m.aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)} mtreeTableName := mtreeTableIdentifier.Sanitize() jobs := make(chan types.BlockRange, len(ranges)) @@ -2447,7 +2455,7 @@ func (m *MerkleTreeTask) leafHashWorker(wg *sync.WaitGroup, jobs <-chan types.Bl } func (m *MerkleTreeTask) insertBlockRanges(conn queries.DBQuerier, ranges []types.BlockRange) error { - mtreeTableIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)} + mtreeTableIdentifier := pgx.Identifier{m.aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)} if m.SimplePrimaryKey { if err := queries.InsertBlockRangesBatchSimple(m.Ctx, conn, mtreeTableIdentifier.Sanitize(), ranges); err != nil { @@ -2464,9 +2472,9 @@ func (m *MerkleTreeTask) insertBlockRanges(conn queries.DBQuerier, ranges []type func (m *MerkleTreeTask) createMtreeObjects(tx pgx.Tx, totalRows int64, numBlocks int) error { - err := queries.CreateSchema(m.Ctx, tx, aceSchema()) + err := queries.CreateSchema(m.Ctx, tx, m.aceSchema()) if err != nil { - return fmt.Errorf("failed to create schema '%s': %w", aceSchema(), err) + return fmt.Errorf("failed to create schema '%s': %w", m.aceSchema(), err) } err = queries.CreateXORFunction(m.Ctx, tx) @@ -2484,7 +2492,7 @@ func (m *MerkleTreeTask) createMtreeObjects(tx pgx.Tx, totalRows int64, numBlock return fmt.Errorf("failed to update metadata: %w", err) } - mtreeTableIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)} + mtreeTableIdentifier := pgx.Identifier{m.aceSchema(), fmt.Sprintf("ace_mtree_%s_%s", m.Schema, m.Table)} mtreeTableName := mtreeTableIdentifier.Sanitize() err = queries.DropMtreeTable(m.Ctx, tx, mtreeTableName) if err != nil { @@ -2510,7 +2518,7 @@ func (m *MerkleTreeTask) createMtreeObjects(tx pgx.Tx, totalRows int64, numBlock keyTypeColumns[i] = fmt.Sprintf("%s %s", pgx.Identifier{col}.Sanitize(), colType) } - compositeTypeIdentifier := pgx.Identifier{aceSchema(), fmt.Sprintf("%s_%s_key_type", m.Schema, m.Table)} + compositeTypeIdentifier := pgx.Identifier{m.aceSchema(), fmt.Sprintf("%s_%s_key_type", m.Schema, m.Table)} compositeTypeName := compositeTypeIdentifier.Sanitize() err = queries.DropCompositeType(m.Ctx, tx, compositeTypeName) diff --git a/internal/infra/cdc/listen.go b/internal/infra/cdc/listen.go index 234a195..82f68a5 100644 --- a/internal/infra/cdc/listen.go +++ b/internal/infra/cdc/listen.go @@ -58,7 +58,9 @@ func processReplicationStream(ctx context.Context, nodeInfo map[string]any, cont } processingCtx := context.WithoutCancel(ctx) - cfg := config.Cfg.MTree.CDC + mtreeCfg := config.Get().MTree + cfg := mtreeCfg.CDC + mtreeSchema := mtreeCfg.Schema publication := cfg.PublicationName slotName := cfg.SlotName var startLSNStr string @@ -123,8 +125,8 @@ func processReplicationStream(ctx context.Context, nodeInfo map[string]any, cont var lastLSNVal atomic.Uint64 lastLSNVal.Store(uint64(lastLSN)) flushInterval := 10 * time.Second - if config.Cfg.MTree.CDC.CDCMetadataFlushSec > 0 { - flushInterval = time.Duration(config.Cfg.MTree.CDC.CDCMetadataFlushSec) * time.Second + if cfg.CDCMetadataFlushSec > 0 { + flushInterval = time.Duration(cfg.CDCMetadataFlushSec) * time.Second } lastFlushTime := time.Now() var conn *pgconn.PgConn @@ -321,7 +323,7 @@ func processReplicationStream(ctx context.Context, nodeInfo map[string]any, cont defer wg.Done() workerSem <- struct{}{} defer func() { <-workerSem }() - if err := processChanges(processingCtx, pool, c); err != nil { + if err := processChanges(processingCtx, pool, c, mtreeSchema); err != nil { select { case errCh <- err: default: @@ -531,7 +533,7 @@ var quotedOIDs = map[uint32]bool{ 2950: true, // uuid } -func processChanges(ctx context.Context, pool *pgxpool.Pool, changes []cdcMsg) error { +func processChanges(ctx context.Context, pool *pgxpool.Pool, changes []cdcMsg, mtreeSchema string) error { logger.Debug("processChanges called with %d changes", len(changes)) tx, err := pool.Begin(ctx) if err != nil { @@ -551,14 +553,14 @@ func processChanges(ctx context.Context, pool *pgxpool.Pool, changes []cdcMsg) e firstChange := tableChanges[0] schema := firstChange.schema table := firstChange.table - mtreeTable := fmt.Sprintf("%s.ace_mtree_%s_%s", config.Cfg.MTree.Schema, schema, table) + mtreeTable := fmt.Sprintf("%s.ace_mtree_%s_%s", mtreeSchema, schema, table) logger.Debug("Processing %d changes for table %s.%s (mtree: %s)", len(tableChanges), schema, table, mtreeTable) var inserts, deletes, updates []string relation := firstChange.relation isComposite := len(getPrimaryKeyColumns(relation)) > 1 - compositeTypeName := fmt.Sprintf("%s.%s_%s_key_type", config.Cfg.MTree.Schema, schema, table) + compositeTypeName := fmt.Sprintf("%s.%s_%s_key_type", mtreeSchema, schema, table) var pkeyType string if !isComposite { for _, col := range relation.Columns { diff --git a/internal/infra/cdc/setup.go b/internal/infra/cdc/setup.go index 2ea1a16..a135cb8 100644 --- a/internal/infra/cdc/setup.go +++ b/internal/infra/cdc/setup.go @@ -39,7 +39,7 @@ func SetupPublication(ctx context.Context, db queries.DBQuerier, publicationName } func SetupReplicationSlot(ctx context.Context, nodeInfo map[string]any) (pglogrepl.LSN, error) { - cfg := config.Cfg.MTree.CDC + cfg := config.Get().MTree.CDC slot := cfg.SlotName pool, err := auth.GetClusterNodeConnection(ctx, nodeInfo, auth.ConnectionOptions{}) diff --git a/internal/infra/db/auth.go b/internal/infra/db/auth.go index 780ba23..247eac6 100644 --- a/internal/infra/db/auth.go +++ b/internal/infra/db/auth.go @@ -95,7 +95,7 @@ func toConnectionString(node map[string]any, dbName string) string { parts = append(parts, "dbname="+dbToUse) } - cfg := config.Cfg + cfg := config.Get() if cfg != nil { pgCfg := cfg.Postgres if pgCfg.ConnectionTimeout > 0 { @@ -208,7 +208,7 @@ func applyPostgresPoolConfig(poolCfg *pgxpool.Config) { if poolCfg == nil || poolCfg.ConnConfig == nil { return } - cfg := config.Cfg + cfg := config.Get() if cfg == nil { return } @@ -226,7 +226,7 @@ func applyPostgresPgconnConfig(pgCfg *pgconn.Config) { if pgCfg == nil { return } - cfg := config.Cfg + cfg := config.Get() if cfg == nil { return } diff --git a/pkg/config/config.go b/pkg/config/config.go index 3108121..1bd1e31 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -113,11 +113,9 @@ type CertAuthConfig struct { CACertFile string `yaml:"ca_cert_file"` } -// Cfg holds the loaded config for the whole app. Direct reads of Cfg are -// only safe during initialization, before the scheduler or SIGHUP reload -// loop starts. Any code running concurrently with Set() or Init() — such -// as scheduler goroutines or request handlers — must call Get() instead to -// obtain a consistent snapshot under the read lock. +// Cfg holds the loaded config for the whole app. +// Use Get() for any code that runs concurrently with SIGHUP reload; bare +// reads of Cfg are only safe during single-threaded startup. var Cfg *Config // CfgPath is the path from which the current config was loaded. @@ -153,15 +151,12 @@ func Init(path string) error { return nil } -// Reload loads a new Config from path and returns it for validation. -// The caller is responsible for calling Set to apply the new config. -// Reload does NOT modify the active Cfg; use it as a dry-run / validation step. -func Reload(path string) (*Config, error) { - if path == "" { - cfgMu.RLock() - path = CfgPath - cfgMu.RUnlock() - } +// Reload re-reads the configuration file and returns the parsed result +// without replacing the active config. +func Reload() (*Config, error) { + cfgMu.RLock() + path := CfgPath + cfgMu.RUnlock() return Load(path) }