-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP - mysql stream #521
base: master
Are you sure you want to change the base?
WIP - mysql stream #521
Conversation
@@ -6,3 +6,4 @@ services: | |||
MYSQL_ROOT_PASSWORD: mysql | |||
ports: | |||
- 3306:3306 | |||
command: --binlog-row-metadata=FULL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes it so that in the binlog change events we get all the column names. Without this the column names are empty.
} | ||
|
||
func (s *StreamingSource) Run(ctx context.Context, writer writers.Writer) error { | ||
streamer, err := s.syncer.StartSync(mysql.Position{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want offsets this is where we would pass them in.
return fmt.Errorf("unable to cast event to replication.RowsEvent") | ||
} | ||
|
||
if string(rowsEvent.Table.Table) != "foo" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was testing with a local table named foo, this is where we need to filter for the tables we care about.
slog.Error("Failed to convert event to messages", slog.Any("err", err)) | ||
} else { | ||
for i, message := range messages { | ||
slog.Info("messages", slog.Int("index", i), slog.Any("event", message.Event())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Emit the messages to Kafka.
} | ||
|
||
collationMap := event.Table.CollationMap() | ||
dataTypes := make([]schema.DataType, len(event.Table.ColumnType)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is mapping to the data types that we use for backfilling, we may not want to do this though and build our own list of converters.
@@ -88,7 +88,7 @@ func (m MySQLAdapter) PartitionKeys() []string { | |||
return m.table.PrimaryKeys | |||
} | |||
|
|||
func valueConverterForType(d schema.DataType, opts *schema.Opts) (converters.ValueConverter, error) { | |||
func ValueConverterForType(d schema.DataType, opts *schema.Opts) (converters.ValueConverter, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably don't want to share these.
No description provided.