diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/file.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/file.go index d2b6764701..9cad541612 100644 --- a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/file.go +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/file.go @@ -14,7 +14,7 @@ import ( "github.com/keboola/keboola-as-code/internal/pkg/service/common/utctime" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/plugin" - keboolasink "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola" + keboolaModel "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/statistics" ) @@ -70,7 +70,7 @@ func (b *Bridge) setupOnFileOpen() { }) } -func (b *Bridge) createStagingFile(ctx context.Context, api *keboola.AuthorizedAPI, sink definition.Sink, file *model.File) (keboolasink.File, error) { +func (b *Bridge) createStagingFile(ctx context.Context, api *keboola.AuthorizedAPI, sink definition.Sink, file *model.File) (keboolaModel.File, error) { name := fmt.Sprintf(`%s_%s_%s`, file.SourceID, file.SinkID, file.OpenedAt().Time().Format(fileNameDateFormat)) attributes := file.Telemetry() attributes = append(attributes, attribute.String("file.name", name)) @@ -88,7 +88,7 @@ func (b *Bridge) createStagingFile(ctx context.Context, api *keboola.AuthorizedA ), ).Send(ctx) if err != nil { - return keboolasink.File{}, err + return keboolaModel.File{}, err } // Register rollback @@ -99,7 +99,7 @@ func (b *Bridge) createStagingFile(ctx context.Context, api *keboola.AuthorizedA // Save credentials to database ctx = ctxattr.ContextWith(ctx, attribute.String("file.resourceID", stagingFile.FileID.String())) - keboolaFile := keboolasink.File{ + keboolaFile := keboolaModel.File{ FileKey: &file.FileKey, SinkKey: file.SinkKey, TableID: sink.Table.Keboola.TableID, diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/file.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/file.go similarity index 97% rename from internal/pkg/service/stream/sink/type/tablesink/keboola/file.go rename to internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/file.go index b7bf960164..17cfdc0c0b 100644 --- a/internal/pkg/service/stream/sink/type/tablesink/keboola/file.go +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/file.go @@ -1,4 +1,4 @@ -package keboola +package model import ( "github.com/keboola/go-client/pkg/keboola" diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/schema/file.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/file/schema/file.go similarity index 54% rename from internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/schema/file.go rename to internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/file/schema/file.go index a757a6bfda..f2608dd5a9 100644 --- a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/schema/file.go +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/file/schema/file.go @@ -3,21 +3,21 @@ package schema import ( "github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop" "github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/serde" - keboolasink "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola" + keboolaModel "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model" ) type ( // File is an etcd prefix that stores all Keboola-specific data we need for upload and import. File struct { - etcdop.PrefixT[keboolasink.File] + etcdop.PrefixT[keboolaModel.File] } ) -func forFile(s *serde.Serde) File { - return File{PrefixT: etcdop.NewTypedPrefix[keboolasink.File]("storage/keboola/file", s)} +func New(s *serde.Serde) File { + return File{PrefixT: etcdop.NewTypedPrefix[keboolaModel.File]("storage/keboola/file", s)} } -func (v File) ForFile(k model.FileKey) etcdop.KeyT[keboolasink.File] { +func (v File) ForFile(k model.FileKey) etcdop.KeyT[keboolaModel.File] { return v.PrefixT.Key(k.String()) } diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/schema/file_test.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/file/schema/file_test.go similarity index 92% rename from internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/schema/file_test.go rename to internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/file/schema/file_test.go index e2f6f1eb8a..83995e4655 100644 --- a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/schema/file_test.go +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/file/schema/file_test.go @@ -11,7 +11,7 @@ import ( func TestFileSchema(t *testing.T) { t.Parallel() - s := forFile(serde.NewJSON(serde.NoValidation)) + s := New(serde.NewJSON(serde.NoValidation)) fileKey := test.NewFileKey() diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/schema/token.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/token/schema/token.go similarity index 52% rename from internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/schema/token.go rename to internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/token/schema/token.go index 9e2a0b3bc7..147f5baa0c 100644 --- a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/schema/token.go +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/token/schema/token.go @@ -4,20 +4,20 @@ import ( "github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop" "github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/serde" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key" - keboolasink "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola" + keboolaModel "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model" ) type ( // Token is an etcd prefix that stores all Keboola Storage API token entities. Token struct { - etcdop.PrefixT[keboolasink.Token] + etcdop.PrefixT[keboolaModel.Token] } ) -func forToken(s *serde.Serde) Token { - return Token{PrefixT: etcdop.NewTypedPrefix[keboolasink.Token]("storage/keboola/secret/token", s)} +func New(s *serde.Serde) Token { + return Token{PrefixT: etcdop.NewTypedPrefix[keboolaModel.Token]("storage/keboola/secret/token", s)} } -func (v Token) ForSink(k key.SinkKey) etcdop.KeyT[keboolasink.Token] { +func (v Token) ForSink(k key.SinkKey) etcdop.KeyT[keboolaModel.Token] { return v.PrefixT.Key(k.String()) } diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/schema/token_test.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/token/schema/token_test.go similarity index 91% rename from internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/schema/token_test.go rename to internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/token/schema/token_test.go index 14ef17c05d..873366e284 100644 --- a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/schema/token_test.go +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/token/schema/token_test.go @@ -11,7 +11,7 @@ import ( func TestTokenSchema(t *testing.T) { t.Parallel() - s := forToken(serde.NewJSON(serde.NoValidation)) + s := New(serde.NewJSON(serde.NoValidation)) sinkKey := test.NewSinkKey() diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/schema/schema.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/schema/schema.go index 043cfad4c8..e09b91da92 100644 --- a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/schema/schema.go +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/schema/schema.go @@ -1,23 +1,27 @@ package schema -import "github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/serde" +import ( + "github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/serde" + fileSchema "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/file/schema" + tokenSchema "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/repository/token/schema" +) type Schema struct { - token Token - file File + token tokenSchema.Token + file fileSchema.File } func New(s *serde.Serde) Schema { return Schema{ - token: forToken(s), - file: forFile(s), + token: tokenSchema.New(s), + file: fileSchema.New(s), } } -func (s Schema) Token() Token { +func (s Schema) Token() tokenSchema.Token { return s.token } -func (s Schema) File() File { +func (s Schema) File() fileSchema.File { return s.file } diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/token.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/token.go similarity index 96% rename from internal/pkg/service/stream/sink/type/tablesink/keboola/token.go rename to internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/token.go index da0bcd3256..005a84c498 100644 --- a/internal/pkg/service/stream/sink/type/tablesink/keboola/token.go +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model/token.go @@ -1,4 +1,4 @@ -package keboola +package model import ( "github.com/keboola/go-client/pkg/keboola" diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/token.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/token.go index 9ca3943ce7..00257bc4ac 100644 --- a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/token.go +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/token.go @@ -14,11 +14,11 @@ import ( "github.com/keboola/keboola-as-code/internal/pkg/service/common/rollback" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key" - keboolasink "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola" + keboolaModel "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model" "github.com/keboola/keboola-as-code/internal/pkg/utils/errors" ) -func (b *Bridge) GetToken(k key.SinkKey) op.WithResult[keboolasink.Token] { +func (b *Bridge) GetToken(k key.SinkKey) op.WithResult[keboolaModel.Token] { return b.schema.Token().ForSink(k).GetOrErr(b.client). WithEmptyResultAsError(func() error { return serviceError.NewResourceNotFoundError("sink token", k.String(), "database") @@ -40,7 +40,7 @@ func (b *Bridge) deleteTokenOnSinkDeactivation() { } func (b *Bridge) deleteToken(api *keboola.AuthorizedAPI, sinkKey key.SinkKey) *op.AtomicOp[op.NoResult] { - var oldToken keboolasink.Token + var oldToken keboolaModel.Token return op.Atomic(b.client, &op.NoResult{}). Read(func(ctx context.Context) op.Op { return b.schema.Token().ForSink(sinkKey).GetOrErr(b.client).WithResultTo(&oldToken) @@ -64,16 +64,16 @@ func (b *Bridge) deleteToken(api *keboola.AuthorizedAPI, sinkKey key.SinkKey) *o }) } -func (b *Bridge) tokenForSink(ctx context.Context, now time.Time, sink definition.Sink) (keboolasink.Token, error) { +func (b *Bridge) tokenForSink(ctx context.Context, now time.Time, sink definition.Sink) (keboolaModel.Token, error) { // Get token from database, if any. // The token is scoped to the sink bucket, // but an API operation can modify the target bucket, // then a new token must be generated. - var existingToken *keboolasink.Token + var existingToken *keboolaModel.Token if !sink.CreatedAt().Time().Equal(now) { err := b.schema.Token().ForSink(sink.SinkKey).GetOrNil(b.client).WithResultTo(&existingToken).Do(ctx).Err() if err != nil { - return keboolasink.Token{}, err + return keboolaModel.Token{}, err } } @@ -83,7 +83,7 @@ func (b *Bridge) tokenForSink(ctx context.Context, now time.Time, sink definitio if err != nil { if existingToken == nil { // Operation is not called from the API and there is no token in the database. - return keboolasink.Token{}, serviceError.NewResourceNotFoundError("sink token", sink.SinkKey.String(), "database") + return keboolaModel.Token{}, serviceError.NewResourceNotFoundError("sink token", sink.SinkKey.String(), "database") } // Operation is not called from the API and there is a token in the database, so we are using the token. @@ -101,7 +101,7 @@ func (b *Bridge) tokenForSink(ctx context.Context, now time.Time, sink definitio ) // Create new token based on the token from API authorization. - var newToken keboolasink.Token + var newToken keboolaModel.Token b.logger.Info(ctx, "creating token") result, err := api. CreateTokenRequest( @@ -118,7 +118,7 @@ func (b *Bridge) tokenForSink(ctx context.Context, now time.Time, sink definitio }). Send(ctx) if err != nil { - return keboolasink.Token{}, err + return keboolaModel.Token{}, err } // Register rollback @@ -128,14 +128,14 @@ func (b *Bridge) tokenForSink(ctx context.Context, now time.Time, sink definitio }) // Update atomic operation - newToken = keboolasink.Token{SinkKey: sink.SinkKey, Token: *result} + newToken = keboolaModel.Token{SinkKey: sink.SinkKey, Token: *result} op.AtomicOpCtxFrom(ctx).AddFrom(op.Atomic(b.client, &newToken). // Save token to database Write(func(ctx context.Context) op.Op { return b.schema.Token().ForSink(sink.SinkKey).Put(b.client, newToken) }). // Delete old token after the new token is saved - AddProcessor(func(ctx context.Context, r *op.Result[keboolasink.Token]) { + AddProcessor(func(ctx context.Context, r *op.Result[keboolaModel.Token]) { // Skip if the operation failed, or there is no old token if r.Err() != nil || existingToken == nil { return diff --git a/internal/pkg/service/stream/storage/metacleanup/metacleanup_test.go b/internal/pkg/service/stream/storage/metacleanup/metacleanup_test.go index f45a5e0b53..232f3efad3 100644 --- a/internal/pkg/service/stream/storage/metacleanup/metacleanup_test.go +++ b/internal/pkg/service/stream/storage/metacleanup/metacleanup_test.go @@ -17,7 +17,6 @@ import ( "github.com/keboola/keboola-as-code/internal/pkg/service/common/utctime" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/dependencies" - keboolaSink "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola" keboolaModel "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model" bridgeTest "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/test" "github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/metacleanup" @@ -31,15 +30,15 @@ import ( type ( // token is an etcd prefix that stores all Keboola Storage API token entities. testToken struct { - etcdop.PrefixT[keboolaSink.Token] + etcdop.PrefixT[keboolaModel.Token] } ) func forToken(s *serde.Serde) testToken { - return testToken{PrefixT: etcdop.NewTypedPrefix[keboolaSink.Token]("storage/keboola/secret/token", s)} + return testToken{PrefixT: etcdop.NewTypedPrefix[keboolaModel.Token]("storage/keboola/secret/token", s)} } -func (v testToken) ForSink(k key.SinkKey) etcdop.KeyT[keboolaSink.Token] { +func (v testToken) ForSink(k key.SinkKey) etcdop.KeyT[keboolaModel.Token] { return v.PrefixT.Key(k.String()) } @@ -123,7 +122,7 @@ func TestMetadataCleanup(t *testing.T) { require.NoError(t, defRepo.Source().Create(&source, clk.Now(), by, "Create source").Do(ctx).Err()) sink := dummy.NewSinkWithLocalStorage(sinkKey) require.NoError(t, defRepo.Sink().Create(&sink, clk.Now(), by, "Create sink").Do(ctx).Err()) - token := keboolaSink.Token{SinkKey: sinkKey, Token: keboola.Token{ID: "secret"}} + token := keboolaModel.Token{SinkKey: sinkKey, Token: keboola.Token{ID: "secret"}} require.NoError(t, forToken(d.EtcdSerde()).ForSink(sinkKey).Put(client, token).Do(ctx).Err()) } @@ -320,7 +319,7 @@ func TestMetadataProcessingJobCleanup(t *testing.T) { require.NoError(t, defRepo.Source().Create(&source, clk.Now(), by, "Create source").Do(ctx).Err()) sink := dummy.NewSinkWithLocalStorage(sinkKey) require.NoError(t, defRepo.Sink().Create(&sink, clk.Now(), by, "Create sink").Do(ctx).Err()) - token := keboolaSink.Token{SinkKey: sinkKey, Token: keboola.Token{ID: "secret"}} + token := keboolaModel.Token{SinkKey: sinkKey, Token: keboola.Token{ID: "secret"}} require.NoError(t, forToken(d.EtcdSerde()).ForSink(sinkKey).Put(client, token).Do(ctx).Err()) }