diff --git a/dml_events.go b/dml_events.go index 7b8a00f1d..c0810e7b1 100644 --- a/dml_events.go +++ b/dml_events.go @@ -42,14 +42,14 @@ type RowData []interface{} // https://github.com/Shopify/ghostferry/issues/165. // // In summary: -// - This code receives values from both go-sql-driver/mysql and -// go-mysql-org/go-mysql. -// - go-sql-driver/mysql gives us int64 for signed integer, and uint64 in a byte -// slice for unsigned integer. -// - go-mysql-org/go-mysql gives us int64 for signed integer, and uint64 for -// unsigned integer. -// - We currently make this function deal with both cases. In the future we can -// investigate alternative solutions. +// - This code receives values from both go-sql-driver/mysql and +// go-mysql-org/go-mysql. +// - go-sql-driver/mysql gives us int64 for signed integer, and uint64 in a byte +// slice for unsigned integer. +// - go-mysql-org/go-mysql gives us int64 for signed integer, and uint64 for +// unsigned integer. +// - We currently make this function deal with both cases. In the future we can +// investigate alternative solutions. func (r RowData) GetUint64(colIdx int) (uint64, error) { u64, ok := Uint64Value(r[colIdx]) if ok { @@ -168,14 +168,16 @@ func (e *BinlogInsertEvent) NewValues() RowData { } func (e *BinlogInsertEvent) AsSQLString(schemaName, tableName string) (string, error) { - if err := verifyValuesHasTheSameLengthAsColumns(e.table, e.newValues); err != nil { + filteredCols := e.table.NonGeneratedColumns() + filteredNewValues, err := e.table.FilterGeneratedColumnsOnRowData(e.newValues) + if err != nil { return "", err } query := "INSERT IGNORE INTO " + QuotedTableNameFromString(schemaName, tableName) + - " (" + strings.Join(quotedColumnNames(e.table), ",") + ")" + - " VALUES (" + buildStringListForValues(e.table.Columns, e.newValues) + ")" + " (" + strings.Join(quotedColumnNames(filteredCols), ",") + ")" + + " VALUES (" + buildStringListForValues(filteredCols, filteredNewValues) + ")" return query, nil } @@ -227,8 +229,8 @@ func (e *BinlogUpdateEvent) AsSQLString(schemaName, tableName string) (string, e } query := "UPDATE " + QuotedTableNameFromString(schemaName, tableName) + - " SET " + buildStringMapForSet(e.table.Columns, e.newValues) + - " WHERE " + buildStringMapForWhere(e.table.Columns, e.oldValues) + " SET " + buildStringMapForSet(e.table, e.newValues) + + " WHERE " + buildStringMapForWhere(e.table, e.oldValues) return query, nil } @@ -269,7 +271,7 @@ func (e *BinlogDeleteEvent) AsSQLString(schemaName, tableName string) (string, e } query := "DELETE FROM " + QuotedTableNameFromString(schemaName, tableName) + - " WHERE " + buildStringMapForWhere(e.table.Columns, e.oldValues) + " WHERE " + buildStringMapForWhere(e.table, e.oldValues) return query, nil } @@ -291,7 +293,11 @@ func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos, re len(row), ) } + for i, col := range table.Columns { + if table.IsColumnIndexGenerated(i) { + continue + } if col.IsUnsigned { switch v := row[i].(type) { case int64: @@ -323,13 +329,14 @@ func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos, re } } -func quotedColumnNames(table *TableSchema) []string { - cols := make([]string, len(table.Columns)) - for i, column := range table.Columns { - cols[i] = QuoteField(column.Name) +func quotedColumnNames(cols []schema.TableColumn) []string { + qnames := make([]string, len(cols)) + + for i, col := range cols { + qnames[i] = QuoteField(col.Name) } - return cols + return qnames } func verifyValuesHasTheSameLengthAsColumns(table *TableSchema, values ...RowData) error { @@ -347,53 +354,59 @@ func verifyValuesHasTheSameLengthAsColumns(table *TableSchema, values ...RowData return nil } -func buildStringListForValues(columns []schema.TableColumn, values []interface{}) string { +func buildStringListForValues(cols []schema.TableColumn, values []interface{}) string { var buffer []byte for i, value := range values { - if i > 0 { + if len(buffer) > 0 { buffer = append(buffer, ',') } - buffer = appendEscapedValue(buffer, value, columns[i]) + buffer = appendEscapedValue(buffer, value, cols[i]) } return string(buffer) } -func buildStringMapForWhere(columns []schema.TableColumn, values []interface{}) string { +func buildStringMapForWhere(table *TableSchema, values []interface{}) string { var buffer []byte for i, value := range values { - if i > 0 { + if table.IsColumnIndexGenerated(i) { + continue + } + if len(buffer) > 0 { buffer = append(buffer, " AND "...) } - buffer = append(buffer, QuoteField(columns[i].Name)...) + buffer = append(buffer, QuoteField(table.Columns[i].Name)...) if isNilValue(value) { // "WHERE value = NULL" will never match rows. buffer = append(buffer, " IS NULL"...) } else { buffer = append(buffer, '=') - buffer = appendEscapedValue(buffer, value, columns[i]) + buffer = appendEscapedValue(buffer, value, table.Columns[i]) } } return string(buffer) } -func buildStringMapForSet(columns []schema.TableColumn, values []interface{}) string { +func buildStringMapForSet(table *TableSchema, values []interface{}) string { var buffer []byte for i, value := range values { - if i > 0 { + if table.IsColumnIndexGenerated(i) { + continue + } + if len(buffer) > 0 { buffer = append(buffer, ',') } - buffer = append(buffer, QuoteField(columns[i].Name)...) + buffer = append(buffer, QuoteField(table.Columns[i].Name)...) buffer = append(buffer, '=') - buffer = appendEscapedValue(buffer, value, columns[i]) + buffer = appendEscapedValue(buffer, value, table.Columns[i]) } return string(buffer) @@ -504,10 +517,10 @@ func Int64Value(value interface{}) (int64, bool) { // // This is specifically mentioned in the the below link: // -// When BINARY values are stored, they are right-padded with the pad value -// to the specified length. The pad value is 0x00 (the zero byte). Values -// are right-padded with 0x00 for inserts, and no trailing bytes are removed -// for retrievals. +// When BINARY values are stored, they are right-padded with the pad value +// to the specified length. The pad value is 0x00 (the zero byte). Values +// are right-padded with 0x00 for inserts, and no trailing bytes are removed +// for retrievals. // // ref: https://dev.mysql.com/doc/refman/5.7/en/binary-varbinary.html func appendEscapedString(buffer []byte, value string, rightPadToLengthWithZeroBytes int) []byte { diff --git a/row_batch.go b/row_batch.go index 7a865a6f6..3d10dd244 100644 --- a/row_batch.go +++ b/row_batch.go @@ -64,23 +64,27 @@ func (e *RowBatch) AsSQLQuery(schemaName, tableName string) (string, []interface return "", nil, err } - valuesStr := "(" + strings.Repeat("?,", len(e.columns)-1) + "?)" + filteredColumns := e.table.NonGeneratedColumnNames() + + valuesStr := "(" + strings.Repeat("?,", len(filteredColumns)-1) + "?)" valuesStr = strings.Repeat(valuesStr+",", len(e.values)-1) + valuesStr query := "INSERT IGNORE INTO " + QuotedTableNameFromString(schemaName, tableName) + - " (" + strings.Join(QuoteFields(e.columns), ",") + ") VALUES " + valuesStr + " (" + strings.Join(QuoteFields(filteredColumns), ",") + ") VALUES " + valuesStr return query, e.flattenRowData(), nil } func (e *RowBatch) flattenRowData() []interface{} { - rowSize := len(e.values[0]) - flattened := make([]interface{}, rowSize*len(e.values)) + flattened := make([]interface{}, 0, len(e.values)) - for rowIdx, row := range e.values { + for _, row := range e.values { for colIdx, col := range row { - flattened[rowIdx*rowSize+colIdx] = col + if e.table.IsColumnIndexGenerated(colIdx) { + continue + } + flattened = append(flattened, col) } } diff --git a/table_schema_cache.go b/table_schema_cache.go index 32f6d0418..358580c45 100644 --- a/table_schema_cache.go +++ b/table_schema_cache.go @@ -46,6 +46,103 @@ type TableSchema struct { rowMd5Query string } +// IsColumnGenerated evaluates whether a go_myslq.schema.TableColumn is generated or not. +func IsColumnGenerated(tc *schema.TableColumn) bool { + return tc.IsVirtual || tc.IsStored +} + +// IsColumnIndexGenerated evaluates whether a TableSchema column is generated, by index. +func (t *TableSchema) IsColumnIndexGenerated(idx int) bool { + return IsColumnGenerated(&t.Columns[idx]) +} + +// Evaluates whether a TableSchema column is generated, by name. +func (t *TableSchema) IsColumnNameGenerated(name string) bool { + for _, col := range t.Columns { + if name == col.Name && IsColumnGenerated(&col) { + return true + } + } + + return false +} + +// Returns a count of total, generated and non-generated columns for a TableSchema. +func (t *TableSchema) ColumnsCount() (int, int, int) { + var generated int + + for _, col := range t.Columns { + if IsColumnGenerated(&col) { + generated += 1 + } + } + + return len(t.Columns), generated, len(t.Columns) - generated +} + +// Returns a list of all non-generated columns for a TableSchema, in schema order. +func (t *TableSchema) NonGeneratedColumns() []schema.TableColumn { + res := make([]schema.TableColumn, 0, len(t.Columns)) + + for _, col := range t.Columns { + if IsColumnGenerated(&col) { + continue + } + res = append(res, col) + } + + return res +} + +// Returns a list of all non-generated column names for a TableSchema, in schema order. +func (t *TableSchema) NonGeneratedColumnNames() []string { + res := make([]string, 0, len(t.Columns)) + + for _, col := range t.Columns { + if IsColumnGenerated(&col) { + continue + } + res = append(res, col.Name) + } + + return res +} + +// FilterGeneratedColumnsOnRowData takes a row (as slice of RowData elements) and returns +// a copy with elements for generated columns removed. +func (t *TableSchema) FilterGeneratedColumnsOnRowData(row []interface{}) ([]interface{}, error) { + columnsCount, _, nonGeneratedColumnsCount := t.ColumnsCount() + + if len(row) != columnsCount { + return nil, fmt.Errorf( + "table %s.%s has %d columns but row has %d columns instead", + t.Schema, + t.Name, + columnsCount, + len(row), + ) + } + + res := make([]interface{}, 0, len(row)) + for i, val := range row { + if t.IsColumnIndexGenerated(i) { + continue + } + res = append(res, val) + } + + if len(res) != nonGeneratedColumnsCount { + return nil, fmt.Errorf( + "table %s.%s has %d updatable columns but processed row has %d updatable columns instead", + t.Schema, + t.Name, + nonGeneratedColumnsCount, + len(res), + ) + } + return res, nil +} + // This query returns the MD5 hash for a row on this table. This query is valid // for both the source and the target shard. // @@ -90,11 +187,12 @@ func (t *TableSchema) RowMd5Query() string { } columns := make([]schema.TableColumn, 0, len(t.Columns)) - for _, column := range t.Columns { + for i, column := range t.Columns { _, isCompressed := t.CompressedColumnsForVerification[column.Name] _, isIgnored := t.IgnoredColumnsForVerification[column.Name] + isGenerated := t.IsColumnIndexGenerated(i) - if isCompressed || isIgnored { + if isCompressed || isIgnored || isGenerated { continue } @@ -151,6 +249,19 @@ func MaxPaginationKeys(db *sql.DB, tables []*TableSchema, logger *logrus.Entry) return tablesWithData, emptyTables, nil } +// removeInvisibleIndeces removes all invisible idx references from a go_mysql.schema.Table. +func removeInvisibleIndexes(ts *schema.Table) { + j := 0 + for i, index := range ts.Indexes { + if !index.Visible { + continue + } + ts.Indexes[j] = ts.Indexes[i] + j++ + } + ts.Indexes = ts.Indexes[:j] +} + func LoadTables(db *sql.DB, tableFilter TableFilter, columnCompressionConfig ColumnCompressionConfig, columnIgnoreConfig ColumnIgnoreConfig, forceIndexConfig ForceIndexConfig, cascadingPaginationColumnConfig *CascadingPaginationColumnConfig) (TableSchemaCache, error) { logger := logrus.WithField("tag", "table_schema_cache") @@ -189,14 +300,8 @@ func LoadTables(db *sql.DB, tableFilter TableFilter, columnCompressionConfig Col return tableSchemaCache, err } - // Filter out invisible indexes - visibleIndexes := make([]*schema.Index, 0, len(tableSchema.Indexes)) - for _, index := range tableSchema.Indexes { - if index.Visible { - visibleIndexes = append(visibleIndexes, index) - } - } - tableSchema.Indexes = visibleIndexes + // filter out unwanted indeces and columns + removeInvisibleIndexes(tableSchema) tableSchemas = append(tableSchemas, &TableSchema{ Table: tableSchema, @@ -448,7 +553,7 @@ func maxPaginationKey(db *sql.DB, table *TableSchema) (PaginationKey, bool, erro if err != nil { break } - + var binValue []byte switch v := val.(type) { case []byte: @@ -458,7 +563,7 @@ func maxPaginationKey(db *sql.DB, table *TableSchema) (PaginationKey, bool, erro default: err = fmt.Errorf("expected binary/string for max key, got %T", val) } - + if err == nil { result = NewBinaryKeyWithColumn(primaryKeyColumn.Name, binValue) } diff --git a/test/go/dml_events_test.go b/test/go/dml_events_test.go index d0b2cec48..3ade4da95 100644 --- a/test/go/dml_events_test.go +++ b/test/go/dml_events_test.go @@ -95,7 +95,7 @@ func (this *DMLEventsTestSuite) TestBinlogInsertEventWithWrongColumnsReturnsErro _, err = dmlEvents[0].AsSQLString(this.targetTable.Schema, this.targetTable.Name) this.Require().NotNil(err) - this.Require().Contains(err.Error(), "test_table has 3 columns but event has 1 column") + this.Require().Contains(err.Error(), "test_table has 3 columns but row has 1 column") } func (this *DMLEventsTestSuite) TestBinlogInsertEventMetadata() { @@ -390,6 +390,148 @@ func (this *DMLEventsTestSuite) TestNoRowsQueryEvent() { this.Require().Equal("", annotation) } +// TestNewBinlogDMLEventsUnsignedConversionWithGeneratedColumn verfies that +// sign conversion logic on tables with generated columns. +func (this *DMLEventsTestSuite) TestNewBinlogDMLEventsUnsignedConversionWithGeneratedColumn() { + columns := []schema.TableColumn{ + {Name: "id"}, + {Name: "gen", IsVirtual: true}, + {Name: "u8", IsUnsigned: true}, + } + table := &ghostferry.TableSchema{ + Table: &schema.Table{ + Schema: "test_schema", + Name: "test_table", + Columns: columns, + }, + } + + ev := &replication.BinlogEvent{ + Header: &replication.EventHeader{EventType: replication.WRITE_ROWS_EVENTv2}, + Event: &replication.RowsEvent{ + Rows: [][]interface{}{ + {int64(1000), "gen_val", int8(-1)}, + }, + }, + } + + dmlEvents, err := ghostferry.NewBinlogDMLEvents(table, ev, mysql.Position{}, mysql.Position{}, nil) + this.Require().Nil(err) + this.Require().Equal(1, len(dmlEvents)) + + q, err := dmlEvents[0].AsSQLString("test_schema", "test_table") + this.Require().Nil(err) + this.Require().Equal( + "INSERT IGNORE INTO `test_schema`.`test_table` (`id`,`u8`) VALUES (1000,255)", + q, + ) +} + +// TestBinlogInsertEventGeneratedColumnBeforeJSONPreservesJSONCasting verifies +// JSON metadata aligment when processing events with generated columnns. +func (this *DMLEventsTestSuite) TestBinlogInsertEventGeneratedColumnBeforeJSONPreservesJSONCasting() { + columns := []schema.TableColumn{ + {Name: "gen", IsVirtual: true}, + {Name: "payload", Type: schema.TYPE_JSON}, + } + table := &ghostferry.TableSchema{ + Table: &schema.Table{ + Schema: "test_schema", + Name: "test_table", + Columns: columns, + }, + } + eventBase := ghostferry.NewDMLEventBase(table, mysql.Position{}, mysql.Position{}, nil, time.Unix(1618318965, 0)) + + rowsEvent := &replication.RowsEvent{ + Table: this.tableMapEvent, + Rows: [][]interface{}{{"gen_val", []byte("payload_data")}}, + } + + dmlEvents, err := ghostferry.NewBinlogInsertEvents(eventBase, rowsEvent) + this.Require().Nil(err) + this.Require().Equal(1, len(dmlEvents)) + + q, err := dmlEvents[0].AsSQLString("test_schema", "test_table") + this.Require().Nil(err) + this.Require().Equal( + "INSERT IGNORE INTO `test_schema`.`test_table` (`payload`) VALUES (CAST('payload_data' AS JSON))", + q, + ) +} + +// TestBinlogUpdateEventExcludesGeneratedColumnFromSetAndWhere verifies that +// UPDATE events for tables with virtual columns emit SET and WHERE clauses that +// reference only real (non-generated) columns. +func (this *DMLEventsTestSuite) TestBinlogUpdateEventExcludesGeneratedColumnFromSetAndWhere() { + columns := []schema.TableColumn{ + {Name: "id"}, + {Name: "gen", IsVirtual: true}, + {Name: "data"}, + } + table := &ghostferry.TableSchema{ + Table: &schema.Table{ + Schema: "test_schema", + Name: "test_table", + Columns: columns, + }, + } + eventBase := ghostferry.NewDMLEventBase(table, mysql.Position{}, mysql.Position{}, nil, time.Unix(1618318965, 0)) + + rowsEvent := &replication.RowsEvent{ + Table: this.tableMapEvent, + Rows: [][]interface{}{ + {int64(1000), "gen_old", "old_data"}, + {int64(1000), "gen_new", "new_data"}, + }, + } + + dmlEvents, err := ghostferry.NewBinlogUpdateEvents(eventBase, rowsEvent) + this.Require().Nil(err) + this.Require().Equal(1, len(dmlEvents)) + + q, err := dmlEvents[0].AsSQLString("test_schema", "test_table") + this.Require().Nil(err) + this.Require().Equal( + "UPDATE `test_schema`.`test_table` SET `id`=1000,`data`='new_data' WHERE `id`=1000 AND `data`='old_data'", + q, + ) +} + +// TestBinlogDeleteEventExcludesStoredGeneratedColumnFromWhere verifies that +// DELETE events skip both VIRTUAL and STORED generated columns in the WHERE clause. +func (this *DMLEventsTestSuite) TestBinlogDeleteEventExcludesStoredGeneratedColumnFromWhere() { + columns := []schema.TableColumn{ + {Name: "id"}, + {Name: "data"}, + {Name: "summary", IsStored: true}, + } + table := &ghostferry.TableSchema{ + Table: &schema.Table{ + Schema: "test_schema", + Name: "test_table", + Columns: columns, + }, + } + eventBase := ghostferry.NewDMLEventBase(table, mysql.Position{}, mysql.Position{}, nil, time.Unix(1618318965, 0)) + + rowsEvent := &replication.RowsEvent{ + Table: this.tableMapEvent, + Rows: [][]interface{}{{int64(1000), "hello", "abc123"}}, + } + + dmlEvents, err := ghostferry.NewBinlogDeleteEvents(eventBase, rowsEvent) + this.Require().Nil(err) + this.Require().Equal(1, len(dmlEvents)) + + q, err := dmlEvents[0].AsSQLString("test_schema", "test_table") + this.Require().Nil(err) + this.Require().Equal( + "DELETE FROM `test_schema`.`test_table` WHERE `id`=1000 AND `data`='hello'", + q, + ) +} + func TestDMLEventsTestSuite(t *testing.T) { suite.Run(t, new(DMLEventsTestSuite)) } diff --git a/test/go/table_schema_cache_test.go b/test/go/table_schema_cache_test.go index fc2bd18fc..a8bad06c7 100644 --- a/test/go/table_schema_cache_test.go +++ b/test/go/table_schema_cache_test.go @@ -339,6 +339,26 @@ func (this *TableSchemaCacheTestSuite) TestTableRowMd5Query() { this.Require().Equal("MD5(CONCAT(MD5(COALESCE(`id`, 'NULL_PBj}b]74P@JTo$5G_null')))) AS __ghostferry_row_md5", query) } +func (this *TableSchemaCacheTestSuite) TestTableRowMd5QueryWithVirtualField() { + tableSchemaCache, err := ghostferry.LoadTables(this.Ferry.SourceDB, this.tableFilter, nil, nil, nil, nil) + this.Require().Nil(err) + + tables := tableSchemaCache.AsSlice() + table := tables[0] + table.Columns[0].IsVirtual = true + this.Require().Equal("MD5(CONCAT(MD5(COALESCE(`data`, 'NULL_PBj}b]74P@JTo$5G_null')))) AS __ghostferry_row_md5", table.RowMd5Query()) +} + +func (this *TableSchemaCacheTestSuite) TestTableRowMd5QueryWithStoredField() { + tableSchemaCache, err := ghostferry.LoadTables(this.Ferry.SourceDB, this.tableFilter, nil, nil, nil, nil) + this.Require().Nil(err) + + tables := tableSchemaCache.AsSlice() + table := tables[0] + table.Columns[1].IsStored = true + this.Require().Equal("MD5(CONCAT(MD5(COALESCE(`id`, 'NULL_PBj}b]74P@JTo$5G_null')))) AS __ghostferry_row_md5", table.RowMd5Query()) +} + func (this *TableSchemaCacheTestSuite) TestFingerprintQueryWithIgnoredColumns() { tableSchemaCache, err := ghostferry.LoadTables(this.Ferry.SourceDB, this.tableFilter, nil, nil, nil, nil) this.Require().Nil(err) diff --git a/test/helpers/db_helper.rb b/test/helpers/db_helper.rb index 405d7def0..e23aba78e 100644 --- a/test/helpers/db_helper.rb +++ b/test/helpers/db_helper.rb @@ -124,7 +124,16 @@ def seed_random_data(connection, database_name: DEFAULT_DB, table_name: DEFAULT_ dbtable = full_table_name(database_name, table_name) connection.query("CREATE DATABASE IF NOT EXISTS #{database_name}") - connection.query("CREATE TABLE IF NOT EXISTS #{dbtable} (id bigint(20) not null auto_increment, data TEXT, primary key(id))") + connection.query(" + CREATE TABLE IF NOT EXISTS #{dbtable} ( + id BIGINT(20) NOT NULL AUTO_INCREMENT, + data TEXT, + /* generated columns should be ignored */ + length BIGINT(20) AS (LENGTH(data)) VIRTUAL, + summary VARCHAR(32) AS (MD5(data)) STORED, + PRIMARY KEY(id) + ) + ") return if number_of_rows == 0 diff --git a/test/integration/ddl_events_test.rb b/test/integration/ddl_events_test.rb index c6d608d45..fce053026 100644 --- a/test/integration/ddl_events_test.rb +++ b/test/integration/ddl_events_test.rb @@ -30,7 +30,7 @@ def test_ddl_event_handler_with_ddl_events ghostferry = new_ghostferry(DDL_GHOSTFERRY) ghostferry.on_status(GhostferryHelper::Ghostferry::Status::BINLOG_STREAMING_STARTED) do - source_db.query("INSERT INTO #{table_name} VALUES (9000, 'test')") + source_db.query("INSERT INTO #{table_name} (id, data) VALUES (9000, 'test')") source_db.query("ALTER TABLE #{table_name} ADD INDEX (data(100))") source_db.query("INSERT INTO #{table_name} (id, data) VALUES (9001, 'test')") end diff --git a/test/integration/inline_verifier_test.rb b/test/integration/inline_verifier_test.rb index b70bccb84..a99830ce3 100644 --- a/test/integration/inline_verifier_test.rb +++ b/test/integration/inline_verifier_test.rb @@ -429,8 +429,8 @@ def test_positive_negative_zero # indeed running as the nominal case (comparing 0.0 and -0.0) should not # emit any error and thus we cannot say for certain if the InlineVerifier # ran or not. - source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} VALUES (1, 0.0)") - target_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} VALUES (1, 1.0)") + source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (id, data) VALUES (1, 0.0)") + target_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (id, data) VALUES (1, 1.0)") ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY, config: { verifier_type: "Inline" }) @@ -468,8 +468,8 @@ def test_null_vs_null seed_random_data(source_db, number_of_rows: 0) seed_random_data(target_db, number_of_rows: 0) - source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} VALUES (1, NULL)") - target_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} VALUES (1, NULL)") + source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (id, data) VALUES (1, NULL)") + target_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (id, data) VALUES (1, NULL)") verification_ran = false ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY, config: { verifier_type: "Inline" }) @@ -486,8 +486,8 @@ def test_null_vs_empty_string seed_random_data(source_db, number_of_rows: 0) seed_random_data(target_db, number_of_rows: 0) - source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} VALUES (1, NULL)") - target_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} VALUES (1, '')") + source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (id, data) VALUES (1, NULL)") + target_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (id, data) VALUES (1, '')") ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY, config: { verifier_type: "Inline" }) @@ -512,8 +512,8 @@ def test_null_vs_null_string seed_random_data(source_db, number_of_rows: 0) seed_random_data(target_db, number_of_rows: 0) - source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} VALUES (1, NULL)") - target_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} VALUES (1, 'NULL')") + source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (id, data) VALUES (1, NULL)") + target_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (id, data) VALUES (1, 'NULL')") ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY, config: { verifier_type: "Inline" }) @@ -542,8 +542,8 @@ def test_null_in_different_order source_db.query("ALTER TABLE #{DEFAULT_FULL_TABLE_NAME} ADD COLUMN data2 VARCHAR(255) AFTER data") target_db.query("ALTER TABLE #{DEFAULT_FULL_TABLE_NAME} ADD COLUMN data2 VARCHAR(255) AFTER data") - source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} VALUES (1, NULL, 'data')") - target_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} VALUES (1, 'data', NULL)") + source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (id, data, data2) VALUES (1, NULL, 'data')") + target_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} (id, data, data2) VALUES (1, 'data', NULL)") ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY, config: { verifier_type: "Inline" }) diff --git a/testhelpers/integration_test_case.go b/testhelpers/integration_test_case.go index 5f4f767ff..34d8f32fc 100644 --- a/testhelpers/integration_test_case.go +++ b/testhelpers/integration_test_case.go @@ -122,7 +122,7 @@ func (this *IntegrationTestCase) VerifyData() { } if !verificationResult.DataCorrect { - this.T.Fatalf(verificationResult.Message) + this.T.Fatal(verificationResult.Message) } }