diff --git a/internal/consistency/diff/schema_diff.go b/internal/consistency/diff/schema_diff.go index 0b42499..9b31285 100644 --- a/internal/consistency/diff/schema_diff.go +++ b/internal/consistency/diff/schema_diff.go @@ -12,10 +12,12 @@ package diff import ( + "bufio" "context" "encoding/json" "fmt" "maps" + "os" "strconv" "strings" "time" @@ -107,6 +109,50 @@ func NewSchemaDiffTask() *SchemaDiffCmd { } } +func (c *SchemaDiffCmd) parseSkipList() error { + var raw []string + if c.SkipTables != "" { + raw = append(raw, strings.Split(c.SkipTables, ",")...) + } + if c.SkipFile != "" { + file, err := os.Open(c.SkipFile) + if err != nil { + return fmt.Errorf("could not open skip file: %w", err) + } + defer file.Close() + scanner := bufio.NewScanner(file) + for scanner.Scan() { + raw = append(raw, scanner.Text()) + } + if err := scanner.Err(); err != nil { + return fmt.Errorf("error reading skip file: %w", err) + } + } + + // Normalize entries: accept both "table" and "schema.table" forms. + // If schema-qualified, the schema must match c.SchemaName. + tables := make([]string, 0, len(raw)) + for _, entry := range raw { + entry = strings.TrimSpace(entry) + if entry == "" { + continue + } + if parts := strings.SplitN(entry, ".", 2); len(parts) == 2 { + if parts[0] != c.SchemaName { + return fmt.Errorf("skip table %q: schema %q does not match target schema %q", + entry, parts[0], c.SchemaName) + } + entry = strings.TrimSpace(parts[1]) + if entry == "" { + return fmt.Errorf("skip table %q: missing table name after schema qualifier", parts[0]+".") + } + } + tables = append(tables, entry) + } + c.skipTablesList = tables + return nil +} + func (c *SchemaDiffCmd) Validate() error { if c.ClusterName == "" { return fmt.Errorf("cluster name is required") @@ -139,6 +185,10 @@ func (c *SchemaDiffCmd) RunChecks(skipValidation bool) error { } } + if err := c.parseSkipList(); err != nil { + return err + } + if err := utils.ReadClusterInfo(c); err != nil { return err } @@ -340,7 +390,7 @@ func (task *SchemaDiffCmd) SchemaTableDiff() (err error) { } } - var tablesProcessed, tablesFailed int + var tablesProcessed, tablesFailed, tablesSkipped int var failedTables []string defer func() { @@ -356,10 +406,11 @@ func (task *SchemaDiffCmd) SchemaTableDiff() (err error) { if recorder != nil && recorder.Created() { ctx := map[string]any{ - "tables_total": len(task.tableList), - "tables_diffed": tablesProcessed, - "tables_failed": tablesFailed, - "ddl_only": task.DDLOnly, + "tables_total": len(task.tableList), + "tables_diffed": tablesProcessed, + "tables_failed": tablesFailed, + "tables_skipped": tablesSkipped, + "ddl_only": task.DDLOnly, } if len(failedTables) > 0 { ctx["failed_tables"] = failedTables @@ -396,6 +447,21 @@ func (task *SchemaDiffCmd) SchemaTableDiff() (err error) { } for _, tableName := range task.tableList { + var skipped bool + for _, skip := range task.skipTablesList { + if skip == tableName { + if !task.Quiet { + logger.Info("Skipping table: %s", tableName) + } + skipped = true + break + } + } + if skipped { + tablesSkipped++ + continue + } + qualifiedTableName := fmt.Sprintf("%s.%s", task.SchemaName, tableName) if !task.Quiet { logger.Info("Diffing table: %s", qualifiedTableName) diff --git a/internal/consistency/diff/schema_diff_test.go b/internal/consistency/diff/schema_diff_test.go new file mode 100644 index 0000000..f454a2a --- /dev/null +++ b/internal/consistency/diff/schema_diff_test.go @@ -0,0 +1,261 @@ +// /////////////////////////////////////////////////////////////////////////// +// +// # ACE - Active Consistency Engine +// +// Copyright (C) 2023 - 2026, pgEdge (https://www.pgedge.com/) +// +// This software is released under the PostgreSQL License: +// https://opensource.org/license/postgresql +// +// /////////////////////////////////////////////////////////////////////////// + +package diff + +import ( + "os" + "path/filepath" + "strings" + "testing" +) + +// writeSkipFile is a helper that writes lines to a temp file and returns its path. +func writeSkipFile(t *testing.T, lines ...string) string { + t.Helper() + f, err := os.CreateTemp(t.TempDir(), "skip-*.txt") + if err != nil { + t.Fatalf("create temp skip file: %v", err) + } + for _, line := range lines { + if _, err := f.WriteString(line + "\n"); err != nil { + t.Fatalf("write skip file: %v", err) + } + } + f.Close() + return f.Name() +} + +// TestParseSkipList_Empty verifies that an empty SkipTables / SkipFile leaves +// skipTablesList empty (no allocations, no errors). +func TestParseSkipList_Empty(t *testing.T) { + cmd := &SchemaDiffCmd{SchemaName: "public"} + if err := cmd.parseSkipList(); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(cmd.skipTablesList) != 0 { + t.Errorf("skipTablesList = %v, want empty", cmd.skipTablesList) + } +} + +// TestParseSkipList_FromFlag verifies comma-separated tables in SkipTables are +// split into individual entries. Schema-qualified entries are stripped. +func TestParseSkipList_FromFlag(t *testing.T) { + cmd := &SchemaDiffCmd{ + SchemaName: "public", + SkipTables: "public.orders,public.audit_log,public.sessions", + } + if err := cmd.parseSkipList(); err != nil { + t.Fatalf("unexpected error: %v", err) + } + want := []string{"orders", "audit_log", "sessions"} + if len(cmd.skipTablesList) != len(want) { + t.Fatalf("skipTablesList len = %d, want %d", len(cmd.skipTablesList), len(want)) + } + for i, w := range want { + if cmd.skipTablesList[i] != w { + t.Errorf("skipTablesList[%d] = %q, want %q", i, cmd.skipTablesList[i], w) + } + } +} + +// TestParseSkipList_FromFile verifies that tables listed one-per-line in a +// skip file are read and stored correctly, with schema prefix stripped. +func TestParseSkipList_FromFile(t *testing.T) { + path := writeSkipFile(t, "public.orders", "public.audit_log") + cmd := &SchemaDiffCmd{SchemaName: "public", SkipFile: path} + if err := cmd.parseSkipList(); err != nil { + t.Fatalf("unexpected error: %v", err) + } + want := []string{"orders", "audit_log"} + if len(cmd.skipTablesList) != len(want) { + t.Fatalf("skipTablesList len = %d, want %d", len(cmd.skipTablesList), len(want)) + } + for i, w := range want { + if cmd.skipTablesList[i] != w { + t.Errorf("skipTablesList[%d] = %q, want %q", i, cmd.skipTablesList[i], w) + } + } +} + +// TestParseSkipList_FromBoth verifies that entries from both SkipTables and +// SkipFile are merged, with the flag entries first. +func TestParseSkipList_FromBoth(t *testing.T) { + path := writeSkipFile(t, "public.from_file") + cmd := &SchemaDiffCmd{ + SchemaName: "public", + SkipTables: "public.from_flag", + SkipFile: path, + } + if err := cmd.parseSkipList(); err != nil { + t.Fatalf("unexpected error: %v", err) + } + want := []string{"from_flag", "from_file"} + if len(cmd.skipTablesList) != len(want) { + t.Fatalf("skipTablesList = %v, want %v", cmd.skipTablesList, want) + } + for i, w := range want { + if cmd.skipTablesList[i] != w { + t.Errorf("skipTablesList[%d] = %q, want %q", i, cmd.skipTablesList[i], w) + } + } +} + +// TestParseSkipList_MissingFile verifies that a non-existent SkipFile path +// returns an error rather than silently succeeding. +func TestParseSkipList_MissingFile(t *testing.T) { + cmd := &SchemaDiffCmd{ + SchemaName: "public", + SkipFile: filepath.Join(t.TempDir(), "does-not-exist.txt"), + } + if err := cmd.parseSkipList(); err == nil { + t.Fatal("expected error for missing skip file, got nil") + } +} + +// TestParseSkipList_SingleEntry verifies a single table name with no comma +// is stored as one entry (regression guard: Split("x", ",") -> ["x"]). +func TestParseSkipList_SingleEntry(t *testing.T) { + cmd := &SchemaDiffCmd{SchemaName: "public", SkipTables: "public.orders"} + if err := cmd.parseSkipList(); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(cmd.skipTablesList) != 1 { + t.Fatalf("skipTablesList len = %d, want 1", len(cmd.skipTablesList)) + } + if cmd.skipTablesList[0] != "orders" { + t.Errorf("skipTablesList[0] = %q, want %q", cmd.skipTablesList[0], "orders") + } +} + +// TestParseSkipList_UnqualifiedNames verifies that bare table names (without +// schema prefix) pass through unchanged. +func TestParseSkipList_UnqualifiedNames(t *testing.T) { + cmd := &SchemaDiffCmd{ + SchemaName: "public", + SkipTables: "orders,audit_log", + } + if err := cmd.parseSkipList(); err != nil { + t.Fatalf("unexpected error: %v", err) + } + want := []string{"orders", "audit_log"} + if len(cmd.skipTablesList) != len(want) { + t.Fatalf("skipTablesList len = %d, want %d", len(cmd.skipTablesList), len(want)) + } + for i, w := range want { + if cmd.skipTablesList[i] != w { + t.Errorf("skipTablesList[%d] = %q, want %q", i, cmd.skipTablesList[i], w) + } + } +} + +// TestParseSkipList_WrongSchema verifies that a schema-qualified entry with a +// mismatched schema returns an error. +func TestParseSkipList_WrongSchema(t *testing.T) { + cmd := &SchemaDiffCmd{ + SchemaName: "public", + SkipTables: "other_schema.orders", + } + err := cmd.parseSkipList() + if err == nil { + t.Fatal("expected error for mismatched schema, got nil") + } + if !strings.Contains(err.Error(), "does not match target schema") { + t.Errorf("error = %q, want it to mention schema mismatch", err.Error()) + } +} + +// TestParseSkipList_MixedQualifiedAndBare verifies that a mix of +// schema-qualified and bare table names is handled correctly. +func TestParseSkipList_MixedQualifiedAndBare(t *testing.T) { + cmd := &SchemaDiffCmd{ + SchemaName: "myschema", + SkipTables: "myschema.orders,audit_log,myschema.sessions", + } + if err := cmd.parseSkipList(); err != nil { + t.Fatalf("unexpected error: %v", err) + } + want := []string{"orders", "audit_log", "sessions"} + if len(cmd.skipTablesList) != len(want) { + t.Fatalf("skipTablesList = %v, want %v", cmd.skipTablesList, want) + } + for i, w := range want { + if cmd.skipTablesList[i] != w { + t.Errorf("skipTablesList[%d] = %q, want %q", i, cmd.skipTablesList[i], w) + } + } +} + +// TestParseSkipList_WhitespaceHandling verifies that leading/trailing +// whitespace is trimmed and empty entries are dropped. +func TestParseSkipList_WhitespaceHandling(t *testing.T) { + cmd := &SchemaDiffCmd{ + SchemaName: "public", + SkipTables: " orders , public.audit_log ", + } + if err := cmd.parseSkipList(); err != nil { + t.Fatalf("unexpected error: %v", err) + } + want := []string{"orders", "audit_log"} + if len(cmd.skipTablesList) != len(want) { + t.Fatalf("skipTablesList = %v, want %v", cmd.skipTablesList, want) + } + for i, w := range want { + if cmd.skipTablesList[i] != w { + t.Errorf("skipTablesList[%d] = %q, want %q", i, cmd.skipTablesList[i], w) + } + } +} + +// TestParseSkipList_EmptyLinesInFile verifies that blank lines in a skip file +// are silently ignored. +func TestParseSkipList_EmptyLinesInFile(t *testing.T) { + path := writeSkipFile(t, "orders", "", "audit_log", "") + cmd := &SchemaDiffCmd{SchemaName: "public", SkipFile: path} + if err := cmd.parseSkipList(); err != nil { + t.Fatalf("unexpected error: %v", err) + } + want := []string{"orders", "audit_log"} + if len(cmd.skipTablesList) != len(want) { + t.Fatalf("skipTablesList = %v (len=%d), want %v", cmd.skipTablesList, len(cmd.skipTablesList), want) + } +} + +// TestParseSkipList_TrailingComma verifies that a trailing comma doesn't create +// a phantom empty entry. +func TestParseSkipList_TrailingComma(t *testing.T) { + cmd := &SchemaDiffCmd{SchemaName: "public", SkipTables: "orders,"} + if err := cmd.parseSkipList(); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(cmd.skipTablesList) != 1 { + t.Fatalf("skipTablesList = %v (len=%d), want 1 entry", cmd.skipTablesList, len(cmd.skipTablesList)) + } + if cmd.skipTablesList[0] != "orders" { + t.Errorf("skipTablesList[0] = %q, want %q", cmd.skipTablesList[0], "orders") + } +} + +// TestParseSkipList_EmptyTableAfterSchema verifies that a schema-qualified +// entry with an empty table name (e.g. "public.") returns an error. +func TestParseSkipList_EmptyTableAfterSchema(t *testing.T) { + cmd := &SchemaDiffCmd{ + SchemaName: "public", + SkipTables: "public.", + } + err := cmd.parseSkipList() + if err == nil { + t.Fatal("expected error for empty table name after schema qualifier, got nil") + } + if !strings.Contains(err.Error(), "missing table name") { + t.Errorf("error = %q, want it to mention missing table name", err.Error()) + } +} diff --git a/pkg/config/config.go b/pkg/config/config.go index c56a27f..3108121 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -113,8 +113,11 @@ type CertAuthConfig struct { CACertFile string `yaml:"ca_cert_file"` } -// Cfg holds the loaded config for the whole app. -// Reads may use Cfg directly; concurrent reloads must go through Set/Get. +// Cfg holds the loaded config for the whole app. Direct reads of Cfg are +// only safe during initialization, before the scheduler or SIGHUP reload +// loop starts. Any code running concurrently with Set() or Init() — such +// as scheduler goroutines or request handlers — must call Get() instead to +// obtain a consistent snapshot under the read lock. var Cfg *Config // CfgPath is the path from which the current config was loaded. diff --git a/tests/integration/schema_diff_test.go b/tests/integration/schema_diff_test.go new file mode 100644 index 0000000..48eb4a9 --- /dev/null +++ b/tests/integration/schema_diff_test.go @@ -0,0 +1,165 @@ +// /////////////////////////////////////////////////////////////////////////// +// +// # ACE - Active Consistency Engine +// +// Copyright (C) 2023 - 2025, pgEdge (https://www.pgedge.com/) +// +// This software is released under the PostgreSQL License: +// https://opensource.org/license/postgresql +// +// /////////////////////////////////////////////////////////////////////////// + +package integration + +import ( + "context" + "fmt" + "os" + "path/filepath" + "testing" + + "github.com/jackc/pgx/v5/pgxpool" + "github.com/pgedge/ace/internal/consistency/diff" + "github.com/stretchr/testify/require" +) + +// newTestSchemaDiffTask builds a SchemaDiffCmd wired to the shared test cluster. +func newTestSchemaDiffTask(schemaName, nodes string) *diff.SchemaDiffCmd { + task := diff.NewSchemaDiffTask() + task.ClusterName = pgCluster.ClusterName + task.DBName = dbName + task.SchemaName = schemaName + task.Nodes = nodes + task.Output = "json" + task.Quiet = true + task.SkipDBUpdate = true + task.BlockSize = 10000 + task.CompareUnitSize = 100 + task.ConcurrencyFactor = 1 + return task +} + +// createDivergentTable creates tableName on both nodes and inserts rows only on +// node1, so a table-diff run will always find data present on n1 but missing +// on n2. A t.Cleanup drops the table on both nodes automatically. +func createDivergentTable(t *testing.T, tableName string) { + t.Helper() + ctx := context.Background() + createSQL := fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s.%s ( + id INT PRIMARY KEY, + val TEXT + )`, testSchema, tableName) + + for _, pool := range []*pgxpool.Pool{pgCluster.Node1Pool, pgCluster.Node2Pool} { + _, err := pool.Exec(ctx, createSQL) + require.NoError(t, err, "create table %s", tableName) + } + + // Insert rows on node1 only (node2 stays empty → guaranteed data diff). + _, err := pgCluster.Node1Pool.Exec(ctx, + fmt.Sprintf(`INSERT INTO %s.%s (id, val) VALUES (1, 'divergent')`, testSchema, tableName), + ) + require.NoError(t, err, "insert into %s on node1", tableName) + + t.Cleanup(func() { + dropSQL := fmt.Sprintf(`DROP TABLE IF EXISTS %s.%s CASCADE`, testSchema, tableName) + for _, pool := range []*pgxpool.Pool{pgCluster.Node1Pool, pgCluster.Node2Pool} { + pool.Exec(ctx, dropSQL) //nolint:errcheck – best-effort cleanup + } + // Remove any diff files left by this table. + files, _ := filepath.Glob(fmt.Sprintf("%s_%s_diffs-*.json", testSchema, tableName)) + for _, f := range files { + os.Remove(f) + } + }) +} + +// diffFilesForTable returns all json diff files whose name starts with tableName. +func diffFilesForTable(t *testing.T, tableName string) []string { + t.Helper() + files, err := filepath.Glob(fmt.Sprintf("%s_%s_diffs-*.json", testSchema, tableName)) + require.NoError(t, err) + return files +} + +// TestSchemaDiff_SkipTablesFlag verifies the core fix: a table listed in +// --skip-tables is not diffed by SchemaTableDiff. +// +// Strategy: +// 1. Create a table that has a data difference between node1 and node2. +// 2. Run SchemaTableDiff with that table in SkipTables → no diff file is +// created, proving the table was never handed to TableDiffTask. +// 3. Run SchemaTableDiff WITHOUT SkipTables → a diff file IS created, +// proving the table is diffed normally when not excluded. +func TestSchemaDiff_SkipTablesFlag(t *testing.T) { + const skipTableName = "schema_diff_skip_test" + + createDivergentTable(t, skipTableName) + + nodes := fmt.Sprintf("%s,%s", serviceN1, serviceN2) + + t.Run("SkippedTableProducesNoDiffFile", func(t *testing.T) { + for _, f := range diffFilesForTable(t, skipTableName) { + _ = os.Remove(f) + } + + task := newTestSchemaDiffTask(testSchema, nodes) + task.SkipTables = skipTableName + + err := task.SchemaTableDiff() + require.NoError(t, err) + + files := diffFilesForTable(t, skipTableName) + require.Empty(t, files, + "expected no diff file for skipped table %q, found: %v", skipTableName, files) + }) + + t.Run("NotSkippedTableProducesDiffFile", func(t *testing.T) { + for _, f := range diffFilesForTable(t, skipTableName) { + _ = os.Remove(f) + } + + t.Cleanup(func() { + for _, f := range diffFilesForTable(t, skipTableName) { + os.Remove(f) + } + }) + + task := newTestSchemaDiffTask(testSchema, nodes) + // SkipTables intentionally not set. + + err := task.SchemaTableDiff() + require.NoError(t, err) + + files := diffFilesForTable(t, skipTableName) + require.NotEmpty(t, files, + "expected a diff file for table %q when not skipped", skipTableName) + }) +} + +// TestSchemaDiff_SkipTablesFile verifies that --skip-file works end-to-end: +// a file listing a table name suppresses that table just like the inline flag. +func TestSchemaDiff_SkipTablesFile(t *testing.T) { + const skipTableName = "schema_diff_skipfile_test" + + createDivergentTable(t, skipTableName) + + // Write a skip file containing the table name. + skipFile, err := os.CreateTemp(t.TempDir(), "skip-*.txt") + require.NoError(t, err) + _, err = fmt.Fprintln(skipFile, skipTableName) + require.NoError(t, err) + skipFile.Close() + + nodes := fmt.Sprintf("%s,%s", serviceN1, serviceN2) + task := newTestSchemaDiffTask(testSchema, nodes) + task.SkipFile = skipFile.Name() + + err = task.SchemaTableDiff() + require.NoError(t, err) + + files := diffFilesForTable(t, skipTableName) + require.Empty(t, files, + "expected no diff file for table listed in skip file, found: %v", files) +}