Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,27 @@

All notable changes to ACE will be captured in this document. This project follows semantic versioning; the latest changes appear first.

## [v1.7.0] - 2026-03-20
## [v1.7.0] - 2026-03-23

### Added
- `table-repair --preserve-origin` flag to preserve replication origin node ID, LSN, and per-row commit timestamps during repair operations. Repaired rows retain the original source node's origin metadata instead of being stamped with the local node's identity, preventing replication conflicts when a failed node rejoins the cluster. Upserts are grouped by (origin, timestamp) into separate transactions to satisfy PostgreSQL's per-transaction replication origin session constraint; deletes commit in a preceding transaction.
- Runtime configuration reload via SIGHUP for all long-running ACE modes. The scheduler waits for in-flight jobs to complete before swapping config; the API server reloads immediately, including mTLS security config (CRL and allowed CN list). Works for `ace start`, `ace server`, and `ace start --component=all`.
- End-of-run summary for `schema-diff` and `repset-diff` listing identical, skipped, differed, missing, and errored tables with error reasons.

### Changed
- CLI migrated from urfave/cli v2 to v3 for native interspersed flag support, allowing flags to be placed before or after subcommands.
- Config reads are now thread-safe and snapshotted per HTTP request, per Merkle-tree task, and per CDC replication stream to prevent mid-operation drift during concurrent SIGHUP reloads.
- mTLS certificate validator is swapped atomically on SIGHUP using `atomic.Pointer` for lock-free reads on the request path.
- `schema-diff` and `repset-diff` now query tables from all nodes and report tables not present on every node, instead of silently using only the first node's table list. still compared across all nodes.
- `repset-diff` reports asymmetric repset membership when a table is in the repset on some nodes but not others.

### Fixed
- `repset-diff` was not working. Fixed and added tests.
- `schema-diff --skip-tables` now actually filters tables. Schema-qualified names (e.g. `myschema.mytable`) are also accepted; the schema prefix is validated against the target schema and stripped for matching.
- Replication origin LSN lookup rewritten to join through `spock.subscription` and `spock.node` instead of a broken LIKE pattern that never matched.
- `executeUpserts` no longer calls `resetReplicationOriginSession` before `tx.Commit()`, which was clearing the origin from WAL commit records.
- Unexpected scheduler exit in the SIGHUP reload loop is now handled gracefully.
- `schema-diff` and `repset-diff` silently excluded tables that failed during per-table comparison (e.g. missing primary key). Failed tables now appear in the summary with the error reason, and the task status is set to FAILED.

## [v1.6.0] 2026-02-25

Expand Down
88 changes: 88 additions & 0 deletions internal/consistency/diff/diff_summary.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// ///////////////////////////////////////////////////////////////////////////
//
// # ACE - Active Consistency Engine
//
// Copyright (C) 2023 - 2026, pgEdge (https://www.pgedge.com/)
//
// This software is released under the PostgreSQL License:
// https://opensource.org/license/postgresql
//
// ///////////////////////////////////////////////////////////////////////////

package diff

import (
"fmt"
"strings"

"github.com/pgedge/ace/pkg/logger"
)

// MissingTableInfo records a table that was not found on every node.
type MissingTableInfo struct {
Table string // schema-qualified table name
PresentOn []string // node names where the table exists
MissingFrom []string // node names where the table does not exist
}

// FailedTableInfo records a table whose diff failed along with the reason.
type FailedTableInfo struct {
Table string // schema-qualified table name
Err error // the error that caused the failure
}

// DiffSummary collects per-table outcomes during a schema-diff or repset-diff
// run and prints a human-readable summary at the end.
type DiffSummary struct {
MatchedTables []string
DifferedTables []string
FailedTables []FailedTableInfo
MissingTables []MissingTableInfo
SkippedTables []string
}

