diff --git a/ctrd/client.go b/ctrd/client.go index 2b27bf5ac..74166a385 100644 --- a/ctrd/client.go +++ b/ctrd/client.go @@ -14,6 +14,7 @@ import ( eventstypes "github.com/containerd/containerd/api/events" "github.com/containerd/containerd/api/services/introspection/v1" "github.com/containerd/containerd/events" + "github.com/containerd/containerd/leases" "github.com/containerd/containerd/plugin" "github.com/containerd/containerd/snapshots" "github.com/containerd/typeurl" @@ -29,6 +30,9 @@ const ( PluginStatusOk = "ok" // PluginStatusError means plugin status is error PluginStatusError = "error" + + // a id for lease used by ctrd + pouchLeaseID = "pouchd.lease" ) // ErrGetCtrdClient is an error returned when failed to get a containerd grpc client from clients pool. @@ -86,8 +90,13 @@ func NewClient(opts ...ClientOpt) (APIClient, error) { insecureRegistries: copts.insecureRegistries, } + lease, err := client.preparePouchdLease(copts.rpcAddr, copts.defaultns) + if err != nil { + return nil, fmt.Errorf("failed to prepare a lease for pouchd") + } + for i := 0; i < copts.grpcClientPoolCapacity; i++ { - cli, err := newWrapperClient(copts.rpcAddr, copts.defaultns, copts.maxStreamsClient) + cli, err := newWrapperClient(copts.rpcAddr, copts.defaultns, copts.maxStreamsClient, lease) if err != nil { return nil, fmt.Errorf("failed to create containerd client: %v", err) } @@ -375,3 +384,56 @@ func (c *Client) checkSnapshotsExist(snapshotter string) (existSnapshot bool, er err = c.WalkSnapshot(context.Background(), snapshotter, fn) return } + +// preparePouchdLease is to prepare a lease for pouch client to containerd. +func (c *Client) preparePouchdLease(rpcAddr, defaultns string) (*leases.Lease, error) { + var lease leases.Lease + + options := []containerd.ClientOpt{ + containerd.WithDefaultNamespace(defaultns), + } + cli, err := containerd.New(rpcAddr, options...) + if err != nil { + return nil, errors.Wrap(err, "failed to connect containerd") + } + defer cli.Close() + + leaseSrv := cli.LeasesService() + leaseList, err := leaseSrv.List(context.TODO()) + if err != nil { + return nil, err + } + + for _, l := range leaseList { + if l.ID != pouchLeaseID { + continue + } + + foundExpireLabel := false + for k := range l.Labels { + if k == "containerd.io/gc.expire" { + foundExpireLabel = true + break + } + } + + // found a lease that matched the condition, just return + if !foundExpireLabel { + return &l, nil + } + + // found a lease with id is pouchd.lease and has expire time, + // then just delete it and wait to recreate a new lease. + if err := leaseSrv.Delete(context.TODO(), l); err != nil { + return nil, err + } + + } + + // not found a matched lease, just create it + if lease, err = leaseSrv.Create(context.TODO(), leases.WithID(pouchLeaseID)); err != nil { + return nil, err + } + + return &lease, nil +} diff --git a/ctrd/wrapper_client.go b/ctrd/wrapper_client.go index 6d621ede3..18c7cb96f 100644 --- a/ctrd/wrapper_client.go +++ b/ctrd/wrapper_client.go @@ -1,7 +1,6 @@ package ctrd import ( - "context" "fmt" "sync" @@ -26,34 +25,19 @@ type WrapperClient struct { streamQuota int } -func newWrapperClient(rpcAddr string, defaultns string, maxStreamsClient int) (*WrapperClient, error) { +func newWrapperClient(rpcAddr string, defaultns string, maxStreamsClient int, lease *leases.Lease) (*WrapperClient, error) { options := []containerd.ClientOpt{ containerd.WithDefaultNamespace(defaultns), } + cli, err := containerd.New(rpcAddr, options...) if err != nil { return nil, errors.Wrap(err, "failed to connect containerd") } - leaseSrv := cli.LeasesService() - - // create a new lease or reuse the existed. - var lease leases.Lease - - leases, err := leaseSrv.List(context.TODO()) - if err != nil { - return nil, err - } - if len(leases) != 0 { - lease = leases[0] - } else { - if lease, err = leaseSrv.Create(context.TODO()); err != nil { - return nil, err - } - } return &WrapperClient{ client: cli, - lease: &lease, + lease: lease, streamQuota: maxStreamsClient, }, nil }