Skip to content

Commit

Permalink
Various optimizations, mostly preventing object allocation
Browse files Browse the repository at this point in the history
  • Loading branch information
noctarius committed Jul 18, 2023
1 parent 332733c commit 7f63fa7
Show file tree
Hide file tree
Showing 16 changed files with 47 additions and 61 deletions.
10 changes: 7 additions & 3 deletions cmd/timescaledb-event-streamer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ import (
"github.com/urfave/cli"
"io"
"log"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"path/filepath"
"runtime/pprof"
"strings"
"syscall"
)
Expand Down Expand Up @@ -100,14 +101,17 @@ func start(*cli.Context) error {
}

if profiling {
cpuProfile, err := os.Create("cpu.prof")
/*cpuProfile, err := os.Create("cpu.prof")
if err != nil {
return err
}
if err := pprof.StartCPUProfile(cpuProfile); err != nil {
return err
}
defer pprof.StopCPUProfile()
defer pprof.StopCPUProfile()*/
go func() {
http.ListenAndServe("localhost:8080", nil)
}()
}

logging.WithCaller = withCaller
Expand Down
2 changes: 1 addition & 1 deletion internal/eventing/eventemitting/eventemitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func (e *eventEmitterEventHandler) OnOriginEvent(_ pgtypes.XLogData, _ *pgtypes.

func (e *eventEmitterEventHandler) OnTransactionFinishedEvent(xld pgtypes.XLogData, msg *pgtypes.CommitMessage) error {
e.eventEmitter.logger.Debugf(
"Transaction xid=%d (LSN: %s) marked as processed", xld.Xid, msg.TransactionEndLSN.String(),
"Transaction xid=%d (LSN: %s) marked as processed", xld.Xid, msg.TransactionEndLSN,
)
transactionEndLSN := pgtypes.LSN(msg.TransactionEndLSN)
return e.eventEmitter.replicationContext.AcknowledgeProcessed(xld, &transactionEndLSN)
Expand Down
6 changes: 3 additions & 3 deletions internal/eventing/schema/schemaregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,20 @@ import (
"github.com/noctarius/timescaledb-event-streamer/spi/schema/schemamodel"
"github.com/noctarius/timescaledb-event-streamer/spi/systemcatalog"
"github.com/noctarius/timescaledb-event-streamer/spi/topic/namegenerator"
"github.com/reugn/async"
"sync"
)

type Registry struct {
topicNameGenerator namegenerator.NameGenerator
schemaRegistry map[string]schemamodel.Struct
mutex *async.ReentrantLock
mutex sync.Mutex
}

func NewRegistry(topicNameGenerator namegenerator.NameGenerator) schema.Registry {
r := &Registry{
topicNameGenerator: topicNameGenerator,
schemaRegistry: make(map[string]schemamodel.Struct),
mutex: async.NewReentrantLock(),
mutex: sync.Mutex{},
}
initializeSourceSchemas(r)
return r
Expand Down
2 changes: 1 addition & 1 deletion internal/replication/context/replicationconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (rc *ReplicationConnection) locateRestartLSN(readReplicationSlot readReplic
if restartLSN == confirmedFlushLSN && !rc.replicationSlotCreated {
addMsg := ""
if offset != nil {
addMsg = fmt.Sprintf(" (lower offset LSN: %s)", offset.LSN.String())
addMsg = fmt.Sprintf(" (lower offset LSN: %s)", offset.LSN)
}
rc.logger.Infof("Restarting replication at last confirmed flush LSN: %s%s", restartLSN, addMsg)
} else if offset != nil && restartLSN == offset.LSN {
Expand Down
2 changes: 1 addition & 1 deletion internal/replication/context/replicationcontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func (rc *replicationContext) AcknowledgeProcessed(xld pgtypes.XLogData, process

newLastProcessedLSN := pgtypes.LSN(xld.WALStart + pglogrepl.LSN(len(xld.WALData)))
if processedLSN != nil {
rc.dispatcher.logger.Debugf("Acknowledge transaction end: %s", processedLSN.String())
rc.dispatcher.logger.Debugf("Acknowledge transaction end: %s", processedLSN)
newLastProcessedLSN = *processedLSN
}

Expand Down
22 changes: 11 additions & 11 deletions internal/replication/replicationchannel/replicationhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (rh *replicationHandler) startReplicationHandler(
}
rh.logger.Tracef(
"Primary Keepalive Message => ServerWALEnd:%s ServerTime:%s ReplyRequested:%t",
pkm.ServerWALEnd.String(), pkm.ServerTime, pkm.ReplyRequested,
pkm.ServerWALEnd, pkm.ServerTime, pkm.ReplyRequested,
)

if pkm.ReplyRequested {
Expand Down Expand Up @@ -186,7 +186,7 @@ func (rh *replicationHandler) handleReplicationEvents(xld pgtypes.XLogData, msg
switch logicalMsg := msg.(type) {
case *pglogrepl.RelationMessage:
intLogicalMsg := pgtypes.RelationMessage(*logicalMsg)
rh.logger.Debugf("EVENT: %s", intLogicalMsg.String())
rh.logger.Debugf("EVENT: %s", intLogicalMsg)
rh.relations[logicalMsg.RelationID] = &intLogicalMsg
return rh.taskManager.EnqueueTask(func(notificator context.Notificator) {
notificator.NotifyBaseReplicationEventHandler(
Expand All @@ -197,7 +197,7 @@ func (rh *replicationHandler) handleReplicationEvents(xld pgtypes.XLogData, msg
})
case *pglogrepl.BeginMessage:
intLogicalMsg := pgtypes.BeginMessage(*logicalMsg)
rh.logger.Debugf("EVENT: %s", intLogicalMsg.String())
rh.logger.Debugf("EVENT: %s", intLogicalMsg)
rh.replicationContext.SetLastTransactionId(intLogicalMsg.Xid)
rh.lastTransactionId = &intLogicalMsg.Xid
xld.Xid = intLogicalMsg.Xid
Expand All @@ -213,7 +213,7 @@ func (rh *replicationHandler) handleReplicationEvents(xld pgtypes.XLogData, msg
})
case *pglogrepl.CommitMessage:
intLogicalMsg := pgtypes.CommitMessage(*logicalMsg)
rh.logger.Debugf("EVENT: %s", intLogicalMsg.String())
rh.logger.Debugf("EVENT: %s", intLogicalMsg)
rh.lastTransactionId = nil
return rh.taskManager.EnqueueTask(func(notificator context.Notificator) {
notificator.NotifyLogicalReplicationEventHandler(
Expand All @@ -230,7 +230,7 @@ func (rh *replicationHandler) handleReplicationEvents(xld pgtypes.XLogData, msg
return rh.handleDeleteMessage(xld, logicalMsg)
case *pglogrepl.TruncateMessage:
intLogicalMsg := pgtypes.TruncateMessage(*logicalMsg)
rh.logger.Debugf("EVENT: %s", intLogicalMsg.String())
rh.logger.Debugf("EVENT: %s", intLogicalMsg)
return rh.taskManager.EnqueueTask(func(notificator context.Notificator) {
notificator.NotifyLogicalReplicationEventHandler(
func(handler eventhandlers.LogicalReplicationEventHandler) error {
Expand All @@ -240,7 +240,7 @@ func (rh *replicationHandler) handleReplicationEvents(xld pgtypes.XLogData, msg
})
case *pglogrepl.TypeMessage:
intLogicalMsg := pgtypes.TypeMessage(*logicalMsg)
rh.logger.Debugf("EVENT: %s", intLogicalMsg.String())
rh.logger.Debugf("EVENT: %s", intLogicalMsg)
return rh.taskManager.EnqueueTask(func(notificator context.Notificator) {
notificator.NotifyLogicalReplicationEventHandler(
func(handler eventhandlers.LogicalReplicationEventHandler) error {
Expand All @@ -250,7 +250,7 @@ func (rh *replicationHandler) handleReplicationEvents(xld pgtypes.XLogData, msg
})
case *pglogrepl.OriginMessage:
intLogicalMsg := pgtypes.OriginMessage(*logicalMsg)
rh.logger.Debugf("EVENT: %s", intLogicalMsg.String())
rh.logger.Debugf("EVENT: %s", intLogicalMsg)
return rh.taskManager.EnqueueTask(func(notificator context.Notificator) {
notificator.NotifyLogicalReplicationEventHandler(
func(handler eventhandlers.LogicalReplicationEventHandler) error {
Expand All @@ -259,7 +259,7 @@ func (rh *replicationHandler) handleReplicationEvents(xld pgtypes.XLogData, msg
)
})
case *pgtypes.LogicalReplicationMessage:
rh.logger.Debugf("EVENT: %+v", logicalMsg.String())
rh.logger.Debugf("EVENT: %s", logicalMsg)
return rh.taskManager.EnqueueTask(func(notificator context.Notificator) {
notificator.NotifyLogicalReplicationEventHandler(
func(handler eventhandlers.LogicalReplicationEventHandler) error {
Expand Down Expand Up @@ -292,7 +292,7 @@ func (rh *replicationHandler) handleDeleteMessage(xld pgtypes.XLogData, msg *pgl
DeleteMessage: msg,
OldValues: oldValues,
}
rh.logger.Debugf("EVENT: %+v", internalMsg)
rh.logger.Debugf("EVENT: %s", internalMsg)

return rh.taskManager.EnqueueTask(func(notificator context.Notificator) {
notificator.NotifyLogicalReplicationEventHandler(
Expand Down Expand Up @@ -329,7 +329,7 @@ func (rh *replicationHandler) handleUpdateMessage(xld pgtypes.XLogData, msg *pgl
OldValues: oldValues,
NewValues: newValues,
}
rh.logger.Debugf("EVENT: %+v", internalMsg)
rh.logger.Debugf("EVENT: %s", internalMsg)

return rh.taskManager.EnqueueTask(func(notificator context.Notificator) {
notificator.NotifyLogicalReplicationEventHandler(
Expand Down Expand Up @@ -360,7 +360,7 @@ func (rh *replicationHandler) handleInsertMessage(xld pgtypes.XLogData, msg *pgl
InsertMessage: msg,
NewValues: newValues,
}
rh.logger.Debugf("EVENT: %+v", internalMsg)
rh.logger.Debugf("EVENT: %s", internalMsg)

return rh.taskManager.EnqueueTask(func(notificator context.Notificator) {
notificator.NotifyLogicalReplicationEventHandler(
Expand Down
4 changes: 2 additions & 2 deletions internal/replication/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ func (r *Replicator) StartReplication() *cli.ExitError {

// Log system information
r.logger.Infof("Discovered System Information:")
r.logger.Infof(" * PostgreSQL version %s", replicationContext.PostgresVersion().String())
r.logger.Infof(" * TimescaleDB version %s", replicationContext.TimescaleVersion().String())
r.logger.Infof(" * PostgreSQL version %s", replicationContext.PostgresVersion())
r.logger.Infof(" * TimescaleDB version %s", replicationContext.TimescaleVersion())
r.logger.Infof(" * PostgreSQL System Identity %s", replicationContext.SystemId())
r.logger.Infof(" * PostgreSQL Timeline %d", replicationContext.Timeline())
r.logger.Infof(" * PostgreSQL Database %s", replicationContext.DatabaseName())
Expand Down
2 changes: 1 addition & 1 deletion internal/statestorages/file/filestatestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (f *fileStateStorage) Stop() error {
f.logger.Infof("Stopping FileStateStorage at %s", f.path)
f.logger.Debugln("Last processed LSNs:")
for name, offset := range f.offsets {
f.logger.Debugf(" * %s: %s", name, offset.LSN.String())
f.logger.Debugf(" * %s: %s", name, offset.LSN)
}
f.shutdownWaiter.SignalShutdown()
if err := f.shutdownWaiter.AwaitDone(); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions internal/systemcatalog/catalogeventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (s *systemCatalogReplicationEventHandler) OnHypertableAddedEvent(
if err := s.systemCatalog.RegisterHypertable(h); err != nil {
return errors.Errorf("registering hypertable failed: %v (error: %+v)", h, err)
}
s.systemCatalog.logger.Verbosef("Entry Added: Hypertable %d => %s", h.Id(), h.String())
s.systemCatalog.logger.Verbosef("Entry Added: Hypertable %d => %s", h.Id(), h)

return s.systemCatalog.replicationContext.ReadHypertableSchema(s.systemCatalog.ApplySchemaUpdate, h)
},
Expand Down Expand Up @@ -120,7 +120,7 @@ func (s *systemCatalogReplicationEventHandler) OnHypertableDeletedEvent(
hypertableId := oldValues["id"].(int32)
if hypertable, present := s.systemCatalog.FindHypertableById(hypertableId); present {
if err := s.systemCatalog.UnregisterHypertable(hypertable); err != nil {
s.systemCatalog.logger.Fatalf("unregistering hypertable failed: %s", hypertable.String())
s.systemCatalog.logger.Fatalf("unregistering hypertable failed: %s", hypertable)
}
}
return nil
Expand All @@ -141,7 +141,7 @@ func (s *systemCatalogReplicationEventHandler) OnChunkAddedEvent(
if h, present := s.systemCatalog.FindHypertableById(hypertableId); present {
s.systemCatalog.logger.Verbosef(
"Entry Added: Chunk %d for Hypertable %s => %s",
c.Id(), h.CanonicalName(), c.String(),
c.Id(), h.CanonicalName(), c,
)

if !c.IsCompressed() &&
Expand Down
4 changes: 2 additions & 2 deletions internal/systemcatalog/systemcatalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func initializeSystemCatalog(sc *SystemCatalog) (*SystemCatalog, error) {
if err := sc.RegisterHypertable(hypertable); err != nil {
return errors.Errorf("registering hypertable failed: %s (error: %+v)", hypertable, err)
}
sc.logger.Verbosef("Entry Added: Hypertable %d => %s", hypertable.Id(), hypertable.String())
sc.logger.Verbosef("Entry Added: Hypertable %d => %s", hypertable.Id(), hypertable)
return nil
}); err != nil {
return nil, errors.Wrap(err, 0)
Expand All @@ -320,7 +320,7 @@ func initializeSystemCatalog(sc *SystemCatalog) (*SystemCatalog, error) {
if h, present := sc.FindHypertableById(chunk.HypertableId()); present {
sc.logger.Verbosef(
"Entry Added: Chunk %d for Hypertable %s => %s",
chunk.Id(), h.CanonicalName(), chunk.String(),
chunk.Id(), h.CanonicalName(), chunk,
)
}
return nil
Expand Down
5 changes: 2 additions & 3 deletions internal/tests/integration/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1294,7 +1294,7 @@ func (its *IntegrationTestSuite) Test_Acknowledge_To_PG_With_Only_Begin_Commit()
func(context testrunner.Context) error {
pgVersion := context.PostgresqlVersion()
if pgVersion >= version.PG_15_VERSION {
fmt.Printf("Skipped test, because of PostgreSQL version <15.0 (%s)", pgVersion.String())
fmt.Printf("Skipped test, because of PostgreSQL version <15.0 (%s)", pgVersion)
return nil
}

Expand Down Expand Up @@ -1341,8 +1341,7 @@ func (its *IntegrationTestSuite) Test_Acknowledge_To_PG_With_Only_Begin_Commit()

if lsn2 <= lsn1 {
its.T().Errorf(
"LSN2 must be larger than LSN1 - LSN1: %s, LSN2: %s",
lsn1.String(), lsn2.String(),
"LSN2 must be larger than LSN1 - LSN1: %s, LSN2: %s", lsn1, lsn2,
)
}

Expand Down
2 changes: 1 addition & 1 deletion spi/pgtypes/logicalreplicationmessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (m *LogicalReplicationMessage) String() string {
builder.WriteString(fmt.Sprintf("prefix:%s ", m.Prefix))
builder.WriteString(fmt.Sprintf("content:%x ", m.Content))
builder.WriteString(fmt.Sprintf("flags:%d ", m.Flags))
builder.WriteString(fmt.Sprintf("lsn:%s ", m.LSN.String()))
builder.WriteString(fmt.Sprintf("lsn:%s ", m.LSN))
builder.WriteString(fmt.Sprintf("xid:%d ", m.Xid))
builder.WriteString("}")
return builder.String()
Expand Down
8 changes: 4 additions & 4 deletions spi/pgtypes/pgtypeadapters.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (m BeginMessage) String() string {
builder := strings.Builder{}
builder.WriteString("{")
builder.WriteString(fmt.Sprintf("messageType:%s ", m.Type().String()))
builder.WriteString(fmt.Sprintf("finalLSN:%s ", m.FinalLSN.String()))
builder.WriteString(fmt.Sprintf("finalLSN:%s ", m.FinalLSN))
builder.WriteString(fmt.Sprintf("commitTime:%s ", m.CommitTime.String()))
builder.WriteString(fmt.Sprintf("xid:%d", m.Xid))
builder.WriteString("}")
Expand All @@ -59,8 +59,8 @@ func (m CommitMessage) String() string {
builder.WriteString("{")
builder.WriteString(fmt.Sprintf("messageType:%s ", m.Type().String()))
builder.WriteString(fmt.Sprintf("flags:%d ", m.Flags))
builder.WriteString(fmt.Sprintf("commitLSN:%s ", m.CommitLSN.String()))
builder.WriteString(fmt.Sprintf("transactionEndLSN:%s ", m.TransactionEndLSN.String()))
builder.WriteString(fmt.Sprintf("commitLSN:%s ", m.CommitLSN))
builder.WriteString(fmt.Sprintf("transactionEndLSN:%s ", m.TransactionEndLSN))
builder.WriteString(fmt.Sprintf("commitTime:%s", m.CommitTime.String()))
builder.WriteString("}")
return builder.String()
Expand All @@ -73,7 +73,7 @@ func (m OriginMessage) String() string {
builder.WriteString("{")
builder.WriteString(fmt.Sprintf("messageType:%s ", m.Type().String()))
builder.WriteString(fmt.Sprintf("name:%s ", m.Name))
builder.WriteString(fmt.Sprintf("commitLSN:%s", m.CommitLSN.String()))
builder.WriteString(fmt.Sprintf("commitLSN:%s", m.CommitLSN))
builder.WriteString("}")
return builder.String()
}
Expand Down
18 changes: 0 additions & 18 deletions spi/schema/schemamodel/schema.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,5 @@
package schemamodel

type Schema interface {
SchemaType() Type
Schema(column ColumnDescriptor) Struct
}

type ColumnDescriptor interface {
Name() string
DataType() uint32
IsNullable() bool
IsPrimaryKey() bool
IsReplicaIdent() bool
DefaultValue() *string
IsDimension() bool
IsDimensionAligned() bool
DimensionType() *string
String() string
}

func Int8() SchemaBuilder {
return NewSchemaBuilder(INT8)
}
Expand Down
4 changes: 0 additions & 4 deletions spi/systemcatalog/hypertable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,6 @@ func (t *testPgType) SchemaType() schemamodel.Type {
return schemamodel.INT16
}

func (t *testPgType) Schema() schemamodel.Schema {
return nil
}

func (t *testPgType) SchemaBuilder() schemamodel.SchemaBuilder {
return nil
}
Expand Down
11 changes: 8 additions & 3 deletions spi/systemcatalog/systementity.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ type SystemEntity interface {
}

type baseSystemEntity struct {
schemaName string
tableName string
schemaName string
tableName string
resolvedCanonicalName *string
}

// NewSystemEntity instantiates a new basic SystemEntity
Expand All @@ -50,5 +51,9 @@ func (bse *baseSystemEntity) TableName() string {
}

func (bse *baseSystemEntity) CanonicalName() string {
return MakeRelationKey(bse.schemaName, bse.tableName)
if bse.resolvedCanonicalName == nil {
relationKey := MakeRelationKey(bse.schemaName, bse.tableName)
bse.resolvedCanonicalName = &relationKey
}
return *bse.resolvedCanonicalName
}

0 comments on commit 7f63fa7

Please sign in to comment.