From f733c563c0e64320c2086c736119cfbc67641e19 Mon Sep 17 00:00:00 2001 From: Lisandro Pin Date: Mon, 30 Jun 2025 19:01:06 +0200 Subject: [PATCH 1/3] Handle generated columns on ghostferry write operations. This PR modifies all `INSERT` logic so virtual (a.k.a generated) MySQL columns are not attempted to insert into, which otherwise breaks the ferrying process. See also https://github.com/Shopify/ghostferry/issues/338. --- dml_events.go | 51 +++++++++++---------- row_batch.go | 66 +++++++++++++++++++++++++--- table_schema_cache.go | 2 +- test/go/dml_events_test.go | 30 +++++++++++++ test/go/row_batch_test.go | 26 +++++++++++ test/go/table_schema_cache_test.go | 5 +++ testhelpers/integration_test_case.go | 2 +- 7 files changed, 152 insertions(+), 30 deletions(-) diff --git a/dml_events.go b/dml_events.go index 7b8a00f1d..365bce84a 100644 --- a/dml_events.go +++ b/dml_events.go @@ -42,14 +42,14 @@ type RowData []interface{} // https://github.com/Shopify/ghostferry/issues/165. // // In summary: -// - This code receives values from both go-sql-driver/mysql and -// go-mysql-org/go-mysql. -// - go-sql-driver/mysql gives us int64 for signed integer, and uint64 in a byte -// slice for unsigned integer. -// - go-mysql-org/go-mysql gives us int64 for signed integer, and uint64 for -// unsigned integer. -// - We currently make this function deal with both cases. In the future we can -// investigate alternative solutions. +// - This code receives values from both go-sql-driver/mysql and +// go-mysql-org/go-mysql. +// - go-sql-driver/mysql gives us int64 for signed integer, and uint64 in a byte +// slice for unsigned integer. +// - go-mysql-org/go-mysql gives us int64 for signed integer, and uint64 for +// unsigned integer. +// - We currently make this function deal with both cases. In the future we can +// investigate alternative solutions. func (r RowData) GetUint64(colIdx int) (uint64, error) { u64, ok := Uint64Value(r[colIdx]) if ok { @@ -174,8 +174,8 @@ func (e *BinlogInsertEvent) AsSQLString(schemaName, tableName string) (string, e query := "INSERT IGNORE INTO " + QuotedTableNameFromString(schemaName, tableName) + - " (" + strings.Join(quotedColumnNames(e.table), ",") + ")" + - " VALUES (" + buildStringListForValues(e.table.Columns, e.newValues) + ")" + " (" + strings.Join(quotedColumnNamesForInsert(e.table), ",") + ")" + + " VALUES (" + buildStringListForInsertValues(e.table, e.newValues) + ")" return query, nil } @@ -323,10 +323,14 @@ func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos, re } } -func quotedColumnNames(table *TableSchema) []string { - cols := make([]string, len(table.Columns)) - for i, column := range table.Columns { - cols[i] = QuoteField(column.Name) +func quotedColumnNamesForInsert(table *TableSchema) []string { + cols := []string{} + + for _, c := range table.Columns { + if c.IsVirtual { + continue + } + cols = append(cols, QuoteField(c.Name)) } return cols @@ -347,15 +351,18 @@ func verifyValuesHasTheSameLengthAsColumns(table *TableSchema, values ...RowData return nil } -func buildStringListForValues(columns []schema.TableColumn, values []interface{}) string { +func buildStringListForInsertValues(table *TableSchema, values []interface{}) string { var buffer []byte for i, value := range values { - if i > 0 { - buffer = append(buffer, ',') + if table.Columns[i].IsVirtual { + continue } - buffer = appendEscapedValue(buffer, value, columns[i]) + if len(buffer) != 0 { + buffer = append(buffer, ',') + } + buffer = appendEscapedValue(buffer, value, table.Columns[i]) } return string(buffer) @@ -504,10 +511,10 @@ func Int64Value(value interface{}) (int64, bool) { // // This is specifically mentioned in the the below link: // -// When BINARY values are stored, they are right-padded with the pad value -// to the specified length. The pad value is 0x00 (the zero byte). Values -// are right-padded with 0x00 for inserts, and no trailing bytes are removed -// for retrievals. +// When BINARY values are stored, they are right-padded with the pad value +// to the specified length. The pad value is 0x00 (the zero byte). Values +// are right-padded with 0x00 for inserts, and no trailing bytes are removed +// for retrievals. // // ref: https://dev.mysql.com/doc/refman/5.7/en/binary-varbinary.html func appendEscapedString(buffer []byte, value string, rightPadToLengthWithZeroBytes int) []byte { diff --git a/row_batch.go b/row_batch.go index 7a865a6f6..8e666f573 100644 --- a/row_batch.go +++ b/row_batch.go @@ -64,23 +64,77 @@ func (e *RowBatch) AsSQLQuery(schemaName, tableName string) (string, []interface return "", nil, err } - valuesStr := "(" + strings.Repeat("?,", len(e.columns)-1) + "?)" + vcm := e.virtualColumnsMap() + valuesStr := "(" + strings.Repeat("?,", e.activeColumnCount(vcm)-1) + "?)" valuesStr = strings.Repeat(valuesStr+",", len(e.values)-1) + valuesStr query := "INSERT IGNORE INTO " + QuotedTableNameFromString(schemaName, tableName) + - " (" + strings.Join(QuoteFields(e.columns), ",") + ") VALUES " + valuesStr + " (" + e.quotedFields(vcm) + ") VALUES " + valuesStr - return query, e.flattenRowData(), nil + return query, e.flattenRowData(vcm), nil } -func (e *RowBatch) flattenRowData() []interface{} { - rowSize := len(e.values[0]) +// virtualColumnsMap returns a map of given columns (by index) -> whether the column is virtual (i.e. generated). +func (e *RowBatch) virtualColumnsMap() map[int]bool { + res := map[int]bool{} + + for i, name := range e.columns { + isVirtual := false + for _, c := range e.table.Columns { + if name == c.Name && c.IsVirtual { + isVirtual = true + break + } + } + + res[i] = isVirtual + } + + return res +} + +// activeColumnCount returns the number of active (non-virtual) columns for this RowBatch. +func (e *RowBatch) activeColumnCount(vcm map[int]bool) int { + if vcm == nil { + return len(e.columns) + } + + count := 0 + for _, isVirtual := range vcm { + if !isVirtual { + count++ + } + } + return count +} + +// quotedFields returns a string with comma-separated quoted field names for INSERTs. +func (e *RowBatch) quotedFields(vcm map[int]bool) string { + cols := []string{} + for i, name := range e.columns { + if vcm != nil && vcm[i] { + continue + } + cols = append(cols, name) + } + + return strings.Join(QuoteFields(cols), ",") +} + +// flattenRowData flattens RowData values into a single array for INSERTs. +func (e *RowBatch) flattenRowData(vcm map[int]bool) []interface{} { + rowSize := e.activeColumnCount(vcm) flattened := make([]interface{}, rowSize*len(e.values)) for rowIdx, row := range e.values { + i := 0 for colIdx, col := range row { - flattened[rowIdx*rowSize+colIdx] = col + if vcm != nil && vcm[colIdx] { + continue + } + flattened[rowIdx*rowSize+i] = col + i++ } } diff --git a/table_schema_cache.go b/table_schema_cache.go index 32f6d0418..1541ff7d3 100644 --- a/table_schema_cache.go +++ b/table_schema_cache.go @@ -94,7 +94,7 @@ func (t *TableSchema) RowMd5Query() string { _, isCompressed := t.CompressedColumnsForVerification[column.Name] _, isIgnored := t.IgnoredColumnsForVerification[column.Name] - if isCompressed || isIgnored { + if isCompressed || isIgnored || column.IsVirtual { continue } diff --git a/test/go/dml_events_test.go b/test/go/dml_events_test.go index d0b2cec48..c78c30846 100644 --- a/test/go/dml_events_test.go +++ b/test/go/dml_events_test.go @@ -83,6 +83,36 @@ func (this *DMLEventsTestSuite) TestBinlogInsertEventGeneratesInsertQuery() { this.Require().Equal("INSERT IGNORE INTO `target_schema`.`target_table` (`col1`,`col2`,`col3`) VALUES (1002,CAST('{\"val\": 42.0}' AS JSON),0)", q3) } +func (this *DMLEventsTestSuite) TestBinlogInsertEventGeneratesInsertQueryWithVirtualColumns() { + rowsEvent := &replication.RowsEvent{ + Table: this.tableMapEvent, + Rows: [][]interface{}{ + {1000, []byte("val1"), true}, + {1001, []byte("val2"), false}, + {1002, "{\"val\": 42.0}", false}, + }, + } + + // column 'col1' (#0) is generated so we should not insert into it. + this.targetTable.Columns[0].IsVirtual = true + + dmlEvents, err := ghostferry.NewBinlogInsertEvents(this.eventBase, rowsEvent) + this.Require().Nil(err) + this.Require().Equal(3, len(dmlEvents)) + + q1, err := dmlEvents[0].AsSQLString(this.targetTable.Schema, this.targetTable.Name) + this.Require().Nil(err) + this.Require().Equal("INSERT IGNORE INTO `target_schema`.`target_table` (`col2`,`col3`) VALUES (_binary'val1',1)", q1) + + q2, err := dmlEvents[1].AsSQLString(this.targetTable.Schema, this.targetTable.Name) + this.Require().Nil(err) + this.Require().Equal("INSERT IGNORE INTO `target_schema`.`target_table` (`col2`,`col3`) VALUES (_binary'val2',0)", q2) + + q3, err := dmlEvents[2].AsSQLString(this.targetTable.Schema, this.targetTable.Name) + this.Require().Nil(err) + this.Require().Equal("INSERT IGNORE INTO `target_schema`.`target_table` (`col2`,`col3`) VALUES (CAST('{\"val\": 42.0}' AS JSON),0)", q3) +} + func (this *DMLEventsTestSuite) TestBinlogInsertEventWithWrongColumnsReturnsError() { rowsEvent := &replication.RowsEvent{ Table: this.tableMapEvent, diff --git a/test/go/row_batch_test.go b/test/go/row_batch_test.go index 62ffbf9dd..633402dea 100644 --- a/test/go/row_batch_test.go +++ b/test/go/row_batch_test.go @@ -73,6 +73,32 @@ func (this *RowBatchTestSuite) TestRowBatchGeneratesInsertQuery() { this.Require().Equal(expected, v1) } +func (this *RowBatchTestSuite) TestRowBatchGeneratesInsertQueryWithVirtualColumns() { + vals := []ghostferry.RowData{ + ghostferry.RowData{1000, []byte("val1"), true}, + ghostferry.RowData{1001, []byte("val2"), true}, + ghostferry.RowData{1002, []byte("val3"), true}, + } + + // column 'col2' (#1) is generated so we should not insert into it. + this.targetTable.Columns[1].IsVirtual = true + + batch := ghostferry.NewRowBatch(this.sourceTable, vals, 0) + this.Require().Equal(vals, batch.Values()) + + q1, v1, err := batch.AsSQLQuery(this.targetTable.Schema, this.targetTable.Name) + this.Require().Nil(err) + this.Require().Equal("INSERT IGNORE INTO `target_schema`.`target_table` (`col1`,`col3`) VALUES (?,?),(?,?),(?,?)", q1) + + expected := []interface{}{ + 1000, true, + 1001, true, + 1002, true, + } + + this.Require().Equal(expected, v1) +} + func (this *RowBatchTestSuite) TestRowBatchWithWrongColumnsReturnsError() { vals := []ghostferry.RowData{ ghostferry.RowData{1000, []byte("val0"), true}, diff --git a/test/go/table_schema_cache_test.go b/test/go/table_schema_cache_test.go index fc2bd18fc..3f206be71 100644 --- a/test/go/table_schema_cache_test.go +++ b/test/go/table_schema_cache_test.go @@ -333,6 +333,11 @@ func (this *TableSchemaCacheTestSuite) TestTableRowMd5Query() { query := table.RowMd5Query() this.Require().Equal("MD5(CONCAT(MD5(COALESCE(`id`, 'NULL_PBj}b]74P@JTo$5G_null')),MD5(COALESCE(`data`, 'NULL_PBj}b]74P@JTo$5G_null')))) AS __ghostferry_row_md5", query) + table = tables[0] + table.Columns[0].IsVirtual = true + query = table.RowMd5Query() + this.Require().Equal("MD5(CONCAT(MD5(COALESCE(`data`, 'NULL_PBj}b]74P@JTo$5G_null')))) AS __ghostferry_row_md5", query) + table = tables[1] table.CompressedColumnsForVerification = map[string]string{"data": "SNAPPY"} query = table.RowMd5Query() diff --git a/testhelpers/integration_test_case.go b/testhelpers/integration_test_case.go index 5f4f767ff..34d8f32fc 100644 --- a/testhelpers/integration_test_case.go +++ b/testhelpers/integration_test_case.go @@ -122,7 +122,7 @@ func (this *IntegrationTestCase) VerifyData() { } if !verificationResult.DataCorrect { - this.T.Fatalf(verificationResult.Message) + this.T.Fatal(verificationResult.Message) } } From 32b12ea1dba4a6861a9e7d56fc26a4e157235dd1 Mon Sep 17 00:00:00 2001 From: Lisandro Pin Date: Tue, 17 Mar 2026 15:02:58 +0100 Subject: [PATCH 2/3] Improve filtering logic, fix integration tests. --- dml_events.go | 68 ++++++++------ row_batch.go | 70 ++------------ table_schema_cache.go | 115 ++++++++++++++++++++--- test/go/dml_events_test.go | 32 +------ test/go/row_batch_test.go | 26 ----- test/go/table_schema_cache_test.go | 25 ++++- test/helpers/db_helper.rb | 11 ++- test/integration/ddl_events_test.rb | 2 +- test/integration/inline_verifier_test.rb | 20 ++-- 9 files changed, 192 insertions(+), 177 deletions(-) diff --git a/dml_events.go b/dml_events.go index 365bce84a..bde66777f 100644 --- a/dml_events.go +++ b/dml_events.go @@ -168,14 +168,15 @@ func (e *BinlogInsertEvent) NewValues() RowData { } func (e *BinlogInsertEvent) AsSQLString(schemaName, tableName string) (string, error) { - if err := verifyValuesHasTheSameLengthAsColumns(e.table, e.newValues); err != nil { + filteredNewValues, err := e.table.FilterGeneratedColumnsOnRowData(e.newValues) + if err != nil { return "", err } query := "INSERT IGNORE INTO " + QuotedTableNameFromString(schemaName, tableName) + - " (" + strings.Join(quotedColumnNamesForInsert(e.table), ",") + ")" + - " VALUES (" + buildStringListForInsertValues(e.table, e.newValues) + ")" + " (" + strings.Join(quotedColumnNames(e.table), ",") + ")" + + " VALUES (" + buildStringListForValues(e.table, filteredNewValues) + ")" return query, nil } @@ -227,8 +228,8 @@ func (e *BinlogUpdateEvent) AsSQLString(schemaName, tableName string) (string, e } query := "UPDATE " + QuotedTableNameFromString(schemaName, tableName) + - " SET " + buildStringMapForSet(e.table.Columns, e.newValues) + - " WHERE " + buildStringMapForWhere(e.table.Columns, e.oldValues) + " SET " + buildStringMapForSet(e.table, e.newValues) + + " WHERE " + buildStringMapForWhere(e.table, e.oldValues) return query, nil } @@ -269,7 +270,7 @@ func (e *BinlogDeleteEvent) AsSQLString(schemaName, tableName string) (string, e } query := "DELETE FROM " + QuotedTableNameFromString(schemaName, tableName) + - " WHERE " + buildStringMapForWhere(e.table.Columns, e.oldValues) + " WHERE " + buildStringMapForWhere(e.table, e.oldValues) return query, nil } @@ -281,16 +282,22 @@ func (e *BinlogDeleteEvent) PaginationKey() (string, error) { func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos, resumablePos mysql.Position, query []byte) ([]DMLEvent, error) { rowsEvent := ev.Event.(*replication.RowsEvent) - for _, row := range rowsEvent.Rows { - if len(row) != len(table.Columns) { + for _, rawRow := range rowsEvent.Rows { + if len(rawRow) != len(table.Columns) { return nil, fmt.Errorf( "table %s.%s has %d columns but event has %d columns instead", table.Schema, table.Name, len(table.Columns), - len(row), + len(rawRow), ) } + + row, err := table.FilterGeneratedColumnsOnRowData(rawRow) + if err != nil { + return nil, err + } + for i, col := range table.Columns { if col.IsUnsigned { switch v := row[i].(type) { @@ -323,14 +330,10 @@ func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos, re } } -func quotedColumnNamesForInsert(table *TableSchema) []string { - cols := []string{} - - for _, c := range table.Columns { - if c.IsVirtual { - continue - } - cols = append(cols, QuoteField(c.Name)) +func quotedColumnNames(table *TableSchema) []string { + cols := make([]string, 0, len(table.Columns)) + for _, name := range table.NonGeneratedColumnNames() { + cols = append(cols, QuoteField(name)) } return cols @@ -351,56 +354,59 @@ func verifyValuesHasTheSameLengthAsColumns(table *TableSchema, values ...RowData return nil } -func buildStringListForInsertValues(table *TableSchema, values []interface{}) string { +func buildStringListForValues(table *TableSchema, values []interface{}) string { var buffer []byte for i, value := range values { - if table.Columns[i].IsVirtual { - continue - } - - if len(buffer) != 0 { + if len(buffer) > 0 { buffer = append(buffer, ',') } + buffer = appendEscapedValue(buffer, value, table.Columns[i]) } return string(buffer) } -func buildStringMapForWhere(columns []schema.TableColumn, values []interface{}) string { +func buildStringMapForWhere(table *TableSchema, values []interface{}) string { var buffer []byte for i, value := range values { - if i > 0 { + if table.IsColumnIndexGenerated(i) { + continue + } + if len(buffer) > 0 { buffer = append(buffer, " AND "...) } - buffer = append(buffer, QuoteField(columns[i].Name)...) + buffer = append(buffer, QuoteField(table.Columns[i].Name)...) if isNilValue(value) { // "WHERE value = NULL" will never match rows. buffer = append(buffer, " IS NULL"...) } else { buffer = append(buffer, '=') - buffer = appendEscapedValue(buffer, value, columns[i]) + buffer = appendEscapedValue(buffer, value, table.Columns[i]) } } return string(buffer) } -func buildStringMapForSet(columns []schema.TableColumn, values []interface{}) string { +func buildStringMapForSet(table *TableSchema, values []interface{}) string { var buffer []byte for i, value := range values { - if i > 0 { + if table.IsColumnIndexGenerated(i) { + continue + } + if len(buffer) > 0 { buffer = append(buffer, ',') } - buffer = append(buffer, QuoteField(columns[i].Name)...) + buffer = append(buffer, QuoteField(table.Columns[i].Name)...) buffer = append(buffer, '=') - buffer = appendEscapedValue(buffer, value, columns[i]) + buffer = appendEscapedValue(buffer, value, table.Columns[i]) } return string(buffer) diff --git a/row_batch.go b/row_batch.go index 8e666f573..3d10dd244 100644 --- a/row_batch.go +++ b/row_batch.go @@ -64,77 +64,27 @@ func (e *RowBatch) AsSQLQuery(schemaName, tableName string) (string, []interface return "", nil, err } - vcm := e.virtualColumnsMap() - valuesStr := "(" + strings.Repeat("?,", e.activeColumnCount(vcm)-1) + "?)" + filteredColumns := e.table.NonGeneratedColumnNames() + + valuesStr := "(" + strings.Repeat("?,", len(filteredColumns)-1) + "?)" valuesStr = strings.Repeat(valuesStr+",", len(e.values)-1) + valuesStr query := "INSERT IGNORE INTO " + QuotedTableNameFromString(schemaName, tableName) + - " (" + e.quotedFields(vcm) + ") VALUES " + valuesStr - - return query, e.flattenRowData(vcm), nil -} - -// virtualColumnsMap returns a map of given columns (by index) -> whether the column is virtual (i.e. generated). -func (e *RowBatch) virtualColumnsMap() map[int]bool { - res := map[int]bool{} - - for i, name := range e.columns { - isVirtual := false - for _, c := range e.table.Columns { - if name == c.Name && c.IsVirtual { - isVirtual = true - break - } - } - - res[i] = isVirtual - } - - return res -} - -// activeColumnCount returns the number of active (non-virtual) columns for this RowBatch. -func (e *RowBatch) activeColumnCount(vcm map[int]bool) int { - if vcm == nil { - return len(e.columns) - } - - count := 0 - for _, isVirtual := range vcm { - if !isVirtual { - count++ - } - } - return count -} - -// quotedFields returns a string with comma-separated quoted field names for INSERTs. -func (e *RowBatch) quotedFields(vcm map[int]bool) string { - cols := []string{} - for i, name := range e.columns { - if vcm != nil && vcm[i] { - continue - } - cols = append(cols, name) - } + " (" + strings.Join(QuoteFields(filteredColumns), ",") + ") VALUES " + valuesStr - return strings.Join(QuoteFields(cols), ",") + return query, e.flattenRowData(), nil } -// flattenRowData flattens RowData values into a single array for INSERTs. -func (e *RowBatch) flattenRowData(vcm map[int]bool) []interface{} { - rowSize := e.activeColumnCount(vcm) - flattened := make([]interface{}, rowSize*len(e.values)) +func (e *RowBatch) flattenRowData() []interface{} { + flattened := make([]interface{}, 0, len(e.values)) - for rowIdx, row := range e.values { - i := 0 + for _, row := range e.values { for colIdx, col := range row { - if vcm != nil && vcm[colIdx] { + if e.table.IsColumnIndexGenerated(colIdx) { continue } - flattened[rowIdx*rowSize+i] = col - i++ + flattened = append(flattened, col) } } diff --git a/table_schema_cache.go b/table_schema_cache.go index 1541ff7d3..d060aa047 100644 --- a/table_schema_cache.go +++ b/table_schema_cache.go @@ -46,6 +46,89 @@ type TableSchema struct { rowMd5Query string } +// IsColumnGenerated evaluates whether a go_myslq.schema.TableColumn is generated or not. +func IsColumnGenerated(tc *schema.TableColumn) bool { + return tc.IsVirtual || tc.IsStored +} + +// IsColumnIndexGenerated evaluates whether a TableSchema column is generated, by index. +func (t *TableSchema) IsColumnIndexGenerated(idx int) bool { + return IsColumnGenerated(&t.Columns[idx]) +} + +// Evaluates whether a TableSchema column is generated, by name. +func (t *TableSchema) IsColumnNameGenerated(name string) bool { + for _, col := range t.Columns { + if name == col.Name && IsColumnGenerated(&col) { + return true + } + } + + return false +} + +// Returns a count of total, generated and non-generated columns for a TableSchema. +func (t *TableSchema) ColumnsCount() (int, int, int) { + var generated int + + for _, col := range t.Columns { + if IsColumnGenerated(&col) { + generated += 1 + } + } + + return len(t.Columns), generated, len(t.Columns) - generated +} + +// Returns a list of all non-generated column names for a TableSchema, in schema order. +func (t *TableSchema) NonGeneratedColumnNames() []string { + res := make([]string, 0, len(t.Columns)) + + for _, col := range t.Columns { + if IsColumnGenerated(&col) { + continue + } + res = append(res, col.Name) + } + + return res +} + +// FilterGeneratedColumnsOnRowData takes a row (as slice of RowData elements) and returns +// a copy with elements for generated columns removed. +func (t *TableSchema) FilterGeneratedColumnsOnRowData(row []interface{}) ([]interface{}, error) { + columnsCount, _, nonGeneratedColumnsCount := t.ColumnsCount() + + if len(row) != columnsCount { + return nil, fmt.Errorf( + "table %s.%s has %d columns but row has %d columns instead", + t.Schema, + t.Name, + columnsCount, + len(row), + ) + } + + res := make([]interface{}, 0, len(row)) + for i, val := range row { + if t.IsColumnIndexGenerated(i) { + continue + } + res = append(res, val) + } + + if len(res) != nonGeneratedColumnsCount { + return nil, fmt.Errorf( + "table %s.%s has %d updatable columns but processed row has %d updatable columns instead", + t.Schema, + t.Name, + nonGeneratedColumnsCount, + len(res), + ) + } + return res, nil +} + // This query returns the MD5 hash for a row on this table. This query is valid // for both the source and the target shard. // @@ -90,11 +173,12 @@ func (t *TableSchema) RowMd5Query() string { } columns := make([]schema.TableColumn, 0, len(t.Columns)) - for _, column := range t.Columns { + for i, column := range t.Columns { _, isCompressed := t.CompressedColumnsForVerification[column.Name] _, isIgnored := t.IgnoredColumnsForVerification[column.Name] + isGenerated := t.IsColumnIndexGenerated(i) - if isCompressed || isIgnored || column.IsVirtual { + if isCompressed || isIgnored || isGenerated { continue } @@ -151,6 +235,19 @@ func MaxPaginationKeys(db *sql.DB, tables []*TableSchema, logger *logrus.Entry) return tablesWithData, emptyTables, nil } +// removeInvisibleIndeces removes all invisible idx references from a go_mysql.schema.Table. +func removeInvisibleIndexes(ts *schema.Table) { + j := 0 + for i, index := range ts.Indexes { + if !index.Visible { + continue + } + ts.Indexes[j] = ts.Indexes[i] + j++ + } + ts.Indexes = ts.Indexes[:j] +} + func LoadTables(db *sql.DB, tableFilter TableFilter, columnCompressionConfig ColumnCompressionConfig, columnIgnoreConfig ColumnIgnoreConfig, forceIndexConfig ForceIndexConfig, cascadingPaginationColumnConfig *CascadingPaginationColumnConfig) (TableSchemaCache, error) { logger := logrus.WithField("tag", "table_schema_cache") @@ -189,14 +286,8 @@ func LoadTables(db *sql.DB, tableFilter TableFilter, columnCompressionConfig Col return tableSchemaCache, err } - // Filter out invisible indexes - visibleIndexes := make([]*schema.Index, 0, len(tableSchema.Indexes)) - for _, index := range tableSchema.Indexes { - if index.Visible { - visibleIndexes = append(visibleIndexes, index) - } - } - tableSchema.Indexes = visibleIndexes + // filter out unwanted indeces and columns + removeInvisibleIndexes(tableSchema) tableSchemas = append(tableSchemas, &TableSchema{ Table: tableSchema, @@ -448,7 +539,7 @@ func maxPaginationKey(db *sql.DB, table *TableSchema) (PaginationKey, bool, erro if err != nil { break } - + var binValue []byte switch v := val.(type) { case []byte: @@ -458,7 +549,7 @@ func maxPaginationKey(db *sql.DB, table *TableSchema) (PaginationKey, bool, erro default: err = fmt.Errorf("expected binary/string for max key, got %T", val) } - + if err == nil { result = NewBinaryKeyWithColumn(primaryKeyColumn.Name, binValue) } diff --git a/test/go/dml_events_test.go b/test/go/dml_events_test.go index c78c30846..d2e7acbd5 100644 --- a/test/go/dml_events_test.go +++ b/test/go/dml_events_test.go @@ -83,36 +83,6 @@ func (this *DMLEventsTestSuite) TestBinlogInsertEventGeneratesInsertQuery() { this.Require().Equal("INSERT IGNORE INTO `target_schema`.`target_table` (`col1`,`col2`,`col3`) VALUES (1002,CAST('{\"val\": 42.0}' AS JSON),0)", q3) } -func (this *DMLEventsTestSuite) TestBinlogInsertEventGeneratesInsertQueryWithVirtualColumns() { - rowsEvent := &replication.RowsEvent{ - Table: this.tableMapEvent, - Rows: [][]interface{}{ - {1000, []byte("val1"), true}, - {1001, []byte("val2"), false}, - {1002, "{\"val\": 42.0}", false}, - }, - } - - // column 'col1' (#0) is generated so we should not insert into it. - this.targetTable.Columns[0].IsVirtual = true - - dmlEvents, err := ghostferry.NewBinlogInsertEvents(this.eventBase, rowsEvent) - this.Require().Nil(err) - this.Require().Equal(3, len(dmlEvents)) - - q1, err := dmlEvents[0].AsSQLString(this.targetTable.Schema, this.targetTable.Name) - this.Require().Nil(err) - this.Require().Equal("INSERT IGNORE INTO `target_schema`.`target_table` (`col2`,`col3`) VALUES (_binary'val1',1)", q1) - - q2, err := dmlEvents[1].AsSQLString(this.targetTable.Schema, this.targetTable.Name) - this.Require().Nil(err) - this.Require().Equal("INSERT IGNORE INTO `target_schema`.`target_table` (`col2`,`col3`) VALUES (_binary'val2',0)", q2) - - q3, err := dmlEvents[2].AsSQLString(this.targetTable.Schema, this.targetTable.Name) - this.Require().Nil(err) - this.Require().Equal("INSERT IGNORE INTO `target_schema`.`target_table` (`col2`,`col3`) VALUES (CAST('{\"val\": 42.0}' AS JSON),0)", q3) -} - func (this *DMLEventsTestSuite) TestBinlogInsertEventWithWrongColumnsReturnsError() { rowsEvent := &replication.RowsEvent{ Table: this.tableMapEvent, @@ -125,7 +95,7 @@ func (this *DMLEventsTestSuite) TestBinlogInsertEventWithWrongColumnsReturnsErro _, err = dmlEvents[0].AsSQLString(this.targetTable.Schema, this.targetTable.Name) this.Require().NotNil(err) - this.Require().Contains(err.Error(), "test_table has 3 columns but event has 1 column") + this.Require().Contains(err.Error(), "test_table has 3 columns but row has 1 column") } func (this *DMLEventsTestSuite) TestBinlogInsertEventMetadata() { diff --git a/test/go/row_batch_test.go b/test/go/row_batch_test.go index 633402dea..62ffbf9dd 100644 --- a/test/go/row_batch_test.go +++ b/test/go/row_batch_test.go @@ -73,32 +73,6 @@ func (this *RowBatchTestSuite) TestRowBatchGeneratesInsertQuery() { this.Require().Equal(expected, v1) } -func (this *RowBatchTestSuite) TestRowBatchGeneratesInsertQueryWithVirtualColumns() { - vals := []ghostferry.RowData{ - ghostferry.RowData{1000, []byte("val1"), true}, - ghostferry.RowData{1001, []byte("val2"), true}, - ghostferry.RowData{1002, []byte("val3"), true}, - } - - // column 'col2' (#1) is generated so we should not insert into it. - this.targetTable.Columns[1].IsVirtual = true - - batch := ghostferry.NewRowBatch(this.sourceTable, vals, 0) - this.Require().Equal(vals, batch.Values()) - - q1, v1, err := batch.AsSQLQuery(this.targetTable.Schema, this.targetTable.Name) - this.Require().Nil(err) - this.Require().Equal("INSERT IGNORE INTO `target_schema`.`target_table` (`col1`,`col3`) VALUES (?,?),(?,?),(?,?)", q1) - - expected := []interface{}{ - 1000, true, - 1001, true, - 1002, true, - } - - this.Require().Equal(expected, v1) -} - func (this *RowBatchTestSuite) TestRowBatchWithWrongColumnsReturnsError() { vals := []ghostferry.RowData{ ghostferry.RowData{1000, []byte("val0"), true}, diff --git a/test/go/table_schema_cache_test.go b/test/go/table_schema_cache_test.go index 3f206be71..a8bad06c7 100644 --- a/test/go/table_schema_cache_test.go +++ b/test/go/table_schema_cache_test.go @@ -333,17 +333,32 @@ func (this *TableSchemaCacheTestSuite) TestTableRowMd5Query() { query := table.RowMd5Query() this.Require().Equal("MD5(CONCAT(MD5(COALESCE(`id`, 'NULL_PBj}b]74P@JTo$5G_null')),MD5(COALESCE(`data`, 'NULL_PBj}b]74P@JTo$5G_null')))) AS __ghostferry_row_md5", query) - table = tables[0] - table.Columns[0].IsVirtual = true - query = table.RowMd5Query() - this.Require().Equal("MD5(CONCAT(MD5(COALESCE(`data`, 'NULL_PBj}b]74P@JTo$5G_null')))) AS __ghostferry_row_md5", query) - table = tables[1] table.CompressedColumnsForVerification = map[string]string{"data": "SNAPPY"} query = table.RowMd5Query() this.Require().Equal("MD5(CONCAT(MD5(COALESCE(`id`, 'NULL_PBj}b]74P@JTo$5G_null')))) AS __ghostferry_row_md5", query) } +func (this *TableSchemaCacheTestSuite) TestTableRowMd5QueryWithVirtualField() { + tableSchemaCache, err := ghostferry.LoadTables(this.Ferry.SourceDB, this.tableFilter, nil, nil, nil, nil) + this.Require().Nil(err) + + tables := tableSchemaCache.AsSlice() + table := tables[0] + table.Columns[0].IsVirtual = true + this.Require().Equal("MD5(CONCAT(MD5(COALESCE(`data`, 'NULL_PBj}b]74P@JTo$5G_null')))) AS __ghostferry_row_md5", table.RowMd5Query()) +} + +func (this *TableSchemaCacheTestSuite) TestTableRowMd5QueryWithStoredField() { + tableSchemaCache, err := ghostferry.LoadTables(this.Ferry.SourceDB, this.tableFilter, nil, nil, nil, nil) + this.Require().Nil(err) + + tables := tableSchemaCache.AsSlice() + table := tables[0] + table.Columns[1].IsStored = true + this.Require().Equal("MD5(CONCAT(MD5(COALESCE(`id`, 'NULL_PBj}b]74P@JTo$5G_null')))) AS __ghostferry_row_md5", table.RowMd5Query()) +} + func (this *TableSchemaCacheTestSuite) TestFingerprintQueryWithIgnoredColumns() { tableSchemaCache, err := ghostferry.LoadTables(this.Ferry.SourceDB, this.tableFilter, nil, nil, nil, nil) this.Require().Nil(err) diff --git a/test/helpers/db_helper.rb b/test/helpers/db_helper.rb index 405d7def0..e23aba78e 100644 --- a/test/helpers/db_helper.rb +++ b/test/helpers/db_helper.rb @@ -124,7 +124,16 @@ def seed_random_data(connection, database_name: DEFAULT_DB, table_name: DEFAULT_ dbtable = full_table_name(database_name, table_name) connection.query("CREATE DATABASE IF NOT EXISTS #{database_name}") - connection.query("CREATE TABLE IF NOT EXISTS #{dbtable} (id bigint(20) not null auto_increment, data TEXT, primary key(id))") + connection.query(" + CREATE TABLE IF NOT EXISTS #{dbtable} ( + id BIGINT(20) NOT NULL AUTO_INCREMENT, + data TEXT, + /* generated columns should be ignored */ + length BIGINT(20) AS (LENGTH(data)) VIRTUAL, + summary VARCHAR(32) AS (MD5(data)) STORED, + PRIMARY KEY(id) + ) + ") return if number_of_rows == 0 diff --git a/test/integration/ddl_events_test.rb b/test/integration/ddl_events_test.rb index c6d608d45..fce053026 100644 --- a/test/integration/ddl_events_test.rb +++ b/test/integration/ddl_events_test.rb @@ -30,7 +30,7 @@ def test_ddl_event_handler_with_ddl_events ghostferry = new_ghostferry(DDL_GHOSTFERRY) ghostferry.on_status(GhostferryHelper::Ghostferry::Status::BINLOG_STREAMING_STARTED) do - source_db.query("INSERT INTO #{table_name} VALUES (9000, 'test')") + source_db.query("INSERT INTO #{table_name} (id, data) VALUES (9000, 'test')") source_db.query("ALTER TABLE #{table_name} ADD INDEX (data(100))") source_db.query("INSERT INTO #{table_name} (id, data) VALUES (9001, 'test')") end diff --git a/test/integration/inline_verifier_test.rb b/test/integration/inline_verifier_test.rb index b70bccb84..a99830ce3 100644 --- a/test/integration/inline_verifier_test.rb +++ b/test/integration/inline_verifier_test.rb @@ -429,8 +429,8 @@ def test_positive_negative_zero # indeed running as the nominal case (comparing 0.0 and -0.0) should not # emit any error and thus we cannot say for certain if the InlineVerifier # ran or not. - source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} VALUES (1, 0.0)") - target_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} VALUES (1, 1.0)") + source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (id, data) VALUES (1, 0.0)") + target_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (id, data) VALUES (1, 1.0)") ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY, config: { verifier_type: "Inline" }) @@ -468,8 +468,8 @@ def test_null_vs_null seed_random_data(source_db, number_of_rows: 0) seed_random_data(target_db, number_of_rows: 0) - source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} VALUES (1, NULL)") - target_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} VALUES (1, NULL)") + source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (id, data) VALUES (1, NULL)") + target_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (id, data) VALUES (1, NULL)") verification_ran = false ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY, config: { verifier_type: "Inline" }) @@ -486,8 +486,8 @@ def test_null_vs_empty_string seed_random_data(source_db, number_of_rows: 0) seed_random_data(target_db, number_of_rows: 0) - source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} VALUES (1, NULL)") - target_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} VALUES (1, '')") + source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (id, data) VALUES (1, NULL)") + target_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (id, data) VALUES (1, '')") ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY, config: { verifier_type: "Inline" }) @@ -512,8 +512,8 @@ def test_null_vs_null_string seed_random_data(source_db, number_of_rows: 0) seed_random_data(target_db, number_of_rows: 0) - source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} VALUES (1, NULL)") - target_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} VALUES (1, 'NULL')") + source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (id, data) VALUES (1, NULL)") + target_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (id, data) VALUES (1, 'NULL')") ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY, config: { verifier_type: "Inline" }) @@ -542,8 +542,8 @@ def test_null_in_different_order source_db.query("ALTER TABLE #{DEFAULT_FULL_TABLE_NAME} ADD COLUMN data2 VARCHAR(255) AFTER data") target_db.query("ALTER TABLE #{DEFAULT_FULL_TABLE_NAME} ADD COLUMN data2 VARCHAR(255) AFTER data") - source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} VALUES (1, NULL, 'data')") - target_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} VALUES (1, 'data', NULL)") + source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (id, data, data2) VALUES (1, NULL, 'data')") + target_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (id, data, data2) VALUES (1, 'data', NULL)") ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY, config: { verifier_type: "Inline" }) From 9d6b51aadf4ef22f142a73727754e265921f5d5b Mon Sep 17 00:00:00 2001 From: Lisandro Pin Date: Thu, 16 Apr 2026 15:03:29 +0200 Subject: [PATCH 3/3] Fix type and metadata handling bug when filtering generated columns on DML events. Includes a number of relevant unit tests proposed by @driv3r . --- dml_events.go | 34 ++++----- table_schema_cache.go | 14 ++++ test/go/dml_events_test.go | 142 +++++++++++++++++++++++++++++++++++++ 3 files changed, 173 insertions(+), 17 deletions(-) diff --git a/dml_events.go b/dml_events.go index bde66777f..c0810e7b1 100644 --- a/dml_events.go +++ b/dml_events.go @@ -168,6 +168,7 @@ func (e *BinlogInsertEvent) NewValues() RowData { } func (e *BinlogInsertEvent) AsSQLString(schemaName, tableName string) (string, error) { + filteredCols := e.table.NonGeneratedColumns() filteredNewValues, err := e.table.FilterGeneratedColumnsOnRowData(e.newValues) if err != nil { return "", err @@ -175,8 +176,8 @@ func (e *BinlogInsertEvent) AsSQLString(schemaName, tableName string) (string, e query := "INSERT IGNORE INTO " + QuotedTableNameFromString(schemaName, tableName) + - " (" + strings.Join(quotedColumnNames(e.table), ",") + ")" + - " VALUES (" + buildStringListForValues(e.table, filteredNewValues) + ")" + " (" + strings.Join(quotedColumnNames(filteredCols), ",") + ")" + + " VALUES (" + buildStringListForValues(filteredCols, filteredNewValues) + ")" return query, nil } @@ -282,23 +283,21 @@ func (e *BinlogDeleteEvent) PaginationKey() (string, error) { func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos, resumablePos mysql.Position, query []byte) ([]DMLEvent, error) { rowsEvent := ev.Event.(*replication.RowsEvent) - for _, rawRow := range rowsEvent.Rows { - if len(rawRow) != len(table.Columns) { + for _, row := range rowsEvent.Rows { + if len(row) != len(table.Columns) { return nil, fmt.Errorf( "table %s.%s has %d columns but event has %d columns instead", table.Schema, table.Name, len(table.Columns), - len(rawRow), + len(row), ) } - row, err := table.FilterGeneratedColumnsOnRowData(rawRow) - if err != nil { - return nil, err - } - for i, col := range table.Columns { + if table.IsColumnIndexGenerated(i) { + continue + } if col.IsUnsigned { switch v := row[i].(type) { case int64: @@ -330,13 +329,14 @@ func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos, re } } -func quotedColumnNames(table *TableSchema) []string { - cols := make([]string, 0, len(table.Columns)) - for _, name := range table.NonGeneratedColumnNames() { - cols = append(cols, QuoteField(name)) +func quotedColumnNames(cols []schema.TableColumn) []string { + qnames := make([]string, len(cols)) + + for i, col := range cols { + qnames[i] = QuoteField(col.Name) } - return cols + return qnames } func verifyValuesHasTheSameLengthAsColumns(table *TableSchema, values ...RowData) error { @@ -354,7 +354,7 @@ func verifyValuesHasTheSameLengthAsColumns(table *TableSchema, values ...RowData return nil } -func buildStringListForValues(table *TableSchema, values []interface{}) string { +func buildStringListForValues(cols []schema.TableColumn, values []interface{}) string { var buffer []byte for i, value := range values { @@ -362,7 +362,7 @@ func buildStringListForValues(table *TableSchema, values []interface{}) string { buffer = append(buffer, ',') } - buffer = appendEscapedValue(buffer, value, table.Columns[i]) + buffer = appendEscapedValue(buffer, value, cols[i]) } return string(buffer) diff --git a/table_schema_cache.go b/table_schema_cache.go index d060aa047..358580c45 100644 --- a/table_schema_cache.go +++ b/table_schema_cache.go @@ -80,6 +80,20 @@ func (t *TableSchema) ColumnsCount() (int, int, int) { return len(t.Columns), generated, len(t.Columns) - generated } +// Returns a list of all non-generated columns for a TableSchema, in schema order. +func (t *TableSchema) NonGeneratedColumns() []schema.TableColumn { + res := make([]schema.TableColumn, 0, len(t.Columns)) + + for _, col := range t.Columns { + if IsColumnGenerated(&col) { + continue + } + res = append(res, col) + } + + return res +} + // Returns a list of all non-generated column names for a TableSchema, in schema order. func (t *TableSchema) NonGeneratedColumnNames() []string { res := make([]string, 0, len(t.Columns)) diff --git a/test/go/dml_events_test.go b/test/go/dml_events_test.go index d2e7acbd5..3ade4da95 100644 --- a/test/go/dml_events_test.go +++ b/test/go/dml_events_test.go @@ -390,6 +390,148 @@ func (this *DMLEventsTestSuite) TestNoRowsQueryEvent() { this.Require().Equal("", annotation) } +// TestNewBinlogDMLEventsUnsignedConversionWithGeneratedColumn verfies that +// sign conversion logic on tables with generated columns. +func (this *DMLEventsTestSuite) TestNewBinlogDMLEventsUnsignedConversionWithGeneratedColumn() { + columns := []schema.TableColumn{ + {Name: "id"}, + {Name: "gen", IsVirtual: true}, + {Name: "u8", IsUnsigned: true}, + } + table := &ghostferry.TableSchema{ + Table: &schema.Table{ + Schema: "test_schema", + Name: "test_table", + Columns: columns, + }, + } + + ev := &replication.BinlogEvent{ + Header: &replication.EventHeader{EventType: replication.WRITE_ROWS_EVENTv2}, + Event: &replication.RowsEvent{ + Rows: [][]interface{}{ + {int64(1000), "gen_val", int8(-1)}, + }, + }, + } + + dmlEvents, err := ghostferry.NewBinlogDMLEvents(table, ev, mysql.Position{}, mysql.Position{}, nil) + this.Require().Nil(err) + this.Require().Equal(1, len(dmlEvents)) + + q, err := dmlEvents[0].AsSQLString("test_schema", "test_table") + this.Require().Nil(err) + this.Require().Equal( + "INSERT IGNORE INTO `test_schema`.`test_table` (`id`,`u8`) VALUES (1000,255)", + q, + ) +} + +// TestBinlogInsertEventGeneratedColumnBeforeJSONPreservesJSONCasting verifies +// JSON metadata aligment when processing events with generated columnns. +func (this *DMLEventsTestSuite) TestBinlogInsertEventGeneratedColumnBeforeJSONPreservesJSONCasting() { + columns := []schema.TableColumn{ + {Name: "gen", IsVirtual: true}, + {Name: "payload", Type: schema.TYPE_JSON}, + } + table := &ghostferry.TableSchema{ + Table: &schema.Table{ + Schema: "test_schema", + Name: "test_table", + Columns: columns, + }, + } + eventBase := ghostferry.NewDMLEventBase(table, mysql.Position{}, mysql.Position{}, nil, time.Unix(1618318965, 0)) + + rowsEvent := &replication.RowsEvent{ + Table: this.tableMapEvent, + Rows: [][]interface{}{{"gen_val", []byte("payload_data")}}, + } + + dmlEvents, err := ghostferry.NewBinlogInsertEvents(eventBase, rowsEvent) + this.Require().Nil(err) + this.Require().Equal(1, len(dmlEvents)) + + q, err := dmlEvents[0].AsSQLString("test_schema", "test_table") + this.Require().Nil(err) + this.Require().Equal( + "INSERT IGNORE INTO `test_schema`.`test_table` (`payload`) VALUES (CAST('payload_data' AS JSON))", + q, + ) +} + +// TestBinlogUpdateEventExcludesGeneratedColumnFromSetAndWhere verifies that +// UPDATE events for tables with virtual columns emit SET and WHERE clauses that +// reference only real (non-generated) columns. +func (this *DMLEventsTestSuite) TestBinlogUpdateEventExcludesGeneratedColumnFromSetAndWhere() { + columns := []schema.TableColumn{ + {Name: "id"}, + {Name: "gen", IsVirtual: true}, + {Name: "data"}, + } + table := &ghostferry.TableSchema{ + Table: &schema.Table{ + Schema: "test_schema", + Name: "test_table", + Columns: columns, + }, + } + eventBase := ghostferry.NewDMLEventBase(table, mysql.Position{}, mysql.Position{}, nil, time.Unix(1618318965, 0)) + + rowsEvent := &replication.RowsEvent{ + Table: this.tableMapEvent, + Rows: [][]interface{}{ + {int64(1000), "gen_old", "old_data"}, + {int64(1000), "gen_new", "new_data"}, + }, + } + + dmlEvents, err := ghostferry.NewBinlogUpdateEvents(eventBase, rowsEvent) + this.Require().Nil(err) + this.Require().Equal(1, len(dmlEvents)) + + q, err := dmlEvents[0].AsSQLString("test_schema", "test_table") + this.Require().Nil(err) + this.Require().Equal( + "UPDATE `test_schema`.`test_table` SET `id`=1000,`data`='new_data' WHERE `id`=1000 AND `data`='old_data'", + q, + ) +} + +// TestBinlogDeleteEventExcludesStoredGeneratedColumnFromWhere verifies that +// DELETE events skip both VIRTUAL and STORED generated columns in the WHERE clause. +func (this *DMLEventsTestSuite) TestBinlogDeleteEventExcludesStoredGeneratedColumnFromWhere() { + columns := []schema.TableColumn{ + {Name: "id"}, + {Name: "data"}, + {Name: "summary", IsStored: true}, + } + table := &ghostferry.TableSchema{ + Table: &schema.Table{ + Schema: "test_schema", + Name: "test_table", + Columns: columns, + }, + } + eventBase := ghostferry.NewDMLEventBase(table, mysql.Position{}, mysql.Position{}, nil, time.Unix(1618318965, 0)) + + rowsEvent := &replication.RowsEvent{ + Table: this.tableMapEvent, + Rows: [][]interface{}{{int64(1000), "hello", "abc123"}}, + } + + dmlEvents, err := ghostferry.NewBinlogDeleteEvents(eventBase, rowsEvent) + this.Require().Nil(err) + this.Require().Equal(1, len(dmlEvents)) + + q, err := dmlEvents[0].AsSQLString("test_schema", "test_table") + this.Require().Nil(err) + this.Require().Equal( + "DELETE FROM `test_schema`.`test_table` WHERE `id`=1000 AND `data`='hello'", + q, + ) +} + func TestDMLEventsTestSuite(t *testing.T) { suite.Run(t, new(DMLEventsTestSuite)) }