Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Continue to push assignment updates to nodes that were removed from the list #419

Merged
merged 2 commits into from
Dec 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 33 additions & 12 deletions coordinator/impl/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,16 @@ type coordinator struct {

shardControllers map[int64]ShardController
nodeControllers map[string]NodeController
clusterStatus *model.ClusterStatus
assignments *proto.ShardAssignments
metadataVersion Version
rpc RpcProvider
log *slog.Logger

// Draining nodes are nodes that were removed from the
// nodes list. We keep sending them assignments updates
// because they might be still reachable to clients.
drainingNodes map[string]NodeController
clusterStatus *model.ClusterStatus
assignments *proto.ShardAssignments
metadataVersion Version
rpc RpcProvider
log *slog.Logger

ctx context.Context
cancel context.CancelFunc
Expand All @@ -98,6 +103,7 @@ func NewCoordinator(metadataProvider MetadataProvider,
clusterConfigRefreshTime: clusterConfigRefreshTime,
shardControllers: make(map[int64]ShardController),
nodeControllers: make(map[string]NodeController),
drainingNodes: make(map[string]NodeController),
rpc: rpc,
log: slog.With(
slog.String("component", "coordinator"),
Expand Down Expand Up @@ -222,11 +228,22 @@ func (c *coordinator) Close() error {
for _, nc := range c.nodeControllers {
err = multierr.Append(err, nc.Close())
}

for _, nc := range c.drainingNodes {
err = multierr.Append(err, nc.Close())
}
return err
}

func (c *coordinator) NodeBecameUnavailable(node model.ServerAddress) {
c.Lock()

if nc, ok := c.drainingNodes[node.Internal]; ok {
// The draining node became unavailable. Let's remove it
delete(c.drainingNodes, node.Internal)
_ = nc.Close()
}

ctrls := make(map[int64]ShardController)
for k, sc := range c.shardControllers {
ctrls[k] = sc
Expand Down Expand Up @@ -418,9 +435,7 @@ func (c *coordinator) handleClusterConfigUpdated() error {
slog.Any("metadataVersion", c.metadataVersion),
)

if err = c.checkClusterNodeChanges(newClusterConfig); err != nil {
return err
}
c.checkClusterNodeChanges(newClusterConfig)

clusterStatus, shardsToAdd, shardsToDelete := applyClusterChanges(&newClusterConfig, c.clusterStatus)

Expand Down Expand Up @@ -484,13 +499,19 @@ func (c *coordinator) rebalanceCluster() error {
return nil
}

func (c *coordinator) checkClusterNodeChanges(newClusterConfig model.ClusterConfig) (err error) {
func (c *coordinator) checkClusterNodeChanges(newClusterConfig model.ClusterConfig) {
// Check for nodes to add
for _, sa := range newClusterConfig.Servers {
if _, ok := c.nodeControllers[sa.Internal]; !ok {
// The node is present in the config, though we don't know it yet,
// therefore it must be a newly added node
c.log.Info("Detected new node", slog.Any("addr", sa))
if nc, ok := c.drainingNodes[sa.Internal]; ok {
// If there were any controller for a draining node, close it
// and recreate it as a new node
_ = nc.Close()
delete(c.drainingNodes, sa.Internal)
}
c.nodeControllers[sa.Internal] = NewNodeController(sa, c, c, c.rpc)
}
}
Expand All @@ -508,12 +529,12 @@ func (c *coordinator) checkClusterNodeChanges(newClusterConfig model.ClusterConf

if !found {
c.log.Info("Detected a removed node", slog.Any("addr", ia))
err = multierr.Append(err, nc.Close())
// Moved the node
delete(c.nodeControllers, ia)
nc.SetStatus(Draining)
c.drainingNodes[ia] = nc
}
}

return err
}

func (c *coordinator) getNodeControllers() map[string]NodeController {
Expand Down
83 changes: 83 additions & 0 deletions coordinator/impl/coordinator_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,89 @@ func TestCoordinator_AddRemoveNodes(t *testing.T) {
}
}

func TestCoordinator_ShrinkCluster(t *testing.T) {
s1, sa1 := newServer(t)
s2, sa2 := newServer(t)
s3, sa3 := newServer(t)
s4, sa4 := newServer(t)
servers := map[model.ServerAddress]*server.Server{
sa1: s1,
sa2: s2,
sa3: s3,
sa4: s4,
}

metadataProvider := NewMetadataProviderMemory()
clusterConfig := model.ClusterConfig{
Namespaces: []model.NamespaceConfig{{
Name: "my-ns-1",
ReplicationFactor: 1,
InitialShardCount: 1,
}},
Servers: []model.ServerAddress{sa1, sa2, sa3, sa4},
}
clientPool := common.NewClientPool()

configProvider := func() (model.ClusterConfig, error) {
return clusterConfig, nil
}

c, err := NewCoordinator(metadataProvider, configProvider, 1*time.Second, NewRpcProvider(clientPool))
assert.NoError(t, err)

// Wait for all shards to be ready
assert.Eventually(t, func() bool {
for _, ns := range c.ClusterStatus().Namespaces {
for _, shard := range ns.Shards {
if shard.Status != model.ShardStatusSteadyState {
return false
}
}
}
return true
}, 10*time.Second, 10*time.Millisecond)

assert.Equal(t, 4, len(c.(*coordinator).getNodeControllers()))

shard0 := c.ClusterStatus().Namespaces["my-ns-1"].Shards[0]
assert.Equal(t, sa1.Public, shard0.Leader.Public)

// Remove s1
clusterConfig.Servers = clusterConfig.Servers[1:]

assert.Eventually(t, func() bool {
return len(c.(*coordinator).getNodeControllers()) == 3
}, 10*time.Second, 10*time.Millisecond)

// Wait for all shards to be ready
assert.Eventually(t, func() bool {
for _, ns := range c.ClusterStatus().Namespaces {
for _, shard := range ns.Shards {
return shard.Term > 0
}
}
return true
}, 10*time.Second, 10*time.Millisecond)

// S1 should receive the updated leader info
shard0 = c.ClusterStatus().Namespaces["my-ns-1"].Shards[0]
assert.NotEqual(t, sa1.Public, shard0.Leader.Public)

client, err := oxia.NewSyncClient(sa1.Public, oxia.WithNamespace("my-ns-1"))
assert.NoError(t, err)

_, err = client.Put(context.Background(), "test", []byte("value"))
assert.NoError(t, err)

assert.NoError(t, client.Close())
assert.NoError(t, c.Close())
assert.NoError(t, clientPool.Close())

for _, serverObj := range servers {
assert.NoError(t, serverObj.Close())
}
}

func checkServerLists(t *testing.T, expected, actual []model.ServerAddress) {
t.Helper()

Expand Down
24 changes: 24 additions & 0 deletions coordinator/impl/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type NodeStatus uint32
const (
Running NodeStatus = iota
NotRunning
Draining //
)

const (
Expand All @@ -50,6 +51,8 @@ type NodeController interface {
io.Closer

Status() NodeStatus

SetStatus(status NodeStatus)
}

type nodeController struct {
Expand Down Expand Up @@ -138,11 +141,25 @@ func (n *nodeController) Status() NodeStatus {
return n.status
}

func (n *nodeController) SetStatus(status NodeStatus) {
n.Lock()
defer n.Unlock()
n.status = status
n.log.Info("Changed status", slog.Any("status", status))
}

func (n *nodeController) healthCheckWithRetries() {
backOff := common.NewBackOffWithInitialInterval(n.ctx, n.initialRetryBackoff)
_ = backoff.RetryNotify(func() error {
return n.healthCheck(backOff)
}, backOff, func(err error, duration time.Duration) {
if n.Status() == Draining {
// Stop the health check and close
_ = n.Close()
n.nodeAvailabilityListener.NodeBecameUnavailable(n.addr)
return
}

n.log.Warn(
"Storage node health check failed",
slog.Any("error", err),
Expand Down Expand Up @@ -244,6 +261,13 @@ func (n *nodeController) sendAssignmentsUpdatesWithRetries() {
_ = backoff.RetryNotify(func() error {
return n.sendAssignmentsUpdates(backOff)
}, backOff, func(err error, duration time.Duration) {
if n.Status() == Draining {
// Stop the health check and close
_ = n.Close()
n.nodeAvailabilityListener.NodeBecameUnavailable(n.addr)
return
}

n.log.Warn(
"Failed to send assignments updates to storage node",
slog.Duration("retry-after", duration),
Expand Down
Loading