Skip to content

Commit

Permalink
chore: master pull
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Nov 3, 2023
2 parents 5c06f35 + 5cd8f7b commit 5c5a21b
Show file tree
Hide file tree
Showing 40 changed files with 2,740 additions and 1,421 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ replace (
go.mongodb.org/mongo-driver => go.mongodb.org/mongo-driver v1.12.1
golang.org/x/crypto => golang.org/x/crypto v0.13.0
golang.org/x/image => golang.org/x/image v0.12.0
golang.org/x/net => golang.org/x/net v0.15.0
golang.org/x/net => golang.org/x/net v0.17.0
golang.org/x/text => golang.org/x/text v0.13.0
gopkg.in/yaml.v2 => gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 => gopkg.in/yaml.v3 v3.0.1
Expand Down Expand Up @@ -289,7 +289,7 @@ require (
golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/term v0.12.0 // indirect
golang.org/x/term v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/tools v0.13.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
Expand Down
7 changes: 4 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1200,8 +1200,8 @@ golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc=
golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.15.0 h1:ugBLEUaxABaB5AJqW9enI0ACdci2RUd4eP51NTBvuJ8=
golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down Expand Up @@ -1331,8 +1331,9 @@ golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20220526004731-065cf7ba2467/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.12.0 h1:/ZfYdc3zq+q02Rv9vGqTeSItdzZTSNDmfTi0mBAuidU=
golang.org/x/term v0.12.0/go.mod h1:owVbMEjm3cBLCHdkQu9b1opXd4ETQWc3BhuQGKgXgvU=
golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek=
golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down
12 changes: 6 additions & 6 deletions warehouse/api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ import (
sqlmw "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
"github.com/rudderlabs/rudder-server/warehouse/internal/repo"
"github.com/rudderlabs/rudder-server/warehouse/jobs"
"github.com/rudderlabs/rudder-server/warehouse/multitenant"
"github.com/rudderlabs/rudder-server/warehouse/source"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

Expand Down Expand Up @@ -75,7 +75,7 @@ type Api struct {
bcConfig backendconfig.BackendConfig
tenantManager *multitenant.Manager
bcManager *bcm.BackendConfigManager
asyncManager *jobs.AsyncJobWh
sourceManager *source.Manager
stagingRepo *repo.StagingFiles
uploadRepo *repo.Uploads
schemaRepo *repo.WHSchema
Expand All @@ -100,7 +100,7 @@ func NewApi(
notifier *notifier.Notifier,
tenantManager *multitenant.Manager,
bcManager *bcm.BackendConfigManager,
asyncManager *jobs.AsyncJobWh,
sourceManager *source.Manager,
triggerStore *sync.Map,
) *Api {
a := &Api{
Expand All @@ -112,7 +112,7 @@ func NewApi(
statsFactory: statsFactory,
tenantManager: tenantManager,
bcManager: bcManager,
asyncManager: asyncManager,
sourceManager: sourceManager,
triggerStore: triggerStore,
stagingRepo: repo.NewStagingFiles(db),
uploadRepo: repo.NewUploads(db),
Expand Down Expand Up @@ -170,8 +170,8 @@ func (a *Api) addMasterEndpoints(ctx context.Context, r chi.Router) {
r.Post("/pending-events", a.logMiddleware(a.pendingEventsHandler))
r.Post("/trigger-upload", a.logMiddleware(a.triggerUploadHandler))

r.Post("/jobs", a.logMiddleware(a.asyncManager.InsertJobHandler)) // TODO: add degraded mode
r.Get("/jobs/status", a.logMiddleware(a.asyncManager.StatusJobHandler)) // TODO: add degraded mode
r.Post("/jobs", a.logMiddleware(a.sourceManager.InsertJobHandler)) // TODO: add degraded mode
r.Get("/jobs/status", a.logMiddleware(a.sourceManager.StatusJobHandler)) // TODO: add degraded mode

r.Get("/fetch-tables", a.logMiddleware(a.fetchTablesHandler)) // TODO: Remove this endpoint once sources change is released
})
Expand Down
13 changes: 7 additions & 6 deletions warehouse/api/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (

kithelper "github.com/rudderlabs/rudder-go-kit/testhelper"
sqlmiddleware "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper"
"github.com/rudderlabs/rudder-server/warehouse/jobs"
"github.com/rudderlabs/rudder-server/warehouse/multitenant"
"github.com/rudderlabs/rudder-server/warehouse/source"

"github.com/golang/mock/gomock"
"github.com/ory/dockertest/v3"
Expand Down Expand Up @@ -188,12 +188,12 @@ func TestHTTPApi(t *testing.T) {
err = n.Setup(ctx, pgResource.DBDsn)
require.NoError(t, err)

sourcesManager := jobs.New(
ctx,
sourcesManager := source.New(
c,
logger.NOP,
db,
n,
)
jobs.WithConfig(sourcesManager, config.New())

g, gCtx := errgroup.WithContext(ctx)
g.Go(func() error {
Expand All @@ -205,7 +205,7 @@ func TestHTTPApi(t *testing.T) {
return nil
})
g.Go(func() error {
return sourcesManager.Run()
return sourcesManager.Run(gCtx)
})

setupCh := make(chan struct{})
Expand Down Expand Up @@ -906,7 +906,8 @@ func TestHTTPApi(t *testing.T) {
"source_id": "test_source_id",
"destination_id": "test_destination_id",
"job_run_id": "test_source_job_run_id",
"task_run_id": "test_source_task_run_id"
"task_run_id": "test_source_task_run_id",
"async_job_type": "deletebyjobrunid"
}
`)))
require.NoError(t, err)
Expand Down
12 changes: 6 additions & 6 deletions warehouse/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ import (
"github.com/rudderlabs/rudder-server/utils/types"
whadmin "github.com/rudderlabs/rudder-server/warehouse/admin"
"github.com/rudderlabs/rudder-server/warehouse/archive"
"github.com/rudderlabs/rudder-server/warehouse/jobs"
"github.com/rudderlabs/rudder-server/warehouse/multitenant"
"github.com/rudderlabs/rudder-server/warehouse/source"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

Expand All @@ -67,7 +67,7 @@ type App struct {
constraintsManager *constraints.Manager
encodingFactory *encoding.Factory
fileManagerFactory filemanager.Factory
sourcesManager *jobs.AsyncJobWh
sourcesManager *source.Manager
admin *whadmin.Admin
triggerStore *sync.Map
createUploadAlways *atomic.Bool
Expand Down Expand Up @@ -174,12 +174,12 @@ func (a *App) Setup(ctx context.Context) error {
return fmt.Errorf("cannot setup notifier: %w", err)
}

a.sourcesManager = jobs.New(
ctx,
a.sourcesManager = source.New(
a.conf,
a.logger,
a.db,
a.notifier,
)
jobs.WithConfig(a.sourcesManager, a.conf)

a.grpcServer, err = api.NewGRPCServer(
a.conf,
Expand Down Expand Up @@ -413,7 +413,7 @@ func (a *App) Run(ctx context.Context) error {
return nil
})
g.Go(misc.WithBugsnagForWarehouse(func() error {
return a.sourcesManager.Run()
return a.sourcesManager.Run(gCtx)
}))
}

Expand Down
10 changes: 5 additions & 5 deletions warehouse/integrations/bigquery/bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func TestIntegration(t *testing.T) {
loadFilesEventsMap whth.EventsCountMap
tableUploadsEventsMap whth.EventsCountMap
warehouseEventsMap whth.EventsCountMap
asyncJob bool
sourceJob bool
skipModifiedEvents bool
prerequisite func(context.Context, testing.TB, *bigquery.Client)
enableMerge bool
Expand Down Expand Up @@ -177,7 +177,7 @@ func TestIntegration(t *testing.T) {
stagingFilePrefix: "testdata/upload-job-merge-mode",
},
{
name: "Async Job",
name: "Source Job",
writeKey: sourcesWriteKey,
sourceID: sourcesSourceID,
destinationID: sourcesDestinationID,
Expand All @@ -192,7 +192,7 @@ func TestIntegration(t *testing.T) {
loadFilesEventsMap: whth.SourcesLoadFilesEventsMap(),
tableUploadsEventsMap: whth.SourcesTableUploadsEventsMap(),
warehouseEventsMap: whth.SourcesWarehouseEventsMap(),
asyncJob: true,
sourceJob: true,
enableMerge: false,
prerequisite: func(ctx context.Context, t testing.TB, db *bigquery.Client) {
t.Helper()
Expand Down Expand Up @@ -347,7 +347,7 @@ func TestIntegration(t *testing.T) {
LoadFilesEventsMap: tc.loadFilesEventsMap,
TableUploadsEventsMap: tc.tableUploadsEventsMap,
WarehouseEventsMap: tc.warehouseEventsMap,
AsyncJob: tc.asyncJob,
SourceJob: tc.sourceJob,
Config: conf,
WorkspaceID: workspaceID,
DestinationType: destType,
Expand All @@ -359,7 +359,7 @@ func TestIntegration(t *testing.T) {
StagingFilePath: tc.stagingFilePrefix + ".staging-2.json",
UserID: whth.GetUserId(destType),
}
if tc.asyncJob {
if tc.sourceJob {
ts2.UserID = ts1.UserID
}
ts2.VerifyEvents(t)
Expand Down
10 changes: 5 additions & 5 deletions warehouse/integrations/mssql/mssql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func TestIntegration(t *testing.T) {
loadFilesEventsMap testhelper.EventsCountMap
tableUploadsEventsMap testhelper.EventsCountMap
warehouseEventsMap testhelper.EventsCountMap
asyncJob bool
sourceJob bool
stagingFilePrefix string
}{
{
Expand All @@ -174,7 +174,7 @@ func TestIntegration(t *testing.T) {
stagingFilePrefix: "testdata/upload-job",
},
{
name: "Async Job",
name: "Source Job",
writeKey: sourcesWriteKey,
schema: sourcesNamespace,
tables: []string{"tracks", "google_sheet"},
Expand All @@ -184,7 +184,7 @@ func TestIntegration(t *testing.T) {
loadFilesEventsMap: testhelper.SourcesLoadFilesEventsMap(),
tableUploadsEventsMap: testhelper.SourcesTableUploadsEventsMap(),
warehouseEventsMap: testhelper.SourcesWarehouseEventsMap(),
asyncJob: true,
sourceJob: true,
stagingFilePrefix: "testdata/sources-job",
},
}
Expand Down Expand Up @@ -245,7 +245,7 @@ func TestIntegration(t *testing.T) {
LoadFilesEventsMap: tc.loadFilesEventsMap,
TableUploadsEventsMap: tc.tableUploadsEventsMap,
WarehouseEventsMap: tc.warehouseEventsMap,
AsyncJob: tc.asyncJob,
SourceJob: tc.sourceJob,
Config: conf,
WorkspaceID: workspaceID,
DestinationType: destType,
Expand All @@ -257,7 +257,7 @@ func TestIntegration(t *testing.T) {
StagingFilePath: tc.stagingFilePrefix + ".staging-2.json",
UserID: testhelper.GetUserId(destType),
}
if tc.asyncJob {
if tc.sourceJob {
ts2.UserID = ts1.UserID
}
ts2.VerifyEvents(t)
Expand Down
10 changes: 5 additions & 5 deletions warehouse/integrations/postgres/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func TestIntegration(t *testing.T) {
loadFilesEventsMap whth.EventsCountMap
tableUploadsEventsMap whth.EventsCountMap
warehouseEventsMap whth.EventsCountMap
asyncJob bool
sourceJob bool
stagingFilePrefix string
}{
{
Expand All @@ -204,7 +204,7 @@ func TestIntegration(t *testing.T) {
stagingFilePrefix: "testdata/upload-job",
},
{
name: "Async Job",
name: "Source Job",
writeKey: sourcesWriteKey,
schema: sourcesNamespace,
tables: []string{"tracks", "google_sheet"},
Expand All @@ -214,7 +214,7 @@ func TestIntegration(t *testing.T) {
loadFilesEventsMap: whth.SourcesLoadFilesEventsMap(),
tableUploadsEventsMap: whth.SourcesTableUploadsEventsMap(),
warehouseEventsMap: whth.SourcesWarehouseEventsMap(),
asyncJob: true,
sourceJob: true,
stagingFilePrefix: "testdata/sources-job",
},
}
Expand Down Expand Up @@ -275,7 +275,7 @@ func TestIntegration(t *testing.T) {
LoadFilesEventsMap: tc.loadFilesEventsMap,
TableUploadsEventsMap: tc.tableUploadsEventsMap,
WarehouseEventsMap: tc.warehouseEventsMap,
AsyncJob: tc.asyncJob,
SourceJob: tc.sourceJob,
Config: conf,
WorkspaceID: workspaceID,
DestinationType: destType,
Expand All @@ -287,7 +287,7 @@ func TestIntegration(t *testing.T) {
StagingFilePath: tc.stagingFilePrefix + ".staging-2.json",
UserID: whth.GetUserId(destType),
}
if tc.asyncJob {
if tc.sourceJob {
ts2.UserID = ts1.UserID
}
ts2.VerifyEvents(t)
Expand Down
10 changes: 5 additions & 5 deletions warehouse/integrations/redshift/redshift_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func TestIntegration(t *testing.T) {
loadFilesEventsMap whth.EventsCountMap
tableUploadsEventsMap whth.EventsCountMap
warehouseEventsMap whth.EventsCountMap
asyncJob bool
sourceJob bool
stagingFilePrefix string
}{
{
Expand All @@ -198,7 +198,7 @@ func TestIntegration(t *testing.T) {
stagingFilePrefix: "testdata/upload-job",
},
{
name: "Async Job",
name: "Source Job",
writeKey: sourcesWriteKey,
schema: sourcesNamespace,
tables: []string{"tracks", "google_sheet"},
Expand All @@ -208,7 +208,7 @@ func TestIntegration(t *testing.T) {
loadFilesEventsMap: whth.SourcesLoadFilesEventsMap(),
tableUploadsEventsMap: whth.SourcesTableUploadsEventsMap(),
warehouseEventsMap: whth.SourcesWarehouseEventsMap(),
asyncJob: true,
sourceJob: true,
stagingFilePrefix: "testdata/sources-job",
},
}
Expand Down Expand Up @@ -280,7 +280,7 @@ func TestIntegration(t *testing.T) {
LoadFilesEventsMap: tc.loadFilesEventsMap,
TableUploadsEventsMap: tc.tableUploadsEventsMap,
WarehouseEventsMap: tc.warehouseEventsMap,
AsyncJob: tc.asyncJob,
SourceJob: tc.sourceJob,
Config: conf,
WorkspaceID: workspaceID,
DestinationType: destType,
Expand All @@ -292,7 +292,7 @@ func TestIntegration(t *testing.T) {
StagingFilePath: tc.stagingFilePrefix + ".staging-1.json",
UserID: whth.GetUserId(destType),
}
if tc.asyncJob {
if tc.sourceJob {
ts2.UserID = ts1.UserID
}
ts2.VerifyEvents(t)
Expand Down
Loading

0 comments on commit 5c5a21b

Please sign in to comment.