Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix double close in stream client #2693

Merged
merged 10 commits into from
Feb 15, 2024
26 changes: 16 additions & 10 deletions client/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,13 @@ func (r *rpcClient) newCodec(contentType string) (codec.NewCodec, error) {
return nil, fmt.Errorf("unsupported Content-Type: %s", contentType)
}

func (r *rpcClient) call(ctx context.Context, node *registry.Node, req Request, resp interface{}, opts CallOptions) error {
func (r *rpcClient) call(
ctx context.Context,
node *registry.Node,
req Request,
resp interface{},
opts CallOptions,
) error {
address := node.Address
logger := r.Options().Logger

Expand Down Expand Up @@ -292,12 +298,6 @@ func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request
r.codec = codec
}

releaseFunc := func(_ error) {
if err = c.Close(); err != nil {
logger.Log(log.ErrorLevel, err)
}
}

stream := &rpcStream{
id: id,
context: ctx,
Expand All @@ -308,7 +308,7 @@ func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request
closed: make(chan bool),
// signal the end of stream,
sendEOS: true,
release: releaseFunc,
release: func(_ error) {},
}

// wait for error response
Expand Down Expand Up @@ -490,7 +490,10 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
return merrors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error())
}

return merrors.InternalServerError("go.micro.client", "error getting next %s node: %s", service, err.Error())
return merrors.InternalServerError("go.micro.client",
"error getting next %s node: %s",
service,
err.Error())
}

// make the call
Expand Down Expand Up @@ -586,7 +589,10 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
return nil, merrors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error())
}

return nil, merrors.InternalServerError("go.micro.client", "error getting next %s node: %s", service, err.Error())
return nil, merrors.InternalServerError("go.micro.client",
"error getting next %s node: %s",
service,
err.Error())
}

stream, err := r.stream(ctx, node, request, callOpts)
Expand Down
Loading