Skip to content

Commit

Permalink
fix: Move defition/job -> storage/job
Browse files Browse the repository at this point in the history
  • Loading branch information
Matovidlo committed Nov 8, 2024
1 parent 1599163 commit 69affdf
Show file tree
Hide file tree
Showing 40 changed files with 434 additions and 451 deletions.
2 changes: 1 addition & 1 deletion internal/pkg/service/stream/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ storage:
fileImportTimeout: 15m0s
import:
# Specifies limit of sink that is representable by number
sinkLimit: 0
sinkLimit: 2
# Min duration from the last import to trigger the next, takes precedence over other settings. Validation rules: required,minDuration=30s,maxDuration=24h
minInterval: 1m0s
trigger:
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/serde"
branch "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/repository/branch"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/repository/job"
sink "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/repository/sink"
source "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/repository/source"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/plugin"
Expand All @@ -21,15 +20,13 @@ type Repository struct {
branch *branch.Repository
source *source.Repository
sink *sink.Repository
job *job.Repository
}

func New(d dependencies) *Repository {
r := &Repository{}
r.branch = branch.NewRepository(d)
r.source = source.NewRepository(d, r.branch)
r.sink = sink.NewRepository(d, r.source)
r.job = job.NewRepository(d, r.sink)
return r
}