// PrintAndFinalize logs the summary and returns an error if any tables failed.
// The label identifies the scope (e.g. "schema public" or "repset default").
func (s *DiffSummary) PrintAndFinalize(commandName, label string) error {
total := len(s.MatchedTables) + len(s.DifferedTables) + len(s.FailedTables) +
len(s.SkippedTables) + len(s.MissingTables)
logger.Info("%s summary: %d table(s) in %s", commandName, total, label)
if len(s.MatchedTables) > 0 {
logger.Info(" %d table(s) are identical:", len(s.MatchedTables))
for _, t := range s.MatchedTables {
logger.Info(" - %s", t)
}
}
if len(s.SkippedTables) > 0 {
logger.Info(" %d table(s) were skipped:", len(s.SkippedTables))
for _, t := range s.SkippedTables {
logger.Info(" - %s", t)
}
}
if len(s.DifferedTables) > 0 {
logger.Warn(" %d table(s) have differences:", len(s.DifferedTables))
for _, t := range s.DifferedTables {
logger.Warn(" - %s", t)
}
}
if len(s.MissingTables) > 0 {
logger.Warn(" %d table(s) not present on all nodes:", len(s.MissingTables))
for _, mt := range s.MissingTables {
logger.Warn(" - %s found on [%s] but missing from [%s]",
mt.Table,
strings.Join(mt.PresentOn, ", "),
strings.Join(mt.MissingFrom, ", "))
}
}
if len(s.FailedTables) > 0 {
logger.Error(" %d table(s) encountered errors and could not be compared:", len(s.FailedTables))
var names []string
for _, ft := range s.FailedTables {
logger.Error(" - %s: %v", ft.Table, ft.Err)
names = append(names, ft.Table)
}
return fmt.Errorf("%d table(s) failed during %s: %v", len(s.FailedTables), commandName, names)
}

return nil
}
130 changes: 99 additions & 31 deletions internal/consistency/diff/repset_diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"maps"
"os"
"sort"
"strconv"
"strings"
"time"
Expand All @@ -43,6 +44,7 @@ type RepsetDiffCmd struct {
SkipFile string
skipTablesList []string
tableList []string
missingTables []MissingTableInfo
nodeList []string
clusterNodes []map[string]any
database types.Database
Expand Down Expand Up @@ -129,38 +131,90 @@ func (c *RepsetDiffCmd) RunChecks(skipValidation bool) error {
return fmt.Errorf("no nodes found in cluster config")
}

firstNode := c.clusterNodes[0]
// Query repset tables from every node that has the repset and build a union.
// In a uni-directional setup the repset may only exist on the publisher, but
// the tables themselves exist on all nodes, so we still diff them across
// every node.
tablePresence := make(map[string]map[string]bool) // table -> {nodeName: true}
var repsetNodeNames []string

for _, nodeInfo := range c.clusterNodes {
nodeName := nodeInfo["Name"].(string)

nodeWithDBInfo := make(map[string]any)
maps.Copy(nodeWithDBInfo, nodeInfo)
utils.ApplyDatabaseCredentials(nodeWithDBInfo, c.database)
if portVal, ok := nodeWithDBInfo["Port"]; ok {
if portFloat, isFloat := portVal.(float64); isFloat {
nodeWithDBInfo["Port"] = strconv.Itoa(int(portFloat))
}
}

nodeWithDBInfo := make(map[string]any)
maps.Copy(nodeWithDBInfo, firstNode)
utils.ApplyDatabaseCredentials(nodeWithDBInfo, c.database)
pool, err := auth.GetClusterNodeConnection(c.Ctx, nodeWithDBInfo, c.connOpts())
if err != nil {
return fmt.Errorf("could not connect to node %s: %w", nodeName, err)
}

if portVal, ok := nodeWithDBInfo["Port"]; ok {
if portFloat, isFloat := portVal.(float64); isFloat {
nodeWithDBInfo["Port"] = strconv.Itoa(int(portFloat))
repsetExists, err := queries.CheckRepSetExists(c.Ctx, pool, c.RepsetName)
if err != nil {
pool.Close()
return fmt.Errorf("could not check if repset exists on node %s: %w", nodeName, err)
}
}
if !repsetExists {
pool.Close()
logger.Warn("repset %s not found on node %s, skipping for table discovery", c.RepsetName, nodeName)
continue
}
repsetNodeNames = append(repsetNodeNames, nodeName)

pool, err := auth.GetClusterNodeConnection(c.Ctx, nodeWithDBInfo, c.connOpts())
if err != nil {
return fmt.Errorf("could not connect to database: %w", err)
}
defer pool.Close()
tables, err := queries.GetTablesInRepSet(c.Ctx, pool, c.RepsetName)
pool.Close()
if err != nil {
return fmt.Errorf("could not get tables in repset on node %s: %w", nodeName, err)
}

repsetExists, err := queries.CheckRepSetExists(c.Ctx, pool, c.RepsetName)
if err != nil {
return fmt.Errorf("could not check if repset exists: %w", err)
for _, t := range tables {
if tablePresence[t] == nil {
tablePresence[t] = make(map[string]bool)
}
tablePresence[t][nodeName] = true
}
}
if !repsetExists {
return fmt.Errorf("repset %s not found", c.RepsetName)

if len(repsetNodeNames) == 0 {
return fmt.Errorf("repset %s not found on any node", c.RepsetName)
}

tables, err := queries.GetTablesInRepSet(c.Ctx, pool, c.RepsetName)
if err != nil {
return fmt.Errorf("could not get tables in repset: %w", err)
// Build the full table list (union) and track tables not in the repset
// on every node. All tables are still diffed (the data exists on all
// nodes), but asymmetric repset membership is reported in the summary.
var allTables []string
var missingTables []MissingTableInfo
for table, presence := range tablePresence {
allTables = append(allTables, table)
if len(presence) < len(repsetNodeNames) {
var presentOn, missingFrom []string
for _, n := range repsetNodeNames {
if presence[n] {
presentOn = append(presentOn, n)
} else {
missingFrom = append(missingFrom, n)
}
}
missingTables = append(missingTables, MissingTableInfo{
Table: table,
PresentOn: presentOn,
MissingFrom: missingFrom,
})
}
}
sort.Strings(allTables)
sort.Slice(missingTables, func(i, j int) bool {
return missingTables[i].Table < missingTables[j].Table
})

c.tableList = tables
c.tableList = allTables
c.missingTables = missingTables

if len(c.tableList) == 0 {
return fmt.Errorf("no tables found in repset %s", c.RepsetName)
Expand Down Expand Up @@ -230,8 +284,10 @@ func RepsetDiff(task *RepsetDiffCmd) (err error) {
}
}

var tablesProcessed, tablesFailed, tablesSkipped int
var failedTables []string
var tablesProcessed, tablesFailed int
var failedTables []FailedTableInfo
var skippedTables []string
var summary DiffSummary

defer func() {
finishedAt := time.Now()
Expand All @@ -249,10 +305,14 @@ func RepsetDiff(task *RepsetDiffCmd) (err error) {
"tables_total": len(task.tableList),
"tables_diffed": tablesProcessed,
"tables_failed": tablesFailed,
"tables_skipped": tablesSkipped,
"tables_skipped": len(skippedTables),
}
if len(failedTables) > 0 {
ctx["failed_tables"] = failedTables
names := make([]string, len(failedTables))
for i, ft := range failedTables {
names[i] = ft.Table
}
ctx["failed_tables"] = names
}
if err != nil {
ctx["error"] = err.Error()
Expand Down Expand Up @@ -293,7 +353,7 @@ func RepsetDiff(task *RepsetDiffCmd) (err error) {
}
}
if skipped {
tablesSkipped++
skippedTables = append(skippedTables, tableName)
continue
}

Expand All @@ -320,27 +380,35 @@ func RepsetDiff(task *RepsetDiffCmd) (err error) {
if err := tdTask.Validate(); err != nil {
logger.Warn("validation for table %s failed: %v", tableName, err)
tablesFailed++
failedTables = append(failedTables, tableName)
failedTables = append(failedTables, FailedTableInfo{Table: tableName, Err: err})
continue
}

if err := tdTask.RunChecks(true); err != nil {
logger.Warn("checks for table %s failed: %v", tableName, err)
tablesFailed++
failedTables = append(failedTables, tableName)
failedTables = append(failedTables, FailedTableInfo{Table: tableName, Err: err})
continue
}
if err := tdTask.ExecuteTask(); err != nil {
logger.Warn("error during comparison for table %s: %v", tableName, err)
tablesFailed++
failedTables = append(failedTables, tableName)
failedTables = append(failedTables, FailedTableInfo{Table: tableName, Err: err})
continue
}

if len(tdTask.DiffResult.NodeDiffs) > 0 {
summary.DifferedTables = append(summary.DifferedTables, tableName)
} else {
summary.MatchedTables = append(summary.MatchedTables, tableName)
}
tablesProcessed++
}

return nil
summary.FailedTables = failedTables
summary.SkippedTables = skippedTables
summary.MissingTables = task.missingTables
return summary.PrintAndFinalize("Repset diff", "repset "+task.RepsetName)
}

func (task *RepsetDiffCmd) CloneForSchedule(ctx context.Context) *RepsetDiffCmd {
Expand Down
Loading
Loading