Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ group :development do
end

gem "mutex_m", "~> 0.3.0"
gem "logger"
2 changes: 2 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ GEM
reline (>= 0.6.0)
coderay (1.1.3)
io-console (0.8.2)
logger (1.7.0)
method_source (1.1.0)
minitest (5.27.0)
minitest-fail-fast (0.1.0)
Expand Down Expand Up @@ -42,6 +43,7 @@ PLATFORMS
ruby

DEPENDENCIES
logger
minitest
minitest-fail-fast (~> 0.1.0)
minitest-hooks
Expand Down
7 changes: 1 addition & 6 deletions cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,12 +268,7 @@ func (c *Cursor) Fetch(db SqlPreparer) (batch *RowBatch, paginationKeypos Pagina
}
}

batch = &RowBatch{
values: batchData,
paginationKeyIndex: paginationKeyIndex,
table: c.Table,
columns: columns,
}
batch = NewRowBatchWithColumns(c.Table, batchData, columns, paginationKeyIndex)

logger.Debugf("found %d rows", batch.Size())

Expand Down
108 changes: 67 additions & 41 deletions dml_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ type RowData []interface{}
// https://github.com/Shopify/ghostferry/issues/165.
//
// In summary:
// - This code receives values from both go-sql-driver/mysql and
// go-mysql-org/go-mysql.
// - go-sql-driver/mysql gives us int64 for signed integer, and uint64 in a byte
// slice for unsigned integer.
// - go-mysql-org/go-mysql gives us int64 for signed integer, and uint64 for
// unsigned integer.
// - We currently make this function deal with both cases. In the future we can
// investigate alternative solutions.
// - This code receives values from both go-sql-driver/mysql and
// go-mysql-org/go-mysql.
// - go-sql-driver/mysql gives us int64 for signed integer, and uint64 in a byte
// slice for unsigned integer.
// - go-mysql-org/go-mysql gives us int64 for signed integer, and uint64 for
// unsigned integer.
// - We currently make this function deal with both cases. In the future we can
// investigate alternative solutions.
func (r RowData) GetUint64(colIdx int) (uint64, error) {
u64, ok := Uint64Value(r[colIdx])
if ok {
Expand Down Expand Up @@ -168,14 +168,15 @@ func (e *BinlogInsertEvent) NewValues() RowData {
}

func (e *BinlogInsertEvent) AsSQLString(schemaName, tableName string) (string, error) {
if err := verifyValuesHasTheSameLengthAsColumns(e.table, e.newValues); err != nil {
filteredNewValues, err := e.table.FilterGeneratedColumnsOnRowData(e.newValues)
if err != nil {
return "", err
}

query := "INSERT IGNORE INTO " +
QuotedTableNameFromString(schemaName, tableName) +
" (" + strings.Join(quotedColumnNames(e.table), ",") + ")" +
" VALUES (" + buildStringListForValues(e.table.Columns, e.newValues) + ")"
" VALUES (" + buildStringListForValues(e.table, filteredNewValues) + ")"

return query, nil
}
Expand Down Expand Up @@ -227,8 +228,8 @@ func (e *BinlogUpdateEvent) AsSQLString(schemaName, tableName string) (string, e
}

query := "UPDATE " + QuotedTableNameFromString(schemaName, tableName) +
" SET " + buildStringMapForSet(e.table.Columns, e.newValues) +
" WHERE " + buildStringMapForWhere(e.table.Columns, e.oldValues)
" SET " + buildStringMapForSet(e.table, e.newValues) +
" WHERE " + buildStringMapForWhere(e.table, e.oldValues)

return query, nil
}
Expand Down Expand Up @@ -269,7 +270,7 @@ func (e *BinlogDeleteEvent) AsSQLString(schemaName, tableName string) (string, e
}

query := "DELETE FROM " + QuotedTableNameFromString(schemaName, tableName) +
" WHERE " + buildStringMapForWhere(e.table.Columns, e.oldValues)
" WHERE " + buildStringMapForWhere(e.table, e.oldValues)

return query, nil
}
Expand All @@ -281,32 +282,39 @@ func (e *BinlogDeleteEvent) PaginationKey() (string, error) {
func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos, resumablePos mysql.Position, query []byte) ([]DMLEvent, error) {
rowsEvent := ev.Event.(*replication.RowsEvent)

for _, row := range rowsEvent.Rows {
if len(row) != len(table.Columns) {
for i, rawRow := range rowsEvent.Rows {
if len(rawRow) != len(table.Columns) {
return nil, fmt.Errorf(
"table %s.%s has %d columns but event has %d columns instead",
table.Schema,
table.Name,
len(table.Columns),
len(row),
len(rawRow),
)
}
for i, col := range table.Columns {

// Normalize signed-to-unsigned integer values in place using
// full-schema column indexes. go-mysql always decodes rows to the
// full column width (RowsEvent.decodeImage allocates make([]any,
// ColumnCount) and leaves omitted positions as nil), so rawRow is
// always len(table.Columns) here and indexing is safe.
for j, col := range table.Columns {
if col.IsUnsigned {
switch v := row[i].(type) {
switch v := rawRow[j].(type) {
case int64:
row[i] = uint64(v)
rawRow[j] = uint64(v)
case int32:
row[i] = uint32(v)
rawRow[j] = uint32(v)
case int16:
row[i] = uint16(v)
rawRow[j] = uint16(v)
case int8:
row[i] = uint8(v)
rawRow[j] = uint8(v)
case int:
row[i] = uint(v)
rawRow[j] = uint(v)
}
}
}
rowsEvent.Rows[i] = rawRow
}

timestamp := time.Unix(int64(ev.Header.Timestamp), 0)
Expand All @@ -324,9 +332,9 @@ func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos, re
}

func quotedColumnNames(table *TableSchema) []string {
cols := make([]string, len(table.Columns))
for i, column := range table.Columns {
cols[i] = QuoteField(column.Name)
cols := make([]string, 0, len(table.Columns))
for _, name := range table.NonGeneratedColumnNames() {
cols = append(cols, QuoteField(name))
}

return cols
Expand All @@ -347,53 +355,71 @@ func verifyValuesHasTheSameLengthAsColumns(table *TableSchema, values ...RowData
return nil
}

func buildStringListForValues(columns []schema.TableColumn, values []interface{}) string {
func buildStringListForValues(table *TableSchema, values []interface{}) string {
var buffer []byte

// values contains only non-generated columns (already filtered by the
// caller via FilterGeneratedColumnsOnRowData). Build a matching list of
// non-generated column descriptors so that value[i] is paired with the
// correct column metadata regardless of where generated columns sit in the
// full schema.
nonGenerated := make([]schema.TableColumn, 0, len(table.Columns))
for _, col := range table.Columns {
if !IsColumnGenerated(&col) {
nonGenerated = append(nonGenerated, col)
}
}

for i, value := range values {
if i > 0 {
if len(buffer) > 0 {
buffer = append(buffer, ',')
}

buffer = appendEscapedValue(buffer, value, columns[i])
buffer = appendEscapedValue(buffer, value, nonGenerated[i])
}

return string(buffer)
}

func buildStringMapForWhere(columns []schema.TableColumn, values []interface{}) string {
func buildStringMapForWhere(table *TableSchema, values []interface{}) string {
var buffer []byte

for i, value := range values {
if i > 0 {
if table.IsColumnIndexGenerated(i) {
continue
}
if len(buffer) > 0 {
buffer = append(buffer, " AND "...)
}

buffer = append(buffer, QuoteField(columns[i].Name)...)
buffer = append(buffer, QuoteField(table.Columns[i].Name)...)

if isNilValue(value) {
// "WHERE value = NULL" will never match rows.
buffer = append(buffer, " IS NULL"...)
} else {
buffer = append(buffer, '=')
buffer = appendEscapedValue(buffer, value, columns[i])
buffer = appendEscapedValue(buffer, value, table.Columns[i])
}
}

return string(buffer)
}

func buildStringMapForSet(columns []schema.TableColumn, values []interface{}) string {
func buildStringMapForSet(table *TableSchema, values []interface{}) string {
var buffer []byte

for i, value := range values {
if i > 0 {
if table.IsColumnIndexGenerated(i) {
continue
}
if len(buffer) > 0 {
buffer = append(buffer, ',')
}

buffer = append(buffer, QuoteField(columns[i].Name)...)
buffer = append(buffer, QuoteField(table.Columns[i].Name)...)
buffer = append(buffer, '=')
buffer = appendEscapedValue(buffer, value, columns[i])
buffer = appendEscapedValue(buffer, value, table.Columns[i])
}

return string(buffer)
Expand Down Expand Up @@ -504,10 +530,10 @@ func Int64Value(value interface{}) (int64, bool) {
//
// This is specifically mentioned in the the below link:
//
// When BINARY values are stored, they are right-padded with the pad value
// to the specified length. The pad value is 0x00 (the zero byte). Values
// are right-padded with 0x00 for inserts, and no trailing bytes are removed
// for retrievals.
// When BINARY values are stored, they are right-padded with the pad value
// to the specified length. The pad value is 0x00 (the zero byte). Values
// are right-padded with 0x00 for inserts, and no trailing bytes are removed
// for retrievals.
//
// ref: https://dev.mysql.com/doc/refman/5.7/en/binary-varbinary.html
func appendEscapedString(buffer []byte, value string, rightPadToLengthWithZeroBytes int) []byte {
Expand Down
7 changes: 6 additions & 1 deletion ferry.go
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,12 @@ func (f *Ferry) FlushBinlogAndStopStreaming() {
func (f *Ferry) StopTargetVerifier() {
if !f.Config.SkipTargetVerification {
f.TargetVerifier.BinlogStreamer.FlushAndStop()
f.targetVerifierWg.Wait()
// targetVerifierWg is only allocated inside Run(). If the ferry exits
// before Run() is reached (e.g. due to an earlier error), the pointer
// is still nil and calling Wait() would panic.
if f.targetVerifierWg != nil {
f.targetVerifierWg.Wait()
}
}
}

Expand Down
13 changes: 8 additions & 5 deletions iterative_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,15 +562,18 @@ func (v *IterativeVerifier) tableIsIgnored(table *TableSchema) bool {

func (v *IterativeVerifier) columnsToVerify(table *TableSchema) []schema.TableColumn {
ignoredColsSet, containsIgnoredColumns := v.IgnoredColumns[table.Name]
if !containsIgnoredColumns {
return table.Columns
}

// Generated columns (VIRTUAL / STORED) are intentionally included so that
// any divergence in computed output between source and target is caught.
// Explicitly ignored columns still take priority over this inclusion.
var columns []schema.TableColumn
for _, column := range table.Columns {
if _, isIgnored := ignoredColsSet[column.Name]; !isIgnored {
columns = append(columns, column)
if containsIgnoredColumns {
if _, isIgnored := ignoredColsSet[column.Name]; isIgnored {
continue
}
}
columns = append(columns, column)
}

return columns
Expand Down
47 changes: 38 additions & 9 deletions row_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,21 @@ type RowBatch struct {
}

func NewRowBatch(table *TableSchema, values []RowData, paginationKeyIndex int) *RowBatch {
return NewRowBatchWithColumns(table, values, ConvertTableColumnsToStrings(table.Columns), paginationKeyIndex)
}

// NewRowBatchWithColumns creates a RowBatch with an explicit ordered list of
// selected column names. Use this when the query that produced the row data
// returns columns in a different order from the schema β€” for example, the
// sharding copy filter issues SELECT * … JOIN … USING(id) which moves 'id'
// to the front of the result set. The selectedColumns slice must match the
// order and count of values in each RowData entry.
func NewRowBatchWithColumns(table *TableSchema, values []RowData, selectedColumns []string, paginationKeyIndex int) *RowBatch {
return &RowBatch{
values: values,
paginationKeyIndex: paginationKeyIndex,
table: table,
columns: ConvertTableColumnsToStrings(table.Columns),
columns: selectedColumns,
}
}

Expand Down Expand Up @@ -64,23 +74,42 @@ func (e *RowBatch) AsSQLQuery(schemaName, tableName string) (string, []interface
return "", nil, err
}

valuesStr := "(" + strings.Repeat("?,", len(e.columns)-1) + "?)"
// Build the INSERT column list from e.columns β€” the actual query-result
// order β€” skipping generated columns by name.
//
// We must NOT use table.NonGeneratedColumnNames() here because that
// always returns schema order. When the SELECT query returns columns in a
// different order (for example, the sharding copy filter uses
// SELECT * FROM t JOIN (SELECT id …) AS batch USING(id)
// which moves 'id' to the front), the column names and row values would
// be misaligned, corrupting every row written to the target.
insertColumns := make([]string, 0, len(e.columns))
for _, col := range e.columns {
if e.table.IsColumnNameGenerated(col) {
continue
}
insertColumns = append(insertColumns, col)
}

valuesStr := "(" + strings.Repeat("?,", len(insertColumns)-1) + "?)"
valuesStr = strings.Repeat(valuesStr+",", len(e.values)-1) + valuesStr

query := "INSERT IGNORE INTO " +
QuotedTableNameFromString(schemaName, tableName) +
" (" + strings.Join(QuoteFields(e.columns), ",") + ") VALUES " + valuesStr
" (" + strings.Join(QuoteFields(insertColumns), ",") + ") VALUES " + valuesStr

return query, e.flattenRowData(), nil
}

func (e *RowBatch) flattenRowData() []interface{} {
rowSize := len(e.values[0])
flattened := make([]interface{}, rowSize*len(e.values))

for rowIdx, row := range e.values {
for colIdx, col := range row {
flattened[rowIdx*rowSize+colIdx] = col
flattened := make([]interface{}, 0, len(e.values)*len(e.columns))

for _, row := range e.values {
for colIdx, col := range e.columns {
if e.table.IsColumnNameGenerated(col) {
continue
}
flattened = append(flattened, row[colIdx])
}
}

Expand Down
Loading
Loading