Skip to content
This repository was archived by the owner on Aug 30, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 11 additions & 20 deletions internal/integration-tests/worker/workflow/datasync-workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,13 @@ import (
type Option func(*TestWorkflowEnv)

type TestWorkflowEnv struct {
neosyncApi *tcneosyncapi.NeosyncApiTestClient
redisconfig *neosync_redis.RedisConfig
fakeEELicense *testutil.FakeEELicense
pageLimit int
maxIterations int
postgresSchemaDrift bool
TestEnv *testsuite.TestWorkflowEnvironment
Redisclient redis.UniversalClient
neosyncApi *tcneosyncapi.NeosyncApiTestClient
redisconfig *neosync_redis.RedisConfig
fakeEELicense *testutil.FakeEELicense
pageLimit int
maxIterations int
TestEnv *testsuite.TestWorkflowEnvironment
Redisclient redis.UniversalClient
}

// WithRedis creates redis client with provided URL
Expand Down Expand Up @@ -78,12 +77,6 @@ func WithMaxIterations(maxIterations int) Option {
}
}

func WithPostgresSchemaDrift() Option {
return func(c *TestWorkflowEnv) {
c.postgresSchemaDrift = true
}
}

// NewTestDataSyncWorkflowEnv creates and configures a new test datasync workflow environment
func NewTestDataSyncWorkflowEnv(
t testing.TB,
Expand All @@ -94,11 +87,10 @@ func NewTestDataSyncWorkflowEnv(
t.Helper()

workflowEnv := &TestWorkflowEnv{
neosyncApi: neosyncApi,
fakeEELicense: testutil.NewFakeEELicense(),
pageLimit: 10,
maxIterations: 5,
postgresSchemaDrift: false,
neosyncApi: neosyncApi,
fakeEELicense: testutil.NewFakeEELicense(),
pageLimit: 10,
maxIterations: 5,
}

for _, opt := range opts {
Expand Down Expand Up @@ -134,7 +126,6 @@ func NewTestDataSyncWorkflowEnv(
workflowEnv.Redisclient,
false,
workflowEnv.pageLimit,
workflowEnv.postgresSchemaDrift,
)

schemainit_workflow_register.Register(
Expand Down
4 changes: 2 additions & 2 deletions internal/integration-tests/worker/workflow/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1423,7 +1423,7 @@ func test_postgres_schema_reconciliation(
})
destinationId := job.GetDestinations()[0].GetId()

testworkflow := NewTestDataSyncWorkflowEnv(t, neosyncApi, dbManagers, WithPostgresSchemaDrift(), WithMaxIterations(100), WithPageLimit(10000))
testworkflow := NewTestDataSyncWorkflowEnv(t, neosyncApi, dbManagers, WithMaxIterations(100), WithPageLimit(10000))
testworkflow.RequireActivitiesCompletedSuccessfully(t)
testworkflow.ExecuteTestDataSyncWorkflow(job.GetId())
require.Truef(t, testworkflow.TestEnv.IsWorkflowCompleted(), "Workflow did not complete. Test: schema_drift")
Expand Down Expand Up @@ -1470,7 +1470,7 @@ func test_postgres_schema_reconciliation(
updatedMappings = append(updatedMappings, pg_schema_init.GetAlteredSyncJobMappings(schema)...)
job = updateJobMappings(t, ctx, jobclient, job.GetId(), updatedMappings, job.GetSource())

testworkflow = NewTestDataSyncWorkflowEnv(t, neosyncApi, dbManagers, WithPostgresSchemaDrift(), WithMaxIterations(100), WithPageLimit(1000))
testworkflow = NewTestDataSyncWorkflowEnv(t, neosyncApi, dbManagers, WithMaxIterations(100), WithPageLimit(1000))
testworkflow.RequireActivitiesCompletedSuccessfully(t)
testworkflow.ExecuteTestDataSyncWorkflow(job.GetId())
require.Truef(t, testworkflow.TestEnv.IsWorkflowCompleted(), "Workflow did not complete. Test: postgres-schema-reconciliation-run-2")
Expand Down
2 changes: 0 additions & 2 deletions worker/internal/cmds/worker/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,14 +406,12 @@ func serve(ctx context.Context) error {
cascadelicense,
)

postgresSchemaDrift := false
datasync_workflow_register.Register(
w,
userclient, jobclient, connclient, transformerclient,
sqlmanager, cascadelicense, redisclient,
otelconfig.IsEnabled,
pageLimit,
postgresSchemaDrift,
)

if cascadelicense.IsValid() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,14 @@ import (
)

type Activity struct {
jobclient mgmtv1alpha1connect.JobServiceClient
postgresSchemaDrift bool
jobclient mgmtv1alpha1connect.JobServiceClient
}

func New(
jobclient mgmtv1alpha1connect.JobServiceClient,
postgresSchemaDrift bool,
) *Activity {
return &Activity{
jobclient: jobclient,
postgresSchemaDrift: postgresSchemaDrift,
jobclient: jobclient,
}
}

Expand All @@ -37,7 +34,6 @@ type RetrieveActivityOptionsResponse struct {
AccountId string
RequestedRecordCount *uint64
Destinations []*mgmtv1alpha1.JobDestination
PostgresSchemaDrift bool
}

func (a *Activity) RetrieveActivityOptions(
Expand Down Expand Up @@ -66,7 +62,6 @@ func (a *Activity) RetrieveActivityOptions(
AccountId: job.GetAccountId(),
RequestedRecordCount: getRequestedRecordCount(job),
Destinations: job.GetDestinations(),
PostgresSchemaDrift: a.postgresSchemaDrift,
}, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
)

func Test_New(t *testing.T) {
a := New(mgmtv1alpha1connect.NewMockJobServiceClient(t), false)
a := New(mgmtv1alpha1connect.NewMockJobServiceClient(t))
require.NotNil(t, a)
}

Expand Down Expand Up @@ -145,7 +145,7 @@ func Test_Activity(t *testing.T) {
srv := startHTTPServer(t, mux)

jobclient := mgmtv1alpha1connect.NewJobServiceClient(srv.Client(), srv.URL)
activity := New(jobclient, false)
activity := New(jobclient)
env.RegisterActivity(activity.RetrieveActivityOptions)

t.Run("sync activity options", func(t *testing.T) {
Expand Down
3 changes: 1 addition & 2 deletions worker/pkg/workflows/datasync/workflow/register/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ func Register(
redisclient redis.UniversalClient,
isOtelEnabled bool,
pageLimit int,
postgresSchemaDrift bool,
) {
genbenthosActivity := genbenthosconfigs_activity.New(
jobclient,
Expand All @@ -41,7 +40,7 @@ func Register(
pageLimit,
)

retrieveActivityOpts := syncactivityopts_activity.New(jobclient, postgresSchemaDrift)
retrieveActivityOpts := syncactivityopts_activity.New(jobclient)
accountStatusActivity := accountstatus_activity.New(userclient)
runPostTableSyncActivity := posttablesync_activity.New(jobclient, sqlmanager, connclient)
jobhookByTimingActivity := jobhooks_by_timing_activity.New(
Expand Down
13 changes: 4 additions & 9 deletions worker/pkg/workflows/datasync/workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ func executeWorkflow(wfctx workflow.Context, req *WorkflowRequest) (*WorkflowRes
req.JobId,
info.WorkflowExecution.ID,
actOptResp.Destinations,
actOptResp.PostgresSchemaDrift,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -487,7 +486,6 @@ func runSchemaInitWorkflowByDestination(
logger log.Logger,
accountId, jobId, jobRunId string,
destinations []*mgmtv1alpha1.JobDestination,
postgresSchemaDrift bool,
) error {
initSchemaActivityOptions := &workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
Expand All @@ -497,8 +495,8 @@ func runSchemaInitWorkflowByDestination(
HeartbeatTimeout: 1 * time.Minute,
}
for _, destination := range destinations {
// right now only mysql supports schema drift
schemaDrift := shouldUseSchemaDrift(destination, postgresSchemaDrift)
// right now only mysql and postgres supports schema drift
schemaDrift := shouldUseSchemaDrift(destination)
logger.Info(
"scheduling Schema Initialization workflow for execution.",
"destinationId",
Expand Down Expand Up @@ -534,11 +532,8 @@ func runSchemaInitWorkflowByDestination(
return nil
}

func shouldUseSchemaDrift(destination *mgmtv1alpha1.JobDestination, postgresSchemaDrift bool) bool {
if destination.GetOptions().GetPostgresOptions() != nil && postgresSchemaDrift {
return true
}
return destination.GetOptions().GetMysqlOptions() != nil
func shouldUseSchemaDrift(destination *mgmtv1alpha1.JobDestination) bool {
return destination.GetOptions().GetMysqlOptions() != nil || destination.GetOptions().GetPostgresOptions() != nil
}

func retrieveActivityOptions(
Expand Down