Skip to content

Commit

Permalink
Added support for enum types. Enums use schema type STRING.
Browse files Browse the repository at this point in the history
  • Loading branch information
noctarius committed Jul 30, 2023
1 parent f77871c commit 48ed86e
Show file tree
Hide file tree
Showing 21 changed files with 662 additions and 536 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -527,21 +527,22 @@ func (t testReplicationContext) ReadHypertableSchema(
}

func (t testReplicationContext) SnapshotChunkTable(
chunk *systemcatalog.Chunk, cb sidechannel.SnapshotRowCallback,
rowDecoderFactory pgtypes.RowDecoderFactory, chunk *systemcatalog.Chunk, cb sidechannel.SnapshotRowCallback,
) (pgtypes.LSN, error) {

return 0, nil
}

func (t testReplicationContext) FetchHypertableSnapshotBatch(
hypertable *systemcatalog.Hypertable, snapshotName string, cb sidechannel.SnapshotRowCallback,
rowDecoderFactory pgtypes.RowDecoderFactory, hypertable *systemcatalog.Hypertable,
snapshotName string, cb sidechannel.SnapshotRowCallback,
) error {

return nil
}

func (t testReplicationContext) ReadSnapshotHighWatermark(
hypertable *systemcatalog.Hypertable, snapshotName string,
rowDecoderFactory pgtypes.RowDecoderFactory, hypertable *systemcatalog.Hypertable, snapshotName string,
) (map[string]any, error) {

return nil, nil
Expand Down
18 changes: 12 additions & 6 deletions internal/replication/replicationcontext/replicationcontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,24 +377,30 @@ func (rc *replicationContext) ReadHypertableSchema(
}

func (rc *replicationContext) SnapshotChunkTable(
chunk *systemcatalog.Chunk, cb sidechannel.SnapshotRowCallback,
rowDecoderFactory pgtypes.RowDecoderFactory, chunk *systemcatalog.Chunk, cb sidechannel.SnapshotRowCallback,
) (pgtypes.LSN, error) {

return rc.sideChannel.SnapshotChunkTable(chunk, rc.snapshotBatchSize, cb)
// FIXME: remove the intermediate function?
return rc.sideChannel.SnapshotChunkTable(rowDecoderFactory, chunk, rc.snapshotBatchSize, cb)
}

func (rc *replicationContext) FetchHypertableSnapshotBatch(
hypertable *systemcatalog.Hypertable, snapshotName string, cb sidechannel.SnapshotRowCallback,
rowDecoderFactory pgtypes.RowDecoderFactory, hypertable *systemcatalog.Hypertable,
snapshotName string, cb sidechannel.SnapshotRowCallback,
) error {

return rc.sideChannel.FetchHypertableSnapshotBatch(hypertable, snapshotName, rc.snapshotBatchSize, cb)
// FIXME: remove the intermediate function?
return rc.sideChannel.FetchHypertableSnapshotBatch(
rowDecoderFactory, hypertable, snapshotName, rc.snapshotBatchSize, cb,
)
}

func (rc *replicationContext) ReadSnapshotHighWatermark(
hypertable *systemcatalog.Hypertable, snapshotName string,
rowDecoderFactory pgtypes.RowDecoderFactory, hypertable *systemcatalog.Hypertable, snapshotName string,
) (map[string]any, error) {

return rc.sideChannel.ReadSnapshotHighWatermark(hypertable, snapshotName)
// FIXME: remove the intermediate function?
return rc.sideChannel.ReadSnapshotHighWatermark(rowDecoderFactory, hypertable, snapshotName)
}

func (rc *replicationContext) ReadReplicaIdentity(
Expand Down
38 changes: 21 additions & 17 deletions internal/replication/sidechannel/sidechannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,8 @@ func (sc *sideChannel) DetachTablesFromPublication(
}

func (sc *sideChannel) SnapshotChunkTable(
chunk *systemcatalog.Chunk, snapshotBatchSize int, cb sidechannel.SnapshotRowCallback,
rowDecoderFactory pgtypes.RowDecoderFactory, chunk *systemcatalog.Chunk,
snapshotBatchSize int, cb sidechannel.SnapshotRowCallback,
) (pgtypes.LSN, error) {

var currentLSN pgtypes.LSN = 0
Expand All @@ -338,14 +339,15 @@ func (sc *sideChannel) SnapshotChunkTable(
"DECLARE %s SCROLL CURSOR FOR SELECT * FROM %s", cursorName, chunk.CanonicalName(),
)

callback := func(lsn pgtypes.LSN, values map[string]any) error {
if currentLSN == 0 {
currentLSN = lsn
}
return cb(lsn, values)
}

if err := sc.snapshotTableWithCursor(
cursorQuery, cursorName, nil, snapshotBatchSize,
func(lsn pgtypes.LSN, values map[string]any) error {
if currentLSN == 0 {
currentLSN = lsn
}
return cb(lsn, values)
},
rowDecoderFactory, cursorQuery, cursorName, nil, snapshotBatchSize, callback,
); err != nil {
return 0, errors.Wrap(err, 0)
}
Expand All @@ -354,8 +356,8 @@ func (sc *sideChannel) SnapshotChunkTable(
}

func (sc *sideChannel) FetchHypertableSnapshotBatch(
hypertable *systemcatalog.Hypertable, snapshotName string,
snapshotBatchSize int, cb sidechannel.SnapshotRowCallback,
rowDecoderFactory pgtypes.RowDecoderFactory, hypertable *systemcatalog.Hypertable,
snapshotName string, snapshotBatchSize int, cb sidechannel.SnapshotRowCallback,
) error {

index, present := hypertable.Columns().SnapshotIndex()
Expand Down Expand Up @@ -419,13 +421,15 @@ func (sc *sideChannel) FetchHypertableSnapshotBatch(
return cb(lsn, values)
}

return sc.snapshotTableWithCursor(cursorQuery, cursorName, &snapshotName, snapshotBatchSize, hook)
return sc.snapshotTableWithCursor(
rowDecoderFactory, cursorQuery, cursorName, &snapshotName, snapshotBatchSize, hook,
)
},
)
}

func (sc *sideChannel) ReadSnapshotHighWatermark(
hypertable *systemcatalog.Hypertable, snapshotName string,
rowDecoderFactory pgtypes.RowDecoderFactory, hypertable *systemcatalog.Hypertable, snapshotName string,
) (values map[string]any, err error) {

index, present := hypertable.Columns().SnapshotIndex()
Expand All @@ -451,7 +455,7 @@ func (sc *sideChannel) ReadSnapshotHighWatermark(
return session.queryFunc(func(row pgx.Row) error {
rows := row.(pgx.Rows)

rowDecoder, err := pgtypes.NewRowDecoder(rows.FieldDescriptions())
rowDecoder, err := rowDecoderFactory(rows.FieldDescriptions())
if err != nil {
return errors.Wrap(err, 0)
}
Expand Down Expand Up @@ -699,8 +703,8 @@ func (sc *sideChannel) readHypertableSchema0(
}

func (sc *sideChannel) snapshotTableWithCursor(
cursorQuery, cursorName string, snapshotName *string,
snapshotBatchSize int, cb sidechannel.SnapshotRowCallback,
rowDecoderFactory pgtypes.RowDecoderFactory, cursorQuery, cursorName string,
snapshotName *string, snapshotBatchSize int, cb sidechannel.SnapshotRowCallback,
) error {

return sc.newSession(time.Minute*60, func(session *session) error {
Expand All @@ -726,15 +730,15 @@ func (sc *sideChannel) snapshotTableWithCursor(
return errors.Wrap(err, 0)
}

var rowDecoder *pgtypes.RowDecoder
var rowDecoder pgtypes.RowDecoder
for {
count := 0
if err := session.queryFunc(func(row pgx.Row) error {
rows := row.(pgx.Rows)

if rowDecoder == nil {
// Initialize the row decoder based on the returned field descriptions
rd, err := pgtypes.NewRowDecoder(rows.FieldDescriptions())
rd, err := rowDecoderFactory(rows.FieldDescriptions())
if err != nil {
return errors.Wrap(err, 0)
}
Expand Down
3 changes: 2 additions & 1 deletion internal/sysconfig/providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ type SinkManagerProvider = func(
) sink.Manager

type SnapshotterProvider = func(
*config.Config, replicationcontext.ReplicationContext, task.TaskManager, publication.PublicationManager,
*config.Config, replicationcontext.ReplicationContext,
task.TaskManager, publication.PublicationManager, pgtypes.TypeManager,
) (*snapshotting.Snapshotter, error)

type ReplicationChannelProvider = func(
Expand Down
15 changes: 11 additions & 4 deletions internal/systemcatalog/snapshotting/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type Snapshotter struct {
partitionCount uint64
replicationContext replicationcontext.ReplicationContext
taskManager task.TaskManager
typeManager pgtypes.TypeManager
publicationManager publication.PublicationManager
snapshotQueues []chan SnapshotTask
shutdownAwaiter *waiting.MultiShutdownAwaiter
Expand All @@ -54,15 +55,17 @@ type Snapshotter struct {
func NewSnapshotterFromConfig(
c *config.Config, replicationContext replicationcontext.ReplicationContext,
taskManager task.TaskManager, publicationManager publication.PublicationManager,
typeManager pgtypes.TypeManager,
) (*Snapshotter, error) {

parallelism := config.GetOrDefault(c, config.PropertySnapshotterParallelism, uint8(5))
return NewSnapshotter(parallelism, replicationContext, taskManager, publicationManager)
return NewSnapshotter(parallelism, replicationContext, taskManager, publicationManager, typeManager)
}

func NewSnapshotter(
partitionCount uint8, replicationContext replicationcontext.ReplicationContext,
taskManager task.TaskManager, publicationManager publication.PublicationManager,
typeManager pgtypes.TypeManager,
) (*Snapshotter, error) {

snapshotQueues := make([]chan SnapshotTask, partitionCount)
Expand All @@ -79,6 +82,7 @@ func NewSnapshotter(
partitionCount: uint64(partitionCount),
replicationContext: replicationContext,
taskManager: taskManager,
typeManager: typeManager,
publicationManager: publicationManager,
snapshotQueues: snapshotQueues,
logger: logger,
Expand Down Expand Up @@ -179,7 +183,8 @@ func (s *Snapshotter) snapshotChunk(
}

lsn, err := s.replicationContext.SnapshotChunkTable(
t.Chunk, func(lsn pgtypes.LSN, values map[string]any) error {
s.typeManager.GetOrPlanRowDecoder, t.Chunk,
func(lsn pgtypes.LSN, values map[string]any) error {
return s.taskManager.EnqueueTask(func(notificator task.Notificator) {
callback := func(handler eventhandlers.HypertableReplicationEventHandler) error {
return handler.OnReadEvent(lsn, t.Hypertable, t.Chunk, values)
Expand Down Expand Up @@ -218,7 +223,9 @@ func (s *Snapshotter) snapshotHypertable(

// Initialize the watermark or update the high watermark after a restart
if created || t.nextSnapshotFetch {
highWatermark, err := s.replicationContext.ReadSnapshotHighWatermark(t.Hypertable, *t.SnapshotName)
highWatermark, err := s.replicationContext.ReadSnapshotHighWatermark(
s.typeManager.GetOrPlanRowDecoder, t.Hypertable, *t.SnapshotName,
)
if err != nil {
return errors.Wrap(err, 0)
}
Expand Down Expand Up @@ -269,7 +276,7 @@ func (s *Snapshotter) runSnapshotFetchBatch(
) error {

return s.replicationContext.FetchHypertableSnapshotBatch(
t.Hypertable, *t.SnapshotName,
s.typeManager.GetOrPlanRowDecoder, t.Hypertable, *t.SnapshotName,
func(lsn pgtypes.LSN, values map[string]any) error {
return s.taskManager.EnqueueTask(func(notificator task.Notificator) {
notificator.NotifyHypertableReplicationEventHandler(
Expand Down
11 changes: 11 additions & 0 deletions internal/typemanager/builtin_converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,17 @@ func reflectiveArrayConverter(
}
}

func enum2string(
_ uint32, value any,
) (any, error) {

switch v := value.(type) {
case string:
return v, nil
}
return nil, errIllegalValue
}

func float42float(
_ uint32, value any,
) (any, error) {
Expand Down
9 changes: 9 additions & 0 deletions internal/typemanager/coretypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ var coreTypes = map[uint32]typeRegistration{
pgtypes.MacAddr8OID: {
schemaType: schema.STRING,
converter: macaddr2text,
typeMapTypeFactory: func(_ *pgtype.Map, typ pgtypes.PgType) *pgtype.Type {
return &pgtype.Type{Name: "macaddr8", OID: pgtypes.MacAddr8OID, Codec: pgtype.MacaddrCodec{}}
},
},
pgtypes.MacAddrArray8OID: {
schemaType: schema.ARRAY,
Expand Down Expand Up @@ -318,6 +321,9 @@ var coreTypes = map[uint32]typeRegistration{
pgtypes.TimeTZOID: {
schemaType: schema.STRING,
converter: time2text,
typeMapTypeFactory: func(_ *pgtype.Map, typ pgtypes.PgType) *pgtype.Type {
return &pgtype.Type{Name: "timetz", OID: pgtypes.TimeTZOID, Codec: &pgtypes.TimetzCodec{}}
},
},
pgtypes.TimeTZArrayOID: {
schemaType: schema.ARRAY,
Expand All @@ -326,6 +332,9 @@ var coreTypes = map[uint32]typeRegistration{
},
pgtypes.XmlOID: {
schemaType: schema.STRING,
typeMapTypeFactory: func(_ *pgtype.Map, typ pgtypes.PgType) *pgtype.Type {
return &pgtype.Type{Name: "xml", OID: pgtypes.XmlOID, Codec: pgtypes.XmlCodec{}}
},
},
pgtypes.XmlArrayOID: {
schemaType: schema.ARRAY,
Expand Down
86 changes: 86 additions & 0 deletions internal/typemanager/decoderplan.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package typemanager

import (
"github.com/go-errors/errors"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5/pgtype"
"github.com/noctarius/timescaledb-event-streamer/internal/functional"
"github.com/noctarius/timescaledb-event-streamer/spi/pgtypes"
)

type tupleDecoder func(column *pglogrepl.TupleDataColumn, values map[string]any) error

type tupleCodec func(data []byte, binary bool) (any, error)

func planTupleDecoder(
typeManager *typeManager, relation *pgtypes.RelationMessage,
) (pgtypes.TupleDecoderPlan, error) {

decoders := make([]tupleDecoder, 0)

for _, column := range relation.Columns {
codec := func(data []byte, binary bool) (any, error) {
return string(data), nil
}
if pgxType, ok := typeManager.typeMap.TypeForOID(column.DataType); ok {
codec = func(data []byte, binary bool) (any, error) {
dataformat := int16(pgtype.TextFormatCode)
if binary {
dataformat = pgtype.BinaryFormatCode
}
return pgxType.Codec.DecodeValue(typeManager.typeMap, column.DataType, dataformat, data)
}
}

decoders = append(decoders, func(dataType uint32, name string, codec tupleCodec) tupleDecoder {
return func(column *pglogrepl.TupleDataColumn, values map[string]any) error {
switch column.DataType {
case 'n': // null
values[name] = nil
case 'u': // unchanged toast
// This TOAST value was not changed. TOAST values are not stored in the tuple, and
// logical replication doesn't want to spend a disk read to fetch its value for you.
case 't': // text (basically anything other than the two above)
val, err := codec(column.Data, false)
if err != nil {
return errors.Errorf("error decoding column data: %s", err)
}
values[name] = val
case 'b': // binary data
val, err := codec(column.Data, true)
if err != nil {
return errors.Errorf("error decoding column data: %s", err)
}
values[name] = val
}
return nil
}
}(column.DataType, column.Name, codec))
}

return &tupleDecoderPlan{
decoders: decoders,
}, nil
}

type tupleDecoderPlan struct {
decoders []tupleDecoder
}

func (tdp *tupleDecoderPlan) Decode(
tupleData *pglogrepl.TupleData,
) (map[string]any, error) {

if tupleData == nil {
return functional.Zero[map[string]any](), nil
}

values := map[string]any{}
for i, decoder := range tdp.decoders {
column := tupleData.Columns[i]
if err := decoder(column, values); err != nil {
return nil, err
}
}
return values, nil
}
Loading

0 comments on commit 48ed86e

Please sign in to comment.