Skip to content

Commit

Permalink
Merge a7ce39f into 1d099ab
Browse files Browse the repository at this point in the history
  • Loading branch information
igotcha authored Oct 10, 2023
2 parents 1d099ab + a7ce39f commit 14ff177
Showing 1 changed file with 21 additions and 4 deletions.
25 changes: 21 additions & 4 deletions database/kv/etcd/v3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,11 @@ import (
"strings"
"sync"
"time"
)

import (
perrors "github.com/pkg/errors"

"go.etcd.io/etcd/client/v3"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"

"google.golang.org/grpc"
)

Expand Down Expand Up @@ -420,10 +417,30 @@ func (c *Client) keepAliveKV(k string, v string) error {
return perrors.New("keep alive lease")
}

// listen keepAlive to avoid useless warning:
// 'lease keepalive response queue is full; dropping response send'
go c.listenKeepAliveRsp(k, keepAlive)

_, err = rawClient.Put(c.ctx, k, v, clientv3.WithLease(lease.ID))
return perrors.WithMessage(err, "put k/v with lease")
}

// listenKeepAliveRsp listens to `keepAliveRspCh` channel.
func (c *Client) listenKeepAliveRsp(k string, keepAliveRspCh <-chan *clientv3.LeaseKeepAliveResponse) {
for {
select {
case <-c.ctx.Done():
log.Printf("listenKeepAliveRsp canceled: %v: %s", c.ctx.Err(), k)
return
case _, ok := <-keepAliveRspCh:
if !ok {
log.Printf("listenKeepAliveRsp canceled: unexpected lease expired: %s", k)
return
}
}
}
}

// Done return exit chan
func (c *Client) Done() <-chan struct{} {
return c.exit
Expand Down

0 comments on commit 14ff177

Please sign in to comment.