Skip to content

Commit

Permalink
feat: Remove token from job and use exported function on top of bridge
Browse files Browse the repository at this point in the history
Rework metadata cleanup test to store token so it can test the workflow
of metadata cleanup correctly.
  • Loading branch information
Matovidlo committed Nov 19, 2024
1 parent da8707a commit 41c0053
Show file tree
Hide file tree
Showing 18 changed files with 78 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (b *Bridge) importFile(ctx context.Context, file plugin.File, stats statist
}

// Save job ID to etcd
err = b.createJob(ctx, token.TokenString(), file, job)
err = b.createJob(ctx, file, job)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,17 @@ import (
"github.com/keboola/keboola-as-code/internal/pkg/service/common/ctxattr"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition/key"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/plugin"
keboolasink "github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola"
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/model"
)

func (b *Bridge) createJob(ctx context.Context, token string, file plugin.File, storageJob *keboola.StorageJob) error {
func (b *Bridge) SinkToken(ctx context.Context, sinkKey key.SinkKey) (keboolasink.Token, error) {
return b.schema.Token().ForSink(sinkKey).GetOrErr(b.client).Do(ctx).ResultOrErr()
}

func (b *Bridge) createJob(ctx context.Context, file plugin.File, storageJob *keboola.StorageJob) error {
keboolaJob := model.Job{
JobKey: key.JobKey{SinkKey: file.SinkKey, JobID: key.JobID(storageJob.ID.String())},
Token: token,
}
// Add context attributes
ctx = ctxattr.ContextWith(ctx, attribute.String("job.id", keboolaJob.String()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,5 @@ import (
// At the end throttle the sink, so it does not overloads the service.
type Job struct {
key.JobKey
Deleted bool `json:"-"` // internal field to mark the entity for deletion, there is no soft delete
Token string `json:"token"`
Deleted bool `json:"-"` // internal field to mark the entity for deletion, there is no soft delete
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ storage/job/123/567/my-source/my-sink/321
"branchId": 567,
"sourceId": "my-source",
"sinkId": "my-sink",
"jobId": "321",
"token": "secret"
"jobId": "321"
}
>>>>>
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ storage/job/123/567/my-source/my-sink/321
"branchId": 567,
"sourceId": "my-source",
"sinkId": "my-sink",
"jobId": "321",
"token": "secret"
"jobId": "321"
}
>>>>>
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ storage/job/123/567/my-source/my-sink/321
"branchId": 567,
"sourceId": "my-source",
"sinkId": "my-sink",
"jobId": "321",
"token": "secret"
"jobId": "321"
}
>>>>>
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ storage/job/123/567/my-source/my-sink/321
"branchId": 567,
"sourceId": "my-source",
"sinkId": "my-sink",
"jobId": "321",
"token": "secret1"
"jobId": "321"
}
>>>>>

Expand All @@ -19,7 +18,6 @@ storage/job/123/567/my-source/my-sink/322
"branchId": 567,
"sourceId": "my-source",
"sinkId": "my-sink",
"jobId": "322",
"token": "secret2"
"jobId": "322"
}
>>>>>
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ storage/job/123/567/my-source/my-sink-1/321
"branchId": 567,
"sourceId": "my-source",
"sinkId": "my-sink-1",
"jobId": "321",
"token": "secret1"
"jobId": "321"
}
>>>>>

Expand All @@ -19,7 +18,6 @@ storage/job/123/567/my-source/my-sink-2/322
"branchId": 567,
"sourceId": "my-source",
"sinkId": "my-sink-2",
"jobId": "322",
"token": "secret2"
"jobId": "322"
}
>>>>>
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,7 @@ storage/job/123/567/my-source/my-sink-2/321
"branchId": 567,
"sourceId": "my-source",
"sinkId": "my-sink-2",
"jobId": "321",
"token": "secret1"
"jobId": "321"
}
>>>>>

Expand All @@ -197,8 +196,7 @@ storage/job/123/567/my-source/my-sink-2/322
"branchId": 567,
"sourceId": "my-source",
"sinkId": "my-sink-2",
"jobId": "322",
"token": "secret2"
"jobId": "322"
}
>>>>>

Expand All @@ -210,7 +208,6 @@ storage/job/123/567/my-source/my-sink-3/323
"branchId": 567,
"sourceId": "my-source",
"sinkId": "my-sink-3",
"jobId": "323",
"token": "secret3"
"jobId": "323"
}
>>>>>
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,7 @@ storage/job/123/567/my-source/my-sink-2/321
"branchId": 567,
"sourceId": "my-source",
"sinkId": "my-sink-2",
"jobId": "321",
"token": "secret1"
"jobId": "321"
}
>>>>>

