Skip to content

Commit

Permalink
bugfix: fix lease used by pouchd may be expired
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Wan <zirenwan@gmail.com>
  • Loading branch information
HusterWan authored and fuweid committed Apr 30, 2019
1 parent 33a9d82 commit f297432
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 20 deletions.
64 changes: 63 additions & 1 deletion ctrd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/api/services/introspection/v1"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/snapshots"
"github.com/containerd/typeurl"
Expand All @@ -29,6 +30,9 @@ const (
PluginStatusOk = "ok"
// PluginStatusError means plugin status is error
PluginStatusError = "error"

// a id for lease used by ctrd
pouchLeaseID = "pouchd.lease"
)

// ErrGetCtrdClient is an error returned when failed to get a containerd grpc client from clients pool.
Expand Down Expand Up @@ -86,8 +90,13 @@ func NewClient(opts ...ClientOpt) (APIClient, error) {
insecureRegistries: copts.insecureRegistries,
}

lease, err := client.preparePouchdLease(copts.rpcAddr, copts.defaultns)
if err != nil {
return nil, fmt.Errorf("failed to prepare a lease for pouchd")
}

for i := 0; i < copts.grpcClientPoolCapacity; i++ {
cli, err := newWrapperClient(copts.rpcAddr, copts.defaultns, copts.maxStreamsClient)
cli, err := newWrapperClient(copts.rpcAddr, copts.defaultns, copts.maxStreamsClient, lease)
if err != nil {
return nil, fmt.Errorf("failed to create containerd client: %v", err)
}
Expand Down Expand Up @@ -375,3 +384,56 @@ func (c *Client) checkSnapshotsExist(snapshotter string) (existSnapshot bool, er
err = c.WalkSnapshot(context.Background(), snapshotter, fn)
return
}

// preparePouchdLease is to prepare a lease for pouch client to containerd.
func (c *Client) preparePouchdLease(rpcAddr, defaultns string) (*leases.Lease, error) {
var lease leases.Lease

options := []containerd.ClientOpt{
containerd.WithDefaultNamespace(defaultns),
}
cli, err := containerd.New(rpcAddr, options...)
if err != nil {
return nil, errors.Wrap(err, "failed to connect containerd")
}
defer cli.Close()

leaseSrv := cli.LeasesService()
leaseList, err := leaseSrv.List(context.TODO())
if err != nil {
return nil, err
}

for _, l := range leaseList {
if l.ID != pouchLeaseID {
continue
}

foundExpireLabel := false
for k := range l.Labels {
if k == "containerd.io/gc.expire" {
foundExpireLabel = true
break
}
}

// found a lease that matched the condition, just return
if !foundExpireLabel {
return &l, nil
}

// found a lease with id is pouchd.lease and has expire time,
// then just delete it and wait to recreate a new lease.
if err := leaseSrv.Delete(context.TODO(), l); err != nil {
return nil, err
}

}

// not found a matched lease, just create it
if lease, err = leaseSrv.Create(context.TODO(), leases.WithID(pouchLeaseID)); err != nil {
return nil, err
}

return &lease, nil
}
22 changes: 3 additions & 19 deletions ctrd/wrapper_client.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package ctrd

import (
"context"
"fmt"
"sync"

Expand All @@ -26,34 +25,19 @@ type WrapperClient struct {
streamQuota int
}

func newWrapperClient(rpcAddr string, defaultns string, maxStreamsClient int) (*WrapperClient, error) {
func newWrapperClient(rpcAddr string, defaultns string, maxStreamsClient int, lease *leases.Lease) (*WrapperClient, error) {
options := []containerd.ClientOpt{
containerd.WithDefaultNamespace(defaultns),
}

cli, err := containerd.New(rpcAddr, options...)
if err != nil {
return nil, errors.Wrap(err, "failed to connect containerd")
}
leaseSrv := cli.LeasesService()

// create a new lease or reuse the existed.
var lease leases.Lease

leases, err := leaseSrv.List(context.TODO())
if err != nil {
return nil, err
}
if len(leases) != 0 {
lease = leases[0]
} else {
if lease, err = leaseSrv.Create(context.TODO()); err != nil {
return nil, err
}
}

return &WrapperClient{
client: cli,
lease: &lease,
lease: lease,
streamQuota: maxStreamsClient,
}, nil
}
Expand Down

0 comments on commit f297432

Please sign in to comment.