Skip to content

Commit

Permalink
Merge pull request #106 from fangyincheng/services
Browse files Browse the repository at this point in the history
Fix:Lock bug
  • Loading branch information
AlexStocks authored Jun 27, 2019
2 parents 10d0f0f + 4833a6f commit 569562c
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 23 deletions.
24 changes: 6 additions & 18 deletions protocol/dubbo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,13 @@ type Client struct {
pool *gettyRPCClientPool
sequence atomic.Uint64

pendingLock sync.RWMutex
pendingResponses map[SequenceType]*PendingResponse
pendingResponses *sync.Map
}

func NewClient() *Client {

c := &Client{
pendingResponses: make(map[SequenceType]*PendingResponse),
pendingResponses: new(sync.Map),
conf: *clientConf,
}
c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL))
Expand Down Expand Up @@ -201,13 +200,6 @@ func (c *Client) AsyncCall(addr string, svcUrl common.URL, method string, args i
return perrors.WithStack(c.call(CT_TwoWay, addr, svcUrl, method, args, reply, callback, copts))
}

func (c *Client) GetPendingResponse(seq SequenceType) *PendingResponse {
c.pendingLock.RLock()
defer c.pendingLock.RUnlock()

return c.pendingResponses[SequenceType(seq)]
}

func (c *Client) call(ct CallType, addr string, svcUrl common.URL, method string,
args, reply interface{}, callback AsyncCallback, opts CallOptions) error {

Expand Down Expand Up @@ -330,20 +322,16 @@ func (c *Client) transfer(session getty.Session, pkg *DubboPackage,
}

func (c *Client) addPendingResponse(pr *PendingResponse) {
c.pendingLock.Lock()
defer c.pendingLock.Unlock()
c.pendingResponses[SequenceType(pr.seq)] = pr
c.pendingResponses.Store(SequenceType(pr.seq), pr)
}

func (c *Client) removePendingResponse(seq SequenceType) *PendingResponse {
c.pendingLock.Lock()
defer c.pendingLock.Unlock()
if c.pendingResponses == nil {
return nil
}
if presp, ok := c.pendingResponses[seq]; ok {
delete(c.pendingResponses, seq)
return presp
if presp, ok := c.pendingResponses.Load(seq); ok {
c.pendingResponses.Delete(seq)
return presp.(*PendingResponse)
}
return nil
}
6 changes: 3 additions & 3 deletions protocol/dubbo/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ func (p *DubboPackage) Unmarshal(buf *bytes.Buffer, opts ...interface{}) error {
return perrors.Errorf("opts[0] is not of type *Client")
}

pendingRsp := client.GetPendingResponse(SequenceType(p.Header.ID))
if pendingRsp == nil {
pendingRsp, ok := client.pendingResponses.Load(SequenceType(p.Header.ID))
if !ok {
return perrors.Errorf("client.GetPendingResponse(%v) = nil", p.Header.ID)
} else {
p.Body = &hessian.Response{RspObj: pendingRsp.reply}
p.Body = &hessian.Response{RspObj: pendingRsp.(*PendingResponse).reply}
}
}

Expand Down
2 changes: 0 additions & 2 deletions protocol/dubbo/readwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ func NewRpcClientPackageHandler(client *Client) *RpcClientPackageHandler {
}

func (p *RpcClientPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
p.client.pendingLock.RLock()
defer p.client.pendingLock.RUnlock()
pkg := &DubboPackage{}

buf := bytes.NewBuffer(data)
Expand Down

0 comments on commit 569562c

Please sign in to comment.