Skip to content

Commit

Permalink
Started SchemaBuilder implementation and split out the schema model f…
Browse files Browse the repository at this point in the history
…rom the schema manager
  • Loading branch information
noctarius committed Jul 11, 2023
1 parent 4a26a8f commit f6f79f6
Show file tree
Hide file tree
Showing 18 changed files with 374 additions and 328 deletions.
58 changes: 30 additions & 28 deletions internal/eventing/eventemitting/eventemitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/noctarius/timescaledb-event-streamer/spi/eventhandlers"
"github.com/noctarius/timescaledb-event-streamer/spi/pgtypes"
"github.com/noctarius/timescaledb-event-streamer/spi/schema"
"github.com/noctarius/timescaledb-event-streamer/spi/schema/schemamodel"
"github.com/noctarius/timescaledb-event-streamer/spi/sink"
"github.com/noctarius/timescaledb-event-streamer/spi/statestorage"
"github.com/noctarius/timescaledb-event-streamer/spi/systemcatalog"
Expand Down Expand Up @@ -88,28 +89,28 @@ func (ee *EventEmitter) NewEventHandler() eventhandlers.BaseReplicationEventHand
}
}

func (ee *EventEmitter) envelopeSchema(hypertable *systemcatalog.Hypertable) schema.Struct {
func (ee *EventEmitter) envelopeSchema(hypertable *systemcatalog.Hypertable) schemamodel.Struct {
schemaTopicName := ee.schemaManager.HypertableEnvelopeSchemaName(hypertable)
return ee.schemaManager.GetSchemaOrCreate(schemaTopicName, func() schema.Struct {
return ee.schemaManager.GetSchemaOrCreate(schemaTopicName, func() schemamodel.Struct {
return schema.EnvelopeSchema(ee.schemaManager, ee.schemaManager, hypertable)
})
}

func (ee *EventEmitter) envelopeMessageSchema() schema.Struct {
func (ee *EventEmitter) envelopeMessageSchema() schemamodel.Struct {
schemaTopicName := ee.schemaManager.MessageEnvelopeSchemaName()
return ee.schemaManager.GetSchemaOrCreate(schemaTopicName, func() schema.Struct {
return ee.schemaManager.GetSchemaOrCreate(schemaTopicName, func() schemamodel.Struct {
return schema.EnvelopeMessageSchema(ee.schemaManager, ee.schemaManager)
})
}

func (ee *EventEmitter) keySchema(hypertable *systemcatalog.Hypertable) schema.Struct {
func (ee *EventEmitter) keySchema(hypertable *systemcatalog.Hypertable) schemamodel.Struct {
schemaTopicName := ee.schemaManager.HypertableKeySchemaName(hypertable)
return ee.schemaManager.GetSchemaOrCreate(schemaTopicName, func() schema.Struct {
return ee.schemaManager.GetSchemaOrCreate(schemaTopicName, func() schemamodel.Struct {
return schema.KeySchema(hypertable, ee.schemaManager)
})
}

func (ee *EventEmitter) emit(xld pgtypes.XLogData, eventTopicName string, key, envelope schema.Struct) error {
func (ee *EventEmitter) emit(xld pgtypes.XLogData, eventTopicName string, key, envelope schemamodel.Struct) error {
// Retryable operation
operation := func() error {
ee.logger.Tracef("Publishing event: %+v", envelope)
Expand Down Expand Up @@ -145,10 +146,10 @@ func (e *eventEmitterEventHandler) OnReadEvent(lsn pgtypes.LSN, hypertable *syst
}

return e.emit0(xld, true, hypertable,
func(source schema.Struct) schema.Struct {
func(source schemamodel.Struct) schemamodel.Struct {
return schema.ReadEvent(cnValues, source)
},
func() (schema.Struct, error) {
func() (schemamodel.Struct, error) {
return e.hypertableEventKey(hypertable, newValues)
},
)
Expand All @@ -163,10 +164,10 @@ func (e *eventEmitterEventHandler) OnInsertEvent(xld pgtypes.XLogData, hypertabl
}

return e.emit(xld, hypertable,
func(source schema.Struct) schema.Struct {
func(source schemamodel.Struct) schemamodel.Struct {
return schema.CreateEvent(cnValues, source)
},
func() (schema.Struct, error) {
func() (schemamodel.Struct, error) {
return e.hypertableEventKey(hypertable, newValues)
},
)
Expand All @@ -185,10 +186,10 @@ func (e *eventEmitterEventHandler) OnUpdateEvent(xld pgtypes.XLogData, hypertabl
}

return e.emit(xld, hypertable,
func(source schema.Struct) schema.Struct {
func(source schemamodel.Struct) schemamodel.Struct {
return schema.UpdateEvent(coValues, cnValues, source)
},
func() (schema.Struct, error) {
func() (schemamodel.Struct, error) {
return e.hypertableEventKey(hypertable, newValues)
},
)
Expand All @@ -203,28 +204,28 @@ func (e *eventEmitterEventHandler) OnDeleteEvent(xld pgtypes.XLogData, hypertabl
}

return e.emit(xld, hypertable,
func(source schema.Struct) schema.Struct {
func(source schemamodel.Struct) schemamodel.Struct {
return schema.DeleteEvent(coValues, source, tombstone)
},
func() (schema.Struct, error) {
func() (schemamodel.Struct, error) {
return e.hypertableEventKey(hypertable, oldValues)
},
)
}

func (e *eventEmitterEventHandler) OnTruncateEvent(xld pgtypes.XLogData, hypertable *systemcatalog.Hypertable) error {
return e.emit(xld, hypertable,
func(source schema.Struct) schema.Struct {
func(source schemamodel.Struct) schemamodel.Struct {
return schema.TruncateEvent(source)
},
func() (schema.Struct, error) {
func() (schemamodel.Struct, error) {
return nil, nil
},
)
}

func (e *eventEmitterEventHandler) OnMessageEvent(xld pgtypes.XLogData, msg *pgtypes.LogicalReplicationMessage) error {
return e.emitMessageEvent(xld, msg, func(source schema.Struct) schema.Struct {
return e.emitMessageEvent(xld, msg, func(source schemamodel.Struct) schemamodel.Struct {
content := base64.StdEncoding.EncodeToString(msg.Content)
return schema.MessageEvent(msg.Prefix, content, source)
})
Expand All @@ -234,10 +235,10 @@ func (e *eventEmitterEventHandler) OnChunkCompressedEvent(
xld pgtypes.XLogData, hypertable *systemcatalog.Hypertable, _ *systemcatalog.Chunk) error {

return e.emit(xld, hypertable,
func(source schema.Struct) schema.Struct {
func(source schemamodel.Struct) schemamodel.Struct {
return schema.CompressionEvent(source)
},
func() (schema.Struct, error) {
func() (schemamodel.Struct, error) {
return e.timescaleEventKey(hypertable)
},
)
Expand All @@ -247,10 +248,10 @@ func (e *eventEmitterEventHandler) OnChunkDecompressedEvent(
xld pgtypes.XLogData, hypertable *systemcatalog.Hypertable, _ *systemcatalog.Chunk) error {

return e.emit(xld, hypertable,
func(source schema.Struct) schema.Struct {
func(source schemamodel.Struct) schemamodel.Struct {
return schema.DecompressionEvent(source)
},
func() (schema.Struct, error) {
func() (schemamodel.Struct, error) {
return e.timescaleEventKey(hypertable)
},
)
Expand Down Expand Up @@ -285,14 +286,15 @@ func (e *eventEmitterEventHandler) OnTransactionFinishedEvent(xld pgtypes.XLogDa
}

func (e *eventEmitterEventHandler) emit(xld pgtypes.XLogData, hypertable *systemcatalog.Hypertable,
eventProvider func(source schema.Struct) schema.Struct, keyProvider func() (schema.Struct, error)) error {
eventProvider func(source schemamodel.Struct) schemamodel.Struct, keyProvider func() (schemamodel.Struct, error),
) error {

return e.emit0(xld, false, hypertable, eventProvider, keyProvider)
}

func (e *eventEmitterEventHandler) emit0(xld pgtypes.XLogData, snapshot bool,
hypertable *systemcatalog.Hypertable, eventProvider func(source schema.Struct) schema.Struct,
keyProvider func() (schema.Struct, error)) error {
hypertable *systemcatalog.Hypertable, eventProvider func(source schemamodel.Struct) schemamodel.Struct,
keyProvider func() (schemamodel.Struct, error)) error {

envelopeSchema := e.eventEmitter.envelopeSchema(hypertable)
eventTopicName := e.eventEmitter.schemaManager.EventTopicName(hypertable)
Expand Down Expand Up @@ -322,7 +324,7 @@ func (e *eventEmitterEventHandler) emit0(xld pgtypes.XLogData, snapshot bool,
}

func (e *eventEmitterEventHandler) emitMessageEvent(xld pgtypes.XLogData,
msg *pgtypes.LogicalReplicationMessage, eventProvider func(source schema.Struct) schema.Struct) error {
msg *pgtypes.LogicalReplicationMessage, eventProvider func(source schemamodel.Struct) schemamodel.Struct) error {

timestamp := time.Now()
if msg.IsTransactional() {
Expand All @@ -349,7 +351,7 @@ func (e *eventEmitterEventHandler) emitMessageEvent(xld pgtypes.XLogData,
}

func (e *eventEmitterEventHandler) hypertableEventKey(
hypertable *systemcatalog.Hypertable, values map[string]any) (schema.Struct, error) {
hypertable *systemcatalog.Hypertable, values map[string]any) (schemamodel.Struct, error) {

columns := make([]systemcatalog.Column, 0)
for _, column := range hypertable.Columns() {
Expand All @@ -361,7 +363,7 @@ func (e *eventEmitterEventHandler) hypertableEventKey(
return e.convertColumnValues(columns, values)
}

func (e *eventEmitterEventHandler) timescaleEventKey(hypertable *systemcatalog.Hypertable) (schema.Struct, error) {
func (e *eventEmitterEventHandler) timescaleEventKey(hypertable *systemcatalog.Hypertable) (schemamodel.Struct, error) {
return schema.TimescaleKey(hypertable.SchemaName(), hypertable.TableName()), nil
}

Expand Down
24 changes: 12 additions & 12 deletions internal/eventing/eventfiltering/eventfilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@ import (
"github.com/go-errors/errors"
"github.com/noctarius/timescaledb-event-streamer/internal/systemcatalog/tablefiltering"
"github.com/noctarius/timescaledb-event-streamer/spi/config"
"github.com/noctarius/timescaledb-event-streamer/spi/schema"
"github.com/noctarius/timescaledb-event-streamer/spi/schema/schemamodel"
"github.com/noctarius/timescaledb-event-streamer/spi/systemcatalog"
)

type EventFilter interface {
Evaluate(hypertable *systemcatalog.Hypertable, key, value schema.Struct) (bool, error)
Evaluate(hypertable *systemcatalog.Hypertable, key, value schemamodel.Struct) (bool, error)
}

type eventFilterFunc func(hypertable *systemcatalog.Hypertable, key, value schema.Struct) (bool, error)
type eventFilterFunc func(hypertable *systemcatalog.Hypertable, key, value schemamodel.Struct) (bool, error)

func (eff eventFilterFunc) Evaluate(hypertable *systemcatalog.Hypertable, key, value schema.Struct) (bool, error) {
func (eff eventFilterFunc) Evaluate(hypertable *systemcatalog.Hypertable, key, value schemamodel.Struct) (bool, error) {
return eff(hypertable, key, value)
}

Expand Down Expand Up @@ -75,12 +75,12 @@ func NewEventFilter(filterDefinitions map[string]config.EventFilterConfig) (Even
return compositeFilter(filters, tableFilters), nil
}

var acceptAllFilter eventFilterFunc = func(_ *systemcatalog.Hypertable, _, _ schema.Struct) (bool, error) {
var acceptAllFilter eventFilterFunc = func(_ *systemcatalog.Hypertable, _, _ schemamodel.Struct) (bool, error) {
return true, nil
}

var compositeFilter = func(filters []*eventFilter, tableFilters []tableFilter) EventFilter {
return eventFilterFunc(func(hypertable *systemcatalog.Hypertable, key, value schema.Struct) (bool, error) {
return eventFilterFunc(func(hypertable *systemcatalog.Hypertable, key, value schemamodel.Struct) (bool, error) {
for i, tableFilter := range tableFilters {
if hypertable == nil || tableFilter.Enabled(hypertable) {
success, err := filters[i].evaluate(key, value)
Expand All @@ -103,12 +103,12 @@ type eventFilter struct {
vm *vm.VM
}

func (f *eventFilter) evaluate(key, value schema.Struct) (bool, error) {
env := map[string]schema.Struct{
"key": key["payload"].(schema.Struct),
"keySchema": key["schema"].(schema.Struct),
"value": value["payload"].(schema.Struct),
"valueSchema": value["schema"].(schema.Struct),
func (f *eventFilter) evaluate(key, value schemamodel.Struct) (bool, error) {
env := map[string]schemamodel.Struct{
"key": key[schemamodel.FieldNamePayload].(schemamodel.Struct),
"keySchema": key[schemamodel.FieldNameSchema].(schemamodel.Struct),
"value": value[schemamodel.FieldNamePayload].(schemamodel.Struct),
"valueSchema": value[schemamodel.FieldNameSchema].(schemamodel.Struct),
}

result, err := f.vm.Run(f.prog, env)
Expand Down
11 changes: 6 additions & 5 deletions internal/eventing/schema/schemaregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,40 +20,41 @@ package schema
import (
"fmt"
"github.com/noctarius/timescaledb-event-streamer/spi/schema"
"github.com/noctarius/timescaledb-event-streamer/spi/schema/schemamodel"
"github.com/noctarius/timescaledb-event-streamer/spi/systemcatalog"
"github.com/noctarius/timescaledb-event-streamer/spi/topic/namegenerator"
"github.com/reugn/async"
)

type Registry struct {
topicNameGenerator namegenerator.NameGenerator
schemaRegistry map[string]schema.Struct
schemaRegistry map[string]schemamodel.Struct
mutex *async.ReentrantLock
}

func NewRegistry(topicNameGenerator namegenerator.NameGenerator) schema.Registry {
r := &Registry{
topicNameGenerator: topicNameGenerator,
schemaRegistry: make(map[string]schema.Struct),
schemaRegistry: make(map[string]schemamodel.Struct),
mutex: async.NewReentrantLock(),
}
initializeSourceSchemas(r)
return r
}

func (r *Registry) RegisterSchema(schemaName string, schema schema.Struct) {
func (r *Registry) RegisterSchema(schemaName string, schema schemamodel.Struct) {
r.mutex.Lock()
defer r.mutex.Unlock()
r.schemaRegistry[schemaName] = schema
}

func (r *Registry) GetSchema(schemaName string) schema.Struct {
func (r *Registry) GetSchema(schemaName string) schemamodel.Struct {
r.mutex.Lock()
defer r.mutex.Unlock()
return r.schemaRegistry[schemaName]
}

func (r *Registry) GetSchemaOrCreate(schemaName string, creator func() schema.Struct) schema.Struct {
func (r *Registry) GetSchemaOrCreate(schemaName string, creator func() schemamodel.Struct) schemamodel.Struct {
r.mutex.Lock()
defer r.mutex.Unlock()
if schema, ok := r.schemaRegistry[schemaName]; ok {
Expand Down
4 changes: 2 additions & 2 deletions internal/eventing/sinks/awskinesis/awskinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/go-errors/errors"
spiconfig "github.com/noctarius/timescaledb-event-streamer/spi/config"
"github.com/noctarius/timescaledb-event-streamer/spi/schema"
"github.com/noctarius/timescaledb-event-streamer/spi/schema/schemamodel"
"github.com/noctarius/timescaledb-event-streamer/spi/sink"
"log"
"time"
Expand Down Expand Up @@ -123,7 +123,7 @@ func (a *awsKinesisSink) Stop() error {
return nil
}

func (a *awsKinesisSink) Emit(_ sink.Context, _ time.Time, topicName string, _, envelope schema.Struct) error {
func (a *awsKinesisSink) Emit(_ sink.Context, _ time.Time, topicName string, _, envelope schemamodel.Struct) error {
envelopeData, err := json.Marshal(envelope)
if err != nil {
return err
Expand Down
12 changes: 6 additions & 6 deletions internal/eventing/sinks/awssqs/awssqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/go-errors/errors"
spiconfig "github.com/noctarius/timescaledb-event-streamer/spi/config"
"github.com/noctarius/timescaledb-event-streamer/spi/schema"
"github.com/noctarius/timescaledb-event-streamer/spi/schema/schemamodel"
"github.com/noctarius/timescaledb-event-streamer/spi/sink"
"time"
)
Expand Down Expand Up @@ -83,16 +83,16 @@ func (a *awsSqsSink) Stop() error {
return nil
}

func (a *awsSqsSink) Emit(_ sink.Context, _ time.Time, topicName string, _, envelope schema.Struct) error {
func (a *awsSqsSink) Emit(_ sink.Context, _ time.Time, topicName string, _, envelope schemamodel.Struct) error {
envelopeData, err := json.Marshal(envelope)
if err != nil {
return err
}

payload := envelope["payload"].(schema.Struct)
source := payload["source"].(schema.Struct)
lsn := source["lsn"].(string)
txId, present := source["txId"]
payload := envelope[schemamodel.FieldNamePayload].(schemamodel.Struct)
source := payload[schemamodel.FieldNameSource].(schemamodel.Struct)
lsn := source[schemamodel.FieldNameLSN].(string)
txId, present := source[schemamodel.FieldNameTxId]

var msgDeduplicationIdContent string
if present {
Expand Down
6 changes: 4 additions & 2 deletions internal/eventing/sinks/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"encoding/json"
"github.com/Shopify/sarama"
spiconfig "github.com/noctarius/timescaledb-event-streamer/spi/config"
"github.com/noctarius/timescaledb-event-streamer/spi/schema"
"github.com/noctarius/timescaledb-event-streamer/spi/schema/schemamodel"
"github.com/noctarius/timescaledb-event-streamer/spi/sink"
"time"
)
Expand Down Expand Up @@ -90,7 +90,9 @@ func (k *kafkaSink) Stop() error {
return k.producer.Close()
}

func (k *kafkaSink) Emit(_ sink.Context, timestamp time.Time, topicName string, key, envelope schema.Struct) error {
func (k *kafkaSink) Emit(_ sink.Context, timestamp time.Time,
topicName string, key, envelope schemamodel.Struct) error {

keyData, err := json.Marshal(key)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions internal/eventing/sinks/nats/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"fmt"
"github.com/nats-io/nats.go"
spiconfig "github.com/noctarius/timescaledb-event-streamer/spi/config"
"github.com/noctarius/timescaledb-event-streamer/spi/schema"
"github.com/noctarius/timescaledb-event-streamer/spi/schema/schemamodel"
"github.com/noctarius/timescaledb-event-streamer/spi/sink"
"time"
)
Expand Down Expand Up @@ -104,7 +104,7 @@ func (n *natsSink) Stop() error {
return nil
}

func (n *natsSink) Emit(_ sink.Context, _ time.Time, topicName string, key, envelope schema.Struct) error {
func (n *natsSink) Emit(_ sink.Context, _ time.Time, topicName string, key, envelope schemamodel.Struct) error {
keyData, err := json.Marshal(key)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions internal/eventing/sinks/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"encoding/json"
"github.com/go-redis/redis"
spiconfig "github.com/noctarius/timescaledb-event-streamer/spi/config"
"github.com/noctarius/timescaledb-event-streamer/spi/schema"
"github.com/noctarius/timescaledb-event-streamer/spi/schema/schemamodel"
"github.com/noctarius/timescaledb-event-streamer/spi/sink"
"time"
)
Expand Down Expand Up @@ -102,7 +102,7 @@ func (r *redisSink) Stop() error {
return r.client.Close()
}

func (r *redisSink) Emit(_ sink.Context, _ time.Time, topicName string, key, envelope schema.Struct) error {
func (r *redisSink) Emit(_ sink.Context, _ time.Time, topicName string, key, envelope schemamodel.Struct) error {
keyData, err := json.Marshal(key)
if err != nil {
return err
Expand Down
Loading

0 comments on commit f6f79f6

Please sign in to comment.