Skip to content

Commit

Permalink
Extracted SideChannel into it's own package, breaking the dependency …
Browse files Browse the repository at this point in the history
…on ReplicationContext
  • Loading branch information
noctarius committed Jul 24, 2023
1 parent fc4e34e commit 2095493
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 54 deletions.
1 change: 1 addition & 0 deletions internal/di/modules.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package di
48 changes: 12 additions & 36 deletions internal/replication/context/replicationcontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/noctarius/timescaledb-event-streamer/internal/replication/sidechannel"
spiconfig "github.com/noctarius/timescaledb-event-streamer/spi/config"
"github.com/noctarius/timescaledb-event-streamer/spi/pgtypes"
"github.com/noctarius/timescaledb-event-streamer/spi/statestorage"
Expand All @@ -35,23 +36,12 @@ import (

type ReplicationContextProvider func(
config *spiconfig.Config, pgxConfig *pgx.ConnConfig,
stateStorageManager statestorage.Manager, sideChannelProvider SideChannelProvider,
stateStorageManager statestorage.Manager, sideChannel sidechannel.SideChannel,
) (ReplicationContext, error)

type HypertableSchemaCallback = func(
hypertable *systemcatalog.Hypertable, columns []systemcatalog.Column,
) bool

type SnapshotRowCallback = func(
lsn pgtypes.LSN, values map[string]any,
) error

type ReplicationContext interface {
StartReplicationContext() error
StopReplicationContext() error
NewSideChannelConnection(
ctx context.Context,
) (*pgx.Conn, error)
NewReplicationChannelConnection(
ctx context.Context,
) (*pgconn.PgConn, error)
Expand Down Expand Up @@ -105,7 +95,7 @@ type ReplicationContext interface {
IsLogicalReplicationEnabled() bool

HasTablePrivilege(
entity systemcatalog.SystemEntity, grant Grant,
entity systemcatalog.SystemEntity, grant sidechannel.Grant,
) (access bool, err error)
LoadHypertables(
cb func(hypertable *systemcatalog.Hypertable) error,
Expand All @@ -114,15 +104,15 @@ type ReplicationContext interface {
cb func(chunk *systemcatalog.Chunk) error,
) error
ReadHypertableSchema(
cb HypertableSchemaCallback,
cb sidechannel.HypertableSchemaCallback,
pgTypeResolver func(oid uint32) (pgtypes.PgType, error),
hypertables ...*systemcatalog.Hypertable,
) error
SnapshotChunkTable(
chunk *systemcatalog.Chunk, cb SnapshotRowCallback,
chunk *systemcatalog.Chunk, cb sidechannel.SnapshotRowCallback,
) (pgtypes.LSN, error)
FetchHypertableSnapshotBatch(
hypertable *systemcatalog.Hypertable, snapshotName string, cb SnapshotRowCallback,
hypertable *systemcatalog.Hypertable, snapshotName string, cb sidechannel.SnapshotRowCallback,
) error
ReadSnapshotHighWatermark(
hypertable *systemcatalog.Hypertable, snapshotName string,
Expand All @@ -144,7 +134,7 @@ type ReplicationContext interface {
type replicationContext struct {
pgxConfig *pgx.ConnConfig

sideChannel SideChannel
sideChannel sidechannel.SideChannel

// internal manager classes
publicationManager *publicationManager
Expand Down Expand Up @@ -177,7 +167,7 @@ type replicationContext struct {

func NewReplicationContext(
config *spiconfig.Config, pgxConfig *pgx.ConnConfig,
stateStorageManager statestorage.Manager, sideChannelProvider SideChannelProvider,
stateStorageManager statestorage.Manager, sideChannel sidechannel.SideChannel,
) (ReplicationContext, error) {

publicationName := spiconfig.GetOrDefault(
Expand Down Expand Up @@ -216,6 +206,7 @@ func NewReplicationContext(
pgxConfig: pgxConfig,

taskManager: taskManager,
sideChannel: sideChannel,
stateStorageManager: stateStorageManager,

snapshotInitialMode: snapshotInitialMode,
Expand All @@ -228,14 +219,6 @@ func NewReplicationContext(
replicationSlotAutoDrop: replicationSlotAutoDrop,
}

// Instantiate the actual side channel implementation
// which handles queries against the database
sideChannel, err := sideChannelProvider(replicationContext)
if err != nil {
return nil, err
}
replicationContext.sideChannel = sideChannel

pgVersion, err := sideChannel.GetPostgresVersion()
if err != nil {
return nil, err
Expand Down Expand Up @@ -499,7 +482,7 @@ func (rc *replicationContext) IsLogicalReplicationEnabled() bool {
// ----> SideChannel functions

func (rc *replicationContext) HasTablePrivilege(
entity systemcatalog.SystemEntity, grant Grant,
entity systemcatalog.SystemEntity, grant sidechannel.Grant,
) (access bool, err error) {

return rc.sideChannel.HasTablePrivilege(rc.pgxConfig.User, entity, grant)
Expand Down Expand Up @@ -528,14 +511,14 @@ func (rc *replicationContext) ReadHypertableSchema(
}

func (rc *replicationContext) SnapshotChunkTable(
chunk *systemcatalog.Chunk, cb SnapshotRowCallback,
chunk *systemcatalog.Chunk, cb sidechannel.SnapshotRowCallback,
) (pgtypes.LSN, error) {

return rc.sideChannel.SnapshotChunkTable(chunk, rc.snapshotBatchSize, cb)
}

func (rc *replicationContext) FetchHypertableSnapshotBatch(
hypertable *systemcatalog.Hypertable, snapshotName string, cb SnapshotRowCallback,
hypertable *systemcatalog.Hypertable, snapshotName string, cb sidechannel.SnapshotRowCallback,
) error {

return rc.sideChannel.FetchHypertableSnapshotBatch(hypertable, snapshotName, rc.snapshotBatchSize, cb)
Expand Down Expand Up @@ -576,13 +559,6 @@ func (rc *replicationContext) ReadReplicationSlot(
return rc.sideChannel.ReadReplicationSlot(slotName)
}

func (rc *replicationContext) NewSideChannelConnection(
ctx context.Context,
) (*pgx.Conn, error) {

return pgx.ConnectConfig(ctx, rc.pgxConfig)
}

func (rc *replicationContext) NewReplicationChannelConnection(
ctx context.Context,
) (*pgconn.PgConn, error) {
Expand Down
9 changes: 8 additions & 1 deletion internal/replication/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,16 @@ func (r *Replicator) StartReplication() *cli.ExitError {
return supporting.AdaptErrorWithMessage(err, "failed to instantiate state storage", 23)
}

// Instantiate the actual side channel implementation
// which handles queries against the database
sideChannel, err := r.config.SideChannelProvider(stateStorageManager, r.config.PgxConfig)
if err != nil {
return supporting.AdaptErrorWithMessage(err, "failed to instantiate side channel", 26)
}

// Create the side channels and replication context
replicationContext, err := r.config.ReplicationContextProvider(
r.config.Config, r.config.PgxConfig, stateStorageManager, r.config.SideChannelProvider,
r.config.Config, r.config.PgxConfig, stateStorageManager, sideChannel,
)
if err != nil {
return supporting.AdaptErrorWithMessage(err, "failed to initialize replication context", 17)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package context
package sidechannel

import (
"context"
Expand All @@ -27,6 +27,7 @@ import (
"github.com/jackc/pgx/v5/pgconn"
"github.com/noctarius/timescaledb-event-streamer/internal/logging"
"github.com/noctarius/timescaledb-event-streamer/spi/pgtypes"
"github.com/noctarius/timescaledb-event-streamer/spi/statestorage"
"github.com/noctarius/timescaledb-event-streamer/spi/systemcatalog"
"github.com/noctarius/timescaledb-event-streamer/spi/version"
"github.com/noctarius/timescaledb-event-streamer/spi/watermark"
Expand Down Expand Up @@ -215,7 +216,15 @@ const walLevelQuery = `SHOW WAL_LEVEL`

const checkTablePrivilegeByUserQuery = `SELECT HAS_TABLE_PRIVILEGE($1, $2, $3)`

type SideChannelProvider func(replicationContext ReplicationContext) (SideChannel, error)
type SideChannelProvider func(stateStorageManager statestorage.Manager, pgxConfig *pgx.ConnConfig) (SideChannel, error)

type HypertableSchemaCallback = func(
hypertable *systemcatalog.Hypertable, columns []systemcatalog.Column,
) bool

type SnapshotRowCallback = func(
lsn pgtypes.LSN, values map[string]any,
) error

type SideChannel interface {
pgtypes.TypeResolver
Expand Down Expand Up @@ -284,12 +293,13 @@ type SideChannel interface {
}

type sideChannelImpl struct {
logger *logging.Logger
replicationContext ReplicationContext
logger *logging.Logger
pgxConfig *pgx.ConnConfig
stateStorageManager statestorage.Manager
}

func NewSideChannel(
replicationContext ReplicationContext,
stateStorageManager statestorage.Manager, pgxConfig *pgx.ConnConfig,
) (SideChannel, error) {

logger, err := logging.NewLogger("SideChannel")
Expand All @@ -298,8 +308,9 @@ func NewSideChannel(
}

return &sideChannelImpl{
logger: logger,
replicationContext: replicationContext,
logger: logger,
pgxConfig: pgxConfig,
stateStorageManager: stateStorageManager,
}, nil
}

Expand All @@ -326,7 +337,7 @@ func (sc *sideChannelImpl) CreatePublication(
err = session.queryRow(
createPublicationQuery,
publicationName,
sc.replicationContext.DatabaseUsername(),
sc.pgxConfig.User,
).Scan(&success)
if err != nil {
return err
Expand Down Expand Up @@ -606,7 +617,7 @@ func (sc *sideChannelImpl) FetchHypertableSnapshotBatch(
return errors.Errorf("missing snapshotting index for hypertable '%s'", hypertable.CanonicalName())
}

return sc.replicationContext.StateStorageManager().SnapshotContextTransaction(
return sc.stateStorageManager.SnapshotContextTransaction(
snapshotName, false,
func(snapshotContext *watermark.SnapshotContext) error {
hypertableWatermark, present := snapshotContext.GetWatermark(hypertable)
Expand Down Expand Up @@ -1032,7 +1043,7 @@ func (sc *sideChannelImpl) newSession(
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

connection, err := sc.replicationContext.NewSideChannelConnection(ctx)
connection, err := sc.newSideChannelConnection(ctx)
if err != nil {
return fmt.Errorf("unable to connect to database: %v", err)
}
Expand All @@ -1051,6 +1062,13 @@ func (sc *sideChannelImpl) newSession(
return fn(s)
}

func (sc *sideChannelImpl) newSideChannelConnection(
ctx context.Context,
) (*pgx.Conn, error) {

return pgx.ConnectConfig(ctx, sc.pgxConfig)
}

type rowFunction = func(
row pgx.Row,
) error
Expand Down
11 changes: 6 additions & 5 deletions internal/sysconfig/defaultproviders/defaultproviders.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/noctarius/timescaledb-event-streamer/internal/eventing/eventemitting"
"github.com/noctarius/timescaledb-event-streamer/internal/eventing/eventfiltering"
"github.com/noctarius/timescaledb-event-streamer/internal/replication/context"
"github.com/noctarius/timescaledb-event-streamer/internal/replication/sidechannel"
spiconfig "github.com/noctarius/timescaledb-event-streamer/spi/config"
"github.com/noctarius/timescaledb-event-streamer/spi/namingstrategy"
"github.com/noctarius/timescaledb-event-streamer/spi/pgtypes"
Expand All @@ -38,10 +39,10 @@ type EventEmitterProvider = func(
) (*eventemitting.EventEmitter, error)

func DefaultSideChannelProvider(
replicationContext context.ReplicationContext,
) (context.SideChannel, error) {
stateStorageManager statestorage.Manager, pgxConfig *pgx.ConnConfig,
) (sidechannel.SideChannel, error) {

return context.NewSideChannel(replicationContext)
return sidechannel.NewSideChannel(stateStorageManager, pgxConfig)
}

func DefaultSinkManagerProvider(
Expand Down Expand Up @@ -90,10 +91,10 @@ func DefaultStateStorageManagerProvider(
}
func DefaultReplicationContextProvider(
config *spiconfig.Config, pgxConfig *pgx.ConnConfig,
stateStorageManager statestorage.Manager, sideChannelProvider context.SideChannelProvider,
stateStorageManager statestorage.Manager, sideChannel sidechannel.SideChannel,
) (context.ReplicationContext, error) {

return context.NewReplicationContext(config, pgxConfig, stateStorageManager, sideChannelProvider)
return context.NewReplicationContext(config, pgxConfig, stateStorageManager, sideChannel)
}

func DefaultStreamManagerProvider(
Expand Down
3 changes: 2 additions & 1 deletion internal/sysconfig/systemconfiguration.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package sysconfig
import (
"github.com/jackc/pgx/v5"
"github.com/noctarius/timescaledb-event-streamer/internal/replication/context"
"github.com/noctarius/timescaledb-event-streamer/internal/replication/sidechannel"
"github.com/noctarius/timescaledb-event-streamer/internal/sysconfig/defaultproviders"
spiconfig "github.com/noctarius/timescaledb-event-streamer/spi/config"
"github.com/noctarius/timescaledb-event-streamer/spi/namingstrategy"
Expand All @@ -38,7 +39,7 @@ type SystemConfig struct {
NamingStrategyProvider namingstrategy.Provider
StateStorageManagerProvider statestorage.Provider
ReplicationContextProvider context.ReplicationContextProvider
SideChannelProvider context.SideChannelProvider
SideChannelProvider sidechannel.SideChannelProvider
}

func NewSystemConfig(
Expand Down
3 changes: 2 additions & 1 deletion internal/systemcatalog/systemcatalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/go-errors/errors"
"github.com/noctarius/timescaledb-event-streamer/internal/logging"
"github.com/noctarius/timescaledb-event-streamer/internal/replication/context"
"github.com/noctarius/timescaledb-event-streamer/internal/replication/sidechannel"
"github.com/noctarius/timescaledb-event-streamer/internal/systemcatalog/snapshotting"
"github.com/noctarius/timescaledb-event-streamer/internal/systemcatalog/tablefiltering"
"github.com/noctarius/timescaledb-event-streamer/spi/config"
Expand Down Expand Up @@ -340,7 +341,7 @@ func initializeSystemCatalog(
}

// Run basic access check based on user permissions
access, err := sc.replicationContext.HasTablePrivilege(hypertable, context.Select)
access, err := sc.replicationContext.HasTablePrivilege(hypertable, sidechannel.Select)
if err != nil {
return errors.Wrap(err, 0)
}
Expand Down

0 comments on commit 2095493

Please sign in to comment.