Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed RemoteWeavelet UpdateRoutingInfo bugs. #529

Merged
merged 2 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions internal/testdeployer/remoteweavelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,8 +516,6 @@ func TestUpdateExistingComponents(t *testing.T) {
}

func TestUpdateNilRoutingInfo(t *testing.T) {
t.Skip("TODO(mwhittaker): Make this test pass.")

ctx := context.Background()
d := deploy(t, ctx)
go d.env.Serve(d)
Expand Down Expand Up @@ -571,8 +569,6 @@ func TestUpdateRoutingInfoNotStartedComponent(t *testing.T) {
}

func TestUpdateLocalRoutingInfoWithNonLocal(t *testing.T) {
t.Skip("TODO(mwhittaker): Make this test pass.")

ctx := context.Background()
d := deploy(t, ctx)
go d.env.Serve(d)
Expand Down
60 changes: 42 additions & 18 deletions internal/weaver/remoteweavelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,42 +429,66 @@ func (w *RemoteWeavelet) UpdateComponents(req *protos.UpdateComponentsRequest) (

// UpdateRoutingInfo implements the conn.WeaverHandler interface.
func (w *RemoteWeavelet) UpdateRoutingInfo(req *protos.UpdateRoutingInfoRequest) (reply *protos.UpdateRoutingInfoReply, err error) {
if req.RoutingInfo == nil {
w.syslogger.Error("Failed to update nil routing info")
return nil, fmt.Errorf("nil RoutingInfo")
}
info := req.RoutingInfo

defer func() {
name := logging.ShortenComponent(req.RoutingInfo.Component)
routing := fmt.Sprint(req.RoutingInfo.Replicas)
if req.RoutingInfo.Local {
name := logging.ShortenComponent(info.Component)
routing := fmt.Sprint(info.Replicas)
if info.Local {
routing = "local"
}
if err != nil {
w.syslogger.Debug(fmt.Sprintf("Failed to update routing info for %q to %s", name, routing), "err", err)
w.syslogger.Error(fmt.Sprintf("Failed to update routing info for %q to %s", name, routing), "err", err)
} else {
w.syslogger.Debug(fmt.Sprintf("Updated routing info for %q to %s", name, routing))
}
}()

// Update load collector.
for _, c := range w.componentsByName {
// TODO(mwhittaker): Double check this.
if c.load != nil && req.RoutingInfo.Assignment != nil {
c.load.updateAssignment(req.RoutingInfo.Assignment)
}
}

c, err := w.getComponent(req.RoutingInfo.Component)
c, err := w.getComponent(info.Component)
if err != nil {
return nil, err
}

// Update resolver and balancer.
endpoints, err := parseEndpoints(req.RoutingInfo.Replicas, c.clientTLS)
// Record whether the component is local or remote. Currently, a component
// must always be local or always be remote. It cannot change.
c.local.TryWrite(info.Local)
if got, want := c.local.Read(), info.Local; got != want {
return nil, fmt.Errorf("RoutingInfo.Local for %q: got %t, want %t", info.Component, got, want)
}

// If the component is local, we don't have to update anything. The routing
// info shouldn't contain any replicas or assignment.
if info.Local {
if len(info.Replicas) > 0 {
w.syslogger.Error(fmt.Sprintf("Local routing info for %q has replicas: %v", info.Component, info.Replicas))
}
if info.Assignment != nil {
w.syslogger.Error(fmt.Sprintf("Local routing info for %q has assignment: %v", info.Component, info.Assignment))
}
return
}

// Update resolver.
endpoints, err := parseEndpoints(info.Replicas, c.clientTLS)
if err != nil {
return nil, err
}
c.resolver.update(endpoints)
c.balancer.update(req.RoutingInfo.Assignment)

// Update local.
c.local.TryWrite(req.RoutingInfo.Local)
// Update balancer.
if info.Assignment != nil {
c.balancer.update(info.Assignment)
}

// Update load collector.
if c.load != nil && info.Assignment != nil {
c.load.updateAssignment(info.Assignment)
}

return &protos.UpdateRoutingInfoReply{}, nil
}

Expand Down