Skip to content

Commit

Permalink
Enhanced logging of model objects using stringer functions
Browse files Browse the repository at this point in the history
  • Loading branch information
noctarius committed Jul 5, 2023
1 parent bdd7b2a commit c4a8dd4
Show file tree
Hide file tree
Showing 9 changed files with 134 additions and 131 deletions.
18 changes: 9 additions & 9 deletions internal/replication/context/sidechannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,23 +467,23 @@ func (sc *sideChannelImpl) fetchHypertableSnapshotBatch(
return sc.replicationContext.SnapshotContextTransaction(
snapshotName, false,
func(snapshotContext *watermark.SnapshotContext) error {
watermark, present := snapshotContext.GetWatermark(hypertable)
hypertableWatermark, present := snapshotContext.GetWatermark(hypertable)
if !present {
return errors.Errorf("illegal watermark state for hypertable '%s'", hypertable.CanonicalName())
}

comparison, success := index.WhereTupleLE(watermark.HighWatermark())
comparison, success := index.WhereTupleLE(hypertableWatermark.HighWatermark())
if !success {
return errors.Errorf("failed encoding watermark: %+v", watermark.HighWatermark())
return errors.Errorf("failed encoding watermark: %+v", hypertableWatermark.HighWatermark())
}

if watermark.HasValidLowWatermark() {
lowWatermarkComparison, success := index.WhereTupleGT(watermark.LowWatermark())
if hypertableWatermark.HasValidLowWatermark() {
lowWatermarkComparison, success := index.WhereTupleGT(hypertableWatermark.LowWatermark())
if !success {
return errors.Errorf("failed encoding watermark: %+v", watermark.LowWatermark())
return errors.Errorf("failed encoding watermark: %+v", hypertableWatermark.LowWatermark())
}

sc.logger.Debugf(
sc.logger.Verbosef(
"Resuming snapshotting of hypertable '%s' at <<%s>> up to <<%s>>",
hypertable.CanonicalName(), lowWatermarkComparison, comparison,
)
Expand All @@ -493,7 +493,7 @@ func (sc *sideChannelImpl) fetchHypertableSnapshotBatch(
comparison,
)
} else {
sc.logger.Debugf(
sc.logger.Verbosef(
"Starting snapshotting of hypertable '%s' up to <<%s>>",
hypertable.CanonicalName(), comparison,
)
Expand All @@ -516,7 +516,7 @@ func (sc *sideChannelImpl) fetchHypertableSnapshotBatch(
return false
})

watermark.SetLowWatermark(indexValues)
hypertableWatermark.SetLowWatermark(indexValues)
return cb(lsn, values)
}

Expand Down
8 changes: 4 additions & 4 deletions internal/systemcatalog/catalogeventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (s *systemCatalogReplicationEventHandler) OnHypertableAddedEvent(
if err := s.systemCatalog.RegisterHypertable(h); err != nil {
return errors.Errorf("registering hypertable failed: %v (error: %+v)", h, err)
}
s.systemCatalog.logger.Verbosef("ADDED CATALOG ENTRY: HYPERTABLE %d => %+v", h.Id(), h)
s.systemCatalog.logger.Verbosef("ADDED CATALOG ENTRY: HYPERTABLE %d => %s", h.Id(), h.String())

return s.systemCatalog.replicationContext.ReadHypertableSchema(s.systemCatalog.ApplySchemaUpdate, h)
},
Expand Down Expand Up @@ -120,7 +120,7 @@ func (s *systemCatalogReplicationEventHandler) OnHypertableDeletedEvent(
hypertableId := oldValues["id"].(int32)
if hypertable, present := s.systemCatalog.FindHypertableById(hypertableId); present {
if err := s.systemCatalog.UnregisterHypertable(hypertable); err != nil {
s.systemCatalog.logger.Fatalf("unregistering hypertable failed: %v", hypertable)
s.systemCatalog.logger.Fatalf("unregistering hypertable failed: %s", hypertable.String())
}
}
return nil
Expand All @@ -140,8 +140,8 @@ func (s *systemCatalogReplicationEventHandler) OnChunkAddedEvent(

if h, present := s.systemCatalog.FindHypertableById(hypertableId); present {
s.systemCatalog.logger.Verbosef(
"ADDED CATALOG ENTRY: CHUNK %d FOR HYPERTABLE %s => %+v",
c.Id(), h.CanonicalName(), *c,
"ADDED CATALOG ENTRY: CHUNK %d FOR HYPERTABLE %s => %s",
c.Id(), h.CanonicalName(), c.String(),
)

if !c.IsCompressed() &&
Expand Down
10 changes: 5 additions & 5 deletions internal/systemcatalog/systemcatalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,22 +305,22 @@ func initializeSystemCatalog(sc *SystemCatalog) (*SystemCatalog, error) {
}

if err := sc.RegisterHypertable(hypertable); err != nil {
return errors.Errorf("registering hypertable failed: %v (error: %+v)", hypertable, err)
return errors.Errorf("registering hypertable failed: %s (error: %+v)", hypertable, err)
}
sc.logger.Verbosef("ADDED CATALOG ENTRY: HYPERTABLE %d => %+v", hypertable.Id(), hypertable)
sc.logger.Verbosef("ADDED CATALOG ENTRY: HYPERTABLE %d => %s", hypertable.Id(), hypertable.String())
return nil
}); err != nil {
return nil, errors.Wrap(err, 0)
}

if err := sc.replicationContext.LoadChunks(func(chunk *systemcatalog.Chunk) error {
if err := sc.RegisterChunk(chunk); err != nil {
return errors.Errorf("registering chunk failed: %v (error: %+v)", chunk, err)
return errors.Errorf("registering chunk failed: %s (error: %+v)", chunk, err)
}
if h, present := sc.FindHypertableById(chunk.HypertableId()); present {
sc.logger.Verbosef(
"ADDED CATALOG ENTRY: CHUNK %d FOR HYPERTABLE %s => %+v",
chunk.Id(), h.CanonicalName(), *chunk,
"ADDED CATALOG ENTRY: CHUNK %d FOR HYPERTABLE %s => %s",
chunk.Id(), h.CanonicalName(), chunk.String(),
)
}
return nil
Expand Down
110 changes: 0 additions & 110 deletions internal/tests/integration/streamer_loading_large_hypertable_test.go

This file was deleted.

18 changes: 18 additions & 0 deletions spi/systemcatalog/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,24 @@ func (c *Chunk) IsCompressed() bool {
return c.compressed
}

func (c *Chunk) String() string {
builder := strings.Builder{}
builder.WriteString("{")
builder.WriteString(fmt.Sprintf("id:%d ", c.id))
builder.WriteString(fmt.Sprintf("hypertableId:%d ", c.hypertableId))
builder.WriteString(fmt.Sprintf("schemaName:%s ", c.schemaName))
builder.WriteString(fmt.Sprintf("tableName:%s ", c.tableName))
if c.compressedChunkId == nil {
builder.WriteString("compressedChunkId:<nil> ")
} else {
builder.WriteString(fmt.Sprintf("compressedChunkId:%d ", *c.compressedChunkId))
}
builder.WriteString(fmt.Sprintf("dropped:%t ", c.dropped))
builder.WriteString(fmt.Sprintf("status:%d", c.status))
builder.WriteString("}")
return builder.String()
}

func (c *Chunk) ApplyChanges(schemaName, tableName string, dropped bool,
status int32, compressedChunkId *int32) (*Chunk, map[string]string) {

Expand Down
41 changes: 41 additions & 0 deletions spi/systemcatalog/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package systemcatalog
import (
"fmt"
"github.com/noctarius/timescaledb-event-streamer/internal/supporting"
"strings"
)

// Columns represents a collection of columns which
Expand Down Expand Up @@ -222,6 +223,46 @@ func (c Column) DimensionType() *string {
return c.dimType
}

func (c Column) String() string {
builder := strings.Builder{}
builder.WriteString("{")
builder.WriteString(fmt.Sprintf("name:%s ", c.name))
builder.WriteString(fmt.Sprintf("dataType:%d ", c.dataType))
builder.WriteString(fmt.Sprintf("typeName:%s ", c.typeName))
builder.WriteString(fmt.Sprintf("nullable:%t ", c.nullable))
builder.WriteString(fmt.Sprintf("primaryKey:%t ", c.primaryKey))
if c.keySeq == nil {
builder.WriteString("keySeq:<nil> ")
} else {
builder.WriteString(fmt.Sprintf("keySeq:%d ", *c.keySeq))
}
if c.indexName == nil {
builder.WriteString("indexName:<nil> ")
} else {
builder.WriteString(fmt.Sprintf("indexName:%s ", *c.indexName))
}
builder.WriteString(fmt.Sprintf("replicaIdent:%t ", c.replicaIdent))
if c.defaultValue == nil {
builder.WriteString("defaultValue:<nil> ")
} else {
builder.WriteString(fmt.Sprintf("defaultValue:%s ", *c.defaultValue))
}
builder.WriteString(fmt.Sprintf("dimension:%t ", c.dimension))
builder.WriteString(fmt.Sprintf("dimAligned:%t ", c.dimAligned))
if c.dimType == nil {
builder.WriteString("dimType:<nil> ")
} else {
builder.WriteString(fmt.Sprintf("dimType:%s ", *c.dimType))
}
if c.dimSeq == nil {
builder.WriteString("dimSeq:<nil>")
} else {
builder.WriteString(fmt.Sprintf("dimSeq:%d", *c.dimSeq))
}
builder.WriteString("}")
return builder.String()
}

func (c Column) equals(other Column) bool {
return c.name == other.name &&
c.typeName == other.typeName &&
Expand Down
37 changes: 37 additions & 0 deletions spi/systemcatalog/hypertable.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package systemcatalog
import (
"fmt"
"github.com/noctarius/timescaledb-event-streamer/spi/pgtypes"
"strings"
)

// Hypertable represents a TimescaleDB hypertable definition
Expand Down Expand Up @@ -154,6 +155,42 @@ func (h *Hypertable) ReplicaIdentity() pgtypes.ReplicaIdentity {
return h.replicaIdentity
}

func (h *Hypertable) String() string {
builder := strings.Builder{}
builder.WriteString("{")
builder.WriteString(fmt.Sprintf("id:%d ", h.id))
builder.WriteString(fmt.Sprintf("schemaName:%s ", h.schemaName))
builder.WriteString(fmt.Sprintf("tableName:%s ", h.tableName))
builder.WriteString(fmt.Sprintf("associatedSchemaName:%s ", h.associatedSchemaName))
builder.WriteString(fmt.Sprintf("associatedTablePrefix:%s ", h.associatedTablePrefix))
if h.compressedHypertableId == nil {
builder.WriteString("compressedHypertableId:<nil> ")
} else {
builder.WriteString(fmt.Sprintf("compressedHypertableId:%d ", *h.compressedHypertableId))
}
builder.WriteString(fmt.Sprintf("compressionState:%d ", h.compressionState))
builder.WriteString(fmt.Sprintf("replicaIdentity:%s", h.replicaIdentity))
if h.viewSchema == nil {
builder.WriteString("viewSchema:<nil> ")
} else {
builder.WriteString(fmt.Sprintf("viewSchema:%s ", *h.viewSchema))
}
if h.viewName == nil {
builder.WriteString("viewName:<nil> ")
} else {
builder.WriteString(fmt.Sprintf("viewName:%s ", *h.viewName))
}
builder.WriteString("columns:[")
for i, column := range h.columns {
builder.WriteString(column.String())
if i < len(h.columns)-1 {
builder.WriteString(" ")
}
}
builder.WriteString("]}")
return builder.String()
}

// ApplyTableSchema applies a new hypertable schema to this
// hypertable instance and returns changes to the previously
// known schema layout.
Expand Down
6 changes: 3 additions & 3 deletions spi/systemcatalog/hypertable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
)

func TestSchemaDifferences_Added_Column(t *testing.T) {
expected := "added: {name:test4 dataType:10 typeName:foo nullable:false primaryKey:false keySeq:<nil> defaultValue:<nil> replicaIdent:false indexName:<nil> sortOrder:ASC nullsOrder:NULLS LAST dimension:false dimAligned:false dimType:<nil> dimSeq:<nil>}"
expected := "added: {name:test4 dataType:10 typeName:foo nullable:false primaryKey:false keySeq:<nil> indexName:<nil> replicaIdent:false defaultValue:<nil> dimension:false dimAligned:false dimType:<nil> dimSeq:<nil>}"
oldColumns := []Column{
NewColumn("test1", 10, "foo", false, nil),
NewColumn("test2", 10, "foo", false, nil),
Expand Down Expand Up @@ -111,7 +111,7 @@ func TestSchemaDifferences_Renamed_Last_Column(t *testing.T) {
}

func TestSchemaDifferences_Dropped_Column(t *testing.T) {
expected := "dropped: {name:test2 dataType:11 typeName:foo nullable:false primaryKey:false keySeq:<nil> defaultValue:<nil> replicaIdent:false indexName:<nil> sortOrder:ASC nullsOrder:NULLS LAST dimension:false dimAligned:false dimType:<nil> dimSeq:<nil>}"
expected := "dropped: {name:test2 dataType:11 typeName:foo nullable:false primaryKey:false keySeq:<nil> indexName:<nil> replicaIdent:false defaultValue:<nil> dimension:false dimAligned:false dimType:<nil> dimSeq:<nil>}"
oldColumns := []Column{
NewColumn("test1", 10, "foo", false, nil),
NewColumn("test2", 11, "foo", false, nil),
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestSchemaDifferences_Dropped_Column(t *testing.T) {
}

func TestSchemaDifferences_Dropped_Last_Column(t *testing.T) {
expected := "dropped: {name:test3 dataType:10 typeName:foo nullable:false primaryKey:false keySeq:<nil> defaultValue:<nil> replicaIdent:false indexName:<nil> sortOrder:ASC nullsOrder:NULLS LAST dimension:false dimAligned:false dimType:<nil> dimSeq:<nil>}"
expected := "dropped: {name:test3 dataType:10 typeName:foo nullable:false primaryKey:false keySeq:<nil> indexName:<nil> replicaIdent:false defaultValue:<nil> dimension:false dimAligned:false dimType:<nil> dimSeq:<nil>}"
oldColumns := []Column{
NewColumn("test1", 10, "foo", false, nil),
NewColumn("test2", 10, "foo", false, nil),
Expand Down
17 changes: 17 additions & 0 deletions spi/systemcatalog/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,23 @@ func (i *Index) Columns() []Column {
return i.columns
}

func (i *Index) String() string {
builder := strings.Builder{}
builder.WriteString("{")
builder.WriteString(fmt.Sprintf("name:%s ", i.name))
builder.WriteString(fmt.Sprintf("primaryKey:%t ", i.primaryKey))
builder.WriteString(fmt.Sprintf("replicaIdentity:%t ", i.replicaIdentity))
builder.WriteString("columns:[")
for c, column := range i.columns {
builder.WriteString(column.String())
if c < len(i.columns)-1 {
builder.WriteString(" ")
}
}
builder.WriteString("]}")
return builder.String()
}

// AsSqlTuple creates a string to be used as a tuple definition
// in a WHERE-clause:
// (col1, col2, col3, ...)
Expand Down

0 comments on commit c4a8dd4

Please sign in to comment.