From e86c659793e0bc5ca44f449a41bf0da8670adb4a Mon Sep 17 00:00:00 2001 From: "Christoph Engelbert (noctarius)" Date: Thu, 21 Sep 2023 16:53:04 +0200 Subject: [PATCH] Started reimplementing snapshotting with support for pg vanilla tables --- .../replicationresolver.go | 8 +-- .../transactiontracker.go | 12 ++-- .../replicationchannel/replicationchannel.go | 8 +-- internal/sidechannel/sidechannel.go | 31 +++++---- .../systemcatalog/snapshotting/snapshotter.go | 68 +++++++++++++++---- internal/systemcatalog/systemcatalog.go | 38 +++++------ spi/eventhandlers/eventhandlers.go | 8 +-- spi/sidechannel/sidechannel.go | 8 ++- spi/systemcatalog/basetable.go | 32 ++++++--- spi/systemcatalog/hypertable.go | 30 ++++---- spi/systemcatalog/pgtable.go | 30 ++++---- spi/watermark/watermarks.go | 16 ++--- 12 files changed, 175 insertions(+), 114 deletions(-) diff --git a/internal/replication/logicalreplicationresolver/replicationresolver.go b/internal/replication/logicalreplicationresolver/replicationresolver.go index 3c8ab20c..432ddb7d 100644 --- a/internal/replication/logicalreplicationresolver/replicationresolver.go +++ b/internal/replication/logicalreplicationresolver/replicationresolver.go @@ -138,15 +138,15 @@ func (l *logicalReplicationResolver) PostConstruct() error { return nil } -func (l *logicalReplicationResolver) OnHypertableSnapshotStartedEvent( - _ string, _ *spicatalog.Hypertable, +func (l *logicalReplicationResolver) OnTableSnapshotStartedEvent( + _ string, _ spicatalog.BaseTable, ) error { return nil } -func (l *logicalReplicationResolver) OnHypertableSnapshotFinishedEvent( - _ string, _ *spicatalog.Hypertable, +func (l *logicalReplicationResolver) OnTableSnapshotFinishedEvent( + _ string, _ spicatalog.BaseTable, _ pgtypes.LSN, ) error { return nil diff --git a/internal/replication/logicalreplicationresolver/transactiontracker.go b/internal/replication/logicalreplicationresolver/transactiontracker.go index 633d95db..3c84be7b 100644 --- a/internal/replication/logicalreplicationresolver/transactiontracker.go +++ b/internal/replication/logicalreplicationresolver/transactiontracker.go @@ -79,18 +79,18 @@ func (tt *transactionTracker) PostConstruct() error { return nil } -func (tt *transactionTracker) OnHypertableSnapshotStartedEvent( - snapshotName string, hypertable *spicatalog.Hypertable, +func (tt *transactionTracker) OnTableSnapshotStartedEvent( + snapshotName string, table spicatalog.BaseTable, ) error { - return tt.resolver.OnHypertableSnapshotStartedEvent(snapshotName, hypertable) + return tt.resolver.OnTableSnapshotStartedEvent(snapshotName, table) } -func (tt *transactionTracker) OnHypertableSnapshotFinishedEvent( - snapshotName string, hypertable *spicatalog.Hypertable, +func (tt *transactionTracker) OnTableSnapshotFinishedEvent( + snapshotName string, table spicatalog.BaseTable, lsn pgtypes.LSN, ) error { - return tt.resolver.OnHypertableSnapshotFinishedEvent(snapshotName, hypertable) + return tt.resolver.OnTableSnapshotFinishedEvent(snapshotName, table, lsn) } func (tt *transactionTracker) OnSnapshottingStartedEvent( diff --git a/internal/replication/replicationchannel/replicationchannel.go b/internal/replication/replicationchannel/replicationchannel.go index f72fd4a6..6d3a1f8c 100644 --- a/internal/replication/replicationchannel/replicationchannel.go +++ b/internal/replication/replicationchannel/replicationchannel.go @@ -297,15 +297,15 @@ func (s *snapshottingEventHandler) OnChunkSnapshotFinishedEvent( return nil } -func (s *snapshottingEventHandler) OnHypertableSnapshotStartedEvent( - _ string, _ *systemcatalog.Hypertable, +func (s *snapshottingEventHandler) OnTableSnapshotStartedEvent( + _ string, _ systemcatalog.BaseTable, ) error { return nil } -func (s *snapshottingEventHandler) OnHypertableSnapshotFinishedEvent( - _ string, _ *systemcatalog.Hypertable, +func (s *snapshottingEventHandler) OnTableSnapshotFinishedEvent( + _ string, _ systemcatalog.BaseTable, _ pgtypes.LSN, ) error { return nil diff --git a/internal/sidechannel/sidechannel.go b/internal/sidechannel/sidechannel.go index bbe746f7..40a4c98b 100644 --- a/internal/sidechannel/sidechannel.go +++ b/internal/sidechannel/sidechannel.go @@ -364,6 +364,13 @@ func (sc *sideChannel) DetachTablesFromPublication( }) } +func (sc *sideChannel) SnapshotVanillaTable( + rowDecoderFactory pgtypes.RowDecoderFactory, table systemcatalog.BaseTable, + snapshotBatchSize int, cb sidechannel.SnapshotRowCallback, +) (lsn pgtypes.LSN, err error) { + +} + func (sc *sideChannel) SnapshotChunkTable( rowDecoderFactory pgtypes.RowDecoderFactory, chunk *systemcatalog.Chunk, snapshotBatchSize int, cb sidechannel.SnapshotRowCallback, @@ -393,21 +400,21 @@ func (sc *sideChannel) SnapshotChunkTable( } func (sc *sideChannel) FetchHypertableSnapshotBatch( - rowDecoderFactory pgtypes.RowDecoderFactory, hypertable *systemcatalog.Hypertable, + rowDecoderFactory pgtypes.RowDecoderFactory, table systemcatalog.BaseTable, snapshotName string, snapshotBatchSize int, cb sidechannel.SnapshotRowCallback, ) error { - index, present := hypertable.Columns().SnapshotIndex() + index, present := table.Columns().SnapshotIndex() if !present { - return errors.Errorf("missing snapshotting index for hypertable '%s'", hypertable.CanonicalName()) + return errors.Errorf("missing snapshotting index for hypertable '%s'", table.CanonicalName()) } return sc.stateStorageManager.SnapshotContextTransaction( snapshotName, false, func(snapshotContext *watermark.SnapshotContext) error { - hypertableWatermark, present := snapshotContext.GetWatermark(hypertable) + hypertableWatermark, present := snapshotContext.GetWatermark(table) if !present { - return errors.Errorf("illegal watermark state for hypertable '%s'", hypertable.CanonicalName()) + return errors.Errorf("illegal watermark state for hypertable '%s'", table.CanonicalName()) } comparison, success := index.WhereTupleLE(hypertableWatermark.HighWatermark()) @@ -423,7 +430,7 @@ func (sc *sideChannel) FetchHypertableSnapshotBatch( sc.logger.Verbosef( "Resuming snapshotting of hypertable '%s' at <<%s>> up to <<%s>>", - hypertable.CanonicalName(), lowWatermarkComparison, comparison, + table.CanonicalName(), lowWatermarkComparison, comparison, ) comparison = fmt.Sprintf("%s AND %s", @@ -433,14 +440,14 @@ func (sc *sideChannel) FetchHypertableSnapshotBatch( } else { sc.logger.Verbosef( "Starting snapshotting of hypertable '%s' up to <<%s>>", - hypertable.CanonicalName(), comparison, + table.CanonicalName(), comparison, ) } cursorName := lo.RandomString(15, lo.LowerCaseLettersCharset) cursorQuery := fmt.Sprintf( `DECLARE %s SCROLL CURSOR FOR SELECT * FROM %s WHERE %s ORDER BY %s LIMIT %d`, - cursorName, hypertable.CanonicalName(), comparison, + cursorName, table.CanonicalName(), comparison, index.AsSqlOrderBy(false), snapshotBatchSize*10, ) @@ -466,19 +473,19 @@ func (sc *sideChannel) FetchHypertableSnapshotBatch( } func (sc *sideChannel) ReadSnapshotHighWatermark( - rowDecoderFactory pgtypes.RowDecoderFactory, hypertable *systemcatalog.Hypertable, snapshotName string, + rowDecoderFactory pgtypes.RowDecoderFactory, table systemcatalog.BaseTable, snapshotName string, ) (values map[string]any, err error) { - index, present := hypertable.Columns().SnapshotIndex() + index, present := table.Columns().SnapshotIndex() if !present { return nil, errors.Errorf( - "missing snapshotting index for hypertable '%s'", hypertable.CanonicalName(), + "missing snapshotting index for hypertable '%s'", table.CanonicalName(), ) } query := fmt.Sprintf( queryTemplateSnapshotHighWatermark, index.AsSqlTuple(), - hypertable.CanonicalName(), index.AsSqlOrderBy(true), + table.CanonicalName(), index.AsSqlOrderBy(true), ) if err := sc.newSession(time.Second*10, func(session *session) error { if _, err := session.exec("BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ"); err != nil { diff --git a/internal/systemcatalog/snapshotting/snapshotter.go b/internal/systemcatalog/snapshotting/snapshotter.go index c61e0783..0ff501c8 100644 --- a/internal/systemcatalog/snapshotting/snapshotter.go +++ b/internal/systemcatalog/snapshotting/snapshotter.go @@ -44,6 +44,7 @@ type snapshotterStats struct { type snapshotterPartitionStats struct { snapshots struct { + pgtables uint `metric:"pgtables" type:"gauge"` hypertables uint `metric:"hypertable" type:"gauge"` chunks uint `metric:"chunks" type:"gauge"` } `metric:"snapshots"` @@ -53,7 +54,7 @@ type snapshotterPartitionStats struct { } type SnapshotTask struct { - Hypertable *systemcatalog.Hypertable + Table systemcatalog.BaseTable Chunk *systemcatalog.Chunk Xld *pgtypes.XLogData SnapshotName *string @@ -139,7 +140,7 @@ func (s *Snapshotter) EnqueueSnapshot( // Partition calculation hasher := fnv.New64a() - if _, err := hasher.Write([]byte(t.Hypertable.CanonicalName())); err != nil { + if _, err := hasher.Write([]byte(t.Table.CanonicalName())); err != nil { // If we cannot hash, the system will break. That means, we harshly kill the process. panic(err) } @@ -160,7 +161,7 @@ func (s *Snapshotter) EnqueueSnapshot( if t.Chunk != nil { err := s.taskManager.EnqueueTask(func(notificator task.Notificator) { notificator.NotifySnapshottingEventHandler(func(handler eventhandlers.SnapshottingEventHandler) error { - return handler.OnChunkSnapshotStartedEvent(t.Hypertable, t.Chunk) + return handler.OnChunkSnapshotStartedEvent(t.Table.(*systemcatalog.Hypertable), t.Chunk) }) enqueueSnapshotTask() }) @@ -208,9 +209,48 @@ func (s *Snapshotter) snapshot( if task.Chunk != nil { return s.snapshotChunk(task, partition) } + + if _, ok := task.Table.(*systemcatalog.PgTable); ok { + return s.snapshotVanillaTable(task, partition) + } + return s.snapshotHypertable(task, partition) } +func (s *Snapshotter) snapshotVanillaTable( + t SnapshotTask, partition int, +) error { + + defer s.statsReporter.Report(s.partitionStats[partition]) + s.partitionStats[partition].snapshots.pgtables++ + + alreadyPublished, err := s.publicationManager.ExistsTableInPublication(t.Table) + if err != nil { + return errors.Wrap(err, 0) + } + if !alreadyPublished { + if err := s.publicationManager.AttachTablesToPublication(t.Table); err != nil { + return errors.Wrap(err, 0) + } + } + + lsn, err := s.sideChannel.SnapshotVanillaTable( + s.typeManager.GetOrPlanRowDecoder, t.Table, s.snapshotBatchSize, + func(lsn pgtypes.LSN, values map[string]any) error { + + }, + ) + if err != nil { + return errors.Wrap(err, 0) + } + + return s.taskManager.EnqueueTaskAndWait(func(notificator task.Notificator) { + notificator.NotifySnapshottingEventHandler(func(handler eventhandlers.SnapshottingEventHandler) error { + return handler.OnTableSnapshotFinishedEvent(*t.SnapshotName, t.Table) + }) + }) +} + func (s *Snapshotter) snapshotChunk( t SnapshotTask, partition int, ) error { @@ -234,11 +274,11 @@ func (s *Snapshotter) snapshotChunk( s.partitionStats[partition].records.total++ return s.taskManager.EnqueueTask(func(notificator task.Notificator) { callback := func(handler eventhandlers.RecordReplicationEventHandler) error { - return handler.OnReadEvent(lsn, t.Hypertable, t.Chunk, values) + return handler.OnReadEvent(lsn, t.Table, t.Chunk, values) } if t.Xld != nil { callback = func(handler eventhandlers.RecordReplicationEventHandler) error { - return handler.OnInsertEvent(*t.Xld, t.Hypertable, t.Chunk, values) + return handler.OnInsertEvent(*t.Xld, t.Table, t.Chunk, values) } } notificator.NotifyRecordReplicationEventHandler(callback) @@ -251,7 +291,7 @@ func (s *Snapshotter) snapshotChunk( return s.taskManager.EnqueueTaskAndWait(func(notificator task.Notificator) { notificator.NotifySnapshottingEventHandler(func(handler eventhandlers.SnapshottingEventHandler) error { - return handler.OnChunkSnapshotFinishedEvent(t.Hypertable, t.Chunk, lsn) + return handler.OnChunkSnapshotFinishedEvent(t.Table.(*systemcatalog.Hypertable), t.Chunk, lsn) }) }) } @@ -267,12 +307,12 @@ func (s *Snapshotter) snapshotHypertable( if err := s.stateStorageManager.SnapshotContextTransaction( *t.SnapshotName, true, func(snapshotContext *watermark.SnapshotContext) error { - hypertableWatermark, created := snapshotContext.GetOrCreateWatermark(t.Hypertable) + hypertableWatermark, created := snapshotContext.GetOrCreateWatermark(t.Table) // Initialize the watermark or update the high watermark after a restart if created || t.nextSnapshotFetch { highWatermark, err := s.sideChannel.ReadSnapshotHighWatermark( - s.typeManager.GetOrPlanRowDecoder, t.Hypertable, *t.SnapshotName, + s.typeManager.GetOrPlanRowDecoder, t.Table, *t.SnapshotName, ) if err != nil { return errors.Wrap(err, 0) @@ -295,23 +335,23 @@ func (s *Snapshotter) snapshotHypertable( return s.stateStorageManager.SnapshotContextTransaction( *t.SnapshotName, false, func(snapshotContext *watermark.SnapshotContext) error { - hypertableWatermark, present := snapshotContext.GetWatermark(t.Hypertable) + hypertableWatermark, present := snapshotContext.GetWatermark(t.Table) if !present { return errors.Errorf( - "illegal watermark state for hypertable '%s'", t.Hypertable.CanonicalName(), + "illegal watermark state for hypertable '%s'", t.Table.CanonicalName(), ) } if hypertableWatermark.Complete() { return s.taskManager.EnqueueTaskAndWait(func(notificator task.Notificator) { notificator.NotifySnapshottingEventHandler(func(handler eventhandlers.SnapshottingEventHandler) error { - return handler.OnHypertableSnapshotFinishedEvent(*t.SnapshotName, t.Hypertable) + return handler.OnTableSnapshotFinishedEvent(*t.SnapshotName, t.Table) }) }) } return s.EnqueueSnapshot(SnapshotTask{ - Hypertable: t.Hypertable, + Table: t.Table, SnapshotName: t.SnapshotName, nextSnapshotFetch: true, }) @@ -325,7 +365,7 @@ func (s *Snapshotter) runSnapshotFetchBatch( iteration := 0 return s.sideChannel.FetchHypertableSnapshotBatch( - s.typeManager.GetOrPlanRowDecoder, t.Hypertable, *t.SnapshotName, s.snapshotBatchSize, + s.typeManager.GetOrPlanRowDecoder, t.Table, *t.SnapshotName, s.snapshotBatchSize, func(lsn pgtypes.LSN, values map[string]any) error { s.partitionStats[partition].records.total++ iteration++ @@ -336,7 +376,7 @@ func (s *Snapshotter) runSnapshotFetchBatch( return s.taskManager.EnqueueTask(func(notificator task.Notificator) { notificator.NotifyRecordReplicationEventHandler( func(handler eventhandlers.RecordReplicationEventHandler) error { - return handler.OnReadEvent(lsn, t.Hypertable, t.Chunk, values) + return handler.OnReadEvent(lsn, t.Table, t.Chunk, values) }, ) }) diff --git a/internal/systemcatalog/systemcatalog.go b/internal/systemcatalog/systemcatalog.go index 349366aa..ae9779bc 100644 --- a/internal/systemcatalog/systemcatalog.go +++ b/internal/systemcatalog/systemcatalog.go @@ -395,9 +395,9 @@ func (sc *systemCatalog) snapshotChunkWithXld( if hypertable, present := sc.FindHypertableById(chunk.HypertableId()); present { return sc.snapshotter.EnqueueSnapshot(snapshotting.SnapshotTask{ - Hypertable: hypertable, - Chunk: chunk, - Xld: xld, + Table: hypertable, + Chunk: chunk, + Xld: xld, }) } return nil @@ -408,7 +408,7 @@ func (sc *systemCatalog) snapshotHypertable( ) error { return sc.snapshotter.EnqueueSnapshot(snapshotting.SnapshotTask{ - Hypertable: hypertable, + Table: hypertable, SnapshotName: &snapshotName, }) } @@ -568,8 +568,8 @@ func (s *snapshottingEventHandler) OnChunkSnapshotFinishedEvent( return nil } -func (s *snapshottingEventHandler) OnHypertableSnapshotStartedEvent( - snapshotName string, hypertable *systemcatalog.Hypertable, +func (s *snapshottingEventHandler) OnTableSnapshotStartedEvent( + snapshotName string, table systemcatalog.BaseTable, ) error { stateManager := s.systemCatalog.stateStorageManager @@ -579,39 +579,39 @@ func (s *snapshottingEventHandler) OnHypertableSnapshotStartedEvent( } if snapshotContext != nil { - hypertableWatermark, present := snapshotContext.GetWatermark(hypertable) + hypertableWatermark, present := snapshotContext.GetWatermark(table) if !present { - s.systemCatalog.logger.Infof("Start snapshotting of hypertable '%s'", hypertable.CanonicalName()) + s.systemCatalog.logger.Infof("Start snapshotting of hypertable '%s'", table.CanonicalName()) } else if hypertableWatermark.Complete() { s.systemCatalog.logger.Infof( - "Snapshotting for hypertable '%s' already completed, skipping", hypertable.CanonicalName(), + "Snapshotting for hypertable '%s' already completed, skipping", table.CanonicalName(), ) return s.scheduleNextSnapshotHypertableOrFinish(snapshotName) } else { - s.systemCatalog.logger.Infof("Resuming snapshotting of hypertable '%s'", hypertable.CanonicalName()) + s.systemCatalog.logger.Infof("Resuming snapshotting of hypertable '%s'", table.CanonicalName()) } } else { - s.systemCatalog.logger.Infof("Start snapshotting of hypertable '%s'", hypertable.CanonicalName()) + s.systemCatalog.logger.Infof("Start snapshotting of hypertable '%s'", table.CanonicalName()) } - if err := s.systemCatalog.snapshotHypertable(snapshotName, hypertable); err != nil { + if err := s.systemCatalog.snapshotHypertable(snapshotName, table.(*systemcatalog.Hypertable)); err != nil { return err } return nil } -func (s *snapshottingEventHandler) OnHypertableSnapshotFinishedEvent( - snapshotName string, hypertable *systemcatalog.Hypertable, +func (s *snapshottingEventHandler) OnTableSnapshotFinishedEvent( + snapshotName string, table systemcatalog.BaseTable, lsn pgtypes.LSN, ) error { stateManager := s.systemCatalog.stateStorageManager if err := stateManager.SnapshotContextTransaction( snapshotName, false, func(snapshotContext *watermark.SnapshotContext) error { - s.handledHypertables[hypertable.CanonicalName()] = true + s.handledHypertables[table.CanonicalName()] = true - hypertableWatermark, _ := snapshotContext.GetOrCreateWatermark(hypertable) + hypertableWatermark, _ := snapshotContext.GetOrCreateWatermark(table) hypertableWatermark.MarkComplete() return nil @@ -620,7 +620,7 @@ func (s *snapshottingEventHandler) OnHypertableSnapshotFinishedEvent( return errors.WrapPrefix(err, "illegal snapshot context state after snapshotting hypertable", 0) } - s.systemCatalog.logger.Infof("Finished snapshotting for hypertable '%s'", hypertable.CanonicalName()) + s.systemCatalog.logger.Infof("Finished snapshotting for hypertable '%s'", table.CanonicalName()) return s.scheduleNextSnapshotHypertableOrFinish(snapshotName) } @@ -649,7 +649,7 @@ func (s *snapshottingEventHandler) OnSnapshottingStartedEvent( return s.taskManager.EnqueueTask(func(notificator task.Notificator) { notificator.NotifySnapshottingEventHandler(func(handler eventhandlers.SnapshottingEventHandler) error { - return handler.OnHypertableSnapshotStartedEvent(snapshotName, hypertable) + return handler.OnTableSnapshotStartedEvent(snapshotName, hypertable) }) }) } @@ -677,7 +677,7 @@ func (s *snapshottingEventHandler) scheduleNextSnapshotHypertableOrFinish( if nextHypertable := s.nextSnapshotHypertable(); nextHypertable != nil { return s.taskManager.EnqueueTask(func(notificator task.Notificator) { notificator.NotifySnapshottingEventHandler(func(handler eventhandlers.SnapshottingEventHandler) error { - return handler.OnHypertableSnapshotStartedEvent(snapshotName, nextHypertable) + return handler.OnTableSnapshotStartedEvent(snapshotName, nextHypertable) }) }) } diff --git a/spi/eventhandlers/eventhandlers.go b/spi/eventhandlers/eventhandlers.go index 312419c1..cccc54d2 100644 --- a/spi/eventhandlers/eventhandlers.go +++ b/spi/eventhandlers/eventhandlers.go @@ -109,11 +109,11 @@ type SnapshottingEventHandler interface { OnChunkSnapshotFinishedEvent( hypertable *systemcatalog.Hypertable, chunk *systemcatalog.Chunk, snapshot pgtypes.LSN, ) error - OnHypertableSnapshotStartedEvent( - snapshotName string, hypertable *systemcatalog.Hypertable, + OnTableSnapshotStartedEvent( + snapshotName string, table systemcatalog.BaseTable, ) error - OnHypertableSnapshotFinishedEvent( - snapshotName string, hypertable *systemcatalog.Hypertable, + OnTableSnapshotFinishedEvent( + snapshotName string, table systemcatalog.BaseTable, lsn pgtypes.LSN, ) error OnSnapshottingStartedEvent( snapshotName string, diff --git a/spi/sidechannel/sidechannel.go b/spi/sidechannel/sidechannel.go index 433aa27c..81f9a88b 100644 --- a/spi/sidechannel/sidechannel.go +++ b/spi/sidechannel/sidechannel.go @@ -90,16 +90,20 @@ type SideChannel interface { DetachTablesFromPublication( publicationName string, entities ...systemcatalog.SystemEntity, ) error + SnapshotVanillaTable( + rowDecoderFactory pgtypes.RowDecoderFactory, table systemcatalog.BaseTable, + snapshotBatchSize int, cb SnapshotRowCallback, + ) (lsn pgtypes.LSN, err error) SnapshotChunkTable( rowDecoderFactory pgtypes.RowDecoderFactory, chunk *systemcatalog.Chunk, snapshotBatchSize int, cb SnapshotRowCallback, ) (lsn pgtypes.LSN, err error) FetchHypertableSnapshotBatch( - rowDecoderFactory pgtypes.RowDecoderFactory, hypertable *systemcatalog.Hypertable, + rowDecoderFactory pgtypes.RowDecoderFactory, table systemcatalog.BaseTable, snapshotName string, snapshotBatchSize int, cb SnapshotRowCallback, ) error ReadSnapshotHighWatermark( - rowDecoderFactory pgtypes.RowDecoderFactory, hypertable *systemcatalog.Hypertable, snapshotName string, + rowDecoderFactory pgtypes.RowDecoderFactory, table systemcatalog.BaseTable, snapshotName string, ) (values map[string]any, err error) ReadReplicaIdentity( schemaName, tableName string, diff --git a/spi/systemcatalog/basetable.go b/spi/systemcatalog/basetable.go index 529cd808..2b1a251c 100644 --- a/spi/systemcatalog/basetable.go +++ b/spi/systemcatalog/basetable.go @@ -24,7 +24,17 @@ import ( "github.com/samber/lo" ) -type BaseTable struct { +type BaseTable interface { + SystemEntity + Columns() Columns + TableColumns() []schema.ColumnAlike + KeyIndexColumns() []schema.ColumnAlike + ReplicaIdentity() pgtypes.ReplicaIdentity + SchemaBuilder() schema.Builder + ApplyTableSchema(newColumns []Column) (changes map[string]string) +} + +type baseTable struct { *baseSystemEntity tableColumns []schema.ColumnAlike replicaIdentity pgtypes.ReplicaIdentity @@ -33,9 +43,9 @@ type BaseTable struct { func newBaseTable( schemaName, tableName string, replicaIdentity pgtypes.ReplicaIdentity, -) *BaseTable { +) *baseTable { - return &BaseTable{ + return &baseTable{ baseSystemEntity: &baseSystemEntity{ schemaName: schemaName, tableName: tableName, @@ -48,25 +58,25 @@ func newBaseTable( // Columns returns a slice with the column definitions // of the table -func (bt *BaseTable) Columns() Columns { +func (bt *baseTable) Columns() Columns { return bt.columns } // TableColumns returns a slice of ColumnAlike entries // representing the columns of this table -func (bt *BaseTable) TableColumns() []schema.ColumnAlike { +func (bt *baseTable) TableColumns() []schema.ColumnAlike { return bt.tableColumns } // ReplicaIdentity returns the replica identity (if available), // otherwise a pgtypes.UNKNOWN is returned -func (bt *BaseTable) ReplicaIdentity() pgtypes.ReplicaIdentity { +func (bt *baseTable) ReplicaIdentity() pgtypes.ReplicaIdentity { return bt.replicaIdentity } // SchemaBuilder returns a SchemaBuilder instance, preconfigured // for this table -func (bt *BaseTable) SchemaBuilder() schema.Builder { +func (bt *baseTable) SchemaBuilder() schema.Builder { schemaBuilder := schema.NewSchemaBuilder(schema.STRUCT). FieldName(bt.CanonicalName()) @@ -79,7 +89,7 @@ func (bt *BaseTable) SchemaBuilder() schema.Builder { // ApplyTableSchema applies a new table schema to this // table and returns changes to the previously // known schema layout. -func (bt *BaseTable) ApplyTableSchema( +func (bt *baseTable) ApplyTableSchema( newColumns []Column, ) (changes map[string]string) { @@ -148,11 +158,11 @@ func (bt *BaseTable) ApplyTableSchema( return differences } -func (bt *BaseTable) ApplyChanges( +func (bt *baseTable) ApplyChanges( schemaName, tableName string, replicaIdentity pgtypes.ReplicaIdentity, -) *BaseTable { +) *baseTable { - return &BaseTable{ + return &baseTable{ baseSystemEntity: &baseSystemEntity{ schemaName: schemaName, tableName: tableName, diff --git a/spi/systemcatalog/hypertable.go b/spi/systemcatalog/hypertable.go index a35c1659..75019cd9 100644 --- a/spi/systemcatalog/hypertable.go +++ b/spi/systemcatalog/hypertable.go @@ -27,7 +27,7 @@ import ( // Hypertable represents a TimescaleDB hypertable definition // in the system catalog type Hypertable struct { - *BaseTable + *baseTable id int32 associatedSchemaName string associatedTablePrefix string @@ -47,7 +47,7 @@ func NewHypertable( ) *Hypertable { return &Hypertable{ - BaseTable: newBaseTable(schemaName, tableName, replicaIdentity), + baseTable: newBaseTable(schemaName, tableName, replicaIdentity), id: id, associatedSchemaName: associatedSchemaName, associatedTablePrefix: associatedTablePrefix, @@ -127,7 +127,7 @@ func (h *Hypertable) IsContinuousAggregate() bool { // A snapshot index is either the (composite) primary key or // a "virtual" index built from the hypertable's dimensions func (h *Hypertable) KeyIndexColumns() []schema.ColumnAlike { - index, present := (Columns(h.columns)).SnapshotIndex() + index, present := (h.Columns()).SnapshotIndex() if !present { return nil } @@ -151,8 +151,8 @@ func (h *Hypertable) String() string { builder := strings.Builder{} builder.WriteString("{") builder.WriteString(fmt.Sprintf("id:%d ", h.id)) - builder.WriteString(fmt.Sprintf("schemaName:%s ", h.schemaName)) - builder.WriteString(fmt.Sprintf("tableName:%s ", h.tableName)) + builder.WriteString(fmt.Sprintf("schemaName:%s ", h.SchemaName())) + builder.WriteString(fmt.Sprintf("tableName:%s ", h.TableName())) builder.WriteString(fmt.Sprintf("associatedSchemaName:%s ", h.associatedSchemaName)) builder.WriteString(fmt.Sprintf("associatedTablePrefix:%s ", h.associatedTablePrefix)) if h.compressedHypertableId == nil { @@ -161,7 +161,7 @@ func (h *Hypertable) String() string { builder.WriteString(fmt.Sprintf("compressedHypertableId:%d ", *h.compressedHypertableId)) } builder.WriteString(fmt.Sprintf("compressionState:%d ", h.compressionState)) - builder.WriteString(fmt.Sprintf("replicaIdentity:%s", h.replicaIdentity)) + builder.WriteString(fmt.Sprintf("replicaIdentity:%s", h.ReplicaIdentity())) if h.viewSchema == nil { builder.WriteString("viewSchema: ") } else { @@ -173,9 +173,9 @@ func (h *Hypertable) String() string { builder.WriteString(fmt.Sprintf("viewName:%s ", *h.viewName)) } builder.WriteString("columns:[") - for i, column := range h.columns { + for i, column := range h.Columns() { builder.WriteString(column.String()) - if i < len(h.columns)-1 { + if i < len(h.Columns())-1 { builder.WriteString(" ") } } @@ -194,7 +194,7 @@ func (h *Hypertable) ApplyChanges( ) (applied *Hypertable, changes map[string]string) { h2 := &Hypertable{ - BaseTable: h.BaseTable.ApplyChanges(schemaName, tableName, replicaIdentity), + baseTable: h.baseTable.ApplyChanges(schemaName, tableName, replicaIdentity), id: h.id, associatedSchemaName: associatedSchemaName, associatedTablePrefix: associatedTablePrefix, @@ -213,11 +213,11 @@ func (h *Hypertable) differences( if h.id != new.id { differences["id"] = fmt.Sprintf("%d=>%d", h.id, new.id) } - if h.schemaName != new.schemaName { - differences["schemaName"] = fmt.Sprintf("%s=>%s", h.schemaName, new.schemaName) + if h.SchemaName() != new.SchemaName() { + differences["schemaName"] = fmt.Sprintf("%s=>%s", h.SchemaName(), new.SchemaName()) } - if h.tableName != new.tableName { - differences["hypertableName"] = fmt.Sprintf("%s=>%s", h.tableName, new.tableName) + if h.TableName() != new.TableName() { + differences["hypertableName"] = fmt.Sprintf("%s=>%s", h.TableName(), new.TableName()) } if h.associatedSchemaName != new.associatedSchemaName { differences["associatedSchemaName"] = fmt.Sprintf("%s=>%s", h.associatedSchemaName, new.associatedSchemaName) @@ -240,8 +240,8 @@ func (h *Hypertable) differences( if h.compressionState != new.compressionState { differences["compressionState"] = fmt.Sprintf("%d=>%d", h.compressionState, new.compressionState) } - if h.replicaIdentity != new.replicaIdentity { - differences["replicaIdentity"] = fmt.Sprintf("%s=>%s", h.replicaIdentity, new.replicaIdentity) + if h.ReplicaIdentity() != new.ReplicaIdentity() { + differences["replicaIdentity"] = fmt.Sprintf("%s=>%s", h.ReplicaIdentity(), new.ReplicaIdentity()) } return differences } diff --git a/spi/systemcatalog/pgtable.go b/spi/systemcatalog/pgtable.go index a10c6421..0be70bf1 100644 --- a/spi/systemcatalog/pgtable.go +++ b/spi/systemcatalog/pgtable.go @@ -27,7 +27,7 @@ import ( // PgTable represents a vanilla PostgreSQL table definition // in the system catalog type PgTable struct { - *BaseTable + *baseTable relId uint32 } @@ -37,7 +37,7 @@ func NewPgTable( ) *PgTable { return &PgTable{ - BaseTable: newBaseTable(schemaName, tableName, replicaIdentity), + baseTable: newBaseTable(schemaName, tableName, replicaIdentity), relId: relId, } } @@ -52,7 +52,7 @@ func (t *PgTable) RelId() uint32 { // A snapshot index must be the (composite) primary key of // the table, since no dimensions exists (as with hypertables), func (t *PgTable) KeyIndexColumns() []schema.ColumnAlike { - index, present := (Columns(t.columns)).PrimaryKeyIndex() + index, present := (t.Columns()).PrimaryKeyIndex() if !present { return nil } @@ -67,13 +67,13 @@ func (t *PgTable) String() string { builder := strings.Builder{} builder.WriteString("{") builder.WriteString(fmt.Sprintf("relId:%d ", t.relId)) - builder.WriteString(fmt.Sprintf("schemaName:%s ", t.schemaName)) - builder.WriteString(fmt.Sprintf("tableName:%s ", t.tableName)) - builder.WriteString(fmt.Sprintf("replicaIdentity:%s", t.replicaIdentity)) + builder.WriteString(fmt.Sprintf("schemaName:%s ", t.SchemaName())) + builder.WriteString(fmt.Sprintf("tableName:%s ", t.TableName())) + builder.WriteString(fmt.Sprintf("replicaIdentity:%s", t.ReplicaIdentity())) builder.WriteString("columns:[") - for i, column := range t.columns { + for i, column := range t.Columns() { builder.WriteString(column.String()) - if i < len(t.columns)-1 { + if i < len(t.Columns())-1 { builder.WriteString(" ") } } @@ -89,7 +89,7 @@ func (t *PgTable) ApplyChanges( ) (applied *PgTable, changes map[string]string) { t2 := &PgTable{ - BaseTable: t.BaseTable.ApplyChanges(schemaName, tableName, replicaIdentity), + baseTable: t.baseTable.ApplyChanges(schemaName, tableName, replicaIdentity), relId: t.relId, } return t2, t.differences(t2) @@ -103,14 +103,14 @@ func (t *PgTable) differences( if t.relId != new.relId { differences["relId"] = fmt.Sprintf("%d=>%d", t.relId, new.relId) } - if t.schemaName != new.schemaName { - differences["schemaName"] = fmt.Sprintf("%s=>%s", t.schemaName, new.schemaName) + if t.SchemaName() != new.SchemaName() { + differences["schemaName"] = fmt.Sprintf("%s=>%s", t.SchemaName(), new.SchemaName()) } - if t.tableName != new.tableName { - differences["hypertableName"] = fmt.Sprintf("%s=>%s", t.tableName, new.tableName) + if t.TableName() != new.TableName() { + differences["hypertableName"] = fmt.Sprintf("%s=>%s", t.TableName(), new.TableName()) } - if t.replicaIdentity != new.replicaIdentity { - differences["replicaIdentity"] = fmt.Sprintf("%s=>%s", t.replicaIdentity, new.replicaIdentity) + if t.ReplicaIdentity() != new.ReplicaIdentity() { + differences["replicaIdentity"] = fmt.Sprintf("%s=>%s", t.ReplicaIdentity(), new.ReplicaIdentity()) } return differences } diff --git a/spi/watermark/watermarks.go b/spi/watermark/watermarks.go index f46c51cd..4c0b56a3 100644 --- a/spi/watermark/watermarks.go +++ b/spi/watermark/watermarks.go @@ -43,10 +43,10 @@ func NewSnapshotContext( } func (sc *SnapshotContext) GetWatermark( - hypertable *systemcatalog.Hypertable, + table systemcatalog.BaseTable, ) (watermark *Watermark, present bool) { - w, present := sc.watermarks[hypertable.CanonicalName()] + w, present := sc.watermarks[table.CanonicalName()] if !present { return nil, false } @@ -54,13 +54,13 @@ func (sc *SnapshotContext) GetWatermark( } func (sc *SnapshotContext) GetOrCreateWatermark( - hypertable *systemcatalog.Hypertable, + table systemcatalog.BaseTable, ) (watermark *Watermark, created bool) { - w, present := sc.watermarks[hypertable.CanonicalName()] + w, present := sc.watermarks[table.CanonicalName()] if !present { - w = newWatermark(hypertable) - sc.watermarks[hypertable.CanonicalName()] = w + w = newWatermark(table) + sc.watermarks[table.CanonicalName()] = w } return w, !present } @@ -261,11 +261,11 @@ type Watermark struct { } func newWatermark( - hypertable *systemcatalog.Hypertable, + table systemcatalog.BaseTable, ) *Watermark { dataTypes := make(map[string]uint32) - if index, ok := hypertable.Columns().SnapshotIndex(); ok { + if index, ok := table.Columns().SnapshotIndex(); ok { for _, column := range index.Columns() { dataTypes[column.Name()] = column.DataType() }