From 61cb9009d686b35bd71238be53fb2a8ce59fead8 Mon Sep 17 00:00:00 2001 From: Mason Sharp Date: Wed, 18 Mar 2026 10:11:58 -0700 Subject: [PATCH 1/2] Fix template to avoid type error --- db/queries/templates.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/queries/templates.go b/db/queries/templates.go index 3c43987..e0e4fce 100644 --- a/db/queries/templates.go +++ b/db/queries/templates.go @@ -612,7 +612,7 @@ var SQLTemplates = Templates{ `SELECT indexname FROM pg_indexes WHERE schemaname = $1;`, )), CheckRepSetExists: template.Must(template.New("checkRepSetExists").Parse( - `SELECT set_name FROM spock.replication_set WHERE set_name = $1;`, + `SELECT EXISTS(SELECT 1 FROM spock.replication_set WHERE set_name = $1);`, )), GetTablesInRepSet: template.Must(template.New("getTablesInRepSet").Parse( `SELECT concat_ws('.', nspname, relname) FROM spock.tables where set_name = $1;`, From 7b132757db92a6c570f05b0d34f79efaf19a9484 Mon Sep 17 00:00:00 2001 From: Mason Sharp Date: Wed, 18 Mar 2026 17:16:47 -0700 Subject: [PATCH 2/2] tests: add unit and integration tests for repset-diff Unit tests (17): validation, skip-list parsing, skip matching against schema-qualified table names (documenting that bare names silently fail to match), NewTask defaults, CloneForSchedule field propagation, and getter/setter coverage. Integration tests (6): identical tables (no diff), divergent table (verifies diff JSON content including schema, table, primary key, row count, node assignment, and actual row values), skip-tables flag and file, nonexistent repset error, and multi-table processing. Co-Authored-By: Claude Opus 4.6 (1M context) --- internal/consistency/diff/repset_diff_test.go | 374 ++++++++++++++++++ tests/integration/repset_diff_test.go | 303 ++++++++++++++ 2 files changed, 677 insertions(+) create mode 100644 internal/consistency/diff/repset_diff_test.go create mode 100644 tests/integration/repset_diff_test.go diff --git a/internal/consistency/diff/repset_diff_test.go b/internal/consistency/diff/repset_diff_test.go new file mode 100644 index 0000000..18a9e91 --- /dev/null +++ b/internal/consistency/diff/repset_diff_test.go @@ -0,0 +1,374 @@ +// /////////////////////////////////////////////////////////////////////////// +// +// # ACE - Active Consistency Engine +// +// Copyright (C) 2023 - 2026, pgEdge (https://www.pgedge.com/) +// +// This software is released under the PostgreSQL License: +// https://opensource.org/license/postgresql +// +// /////////////////////////////////////////////////////////////////////////// + +package diff + +import ( + "os" + "path/filepath" + "strings" + "testing" +) + +// --------------------------------------------------------------------------- +// Validate +// --------------------------------------------------------------------------- + +func TestRepsetDiffCmd_Validate(t *testing.T) { + tests := []struct { + name string + cmd RepsetDiffCmd + wantErr bool + errContains string + }{ + { + name: "missing cluster name", + cmd: RepsetDiffCmd{RepsetName: "default"}, + wantErr: true, + errContains: "cluster name is required", + }, + { + name: "missing repset name", + cmd: RepsetDiffCmd{ClusterName: "test_cluster"}, + wantErr: true, + errContains: "repset name is required", + }, + { + name: "valid", + cmd: RepsetDiffCmd{ClusterName: "test_cluster", RepsetName: "default"}, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + err := tc.cmd.Validate() + if tc.wantErr { + if err == nil { + t.Fatal("expected error, got nil") + } + if tc.errContains != "" && !strings.Contains(err.Error(), tc.errContains) { + t.Errorf("error = %q, want it to contain %q", err.Error(), tc.errContains) + } + } else if err != nil { + t.Fatalf("unexpected error: %v", err) + } + }) + } +} + +// --------------------------------------------------------------------------- +// parseSkipList +// --------------------------------------------------------------------------- + +func TestRepsetDiffCmd_ParseSkipList_Empty(t *testing.T) { + cmd := &RepsetDiffCmd{} + if err := cmd.parseSkipList(); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(cmd.skipTablesList) != 0 { + t.Errorf("skipTablesList = %v, want empty", cmd.skipTablesList) + } +} + +func TestRepsetDiffCmd_ParseSkipList_FromFlag(t *testing.T) { + cmd := &RepsetDiffCmd{SkipTables: "orders,audit_log,sessions"} + if err := cmd.parseSkipList(); err != nil { + t.Fatalf("unexpected error: %v", err) + } + assertStringSlice(t, cmd.skipTablesList, []string{"orders", "audit_log", "sessions"}) +} + +func TestRepsetDiffCmd_ParseSkipList_FromFile(t *testing.T) { + path := writeRepsetSkipFile(t, "orders", "audit_log") + cmd := &RepsetDiffCmd{SkipFile: path} + if err := cmd.parseSkipList(); err != nil { + t.Fatalf("unexpected error: %v", err) + } + assertStringSlice(t, cmd.skipTablesList, []string{"orders", "audit_log"}) +} + +func TestRepsetDiffCmd_ParseSkipList_FromBoth(t *testing.T) { + path := writeRepsetSkipFile(t, "from_file") + cmd := &RepsetDiffCmd{SkipTables: "from_flag", SkipFile: path} + if err := cmd.parseSkipList(); err != nil { + t.Fatalf("unexpected error: %v", err) + } + assertStringSlice(t, cmd.skipTablesList, []string{"from_flag", "from_file"}) +} + +func TestRepsetDiffCmd_ParseSkipList_MissingFile(t *testing.T) { + cmd := &RepsetDiffCmd{ + SkipFile: filepath.Join(t.TempDir(), "does-not-exist.txt"), + } + if err := cmd.parseSkipList(); err == nil { + t.Fatal("expected error for missing skip file, got nil") + } +} + +// TestRepsetDiffCmd_ParseSkipList_SchemaQualified verifies that schema-qualified +// names pass through unchanged. Unlike SchemaDiffCmd, repset's parseSkipList +// does NOT strip the schema prefix. +func TestRepsetDiffCmd_ParseSkipList_SchemaQualified(t *testing.T) { + cmd := &RepsetDiffCmd{SkipTables: "public.orders,myschema.audit_log"} + if err := cmd.parseSkipList(); err != nil { + t.Fatalf("unexpected error: %v", err) + } + assertStringSlice(t, cmd.skipTablesList, []string{"public.orders", "myschema.audit_log"}) +} + +// --------------------------------------------------------------------------- +// Skip matching (simulates the loop in RepsetDiff lines 284-298) +// +// GetTablesInRepSet returns schema-qualified names ("public.orders"), so the +// skip list must also use schema-qualified names to match. These tests verify +// that behavior and document the mismatch when bare names are used. +// --------------------------------------------------------------------------- + +// simulateSkipMatching reproduces the skip-matching loop from RepsetDiff. +// Returns the tables that would actually be diffed (i.e. not skipped). +func simulateSkipMatching(tableList, skipTablesList []string) []string { + var diffed []string + for _, tableName := range tableList { + var skipped bool + for _, skip := range skipTablesList { + if strings.TrimSpace(skip) == tableName { + skipped = true + break + } + } + if !skipped { + diffed = append(diffed, tableName) + } + } + return diffed +} + +func TestSkipMatching_SchemaQualifiedMatch(t *testing.T) { + // GetTablesInRepSet returns schema-qualified names. + tableList := []string{"public.orders", "public.users", "public.audit_log"} + skipList := []string{"public.users"} + + diffed := simulateSkipMatching(tableList, skipList) + assertStringSlice(t, diffed, []string{"public.orders", "public.audit_log"}) +} + +func TestSkipMatching_BareNameDoesNotMatchQualified(t *testing.T) { + // A bare "users" will NOT match "public.users" — this documents current behavior. + tableList := []string{"public.orders", "public.users"} + skipList := []string{"users"} + + diffed := simulateSkipMatching(tableList, skipList) + // "users" doesn't match "public.users", so nothing is skipped. + assertStringSlice(t, diffed, []string{"public.orders", "public.users"}) +} + +func TestSkipMatching_MultipleSkips(t *testing.T) { + tableList := []string{"public.a", "public.b", "public.c", "public.d"} + skipList := []string{"public.b", "public.d"} + + diffed := simulateSkipMatching(tableList, skipList) + assertStringSlice(t, diffed, []string{"public.a", "public.c"}) +} + +func TestSkipMatching_EmptySkipList(t *testing.T) { + tableList := []string{"public.orders", "public.users"} + + diffed := simulateSkipMatching(tableList, nil) + assertStringSlice(t, diffed, []string{"public.orders", "public.users"}) +} + +func TestSkipMatching_AllSkipped(t *testing.T) { + tableList := []string{"public.orders"} + skipList := []string{"public.orders"} + + diffed := simulateSkipMatching(tableList, skipList) + if len(diffed) != 0 { + t.Errorf("expected empty, got %v", diffed) + } +} + +func TestSkipMatching_WhitespaceInSkipEntry(t *testing.T) { + tableList := []string{"public.orders"} + skipList := []string{" public.orders "} + + diffed := simulateSkipMatching(tableList, skipList) + if len(diffed) != 0 { + t.Errorf("expected skip to match after trimming, got %v", diffed) + } +} + +func TestSkipMatching_DifferentSchemas(t *testing.T) { + tableList := []string{"public.orders", "sales.orders"} + skipList := []string{"public.orders"} + + diffed := simulateSkipMatching(tableList, skipList) + assertStringSlice(t, diffed, []string{"sales.orders"}) +} + +// --------------------------------------------------------------------------- +// NewRepsetDiffTask / CloneForSchedule +// --------------------------------------------------------------------------- + +func TestRepsetDiffCmd_NewTask(t *testing.T) { + task := NewRepsetDiffTask() + if task.TaskID == "" { + t.Error("TaskID should not be empty") + } + if task.TaskType != "REPSET_DIFF" { + t.Errorf("TaskType = %q, want %q", task.TaskType, "REPSET_DIFF") + } + if task.TaskStatus != "PENDING" { + t.Errorf("TaskStatus = %q, want %q", task.TaskStatus, "PENDING") + } + if task.Ctx == nil { + t.Error("Ctx should not be nil") + } +} + +func TestRepsetDiffCmd_CloneForSchedule(t *testing.T) { + original := &RepsetDiffCmd{ + ClusterName: "prod", + DBName: "mydb", + RepsetName: "default", + Nodes: "n1,n2", + SkipTables: "foo,bar", + SkipFile: "/tmp/skip.txt", + Quiet: true, + BlockSize: 5000, + ConcurrencyFactor: 0.75, + CompareUnitSize: 50, + Output: "json", + TableFilter: "id > 10", + OverrideBlockSize: true, + SkipDBUpdate: true, + TaskStorePath: "/tmp/store.db", + } + + clone := original.CloneForSchedule(original.Ctx) + + // All config fields should be copied. + if clone.ClusterName != original.ClusterName { + t.Errorf("ClusterName = %q, want %q", clone.ClusterName, original.ClusterName) + } + if clone.DBName != original.DBName { + t.Errorf("DBName = %q, want %q", clone.DBName, original.DBName) + } + if clone.RepsetName != original.RepsetName { + t.Errorf("RepsetName = %q, want %q", clone.RepsetName, original.RepsetName) + } + if clone.Nodes != original.Nodes { + t.Errorf("Nodes = %q, want %q", clone.Nodes, original.Nodes) + } + if clone.SkipTables != original.SkipTables { + t.Errorf("SkipTables = %q, want %q", clone.SkipTables, original.SkipTables) + } + if clone.SkipFile != original.SkipFile { + t.Errorf("SkipFile = %q, want %q", clone.SkipFile, original.SkipFile) + } + if clone.Quiet != original.Quiet { + t.Errorf("Quiet = %v, want %v", clone.Quiet, original.Quiet) + } + if clone.BlockSize != original.BlockSize { + t.Errorf("BlockSize = %d, want %d", clone.BlockSize, original.BlockSize) + } + if clone.ConcurrencyFactor != original.ConcurrencyFactor { + t.Errorf("ConcurrencyFactor = %f, want %f", clone.ConcurrencyFactor, original.ConcurrencyFactor) + } + if clone.CompareUnitSize != original.CompareUnitSize { + t.Errorf("CompareUnitSize = %d, want %d", clone.CompareUnitSize, original.CompareUnitSize) + } + if clone.Output != original.Output { + t.Errorf("Output = %q, want %q", clone.Output, original.Output) + } + if clone.TableFilter != original.TableFilter { + t.Errorf("TableFilter = %q, want %q", clone.TableFilter, original.TableFilter) + } + if clone.OverrideBlockSize != original.OverrideBlockSize { + t.Errorf("OverrideBlockSize = %v, want %v", clone.OverrideBlockSize, original.OverrideBlockSize) + } + if clone.SkipDBUpdate != original.SkipDBUpdate { + t.Errorf("SkipDBUpdate = %v, want %v", clone.SkipDBUpdate, original.SkipDBUpdate) + } + if clone.TaskStorePath != original.TaskStorePath { + t.Errorf("TaskStorePath = %q, want %q", clone.TaskStorePath, original.TaskStorePath) + } + // Clone gets a fresh TaskID. + if clone.TaskID == original.TaskID { + t.Error("clone should have a new TaskID") + } +} + +// --------------------------------------------------------------------------- +// Getter/setter coverage +// --------------------------------------------------------------------------- + +func TestRepsetDiffCmd_Getters(t *testing.T) { + cmd := &RepsetDiffCmd{ + ClusterName: "mycluster", + DBName: "mydb", + Nodes: "n1,n2", + } + + if cmd.GetClusterName() != "mycluster" { + t.Errorf("GetClusterName() = %q", cmd.GetClusterName()) + } + if cmd.GetDBName() != "mydb" { + t.Errorf("GetDBName() = %q", cmd.GetDBName()) + } + if cmd.GetNodes() != "n1,n2" { + t.Errorf("GetNodes() = %q", cmd.GetNodes()) + } + + cmd.SetDBName("otherdb") + if cmd.GetDBName() != "otherdb" { + t.Errorf("after SetDBName: GetDBName() = %q", cmd.GetDBName()) + } + + cmd.SetNodeList([]string{"a", "b"}) + assertStringSlice(t, cmd.GetNodeList(), []string{"a", "b"}) + + nodes := []map[string]any{{"Name": "n1"}} + cmd.SetClusterNodes(nodes) + if len(cmd.GetClusterNodes()) != 1 || cmd.GetClusterNodes()[0]["Name"] != "n1" { + t.Errorf("GetClusterNodes() = %v", cmd.GetClusterNodes()) + } +} + +// --------------------------------------------------------------------------- +// helpers +// --------------------------------------------------------------------------- + +func writeRepsetSkipFile(t *testing.T, lines ...string) string { + t.Helper() + f, err := os.CreateTemp(t.TempDir(), "skip-*.txt") + if err != nil { + t.Fatalf("create temp skip file: %v", err) + } + for _, line := range lines { + if _, err := f.WriteString(line + "\n"); err != nil { + t.Fatalf("write skip file: %v", err) + } + } + f.Close() + return f.Name() +} + +func assertStringSlice(t *testing.T, got, want []string) { + t.Helper() + if len(got) != len(want) { + t.Fatalf("got %v (len=%d), want %v (len=%d)", got, len(got), want, len(want)) + } + for i := range want { + if got[i] != want[i] { + t.Errorf("[%d] = %q, want %q", i, got[i], want[i]) + } + } +} diff --git a/tests/integration/repset_diff_test.go b/tests/integration/repset_diff_test.go new file mode 100644 index 0000000..936d5cc --- /dev/null +++ b/tests/integration/repset_diff_test.go @@ -0,0 +1,303 @@ +// /////////////////////////////////////////////////////////////////////////// +// +// # ACE - Active Consistency Engine +// +// Copyright (C) 2023 - 2026, pgEdge (https://www.pgedge.com/) +// +// This software is released under the PostgreSQL License: +// https://opensource.org/license/postgresql +// +// /////////////////////////////////////////////////////////////////////////// + +package integration + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "testing" + + "github.com/jackc/pgx/v5/pgxpool" + "github.com/pgedge/ace/internal/consistency/diff" + "github.com/pgedge/ace/pkg/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// newTestRepsetDiffTask builds a RepsetDiffCmd wired to the shared test cluster. +func newTestRepsetDiffTask(repsetName string) *diff.RepsetDiffCmd { + task := diff.NewRepsetDiffTask() + task.ClusterName = pgCluster.ClusterName + task.DBName = dbName + task.RepsetName = repsetName + task.Nodes = "all" + task.Output = "json" + task.Quiet = true + task.SkipDBUpdate = true + task.BlockSize = 10000 + task.CompareUnitSize = 100 + task.ConcurrencyFactor = 1 + return task +} + +// createRepsetDiffTable creates a table on both nodes and adds it to the given +// replication set. Data is inserted BEFORE joining the repset so spock does not +// replicate it, allowing controlled divergence. +// +// When divergent is true, node1 gets an extra row (id=99) that node2 does not +// have. Returns the schema-qualified table name. Cleanup is automatic. +func createRepsetDiffTable(t *testing.T, tableName, repsetName string, divergent bool) string { + t.Helper() + ctx := context.Background() + qualifiedTableName := fmt.Sprintf("%s.%s", testSchema, tableName) + + createSQL := fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id INT PRIMARY KEY, + val TEXT + )`, qualifiedTableName) + + pools := []*pgxpool.Pool{pgCluster.Node1Pool, pgCluster.Node2Pool} + nodeNames := []string{serviceN1, serviceN2} + + for i, pool := range pools { + _, err := pool.Exec(ctx, createSQL) + require.NoError(t, err, "create table on %s", nodeNames[i]) + } + + // Insert baseline rows on both nodes BEFORE adding to repset. + for i, pool := range pools { + _, err := pool.Exec(ctx, fmt.Sprintf( + `INSERT INTO %s (id, val) VALUES (1, 'same'), (2, 'same') + ON CONFLICT DO NOTHING`, qualifiedTableName)) + require.NoError(t, err, "insert baseline on %s", nodeNames[i]) + } + + if divergent { + _, err := pgCluster.Node1Pool.Exec(ctx, fmt.Sprintf( + `INSERT INTO %s (id, val) VALUES (99, 'only_on_n1')`, qualifiedTableName)) + require.NoError(t, err, "insert divergent row on n1") + } + + // Now add to repset. + for i, pool := range pools { + _, err := pool.Exec(ctx, + fmt.Sprintf(`SELECT spock.repset_add_table('%s', '%s');`, repsetName, qualifiedTableName)) + require.NoError(t, err, "add table to repset on %s", nodeNames[i]) + } + + t.Cleanup(func() { + for _, pool := range pools { + pool.Exec(ctx, fmt.Sprintf( + `SELECT spock.repset_remove_table('%s', '%s');`, repsetName, qualifiedTableName)) //nolint:errcheck + pool.Exec(ctx, fmt.Sprintf(`DROP TABLE IF EXISTS %s CASCADE`, qualifiedTableName)) //nolint:errcheck + } + files, _ := filepath.Glob(fmt.Sprintf("%s_%s_diffs-*.json", testSchema, tableName)) + for _, f := range files { + os.Remove(f) + } + }) + + return qualifiedTableName +} + +// repsetDiffFilesForTable returns all json diff files for the given table. +func repsetDiffFilesForTable(t *testing.T, tableName string) []string { + t.Helper() + files, err := filepath.Glob(fmt.Sprintf("%s_%s_diffs-*.json", testSchema, tableName)) + require.NoError(t, err) + return files +} + +// readDiffOutput parses a diff JSON file into a DiffOutput struct. +func readDiffOutput(t *testing.T, path string) types.DiffOutput { + t.Helper() + data, err := os.ReadFile(path) + require.NoError(t, err, "read diff file %s", path) + var out types.DiffOutput + require.NoError(t, json.Unmarshal(data, &out), "parse diff file %s", path) + return out +} + +// totalDiffRows returns the sum of row counts across all node pairs in a DiffOutput. +func totalDiffRows(d types.DiffOutput) int { + total := 0 + for _, pair := range d.NodeDiffs { + for _, rows := range pair.Rows { + total += len(rows) + } + } + return total +} + +// TestRepsetDiff_IdenticalTables verifies that repset-diff produces no diff +// files when tables in the replication set are identical across nodes. +func TestRepsetDiff_IdenticalTables(t *testing.T) { + const tableName = "repset_diff_identical" + createRepsetDiffTable(t, tableName, "default", false) + + task := newTestRepsetDiffTask("default") + err := diff.RepsetDiff(task) + require.NoError(t, err) + + files := repsetDiffFilesForTable(t, tableName) + require.Empty(t, files, + "expected no diff file for identical table, found: %v", files) +} + +// TestRepsetDiff_DivergentTable verifies that repset-diff detects the extra +// row on node1, reports the correct node pair, diff count, and row content. +func TestRepsetDiff_DivergentTable(t *testing.T) { + const tableName = "repset_diff_divergent" + createRepsetDiffTable(t, tableName, "default", true) + + task := newTestRepsetDiffTask("default") + err := diff.RepsetDiff(task) + require.NoError(t, err) + + files := repsetDiffFilesForTable(t, tableName) + require.Len(t, files, 1, "expected exactly one diff file") + + diffOut := readDiffOutput(t, files[0]) + + // Summary should reference the correct schema/table. + assert.Equal(t, testSchema, diffOut.Summary.Schema) + assert.Equal(t, tableName, diffOut.Summary.Table) + assert.Contains(t, diffOut.Summary.PrimaryKey, "id") + + // Exactly one diff row (id=99 exists on n1 but not n2). + assert.Equal(t, 1, totalDiffRows(diffOut), + "expected 1 diff row (id=99 on n1 only)") + + // The row should appear under the n1 key of the node pair. + for _, pair := range diffOut.NodeDiffs { + n1Rows := pair.Rows[serviceN1] + assert.Len(t, n1Rows, 1, "n1 should have 1 extra row") + if len(n1Rows) > 0 { + id, _ := n1Rows[0].Get("id") + assert.Equal(t, float64(99), id, "divergent row should be id=99") + val, _ := n1Rows[0].Get("val") + assert.Equal(t, "only_on_n1", val) + } + n2Rows := pair.Rows[serviceN2] + assert.Empty(t, n2Rows, "n2 should have no extra rows") + } +} + +// TestRepsetDiff_SkipTables verifies that --skip-tables suppresses diffing for +// the named table (no diff file), and that without it the diff is produced with +// correct content. +func TestRepsetDiff_SkipTables(t *testing.T) { + const tableName = "repset_diff_skip" + qualifiedTableName := createRepsetDiffTable(t, tableName, "default", true) + + t.Run("SkippedTableProducesNoDiffFile", func(t *testing.T) { + for _, f := range repsetDiffFilesForTable(t, tableName) { + _ = os.Remove(f) + } + + task := newTestRepsetDiffTask("default") + task.SkipTables = qualifiedTableName + + err := diff.RepsetDiff(task) + require.NoError(t, err) + + files := repsetDiffFilesForTable(t, tableName) + require.Empty(t, files, + "expected no diff file for skipped table, found: %v", files) + }) + + t.Run("NotSkippedTableProducesDiffFile", func(t *testing.T) { + for _, f := range repsetDiffFilesForTable(t, tableName) { + _ = os.Remove(f) + } + t.Cleanup(func() { + for _, f := range repsetDiffFilesForTable(t, tableName) { + os.Remove(f) + } + }) + + task := newTestRepsetDiffTask("default") + + err := diff.RepsetDiff(task) + require.NoError(t, err) + + files := repsetDiffFilesForTable(t, tableName) + require.NotEmpty(t, files, + "expected diff file for table %q when not skipped", tableName) + + diffOut := readDiffOutput(t, files[0]) + assert.Equal(t, 1, totalDiffRows(diffOut), + "expected 1 diff row (id=99 on n1 only)") + }) +} + +// TestRepsetDiff_SkipTablesFile verifies that --skip-file works end-to-end. +func TestRepsetDiff_SkipTablesFile(t *testing.T) { + const tableName = "repset_diff_skipfile" + qualifiedTableName := createRepsetDiffTable(t, tableName, "default", true) + + skipFile, err := os.CreateTemp(t.TempDir(), "skip-*.txt") + require.NoError(t, err) + _, err = fmt.Fprintln(skipFile, qualifiedTableName) + require.NoError(t, err) + skipFile.Close() + + task := newTestRepsetDiffTask("default") + task.SkipFile = skipFile.Name() + + err = diff.RepsetDiff(task) + require.NoError(t, err) + + files := repsetDiffFilesForTable(t, tableName) + require.Empty(t, files, + "expected no diff file for table listed in skip file, found: %v", files) +} + +// TestRepsetDiff_NonExistentRepset verifies that referencing a replication set +// that does not exist produces a clear error. +func TestRepsetDiff_NonExistentRepset(t *testing.T) { + task := newTestRepsetDiffTask("does_not_exist_repset") + + err := diff.RepsetDiff(task) + require.Error(t, err) + require.Contains(t, err.Error(), "not found", + "error should indicate repset was not found") +} + +// TestRepsetDiff_MultipleTables verifies that repset-diff processes all tables +// in the replication set: no diff file for the identical table, and a diff file +// with correct content for the divergent one. +func TestRepsetDiff_MultipleTables(t *testing.T) { + const identicalTable = "repset_multi_identical" + const divergentTable = "repset_multi_divergent" + + createRepsetDiffTable(t, identicalTable, "default", false) + createRepsetDiffTable(t, divergentTable, "default", true) + + task := newTestRepsetDiffTask("default") + err := diff.RepsetDiff(task) + require.NoError(t, err) + + identicalFiles := repsetDiffFilesForTable(t, identicalTable) + require.Empty(t, identicalFiles, + "expected no diff file for identical table, found: %v", identicalFiles) + + divergentFiles := repsetDiffFilesForTable(t, divergentTable) + require.Len(t, divergentFiles, 1, "expected exactly one diff file for divergent table") + + diffOut := readDiffOutput(t, divergentFiles[0]) + assert.Equal(t, divergentTable, diffOut.Summary.Table) + assert.Equal(t, 1, totalDiffRows(diffOut), + "expected 1 diff row for divergent table") + + // Verify the specific row. + for _, pair := range diffOut.NodeDiffs { + n1Rows := pair.Rows[serviceN1] + require.Len(t, n1Rows, 1) + id, _ := n1Rows[0].Get("id") + assert.Equal(t, float64(99), id) + } +}