Expand All @@ -197,8 +196,7 @@ storage/job/123/567/my-source/my-sink-2/322
"branchId": 567,
"sourceId": "my-source",
"sinkId": "my-sink-2",
"jobId": "322",
"token": "secret2"
"jobId": "322"
}
>>>>>

Expand All @@ -210,7 +208,6 @@ storage/job/123/567/my-source/my-sink-3/323
"branchId": 567,
"sourceId": "my-source",
"sinkId": "my-sink-3",
"jobId": "323",
"token": "secret3"
"jobId": "323"
}
>>>>>
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestJobRepository_Create(t *testing.T) {
sink := dummy.NewSink(sinkKey)
require.NoError(t, d.DefinitionRepository().Sink().Create(&sink, now, by, "Create sink").Do(ctx).Err())

job := model.Job{JobKey: jobKey, Token: "secret"}
job := model.Job{JobKey: jobKey}
result, err := repo.Create(&job).Do(ctx).ResultOrErr()
require.NoError(t, err)
assert.Equal(t, job, result)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestJobRepository_Exists(t *testing.T) {
sink := dummy.NewSink(sinkKey)
require.NoError(t, d.DefinitionRepository().Sink().Create(&sink, now, by, "Create sink").Do(ctx).Err())

job := model.Job{JobKey: jobKey, Token: "secret"}
job := model.Job{JobKey: jobKey}
result, err := repo.Create(&job).Do(ctx).ResultOrErr()
require.NoError(t, err)
assert.Equal(t, job, result)
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestJobRepository_Exists(t *testing.T) {
// Purge - ok
// -----------------------------------------------------------------------------------------------------------------
{
job := model.Job{JobKey: jobKey, Token: "secret"}
job := model.Job{JobKey: jobKey}
assert.NoError(t, repo.Purge(&job).Do(ctx).Err())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestJobRepository_Get(t *testing.T) {
sink := dummy.NewSink(sinkKey)
require.NoError(t, d.DefinitionRepository().Sink().Create(&sink, now, by, "Create sink").Do(ctx).Err())

job := model.Job{JobKey: jobKey, Token: "secret"}
job := model.Job{JobKey: jobKey}
result, err := repo.Create(&job).Do(ctx).ResultOrErr()
require.NoError(t, err)
assert.Equal(t, job, result)
Expand All @@ -74,6 +74,6 @@ func TestJobRepository_Get(t *testing.T) {
{
result, err := repo.Get(jobKey).Do(ctx).ResultOrErr()
require.NoError(t, err)
assert.Equal(t, model.Job{JobKey: jobKey, Token: "secret"}, result)
assert.Equal(t, model.Job{JobKey: jobKey}, result)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ func TestJobRepository_List(t *testing.T) {
sink := dummy.NewSink(sinkKey)
require.NoError(t, d.DefinitionRepository().Sink().Create(&sink, now, by, "Create sink").Do(ctx).Err())

job1 = model.Job{JobKey: jobKey1, Token: "secret1"}
job1 = model.Job{JobKey: jobKey1}
result, err := repo.Create(&job1).Do(ctx).ResultOrErr()
require.NoError(t, err)
assert.Equal(t, job1, result)

job2 = model.Job{JobKey: jobKey2, Token: "secret2"}
job2 = model.Job{JobKey: jobKey2}
result, err = repo.Create(&job2).Do(ctx).ResultOrErr()
require.NoError(t, err)
assert.Equal(t, job2, result)
Expand Down Expand Up @@ -133,12 +133,12 @@ func TestJobRepository_ListDeleted(t *testing.T) {
// -----------------------------------------------------------------------------------------------------------------
var job1, job2 model.Job
{
job1 = model.Job{JobKey: jobKey1, Token: "secret1"}
job1 = model.Job{JobKey: jobKey1}
result, err := repo.Create(&job1).Do(ctx).ResultOrErr()
require.NoError(t, err)
assert.Equal(t, job1, result)

job2 = model.Job{JobKey: jobKey2, Token: "secret2"}
job2 = model.Job{JobKey: jobKey2}
result, err = repo.Create(&job2).Do(ctx).ResultOrErr()
require.NoError(t, err)
assert.Equal(t, job2, result)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestJobRepository_Purge(t *testing.T) {
sink := dummy.NewSink(sinkKey)
require.NoError(t, d.DefinitionRepository().Sink().Create(&sink, now, by, "Create sink").Do(ctx).Err())

job := model.Job{JobKey: jobKey, Token: "secret"}
job := model.Job{JobKey: jobKey}
require.NoError(t, repo.Create(&job).Do(ctx).Err())
}

Expand Down Expand Up @@ -127,13 +127,13 @@ func TestJobRepository_PurgeJobsOnSinkDelete(t *testing.T) {
// Create three Jobs
// -----------------------------------------------------------------------------------------------------------------
{
job := model.Job{JobKey: jobKey1, Token: "secret1"}
job := model.Job{JobKey: jobKey1}
require.NoError(t, repo.Create(&job).Do(ctx).Err())

job = model.Job{JobKey: jobKey2, Token: "secret2"}
job = model.Job{JobKey: jobKey2}
require.NoError(t, repo.Create(&job).Do(ctx).Err())

job = model.Job{JobKey: jobKey3, Token: "secret3"}
job = model.Job{JobKey: jobKey3}
require.NoError(t, repo.Create(&job).Do(ctx).Err())
}

Expand Down Expand Up @@ -223,13 +223,13 @@ func TestJobRepository_PurgeJobsOnSourceDelete_DeleteSource(t *testing.T) {
serviceErrors.AssertErrorStatusCode(t, http.StatusNotFound, err)
}

job1 = model.Job{JobKey: jobKey1, Token: "secret1"}
job1 = model.Job{JobKey: jobKey1}
require.NoError(t, repo.Create(&job1).Do(ctx).Err())

job2 = model.Job{JobKey: jobKey2, Token: "secret2"}
job2 = model.Job{JobKey: jobKey2}
require.NoError(t, repo.Create(&job2).Do(ctx).Err())

job3 = model.Job{JobKey: jobKey3, Token: "secret3"}
job3 = model.Job{JobKey: jobKey3}
require.NoError(t, repo.Create(&job3).Do(ctx).Err())
etcdhelper.AssertKVsFromFile(t, client, "fixtures/job_purge_snapshot_003.txt", ignoredEtcdKeys)
}
Expand Down Expand Up @@ -325,13 +325,13 @@ func TestSinkRepository_DeleteSinksOnSourceDelete_DeleteBranch(t *testing.T) {
serviceErrors.AssertErrorStatusCode(t, http.StatusNotFound, err)
}

job1 = model.Job{JobKey: jobKey1, Token: "secret1"}
job1 = model.Job{JobKey: jobKey1}
require.NoError(t, repo.Create(&job1).Do(ctx).Err())

job2 = model.Job{JobKey: jobKey2, Token: "secret2"}
job2 = model.Job{JobKey: jobKey2}
require.NoError(t, repo.Create(&job2).Do(ctx).Err())

job3 = model.Job{JobKey: jobKey3, Token: "secret3"}
job3 = model.Job{JobKey: jobKey3}
require.NoError(t, repo.Create(&job3).Do(ctx).Err())
etcdhelper.AssertKVsFromFile(t, client, "fixtures/job_purge_snapshot_005.txt", ignoredEtcdKeys)
}
Expand Down
18 changes: 8 additions & 10 deletions internal/pkg/service/stream/storage/metacleanup/metacleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,16 +266,8 @@ func (n *Node) cleanJob(ctx context.Context, job keboolaBridgeModel.Job) (err er
ctx, span := n.telemetry.Tracer().Start(ctx, "keboola.go.stream.model.cleanup.metadata.cleanJob")
defer span.End(&err)

var keboolaJob keboolaBridgeModel.Job
// Retrieve job on bridge level
if keboolaJob, err = n.keboolaBridgeRepository.Job().Get(job.JobKey).Do(ctx).ResultOrErr(); err != nil {
err = errors.PrefixErrorf(err, `cannot get keboola storage job "%s"`, job.JobKey)
n.logger.Error(ctx, err.Error())
return err, false
}

// Parse storage job ID from string
id64, err := strconv.ParseInt(string(keboolaJob.JobKey.JobID), 10, 64)
id64, err := strconv.ParseInt(string(job.JobKey.JobID), 10, 64)
if err != nil {
err = errors.PrefixErrorf(err, `cannot get keboola storage job "%s"`, job.JobKey)
n.logger.Error(ctx, err.Error())
Expand All @@ -288,9 +280,15 @@ func (n *Node) cleanJob(ctx context.Context, job keboolaBridgeModel.Job) (err er
return err, false
}

token, err := n.bridge.SinkToken(ctx, job.SinkKey)
if err != nil {
n.logger.Warnf(ctx, "cannot get token for sink, already deleted: %s", err.Error())
return nil, false
}

// Get job details from storage API
id := int(id64)
api := n.publicAPI.NewAuthorizedAPI(keboolaJob.Token, 1*time.Minute)
api := n.publicAPI.NewAuthorizedAPI(token.TokenString(), 1*time.Minute)
var jobStatus *keboola.StorageJob
if jobStatus, err = api.GetStorageJobRequest(keboola.StorageJobKey{ID: keboola.StorageJobID(id)}).Send(ctx); err != nil {
n.logger.Warnf(ctx, "cannot get information about storage job, probably already deleted: %s", err.Error())
Expand Down
Loading

0 comments on commit 41c0053

Please sign in to comment.