Skip to content

Commit

Permalink
Started reimplementing snapshotting with support for pg vanilla tables
Browse files Browse the repository at this point in the history
  • Loading branch information
noctarius committed Sep 26, 2023
1 parent 79847be commit e86c659
Show file tree
Hide file tree
Showing 12 changed files with 175 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,15 @@ func (l *logicalReplicationResolver) PostConstruct() error {
return nil
}

func (l *logicalReplicationResolver) OnHypertableSnapshotStartedEvent(
_ string, _ *spicatalog.Hypertable,
func (l *logicalReplicationResolver) OnTableSnapshotStartedEvent(
_ string, _ spicatalog.BaseTable,
) error {

return nil
}

func (l *logicalReplicationResolver) OnHypertableSnapshotFinishedEvent(
_ string, _ *spicatalog.Hypertable,
func (l *logicalReplicationResolver) OnTableSnapshotFinishedEvent(
_ string, _ spicatalog.BaseTable, _ pgtypes.LSN,
) error {

return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,18 @@ func (tt *transactionTracker) PostConstruct() error {
return nil
}

func (tt *transactionTracker) OnHypertableSnapshotStartedEvent(
snapshotName string, hypertable *spicatalog.Hypertable,
func (tt *transactionTracker) OnTableSnapshotStartedEvent(
snapshotName string, table spicatalog.BaseTable,
) error {

return tt.resolver.OnHypertableSnapshotStartedEvent(snapshotName, hypertable)
return tt.resolver.OnTableSnapshotStartedEvent(snapshotName, table)
}

func (tt *transactionTracker) OnHypertableSnapshotFinishedEvent(
snapshotName string, hypertable *spicatalog.Hypertable,
func (tt *transactionTracker) OnTableSnapshotFinishedEvent(
snapshotName string, table spicatalog.BaseTable, lsn pgtypes.LSN,
) error {

return tt.resolver.OnHypertableSnapshotFinishedEvent(snapshotName, hypertable)
return tt.resolver.OnTableSnapshotFinishedEvent(snapshotName, table, lsn)
}

func (tt *transactionTracker) OnSnapshottingStartedEvent(
Expand Down
8 changes: 4 additions & 4 deletions internal/replication/replicationchannel/replicationchannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,15 +297,15 @@ func (s *snapshottingEventHandler) OnChunkSnapshotFinishedEvent(
return nil
}

func (s *snapshottingEventHandler) OnHypertableSnapshotStartedEvent(
_ string, _ *systemcatalog.Hypertable,
func (s *snapshottingEventHandler) OnTableSnapshotStartedEvent(
_ string, _ systemcatalog.BaseTable,
) error {

return nil
}

func (s *snapshottingEventHandler) OnHypertableSnapshotFinishedEvent(
_ string, _ *systemcatalog.Hypertable,
func (s *snapshottingEventHandler) OnTableSnapshotFinishedEvent(
_ string, _ systemcatalog.BaseTable, _ pgtypes.LSN,
) error {

return nil
Expand Down
31 changes: 19 additions & 12 deletions internal/sidechannel/sidechannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,13 @@ func (sc *sideChannel) DetachTablesFromPublication(
})
}

func (sc *sideChannel) SnapshotVanillaTable(
rowDecoderFactory pgtypes.RowDecoderFactory, table systemcatalog.BaseTable,
snapshotBatchSize int, cb sidechannel.SnapshotRowCallback,
) (lsn pgtypes.LSN, err error) {

}

func (sc *sideChannel) SnapshotChunkTable(
rowDecoderFactory pgtypes.RowDecoderFactory, chunk *systemcatalog.Chunk,
snapshotBatchSize int, cb sidechannel.SnapshotRowCallback,
Expand Down Expand Up @@ -393,21 +400,21 @@ func (sc *sideChannel) SnapshotChunkTable(
}

func (sc *sideChannel) FetchHypertableSnapshotBatch(
rowDecoderFactory pgtypes.RowDecoderFactory, hypertable *systemcatalog.Hypertable,
rowDecoderFactory pgtypes.RowDecoderFactory, table systemcatalog.BaseTable,
snapshotName string, snapshotBatchSize int, cb sidechannel.SnapshotRowCallback,
) error {

index, present := hypertable.Columns().SnapshotIndex()
index, present := table.Columns().SnapshotIndex()
if !present {
return errors.Errorf("missing snapshotting index for hypertable '%s'", hypertable.CanonicalName())
return errors.Errorf("missing snapshotting index for hypertable '%s'", table.CanonicalName())
}

return sc.stateStorageManager.SnapshotContextTransaction(
snapshotName, false,
func(snapshotContext *watermark.SnapshotContext) error {
hypertableWatermark, present := snapshotContext.GetWatermark(hypertable)
hypertableWatermark, present := snapshotContext.GetWatermark(table)
if !present {
return errors.Errorf("illegal watermark state for hypertable '%s'", hypertable.CanonicalName())
return errors.Errorf("illegal watermark state for hypertable '%s'", table.CanonicalName())
}

comparison, success := index.WhereTupleLE(hypertableWatermark.HighWatermark())
Expand All @@ -423,7 +430,7 @@ func (sc *sideChannel) FetchHypertableSnapshotBatch(

sc.logger.Verbosef(
"Resuming snapshotting of hypertable '%s' at <<%s>> up to <<%s>>",
hypertable.CanonicalName(), lowWatermarkComparison, comparison,
table.CanonicalName(), lowWatermarkComparison, comparison,
)

comparison = fmt.Sprintf("%s AND %s",
Expand All @@ -433,14 +440,14 @@ func (sc *sideChannel) FetchHypertableSnapshotBatch(
} else {
sc.logger.Verbosef(
"Starting snapshotting of hypertable '%s' up to <<%s>>",
hypertable.CanonicalName(), comparison,
table.CanonicalName(), comparison,
)
}

cursorName := lo.RandomString(15, lo.LowerCaseLettersCharset)
cursorQuery := fmt.Sprintf(
`DECLARE %s SCROLL CURSOR FOR SELECT * FROM %s WHERE %s ORDER BY %s LIMIT %d`,
cursorName, hypertable.CanonicalName(), comparison,
cursorName, table.CanonicalName(), comparison,
index.AsSqlOrderBy(false), snapshotBatchSize*10,
)

Expand All @@ -466,19 +473,19 @@ func (sc *sideChannel) FetchHypertableSnapshotBatch(
}

func (sc *sideChannel) ReadSnapshotHighWatermark(
rowDecoderFactory pgtypes.RowDecoderFactory, hypertable *systemcatalog.Hypertable, snapshotName string,
rowDecoderFactory pgtypes.RowDecoderFactory, table systemcatalog.BaseTable, snapshotName string,
) (values map[string]any, err error) {

index, present := hypertable.Columns().SnapshotIndex()
index, present := table.Columns().SnapshotIndex()
if !present {
return nil, errors.Errorf(
"missing snapshotting index for hypertable '%s'", hypertable.CanonicalName(),
"missing snapshotting index for hypertable '%s'", table.CanonicalName(),
)
}

query := fmt.Sprintf(
queryTemplateSnapshotHighWatermark, index.AsSqlTuple(),
hypertable.CanonicalName(), index.AsSqlOrderBy(true),
table.CanonicalName(), index.AsSqlOrderBy(true),
)
if err := sc.newSession(time.Second*10, func(session *session) error {
if _, err := session.exec("BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ"); err != nil {
Expand Down
68 changes: 54 additions & 14 deletions internal/systemcatalog/snapshotting/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type snapshotterStats struct {

type snapshotterPartitionStats struct {
snapshots struct {
pgtables uint `metric:"pgtables" type:"gauge"`
hypertables uint `metric:"hypertable" type:"gauge"`
chunks uint `metric:"chunks" type:"gauge"`
} `metric:"snapshots"`
Expand All @@ -53,7 +54,7 @@ type snapshotterPartitionStats struct {
}

type SnapshotTask struct {
Hypertable *systemcatalog.Hypertable
Table systemcatalog.BaseTable
Chunk *systemcatalog.Chunk
Xld *pgtypes.XLogData
SnapshotName *string
Expand Down Expand Up @@ -139,7 +140,7 @@ func (s *Snapshotter) EnqueueSnapshot(

// Partition calculation
hasher := fnv.New64a()
if _, err := hasher.Write([]byte(t.Hypertable.CanonicalName())); err != nil {
if _, err := hasher.Write([]byte(t.Table.CanonicalName())); err != nil {
// If we cannot hash, the system will break. That means, we harshly kill the process.
panic(err)
}
Expand All @@ -160,7 +161,7 @@ func (s *Snapshotter) EnqueueSnapshot(
if t.Chunk != nil {
err := s.taskManager.EnqueueTask(func(notificator task.Notificator) {
notificator.NotifySnapshottingEventHandler(func(handler eventhandlers.SnapshottingEventHandler) error {
return handler.OnChunkSnapshotStartedEvent(t.Hypertable, t.Chunk)
return handler.OnChunkSnapshotStartedEvent(t.Table.(*systemcatalog.Hypertable), t.Chunk)
})
enqueueSnapshotTask()
})
Expand Down Expand Up @@ -208,9 +209,48 @@ func (s *Snapshotter) snapshot(
if task.Chunk != nil {
return s.snapshotChunk(task, partition)
}

if _, ok := task.Table.(*systemcatalog.PgTable); ok {
return s.snapshotVanillaTable(task, partition)
}

return s.snapshotHypertable(task, partition)
}

func (s *Snapshotter) snapshotVanillaTable(
t SnapshotTask, partition int,
) error {

defer s.statsReporter.Report(s.partitionStats[partition])
s.partitionStats[partition].snapshots.pgtables++

alreadyPublished, err := s.publicationManager.ExistsTableInPublication(t.Table)
if err != nil {
return errors.Wrap(err, 0)
}
if !alreadyPublished {
if err := s.publicationManager.AttachTablesToPublication(t.Table); err != nil {
return errors.Wrap(err, 0)
}
}

lsn, err := s.sideChannel.SnapshotVanillaTable(
s.typeManager.GetOrPlanRowDecoder, t.Table, s.snapshotBatchSize,
func(lsn pgtypes.LSN, values map[string]any) error {

},
)
if err != nil {
return errors.Wrap(err, 0)
}

return s.taskManager.EnqueueTaskAndWait(func(notificator task.Notificator) {
notificator.NotifySnapshottingEventHandler(func(handler eventhandlers.SnapshottingEventHandler) error {
return handler.OnTableSnapshotFinishedEvent(*t.SnapshotName, t.Table)
})
})
}

func (s *Snapshotter) snapshotChunk(
t SnapshotTask, partition int,
) error {
Expand All @@ -234,11 +274,11 @@ func (s *Snapshotter) snapshotChunk(
s.partitionStats[partition].records.total++
return s.taskManager.EnqueueTask(func(notificator task.Notificator) {
callback := func(handler eventhandlers.RecordReplicationEventHandler) error {
return handler.OnReadEvent(lsn, t.Hypertable, t.Chunk, values)
return handler.OnReadEvent(lsn, t.Table, t.Chunk, values)
}
if t.Xld != nil {
callback = func(handler eventhandlers.RecordReplicationEventHandler) error {
return handler.OnInsertEvent(*t.Xld, t.Hypertable, t.Chunk, values)
return handler.OnInsertEvent(*t.Xld, t.Table, t.Chunk, values)
}
}
notificator.NotifyRecordReplicationEventHandler(callback)
Expand All @@ -251,7 +291,7 @@ func (s *Snapshotter) snapshotChunk(

return s.taskManager.EnqueueTaskAndWait(func(notificator task.Notificator) {
notificator.NotifySnapshottingEventHandler(func(handler eventhandlers.SnapshottingEventHandler) error {
return handler.OnChunkSnapshotFinishedEvent(t.Hypertable, t.Chunk, lsn)
return handler.OnChunkSnapshotFinishedEvent(t.Table.(*systemcatalog.Hypertable), t.Chunk, lsn)
})
})
}
Expand All @@ -267,12 +307,12 @@ func (s *Snapshotter) snapshotHypertable(
if err := s.stateStorageManager.SnapshotContextTransaction(
*t.SnapshotName, true,
func(snapshotContext *watermark.SnapshotContext) error {
hypertableWatermark, created := snapshotContext.GetOrCreateWatermark(t.Hypertable)
hypertableWatermark, created := snapshotContext.GetOrCreateWatermark(t.Table)

// Initialize the watermark or update the high watermark after a restart
if created || t.nextSnapshotFetch {
highWatermark, err := s.sideChannel.ReadSnapshotHighWatermark(
s.typeManager.GetOrPlanRowDecoder, t.Hypertable, *t.SnapshotName,
s.typeManager.GetOrPlanRowDecoder, t.Table, *t.SnapshotName,
)
if err != nil {
return errors.Wrap(err, 0)
Expand All @@ -295,23 +335,23 @@ func (s *Snapshotter) snapshotHypertable(
return s.stateStorageManager.SnapshotContextTransaction(
*t.SnapshotName, false,
func(snapshotContext *watermark.SnapshotContext) error {
hypertableWatermark, present := snapshotContext.GetWatermark(t.Hypertable)
hypertableWatermark, present := snapshotContext.GetWatermark(t.Table)
if !present {
return errors.Errorf(
"illegal watermark state for hypertable '%s'", t.Hypertable.CanonicalName(),
"illegal watermark state for hypertable '%s'", t.Table.CanonicalName(),
)
}

if hypertableWatermark.Complete() {
return s.taskManager.EnqueueTaskAndWait(func(notificator task.Notificator) {
notificator.NotifySnapshottingEventHandler(func(handler eventhandlers.SnapshottingEventHandler) error {
return handler.OnHypertableSnapshotFinishedEvent(*t.SnapshotName, t.Hypertable)
return handler.OnTableSnapshotFinishedEvent(*t.SnapshotName, t.Table)
})
})
}

return s.EnqueueSnapshot(SnapshotTask{
Hypertable: t.Hypertable,
Table: t.Table,
SnapshotName: t.SnapshotName,
nextSnapshotFetch: true,
})
Expand All @@ -325,7 +365,7 @@ func (s *Snapshotter) runSnapshotFetchBatch(

iteration := 0
return s.sideChannel.FetchHypertableSnapshotBatch(
s.typeManager.GetOrPlanRowDecoder, t.Hypertable, *t.SnapshotName, s.snapshotBatchSize,
s.typeManager.GetOrPlanRowDecoder, t.Table, *t.SnapshotName, s.snapshotBatchSize,
func(lsn pgtypes.LSN, values map[string]any) error {
s.partitionStats[partition].records.total++
iteration++
Expand All @@ -336,7 +376,7 @@ func (s *Snapshotter) runSnapshotFetchBatch(
return s.taskManager.EnqueueTask(func(notificator task.Notificator) {
notificator.NotifyRecordReplicationEventHandler(
func(handler eventhandlers.RecordReplicationEventHandler) error {
return handler.OnReadEvent(lsn, t.Hypertable, t.Chunk, values)
return handler.OnReadEvent(lsn, t.Table, t.Chunk, values)
},
)
})
Expand Down
Loading

0 comments on commit e86c659

Please sign in to comment.