Skip to content

Commit

Permalink
Upgraded to Go 1.21.x, exchanged lo./functional. for slices. function…
Browse files Browse the repository at this point in the history
…ality
  • Loading branch information
noctarius committed Nov 12, 2023
1 parent 63abbe2 commit 57ab5e8
Show file tree
Hide file tree
Showing 17 changed files with 71 additions and 99 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ on:
branches: [ "**" ]

env:
golang-version: 1.20.2
golangci-lint-version: v1.51.2
golang-version: 1.21.4
golangci-lint-version: v1.54.2
docker-registry: ghcr.io
docker-image-name: ${{ github.repository }}

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ on:
types: [created]

env:
golang-version: 1.20.2
golangci-lint-version: v1.51.2
golang-version: 1.21.4
golangci-lint-version: v1.54.2
docker-registry: ghcr.io
docker-image-name: ${{ github.repository }}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/noctarius/timescaledb-event-streamer

go 1.20
go 1.21

require (
github.com/BurntSushi/toml v1.3.2
Expand Down
39 changes: 0 additions & 39 deletions internal/functional/functional.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,45 +17,6 @@

package functional

import "sort"

func Zero[T any]() (t T) {
return
}

func MappingTransformer[T, V any](
transformer func(T) V,
) func(T, int) V {

return func(value T, _ int) V {
return transformer(value)
}
}

func Sort[I any](
collection []I, less func(this, other I) bool,
) []I {

sort.Slice(collection, func(i, j int) bool {
return less(collection[i], collection[j])
})
return collection
}

func ArrayEqual[T comparable](
this, that []T,
) bool {

if (this == nil && that != nil) || (this != nil && that == nil) {
return false
}
if len(this) != len(that) {
return false
}
for i := 0; i < len(this); i++ {
if this[i] != that[i] {
return false
}
}
return true
}
11 changes: 6 additions & 5 deletions internal/logging/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/inhies/go-bytesize"
"github.com/noctarius/timescaledb-event-streamer/internal/erroring"
spiconfig "github.com/noctarius/timescaledb-event-streamer/spi/config"
"github.com/samber/lo"
"os"
"strings"
"sync"
Expand Down Expand Up @@ -82,8 +83,7 @@ func InitializeLogging(

defaultConsoleHandler = newConsoleHandler(logToStdErr)

defaultConsoleHandlerEnabled =
loggingConfig.Outputs.Console.Enabled == nil || *loggingConfig.Outputs.Console.Enabled
defaultConsoleHandlerEnabled = lo.FromPtrOr(loggingConfig.Outputs.Console.Enabled, true)

if _, fileHandler, err := newFileHandler(loggingConfig.Outputs.File); err != nil {
return erroring.AdaptError(err, 1)
Expand Down Expand Up @@ -145,7 +145,7 @@ func NewLogger(
if config, found := loggingConfig.Loggers[name]; found {
// Found specific config
handlers := make([]slog.Handler, 0)
if config.Outputs.Console.Enabled == nil || *config.Outputs.Console.Enabled {
if !lo.FromPtrOr(config.Outputs.Console.Enabled, true) {
handlers = append(handlers, defaultConsoleHandler)
}

Expand Down Expand Up @@ -397,7 +397,7 @@ func newFileHandler(
config spiconfig.LoggerFileConfig,
) (bool, *handler.SyncCloseHandler, error) {

if config.Enabled == nil || !*config.Enabled {
if !lo.FromPtrOr(config.Enabled, false) {
return false, nil, nil
}

Expand All @@ -413,7 +413,8 @@ func newFileHandler(
}

var fileHandler *handler.SyncCloseHandler
if config.Rotate == nil || !*config.Rotate {

if !lo.FromPtrOr(config.Rotate, false) {
if h, err := handler.NewBuffFileHandler(config.Path, 1024, configurator); err != nil {
return false, nil, errors.Errorf(
fmt.Sprintf("Failed to initialize logfile handler => %s", err.Error()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
spicatalog "github.com/noctarius/timescaledb-event-streamer/spi/systemcatalog"
"github.com/noctarius/timescaledb-event-streamer/spi/task"
"github.com/samber/lo"
"slices"
)

type snapshotCallback func(snapshot pgtypes.LSN) error
Expand Down Expand Up @@ -469,15 +470,15 @@ func (l *logicalReplicationResolver) OnTruncateEvent(
return false
})
affectedTablesHypertables := lo.Filter(msg.RelationIDs, func(relId uint32, _ int) bool {
return !lo.Contains(affectedTablesVanilla, relId) && !lo.Contains(unknownRelations, relId)
return !slices.Contains(affectedTablesVanilla, relId) && !lo.Contains(unknownRelations, relId)
})

// FIXME: Truncate support for vanilla tables missing!

truncatedTables := make([]schema.TableAlike, 0)
for i := 0; i < int(msg.RelationNum); i++ {
relId := msg.RelationIDs[i]
if lo.Contains(affectedTablesHypertables, relId) {
if slices.Contains(affectedTablesHypertables, relId) {
if !l.genHypertableTruncateEvent {
continue
}
Expand All @@ -500,7 +501,7 @@ func (l *logicalReplicationResolver) OnTruncateEvent(
}
}

if lo.Contains(affectedTablesVanilla, relId) {
if slices.Contains(affectedTablesVanilla, relId) {
if !l.genPostgresqlTruncateEvent {
continue
}
Expand Down
29 changes: 18 additions & 11 deletions internal/replication/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/jackc/pgx/v5"
"github.com/noctarius/timescaledb-event-streamer/internal/erroring"
"github.com/noctarius/timescaledb-event-streamer/internal/eventing/eventemitting"
"github.com/noctarius/timescaledb-event-streamer/internal/functional"
"github.com/noctarius/timescaledb-event-streamer/internal/logging"
"github.com/noctarius/timescaledb-event-streamer/internal/replication/replicationchannel"
"github.com/noctarius/timescaledb-event-streamer/internal/stats"
Expand All @@ -41,6 +40,7 @@ import (
"github.com/noctarius/timescaledb-event-streamer/spi/wiring"
"github.com/samber/lo"
"github.com/urfave/cli"
"slices"
)

const esPreviouslyKnownChunks = "::previously::known::chunks"
Expand Down Expand Up @@ -260,7 +260,7 @@ func (r *Replicator) collectVanillaTablesForPublication(

r.logger.Debugf(
"All interesting tables: %+v",
lo.Map(allKnownTables, functional.MappingTransformer(systemcatalog.SystemEntity.CanonicalName)),
lo.Map(allKnownTables, extractCanonicalNameMapper),
)

// Filter out published chunks, we're only interested in non TimescaleDB tables
Expand All @@ -270,17 +270,17 @@ func (r *Replicator) collectVanillaTablesForPublication(

r.logger.Debugf(
"Tables already in publication: %+v",
lo.Map(publishedTables, functional.MappingTransformer(systemcatalog.SystemEntity.CanonicalName)),
lo.Map(publishedTables, extractCanonicalNameMapper),
)

initialTables := lo.Filter(allKnownTables, func(item systemcatalog.SystemEntity, _ int) bool {
return !lo.ContainsBy(publishedTables, func(other systemcatalog.SystemEntity) bool {
return !slices.ContainsFunc(publishedTables, func(other systemcatalog.SystemEntity) bool {
return item.CanonicalName() == other.CanonicalName()
})
})
r.logger.Debugf(
"Tables to be added publication: %+v",
lo.Map(initialTables, functional.MappingTransformer(systemcatalog.SystemEntity.CanonicalName)),
lo.Map(initialTables, extractCanonicalNameMapper),
)
return initialTables, nil
}
Expand All @@ -299,7 +299,7 @@ func (r *Replicator) collectChunksForPublication(

r.logger.Debugf(
"All interesting chunks: %+v",
lo.Map(allKnownTables, functional.MappingTransformer(systemcatalog.SystemEntity.CanonicalName)),
lo.Map(allKnownTables, extractCanonicalNameMapper),
)

// Filter published chunks to only add new chunks
Expand All @@ -309,17 +309,17 @@ func (r *Replicator) collectChunksForPublication(

r.logger.Debugf(
"Chunks already in publication: %+v",
lo.Map(publishedTables, functional.MappingTransformer(systemcatalog.SystemEntity.CanonicalName)),
lo.Map(publishedTables, extractCanonicalNameMapper),
)

initialChunkTables := lo.Filter(allKnownTables, func(item systemcatalog.SystemEntity, _ int) bool {
return !lo.ContainsBy(publishedTables, func(other systemcatalog.SystemEntity) bool {
return !slices.ContainsFunc(publishedTables, func(other systemcatalog.SystemEntity) bool {
return item.CanonicalName() == other.CanonicalName()
})
})
r.logger.Debugf(
"Chunks to be added publication: %+v",
lo.Map(initialChunkTables, functional.MappingTransformer(systemcatalog.SystemEntity.CanonicalName)),
lo.Map(initialChunkTables, extractCanonicalNameMapper),
)
return initialChunkTables, nil
}
Expand All @@ -338,7 +338,7 @@ func getKnownVanillaTables(

// Filter potentially deleted chunks
return lo.Filter(candidates, func(item systemcatalog.SystemEntity, index int) bool {
return lo.ContainsBy(allTables, func(other systemcatalog.SystemEntity) bool {
return slices.ContainsFunc(allTables, func(other systemcatalog.SystemEntity) bool {
return item.CanonicalName() == other.CanonicalName()
})
}), nil
Expand All @@ -361,7 +361,7 @@ func getKnownChunks(

// Filter potentially deleted chunks
return lo.Filter(candidates, func(item systemcatalog.SystemEntity, index int) bool {
return lo.ContainsBy(allChunks, func(other systemcatalog.SystemEntity) bool {
return slices.ContainsFunc(allChunks, func(other systemcatalog.SystemEntity) bool {
return item.CanonicalName() == other.CanonicalName()
})
}), nil
Expand Down Expand Up @@ -418,3 +418,10 @@ func encodeKnownTables(
}
return buffer.Bytes(), nil
}

func extractCanonicalNameMapper(
item systemcatalog.SystemEntity, _ int,
) string {

return item.CanonicalName()
}
8 changes: 4 additions & 4 deletions internal/replication/replicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (
"github.com/noctarius/timescaledb-event-streamer/internal/logging"
"github.com/noctarius/timescaledb-event-streamer/spi/systemcatalog"
"github.com/noctarius/timescaledb-event-streamer/testsupport"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"golang.org/x/exp/slices"
"testing"
)

Expand Down Expand Up @@ -100,7 +100,7 @@ func Test_Replicator_Select_Missing_Tables_Random_Selection(
index := testsupport.RandomNumber(0, 1000)
chunk := knownTables[index]

if lo.ContainsBy(publishedChunkTables, func(other systemcatalog.SystemEntity) bool {
if slices.ContainsFunc(publishedChunkTables, func(other systemcatalog.SystemEntity) bool {
return other.CanonicalName() == chunk.CanonicalName()
}) {
goto retry
Expand Down Expand Up @@ -138,13 +138,13 @@ func Test_Replicator_Select_Missing_Tables_Random_Selection(
for i := 0; i < 1000; i++ {
chunk := knownTables[i]

if lo.ContainsBy(publishedChunkTables, func(other systemcatalog.SystemEntity) bool {
if slices.ContainsFunc(publishedChunkTables, func(other systemcatalog.SystemEntity) bool {
return other.CanonicalName() == chunk.CanonicalName()
}) {
mergeChunkTables = append(mergeChunkTables, chunk)
}

if lo.ContainsBy(neededChunkTables, func(other systemcatalog.SystemEntity) bool {
if slices.ContainsFunc(neededChunkTables, func(other systemcatalog.SystemEntity) bool {
return other.CanonicalName() == chunk.CanonicalName()
}) {
mergeChunkTables = append(mergeChunkTables, chunk)
Expand Down
8 changes: 4 additions & 4 deletions internal/systemcatalog/systemcatalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
package systemcatalog

import (
"cmp"
"fmt"
"github.com/go-errors/errors"
"github.com/jackc/pgx/v5"
"github.com/noctarius/timescaledb-event-streamer/internal/functional"
"github.com/noctarius/timescaledb-event-streamer/internal/logging"
"github.com/noctarius/timescaledb-event-streamer/internal/systemcatalog/snapshotting"
"github.com/noctarius/timescaledb-event-streamer/internal/systemcatalog/tablefiltering"
Expand All @@ -35,7 +35,7 @@ import (
"github.com/noctarius/timescaledb-event-streamer/spi/task"
"github.com/noctarius/timescaledb-event-streamer/spi/watermark"
"github.com/samber/lo"
"strings"
"slices"
"sync"
)

Expand Down Expand Up @@ -510,8 +510,8 @@ func initializeSystemCatalog(
}

// Sorting by canonical name
tables = functional.Sort(tables, func(this, other systemcatalog.BaseTable) bool {
return strings.Compare(this.CanonicalName(), other.CanonicalName()) < 0
slices.SortStableFunc(tables, func(this, other systemcatalog.BaseTable) int {
return cmp.Compare(this.CanonicalName(), other.CanonicalName())
})

if err := sc.sideChannel.ReadHypertableSchema(
Expand Down
2 changes: 1 addition & 1 deletion internal/typemanager/builtin_converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func bits2string(

remaining := v.Len
for _, b := range v.Bytes {
length := lo.Min([]int32{remaining, 8})
length := min(remaining, 8)
for i := int32(0); i < length; i++ {
zeroOrOne := b >> (7 - i) & 1
builder.WriteString(fmt.Sprintf("%c", '0'+zeroOrOne))
Expand Down
4 changes: 2 additions & 2 deletions internal/typemanager/pgtype.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ package typemanager
import (
"fmt"
"github.com/go-errors/errors"
"github.com/noctarius/timescaledb-event-streamer/internal/functional"
"github.com/noctarius/timescaledb-event-streamer/spi/pgtypes"
"github.com/noctarius/timescaledb-event-streamer/spi/schema"
"slices"
)

type pgType struct {
Expand Down Expand Up @@ -221,5 +221,5 @@ func (t *pgType) Equal(
t.modifiers == other.Modifiers() &&
t.delimiter == other.Delimiter() &&
t.schemaType == other.SchemaType() &&
functional.ArrayEqual(t.enumValues, other.EnumValues())
slices.Equal(t.enumValues, other.EnumValues())
}
8 changes: 5 additions & 3 deletions spi/schema/schemabuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
package schema

import (
"github.com/noctarius/timescaledb-event-streamer/internal/functional"
"cmp"
"github.com/samber/lo"
"slices"
"strconv"
)

Expand Down Expand Up @@ -420,8 +421,9 @@ func (s *schemaBuilderImpl) Build() Struct {
schemaStruct[FieldNameKeySchema] = s.keySchemaBuilder.Build()
schemaStruct[FieldNameValueSchema] = s.valueSchemaBuilder.Build()
case STRUCT:
fields := functional.Sort(lo.Values(s.fields), func(this, other Field) bool {
return this.Index() < other.Index()
fields := lo.Values(s.fields)
slices.SortStableFunc(fields, func(this, other Field) int {
return cmp.Compare(this.Index(), other.Index())
})

fieldSchemas := make([]Struct, 0)
Expand Down
3 changes: 1 addition & 2 deletions spi/systemcatalog/basetable.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"github.com/noctarius/timescaledb-event-streamer/spi/pgtypes"
"github.com/noctarius/timescaledb-event-streamer/spi/schema"
"github.com/samber/lo"
)

type BaseTable interface {
Expand Down Expand Up @@ -128,7 +127,7 @@ func (bt *baseTable) ApplyTableSchema(
} else {
// potentially renamed, run look ahead
lookAheadSuccessful := false
for o := i; o < lo.Min([]int{len(oldColumns), len(newColumns)}); o++ {
for o := i; o < min(len(oldColumns), len(newColumns)); o++ {
if oldColumns[o].equals(newColumns[o]) {
lookAheadSuccessful = true
}
Expand Down
Loading

0 comments on commit 57ab5e8

Please sign in to comment.