Expand All @@ -44,7 +41,3 @@ func (r *Repository) Source() *source.Repository {
func (r *Repository) Sink() *sink.Repository {
return r.sink
}

func (r *Repository) Job() *job.Repository {
return r.job
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/keboola/keboola-as-code/internal/pkg/service/common/distlock"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/serde"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition"
definitionRepo "github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/repository"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/plugin"
keboolasink "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/schema"
Expand All @@ -32,16 +31,15 @@ const (
)

type Bridge struct {
logger log.Logger
config keboolasink.Config
client etcd.KV
schema schema.Schema
plugins *plugin.Plugins
publicAPI *keboola.PublicAPI
apiProvider apiProvider
definitionRepository *definitionRepo.Repository
storageRepository *storageRepo.Repository
locks *distlock.Provider
logger log.Logger
config keboolasink.Config
client etcd.KV
schema schema.Schema
plugins *plugin.Plugins
publicAPI *keboola.PublicAPI
apiProvider apiProvider
storageRepository *storageRepo.Repository
locks *distlock.Provider

getBucketOnce *singleflight.Group
createBucketOnce *singleflight.Group
Expand All @@ -53,25 +51,23 @@ type dependencies interface {
EtcdSerde() *serde.Serde
Plugins() *plugin.Plugins
KeboolaPublicAPI() *keboola.PublicAPI
DefinitionRepository() *definitionRepo.Repository
StorageRepository() *storageRepo.Repository
DistributedLockProvider() *distlock.Provider
}

func New(d dependencies, apiProvider apiProvider, config keboolasink.Config) *Bridge {
b := &Bridge{
logger: d.Logger().WithComponent("keboola.bridge"),
config: config,
client: d.EtcdClient(),
schema: schema.New(d.EtcdSerde()),
plugins: d.Plugins(),
publicAPI: d.KeboolaPublicAPI(),
apiProvider: apiProvider,
definitionRepository: d.DefinitionRepository(),
storageRepository: d.StorageRepository(),
locks: d.DistributedLockProvider(),
getBucketOnce: &singleflight.Group{},
createBucketOnce: &singleflight.Group{},
logger: d.Logger().WithComponent("keboola.bridge"),
config: config,
client: d.EtcdClient(),
schema: schema.New(d.EtcdSerde()),
plugins: d.Plugins(),
publicAPI: d.KeboolaPublicAPI(),
apiProvider: apiProvider,
storageRepository: d.StorageRepository(),
locks: d.DistributedLockProvider(),
getBucketOnce: &singleflight.Group{},
createBucketOnce: &singleflight.Group{},
}

b.setupOnFileOpen()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestBridge_FullWorkflow(t *testing.T) {
bridgeTest.MockTokenStorageAPICalls(t, transport)
bridgeTest.MockBucketStorageAPICalls(t, transport)
bridgeTest.MockTableStorageAPICalls(t, transport)
bridgeTest.MockSucessJobStorageAPICalls(t, transport)
bridgeTest.MockSuccessJobStorageAPICalls(t, transport)
bridgeTest.MockFileStorageAPICalls(t, clk, transport)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func (b *Bridge) importFile(ctx context.Context, file plugin.File, stats statist
if err != nil {
return err
}
fmt.Println("keboolafile", job.ID)

Check failure on line 217 in internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/file.go

View workflow job for this annotation

GitHub Actions / Lint / lint

use of `fmt.Println` forbidden because "Debug statements are forbidden, use a logger, not debug statements." (forbidigo)

b.logger.Info(ctx, "created staging file")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestBridge_ImportFile_EmptyFile(t *testing.T) {
require.NoError(t, storageRepo.Slice().SwitchToUploading(slice.SliceKey, clk.Now(), true).Do(ctx).Err())
require.NoError(t, storageRepo.Slice().SwitchToUploaded(slice.SliceKey, clk.Now()).Do(ctx).Err())

// Switch file to the FileImporting state
// Switch empty file to the FileImporting state
clk.Add(time.Second)
require.NoError(t, storageRepo.File().SwitchToImporting(file.FileKey, clk.Now(), true).Do(ctx).Err())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (
"github.com/keboola/go-client/pkg/keboola"

"github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop/op"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/plugin"
keboolasink "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model"
)

func (b *Bridge) Job(k key.JobKey) op.WithResult[keboolasink.Job] {
Expand All @@ -31,19 +31,21 @@ func (b *Bridge) DeleteJob(k key.JobKey) *op.AtomicOp[keboolasink.Job] {
}

func (b *Bridge) createJob(ctx context.Context, token string, file plugin.File, storageJob *keboola.StorageJob) error {
definitionJob := definition.Job{
modelJob := model.Job{
JobKey: key.JobKey{
SinkKey: file.SinkKey,
JobID: key.JobID(storageJob.ID.String()),
},
}
b.logger.Debugf(ctx, "creating job: %v", modelJob)

keboolaJob := keboolasink.Job{
JobKey: definitionJob.JobKey,
JobKey: modelJob.JobKey,
StorageJobKey: storageJob.StorageJobKey,
Token: token,
}
b.logger.Infof(ctx, "creating job: %v", definitionJob)
b.logger.Infof(ctx, "creating storage job: %v", keboolaJob)

lock := b.locks.NewMutex(fmt.Sprintf("api.source.sink.jobs.%s", file.SinkKey))
if err := lock.Lock(ctx); err != nil {
return err
Expand All @@ -54,10 +56,11 @@ func (b *Bridge) createJob(ctx context.Context, token string, file plugin.File,
}
}()

operation := b.definitionRepository.Job().Create(&definitionJob).RequireLock(lock)
operation := b.storageRepository.Job().Create(&modelJob).RequireLock(lock)
if err := operation.Do(ctx).Err(); err != nil {
return err
}
b.logger.Debugf(ctx, "job created: %v", modelJob)

resultOp := b.schema.Job().ForJob(keboolaJob.JobKey).Put(b.client, keboolaJob)
if err := resultOp.Do(ctx).Err(); err != nil {
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,145 @@
package bridge_test

import (
"context"
"testing"
"time"

"github.com/benbjohnson/clock"
"github.com/keboola/go-client/pkg/keboola"
commonDeps "github.com/keboola/keboola-as-code/internal/pkg/service/common/dependencies"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/duration"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/rollback"
"github.com/keboola/keboola-as-code/internal/pkg/service/common/utctime"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/config"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/dependencies"
bridgeTest "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/test"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/model"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/node/coordinator/fileimport"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/storage/test"
"github.com/keboola/keboola-as-code/internal/pkg/utils/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/client/v3/concurrency"
)

func TestBridge_CreateJob(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

clk := clock.NewMock()
clk.Set(utctime.MustParse("2000-01-01T01:00:00.000Z").Time())
by := test.ByUser()

checkInterval := time.Minute
d, mock := dependencies.NewMockedCoordinatorScopeWithConfig(t, ctx, func(cfg *config.Config) {
cfg.Storage.Level.Target.Operator.FileImportCheckInterval = duration.From(checkInterval)
}, commonDeps.WithClock(clk))
logger := mock.DebugLogger()
client := mock.TestEtcdClient()
defRepo := d.DefinitionRepository()
storageRepo := d.StorageRepository()
volumeRepo := storageRepo.Volume()

apiCtx := rollback.ContextWith(ctx, rollback.New(d.Logger()))
apiCtx = context.WithValue(apiCtx, dependencies.KeboolaProjectAPICtxKey, mock.KeboolaProjectAPI())

// Register mocked responses
// -----------------------------------------------------------------------------------------------------------------
transport := mock.MockedHTTPTransport()
{
bridgeTest.MockTokenStorageAPICalls(t, transport)
bridgeTest.MockBucketStorageAPICalls(t, transport)
bridgeTest.MockTableStorageAPICalls(t, transport)
bridgeTest.MockSuccessJobStorageAPICalls(t, transport)
bridgeTest.MockImportAsyncAPICalls(t, transport)
bridgeTest.MockFileStorageAPICalls(t, clk, transport)
}

// Register active volumes
// -----------------------------------------------------------------------------------------------------------------
{
session, err := concurrency.NewSession(client)
require.NoError(t, err)
defer func() { require.NoError(t, session.Close()) }()
test.RegisterWriterVolumes(t, ctx, volumeRepo, session, 1)
}

// Create sink
// -----------------------------------------------------------------------------------------------------------------
projectID := keboola.ProjectID(123)
branchKey := key.BranchKey{ProjectID: projectID, BranchID: 456}
sourceKey := key.SourceKey{BranchKey: branchKey, SourceID: "my-source"}
sinkKey := key.SinkKey{SourceKey: sourceKey, SinkID: "my-sink"}
{
branch := test.NewBranch(branchKey)
require.NoError(t, defRepo.Branch().Create(&branch, clk.Now(), by).Do(apiCtx).Err())
source := test.NewSource(sourceKey)
require.NoError(t, defRepo.Source().Create(&source, clk.Now(), by, "Create source").Do(apiCtx).Err())
sink := test.NewKeboolaTableSink(sinkKey)
require.NoError(t, defRepo.Sink().Create(&sink, clk.Now(), by, "Create sink").Do(apiCtx).Err())
}

// Switch file to the FileImporting state
// -----------------------------------------------------------------------------------------------------------------
{
// Rotate file
clk.Add(time.Second)
require.NoError(t, storageRepo.File().Rotate(sinkKey, clk.Now()).Do(apiCtx).Err())
files, err := storageRepo.File().ListAll().Do(ctx).All()
require.NoError(t, err)
require.Len(t, files, 2)
slices, err := storageRepo.Slice().ListIn(files[0].FileKey).Do(ctx).All()
require.NoError(t, err)
require.Len(t, slices, 1)
require.Equal(t, model.FileClosing, files[0].State)
require.Equal(t, model.FileWriting, files[1].State)
require.Equal(t, model.SliceClosing, slices[0].State)
file := files[0]
slice := slices[0]

// Upload of non empty slice
clk.Add(time.Second)
require.NoError(t, storageRepo.Slice().SwitchToUploading(slice.SliceKey, clk.Now(), false).Do(ctx).Err())
require.NoError(t, storageRepo.Slice().SwitchToUploaded(slice.SliceKey, clk.Now()).Do(ctx).Err())

// Switch file to the FileImporting state
clk.Add(time.Second)
require.NoError(t, storageRepo.File().SwitchToImporting(file.FileKey, clk.Now(), false).Do(ctx).Err())
}

// Start file import operator
// -----------------------------------------------------------------------------------------------------------------
require.NoError(t, fileimport.Start(d, mock.TestConfig().Storage.Level.Target.Operator))

// Wait import of the empty file
// -----------------------------------------------------------------------------------------------------------------
logger.Truncate()
clk.Add(checkInterval)
assert.EventuallyWithT(t, func(c *assert.CollectT) {
logger.AssertJSONMessages(c, `
{"level":"info","message":"importing file","component":"storage.node.operator.file.import"}
{"level":"debug","message":"creating job: 123/456/my-source/my-sink/321","component":"keboola.bridge"}
{"level":"info","message":"creating storage job: 123/456/my-source/my-sink/321","component":"keboola.bridge"}
{"level":"debug","message":"job created: 123/456/my-source/my-sink/321","component":"keboola.bridge"}
{"level":"info","message":"storage job created: 123/456/my-source/my-sink/321","component":"keboola.bridge"}
{"level":"info","message":"imported file","component":"storage.node.operator.file.import"}
`)
}, 15*time.Second, 50*time.Millisecond)

// Empty file in the Storage API has been deleted
// -----------------------------------------------------------------------------------------------------------------
expectedImportAsyncAPICall := "POST https://connection.keboola.local/v2/storage/branch/456/tables/in.c-bucket.my-table/import-async"
expectedStorageJobsCall := "GET https://connection.keboola.local/v2/storage/jobs/321"
assert.Equal(t, 1, transport.GetCallCountInfo()[expectedImportAsyncAPICall])
assert.Equal(t, 1, transport.GetCallCountInfo()[expectedStorageJobsCall])

// Shutdown
// -----------------------------------------------------------------------------------------------------------------
d.Process().Shutdown(ctx, errors.New("bye bye"))
d.Process().WaitForShutdown()
logger.AssertNoErrorMessage(t)
}
Loading

0 comments on commit 69affdf

Please sign in to comment.