Skip to content

Commit

Permalink
chore: remove workspace ack in multitenant setup [PIPE-474] (#4066)
Browse files Browse the repository at this point in the history
remove workspace ack in multitenant setup since server serves all the workspaces not just workspaces from scheduler
  • Loading branch information
BonapartePC authored Nov 6, 2023
1 parent 680ff8c commit 338a8e9
Show file tree
Hide file tree
Showing 10 changed files with 4 additions and 765 deletions.
173 changes: 0 additions & 173 deletions app/cluster/configlifecycle_mock_test.go

This file was deleted.

45 changes: 1 addition & 44 deletions app/cluster/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@ package cluster
import (
"context"
"fmt"
"strings"
"sync"
"time"

"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/utils/types/servermode"
"github.com/rudderlabs/rudder-server/utils/types/workspace"
)

var (
Expand All @@ -21,21 +18,13 @@ var (

type ChangeEventProvider interface {
ServerMode(ctx context.Context) <-chan servermode.ChangeEvent
WorkspaceIDs(ctx context.Context) <-chan workspace.ChangeEvent
}

type lifecycle interface {
Start() error
Stop()
}

//go:generate mockgen -destination=./configlifecycle_mock_test.go -package=cluster_test -source=./dynamic.go configLifecycle
type configLifecycle interface {
Stop()
StartWithIDs(ctx context.Context, workspaces string)
WaitForConfig(ctx context.Context)
}

type Dynamic struct {
Provider ChangeEventProvider

Expand All @@ -54,14 +43,12 @@ type Dynamic struct {
SchemaForwarder lifecycle
Archiver lifecycle

currentMode servermode.Mode
currentWorkspaceIDs string
currentMode servermode.Mode

serverStartTimeStat stats.Measurement
serverStopTimeStat stats.Measurement
serverStartCountStat stats.Measurement
serverStopCountStat stats.Measurement
BackendConfig configLifecycle

logger logger.Logger

Expand All @@ -79,10 +66,6 @@ func (d *Dynamic) init() {
d.serverStopTimeStat = stats.Default.NewTaggedStat("cluster.server_stop_time", stats.TimerType, tag)
d.serverStartCountStat = stats.Default.NewTaggedStat("cluster.server_start_count", stats.CountType, tag)
d.serverStopCountStat = stats.Default.NewTaggedStat("cluster.server_stop_count", stats.CountType, tag)

if d.BackendConfig == nil {
d.BackendConfig = backendconfig.DefaultBackendConfig
}
}

func (d *Dynamic) Run(ctx context.Context) error {
Expand All @@ -92,7 +75,6 @@ func (d *Dynamic) Run(ctx context.Context) error {
defer cancel()

serverModeChan := d.Provider.ServerMode(ctx)
workspaceIDsChan := d.Provider.WorkspaceIDs(ctx)
if d.GatewayComponent {
d.currentMode = servermode.NormalMode
}
Expand Down Expand Up @@ -130,23 +112,6 @@ func (d *Dynamic) Run(ctx context.Context) error {
if err := req.Ack(ctx); err != nil {
return fmt.Errorf("ack mode change: %w", err)
}
case req := <-workspaceIDsChan:
if req.Err() != nil {
return req.Err()
}
ids := strings.Join(req.WorkspaceIDs(), ",")

d.logger.Infof("Got trigger to change workspaceIDs: %q", ids)
err := d.handleWorkspaceChange(ctx, ids)
if ackErr := req.Ack(ctx, err); ackErr != nil {
return fmt.Errorf("ack workspaceIDs change with error: %v: %w", err, ackErr)
}
if err != nil {
d.logger.Debugf("Could not handle workspaceIDs change: %v", err)
return err
}

d.logger.Debug("WorkspaceIDs changed")
}
}
}
Expand Down Expand Up @@ -224,14 +189,6 @@ func (d *Dynamic) stop() {
d.serverStopCountStat.Increment()
}

func (d *Dynamic) handleWorkspaceChange(ctx context.Context, workspaces string) error {
d.BackendConfig.Stop()
d.BackendConfig.StartWithIDs(ctx, workspaces)
d.currentWorkspaceIDs = workspaces
d.BackendConfig.WaitForConfig(ctx)
return nil
}

func (d *Dynamic) handleModeChange(newMode servermode.Mode) error {
if d.GatewayComponent {
d.logger.Info("Not transiting the server because this is only Gateway App")
Expand Down
65 changes: 2 additions & 63 deletions app/cluster/dynamic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,20 @@ import (
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-server/app/cluster"
"github.com/rudderlabs/rudder-server/utils/types/servermode"
"github.com/rudderlabs/rudder-server/utils/types/workspace"
)

type mockModeProvider struct {
modeCh chan servermode.ChangeEvent
workspaceCh chan workspace.ChangeEvent
modeCh chan servermode.ChangeEvent
}

func (m *mockModeProvider) ServerMode(context.Context) <-chan servermode.ChangeEvent {
return m.modeCh
}

func (m *mockModeProvider) WorkspaceIDs(_ context.Context) <-chan workspace.ChangeEvent {
return m.workspaceCh
}

func (m *mockModeProvider) sendMode(newMode servermode.ChangeEvent) {
m.modeCh <- newMode
}

func (m *mockModeProvider) sendWorkspaceIDs(ws workspace.ChangeEvent) {
m.workspaceCh <- ws
}

type mockLifecycle struct {
status string
callOrder uint64
Expand All @@ -65,8 +55,7 @@ func TestDynamicCluster(t *testing.T) {
Init()

provider := &mockModeProvider{
modeCh: make(chan servermode.ChangeEvent),
workspaceCh: make(chan workspace.ChangeEvent),
modeCh: make(chan servermode.ChangeEvent),
}

callCount := uint64(0)
Expand All @@ -82,9 +71,6 @@ func TestDynamicCluster(t *testing.T) {

processor := &mockLifecycle{status: "", callCount: &callCount}
router := &mockLifecycle{status: "", callCount: &callCount}

ctrl := gomock.NewController(t)
backendConfig := NewMockconfigLifecycle(ctrl)
dc := cluster.Dynamic{
Provider: provider,

Expand All @@ -99,8 +85,6 @@ func TestDynamicCluster(t *testing.T) {
Router: router,
SchemaForwarder: schemaForwarder,
Archiver: archiver,

BackendConfig: backendConfig,
}

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -205,51 +189,6 @@ func TestDynamicCluster(t *testing.T) {
require.True(t, errorDB.callOrder > router.callOrder)
})

t.Run("Update workspaceIDs", func(t *testing.T) {
chACK := make(chan struct{})
backendConfig.EXPECT().Stop().Times(1)
backendConfig.EXPECT().WaitForConfig(gomock.Any()).Times(1)
backendConfig.EXPECT().StartWithIDs(gomock.Any(), "a,b,c").Times(1)

provider.sendWorkspaceIDs(
workspace.NewWorkspacesRequest([]string{"a", "b", "c"},
func(_ context.Context, err error) error {
close(chACK)
require.NoError(t, err)
return nil
},
),
)

select {
case <-chACK:
case <-time.After(time.Second):
t.Fatal("Did not get acknowledgement within 1 second")
}
})

t.Run("Empty workspaces triggers a reload", func(t *testing.T) {
chACK := make(chan struct{})
backendConfig.EXPECT().Stop().Times(1)
backendConfig.EXPECT().StartWithIDs(gomock.Any(), gomock.Any()).Times(1)
backendConfig.EXPECT().WaitForConfig(gomock.Any()).Times(1)

provider.sendWorkspaceIDs(
workspace.NewWorkspacesRequest([]string{},
func(ctx context.Context, err error) error {
require.NoError(t, err)
close(chACK)
return nil
}),
)

select {
case <-chACK:
case <-time.After(time.Second):
t.Fatal("Did not get acknowledgement error within 1 second")
}
})

t.Run("Shutdown from Normal", func(t *testing.T) {
cancel()
<-wait
Expand Down
Loading

0 comments on commit 338a8e9

Please sign in to comment.