Skip to content

Commit

Permalink
refactor: Move File and Token schema
Browse files Browse the repository at this point in the history
  • Loading branch information
jachym-tousek-keboola committed Nov 28, 2024
1 parent 70b75fe commit ffd3ac4
Show file tree
Hide file tree
Showing 10 changed files with 45 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/keboola/keboola-as-code/internal/pkg/service/common/utctime"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/plugin"
keboolasink "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola"
keboolaModel "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/statistics"
)
Expand Down Expand Up @@ -70,7 +70,7 @@ func (b *Bridge) setupOnFileOpen() {
})
}

func (b *Bridge) createStagingFile(ctx context.Context, api *keboola.AuthorizedAPI, sink definition.Sink, file *model.File) (keboolasink.File, error) {
func (b *Bridge) createStagingFile(ctx context.Context, api *keboola.AuthorizedAPI, sink definition.Sink, file *model.File) (keboolaModel.File, error) {
name := fmt.Sprintf(`%s_%s_%s`, file.SourceID, file.SinkID, file.OpenedAt().Time().Format(fileNameDateFormat))
attributes := file.Telemetry()
attributes = append(attributes, attribute.String("file.name", name))
Expand All @@ -88,7 +88,7 @@ func (b *Bridge) createStagingFile(ctx context.Context, api *keboola.AuthorizedA
),
).Send(ctx)
if err != nil {
return keboolasink.File{}, err
return keboolaModel.File{}, err
}

// Register rollback
Expand All @@ -99,7 +99,7 @@ func (b *Bridge) createStagingFile(ctx context.Context, api *keboola.AuthorizedA

// Save credentials to database
ctx = ctxattr.ContextWith(ctx, attribute.String("file.resourceID", stagingFile.FileID.String()))
keboolaFile := keboolasink.File{
keboolaFile := keboolaModel.File{
FileKey: &file.FileKey,
SinkKey: file.SinkKey,
TableID: sink.Table.Keboola.TableID,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package keboola
package model

import (
"github.com/keboola/go-client/pkg/keboola"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@ package schema
import (
"github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/serde"
keboolasink "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola"
keboolaModel "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model"
)

type (
// File is an etcd prefix that stores all Keboola-specific data we need for upload and import.
File struct {
etcdop.PrefixT[keboolasink.File]
etcdop.PrefixT[keboolaModel.File]
}
)

func forFile(s *serde.Serde) File {
return File{PrefixT: etcdop.NewTypedPrefix[keboolasink.File]("storage/keboola/file", s)}
func New(s *serde.Serde) File {
return File{PrefixT: etcdop.NewTypedPrefix[keboolaModel.File]("storage/keboola/file", s)}
}

func (v File) ForFile(k model.FileKey) etcdop.KeyT[keboolasink.File] {
func (v File) ForFile(k model.FileKey) etcdop.KeyT[keboolaModel.File] {
return v.PrefixT.Key(k.String())
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

func TestFileSchema(t *testing.T) {
t.Parallel()
s := forFile(serde.NewJSON(serde.NoValidation))
s := New(serde.NewJSON(serde.NoValidation))

fileKey := test.NewFileKey()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@ import (
"github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/serde"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key"
keboolasink "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola"
keboolaModel "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model"
)

type (
// Token is an etcd prefix that stores all Keboola Storage API token entities.
Token struct {
etcdop.PrefixT[keboolasink.Token]
etcdop.PrefixT[keboolaModel.Token]
}
)

func forToken(s *serde.Serde) Token {
return Token{PrefixT: etcdop.NewTypedPrefix[keboolasink.Token]("storage/keboola/secret/token", s)}
func New(s *serde.Serde) Token {
return Token{PrefixT: etcdop.NewTypedPrefix[keboolaModel.Token]("storage/keboola/secret/token", s)}
}

func (v Token) ForSink(k key.SinkKey) etcdop.KeyT[keboolasink.Token] {
func (v Token) ForSink(k key.SinkKey) etcdop.KeyT[keboolaModel.Token] {
return v.PrefixT.Key(k.String())
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

func TestTokenSchema(t *testing.T) {
t.Parallel()
s := forToken(serde.NewJSON(serde.NoValidation))
s := New(serde.NewJSON(serde.NoValidation))

sinkKey := test.NewSinkKey()

Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
package schema

import "github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/serde"
import (
"github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/serde"
fileSchema "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/file/schema"
tokenSchema "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/token/schema"
)

type Schema struct {
token Token
file File
token tokenSchema.Token
file fileSchema.File
}

func New(s *serde.Serde) Schema {
return Schema{
token: forToken(s),
file: forFile(s),
token: tokenSchema.New(s),
file: fileSchema.New(s),
}
}

func (s Schema) Token() Token {
func (s Schema) Token() tokenSchema.Token {
return s.token
}

func (s Schema) File() File {
func (s Schema) File() fileSchema.File {
return s.file
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package keboola
package model

import (
"github.com/keboola/go-client/pkg/keboola"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ import (
"github.com/keboola/keboola-as-code/internal/pkg/service/common/rollback"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key"
keboolasink "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola"
keboolaModel "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model"
"github.com/keboola/keboola-as-code/internal/pkg/utils/errors"
)

func (b *Bridge) GetToken(k key.SinkKey) op.WithResult[keboolasink.Token] {
func (b *Bridge) GetToken(k key.SinkKey) op.WithResult[keboolaModel.Token] {
return b.schema.Token().ForSink(k).GetOrErr(b.client).
WithEmptyResultAsError(func() error {
return serviceError.NewResourceNotFoundError("sink token", k.String(), "database")
Expand All @@ -40,7 +40,7 @@ func (b *Bridge) deleteTokenOnSinkDeactivation() {
}

func (b *Bridge) deleteToken(api *keboola.AuthorizedAPI, sinkKey key.SinkKey) *op.AtomicOp[op.NoResult] {
var oldToken keboolasink.Token
var oldToken keboolaModel.Token
return op.Atomic(b.client, &op.NoResult{}).
Read(func(ctx context.Context) op.Op {
return b.schema.Token().ForSink(sinkKey).GetOrErr(b.client).WithResultTo(&oldToken)
Expand All @@ -64,16 +64,16 @@ func (b *Bridge) deleteToken(api *keboola.AuthorizedAPI, sinkKey key.SinkKey) *o
})
}

func (b *Bridge) tokenForSink(ctx context.Context, now time.Time, sink definition.Sink) (keboolasink.Token, error) {
func (b *Bridge) tokenForSink(ctx context.Context, now time.Time, sink definition.Sink) (keboolaModel.Token, error) {
// Get token from database, if any.
// The token is scoped to the sink bucket,
// but an API operation can modify the target bucket,
// then a new token must be generated.
var existingToken *keboolasink.Token
var existingToken *keboolaModel.Token
if !sink.CreatedAt().Time().Equal(now) {
err := b.schema.Token().ForSink(sink.SinkKey).GetOrNil(b.client).WithResultTo(&existingToken).Do(ctx).Err()
if err != nil {
return keboolasink.Token{}, err
return keboolaModel.Token{}, err
}
}

Expand All @@ -83,7 +83,7 @@ func (b *Bridge) tokenForSink(ctx context.Context, now time.Time, sink definitio
if err != nil {
if existingToken == nil {
// Operation is not called from the API and there is no token in the database.
return keboolasink.Token{}, serviceError.NewResourceNotFoundError("sink token", sink.SinkKey.String(), "database")
return keboolaModel.Token{}, serviceError.NewResourceNotFoundError("sink token", sink.SinkKey.String(), "database")
}

// Operation is not called from the API and there is a token in the database, so we are using the token.
Expand All @@ -101,7 +101,7 @@ func (b *Bridge) tokenForSink(ctx context.Context, now time.Time, sink definitio
)

// Create new token based on the token from API authorization.
var newToken keboolasink.Token
var newToken keboolaModel.Token
b.logger.Info(ctx, "creating token")
result, err := api.
CreateTokenRequest(
Expand All @@ -118,7 +118,7 @@ func (b *Bridge) tokenForSink(ctx context.Context, now time.Time, sink definitio
}).
Send(ctx)
if err != nil {
return keboolasink.Token{}, err
return keboolaModel.Token{}, err
}

// Register rollback
Expand All @@ -128,14 +128,14 @@ func (b *Bridge) tokenForSink(ctx context.Context, now time.Time, sink definitio
})

// Update atomic operation
newToken = keboolasink.Token{SinkKey: sink.SinkKey, Token: *result}
newToken = keboolaModel.Token{SinkKey: sink.SinkKey, Token: *result}
op.AtomicOpCtxFrom(ctx).AddFrom(op.Atomic(b.client, &newToken).
// Save token to database
Write(func(ctx context.Context) op.Op {
return b.schema.Token().ForSink(sink.SinkKey).Put(b.client, newToken)
}).
// Delete old token after the new token is saved
AddProcessor(func(ctx context.Context, r *op.Result[keboolasink.Token]) {
AddProcessor(func(ctx context.Context, r *op.Result[keboolaModel.Token]) {
// Skip if the operation failed, or there is no old token
if r.Err() != nil || existingToken == nil {
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/keboola/keboola-as-code/internal/pkg/service/common/utctime"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/dependencies"
keboolaSink "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola"
keboolaModel "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model"
bridgeTest "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/test"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/metacleanup"
Expand All @@ -31,15 +30,15 @@ import (
type (
// token is an etcd prefix that stores all Keboola Storage API token entities.
testToken struct {
etcdop.PrefixT[keboolaSink.Token]
etcdop.PrefixT[keboolaModel.Token]
}
)

func forToken(s *serde.Serde) testToken {
return testToken{PrefixT: etcdop.NewTypedPrefix[keboolaSink.Token]("storage/keboola/secret/token", s)}
return testToken{PrefixT: etcdop.NewTypedPrefix[keboolaModel.Token]("storage/keboola/secret/token", s)}
}

func (v testToken) ForSink(k key.SinkKey) etcdop.KeyT[keboolaSink.Token] {
func (v testToken) ForSink(k key.SinkKey) etcdop.KeyT[keboolaModel.Token] {
return v.PrefixT.Key(k.String())
}

Expand Down Expand Up @@ -123,7 +122,7 @@ func TestMetadataCleanup(t *testing.T) {
require.NoError(t, defRepo.Source().Create(&source, clk.Now(), by, "Create source").Do(ctx).Err())
sink := dummy.NewSinkWithLocalStorage(sinkKey)
require.NoError(t, defRepo.Sink().Create(&sink, clk.Now(), by, "Create sink").Do(ctx).Err())
token := keboolaSink.Token{SinkKey: sinkKey, Token: keboola.Token{ID: "secret"}}
token := keboolaModel.Token{SinkKey: sinkKey, Token: keboola.Token{ID: "secret"}}
require.NoError(t, forToken(d.EtcdSerde()).ForSink(sinkKey).Put(client, token).Do(ctx).Err())
}

Expand Down Expand Up @@ -320,7 +319,7 @@ func TestMetadataProcessingJobCleanup(t *testing.T) {
require.NoError(t, defRepo.Source().Create(&source, clk.Now(), by, "Create source").Do(ctx).Err())
sink := dummy.NewSinkWithLocalStorage(sinkKey)
require.NoError(t, defRepo.Sink().Create(&sink, clk.Now(), by, "Create sink").Do(ctx).Err())
token := keboolaSink.Token{SinkKey: sinkKey, Token: keboola.Token{ID: "secret"}}
token := keboolaModel.Token{SinkKey: sinkKey, Token: keboola.Token{ID: "secret"}}
require.NoError(t, forToken(d.EtcdSerde()).ForSink(sinkKey).Put(client, token).Do(ctx).Err())
}

Expand Down

0 comments on commit ffd3ac4

Please sign in to comment.