Skip to content

Commit

Permalink
ctrd,cli,daemon,pkg: update to adapt with containerd@v1.2.5
Browse files Browse the repository at this point in the history
The containerd has been improved a lot in v1.2, which makes the service
stable before. Based on this, we need to do the upgrade.

Basically, both the contaienrd and shim API interfaces are stable so
that we don't need to worry about the remote API calls. However, the
package level interface has been changed a lot. The commit is used to
align with containerd@v1.2.5.

1. events part:
   using exchange event helper instead of raw grpc conn

2. image part:
  a. import:
     new Import interface makes the import easier without Importer.

  b. commit:
     content helper has been upgraded with less arguments for caller.
     and `diff` package separates the Applier and Comparer interfaces.
     The changes can make the code clear in PouchContainer

3. lease:
  containerd client doesn't provide ListLeases interface directly.
  PouchContainer needs to get LeasesService and make the call by itself.

4. container IO:
  remove codes which comes from containerd. The code was used to align
  with containerd v1.2, which can make upgrade work easier. For now, we
  can remove this.

Signed-off-by: Wei Fu <fuweid89@gmail.com>
  • Loading branch information
fuweid authored and Ace-Tang committed Mar 14, 2019
1 parent ab32f3d commit ffeaa43
Show file tree
Hide file tree
Showing 20 changed files with 126 additions and 255 deletions.
2 changes: 1 addition & 1 deletion cli/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/alibaba/pouch/pkg/jsonstream"
"github.com/alibaba/pouch/pkg/reference"

"github.com/containerd/containerd/progress"
"github.com/containerd/containerd/pkg/progress"
"github.com/spf13/cobra"
"golang.org/x/crypto/ssh/terminal"
)
Expand Down
28 changes: 19 additions & 9 deletions ctrd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/containerd/containerd"
eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/api/services/introspection/v1"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/snapshots"
"github.com/containerd/typeurl"
Expand Down Expand Up @@ -235,22 +236,31 @@ func (c *Client) Plugins(ctx context.Context, filters []string) ([]Plugin, error
// collectContainerdEvents collects events generated by containerd.
func (c *Client) collectContainerdEvents() {
ctx := context.Background()
topicsToHandle := []string{TaskOOMEventTopic, TaskExitEventTopic}

// set filters for subscribe containerd events,
// now we only care about task and container events.
ef := []string{"topic~=task.*", "topic~=container.*"}
events, err := c.Events(ctx, ef...)
// get client
wrapperCli, err := c.Get(ctx)
if err != nil {
logrus.Errorf("failed to connect containerd event service: %v", err)
logrus.Errorf("failed to get a containerd grpc client: %v", err)
return
}
eventsClient := wrapperCli.client.EventService()

// set filters for subscribe containerd events,
// now we only care about task and container events.
ef := []string{"topic~=task.*", "topic~=container.*"}
topicsToHandle := []string{TaskOOMEventTopic, TaskExitEventTopic}

eventCh, errCh := eventsClient.Subscribe(ctx, ef...)

for {
// TODO(ziren):need reconnect the event service
e, err := events.Recv()
if err != nil {
logrus.Errorf("failed to receive event: %v", err)
var e *events.Envelope
select {
case e = <-eventCh:
case err := <-errCh:
if err != nil {
logrus.Errorf("failed to receive event: %v", err)
}
return
}

Expand Down
32 changes: 16 additions & 16 deletions ctrd/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/linux/runctypes"
"github.com/containerd/containerd/oci"
"github.com/containerd/containerd/runtime/linux/runctypes"
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -112,7 +112,7 @@ func (c *Client) execContainer(ctx context.Context, process *Process) error {
},
).Debugf("creating cio (withStdin=%v, withTerminal=%v)", withStdin, withTerminal)

fifoset, err := containerio.NewCioFIFOSet(execID, withStdin, withTerminal)
fifoset, err := containerio.NewFIFOSet(execID, withStdin, withTerminal)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -362,7 +362,7 @@ func (c *Client) destroyContainer(ctx context.Context, id string, timeout int64)
return nil, fmt.Errorf("failed to get a containerd grpc client: %v", err)
}

ctx = leases.WithLease(ctx, wrapperCli.lease.ID())
ctx = leases.WithLease(ctx, wrapperCli.lease.ID)

if !c.lock.TrylockWithRetry(ctx, id) {
return nil, errtypes.ErrLockfailed
Expand Down Expand Up @@ -618,7 +618,7 @@ func (c *Client) createTask(ctx context.Context, id, checkpointDir string, conta
task, err := container.NewTask(ctx, func(_ string) (cio.IO, error) {
logrus.WithField("container", cntrID).Debugf("creating cio (withStdin=%v, withTerminal=%v)", withStdin, withTerminal)

fifoset, err := containerio.NewCioFIFOSet(execID, withStdin, withTerminal)
fifoset, err := containerio.NewFIFOSet(execID, withStdin, withTerminal)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -730,7 +730,7 @@ func (c *Client) waitContainer(ctx context.Context, id string) (types.ContainerW
return types.ContainerWaitOKBody{}, fmt.Errorf("failed to get a containerd grpc client: %v", err)
}

ctx = leases.WithLease(ctx, wrapperCli.lease.ID())
ctx = leases.WithLease(ctx, wrapperCli.lease.ID)

waitExit := func() *Message {
return c.ProbeContainer(ctx, id, -1*time.Second)
Expand Down Expand Up @@ -769,7 +769,7 @@ func (c *Client) CreateCheckpoint(ctx context.Context, id string, checkpointDir

var opts []containerd.CheckpointTaskOpts
if exit {
opts = append(opts, containerd.WithExit)
opts = append(opts, withExitShimV1CheckpointTaskOpts())
}
checkpoint, err := pack.task.Checkpoint(ctx, opts...)
if err != nil {
Expand All @@ -783,7 +783,7 @@ func (c *Client) CreateCheckpoint(ctx context.Context, id string, checkpointDir
}

func applyCheckpointImage(ctx context.Context, client *containerd.Client, checkpoint containerd.Image, checkpointDir string) error {
b, err := content.ReadBlob(ctx, client.ContentStore(), checkpoint.Target().Digest)
b, err := content.ReadBlob(ctx, client.ContentStore(), checkpoint.Target())
if err != nil {
return errors.Wrapf(err, "failed to retrieve checkpoint data")
}
Expand All @@ -803,7 +803,7 @@ func applyCheckpointImage(ctx context.Context, client *containerd.Client, checkp
return errors.Wrapf(err, "invalid checkpoint")
}

rat, err := client.ContentStore().ReaderAt(ctx, cpDesc.Digest)
rat, err := client.ContentStore().ReaderAt(ctx, *cpDesc)
if err != nil {
return errors.Wrapf(err, "failed to get checkpoint reader")
}
Expand All @@ -817,7 +817,7 @@ func applyCheckpointImage(ctx context.Context, client *containerd.Client, checkp
}

func writeContent(ctx context.Context, mediaType, ref string, r io.Reader, client *containerd.Client) (*containerdtypes.Descriptor, error) {
writer, err := client.ContentStore().Writer(ctx, ref, 0, "")
writer, err := client.ContentStore().Writer(ctx, content.WithRef(ref))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -866,10 +866,10 @@ func withCheckpointOpt(checkpoint *containerdtypes.Descriptor) containerd.NewTas
}

// InitStdio allows caller to handle any initialize job.
type InitStdio func(dio *containerio.DirectIO) (cio.IO, error)
type InitStdio func(dio *cio.DirectIO) (cio.IO, error)

func (c *Client) createIO(fifoSet *containerio.CioFIFOSet, cntrID, procID string, closeStdinCh <-chan struct{}, initstdio InitStdio) (cio.IO, error) {
cdio, err := containerio.NewDirectIO(context.Background(), fifoSet)
func (c *Client) createIO(fifoSet *cio.FIFOSet, cntrID, procID string, closeStdinCh <-chan struct{}, initstdio InitStdio) (cio.IO, error) {
cdio, err := cio.NewDirectIO(context.Background(), fifoSet)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -918,12 +918,12 @@ func (c *Client) attachIO(fifoSet *cio.FIFOSet, initstdio InitStdio) (cio.IO, er
return nil, fmt.Errorf("cannot attach to existing fifos")
}

cdio, err := containerio.NewDirectIO(context.Background(), &containerio.CioFIFOSet{
cdio, err := cio.NewDirectIO(context.Background(), &cio.FIFOSet{
Config: cio.Config{
Terminal: fifoSet.Terminal,
Stdin: fifoSet.In,
Stdout: fifoSet.Out,
Stderr: fifoSet.Err,
Stdin: fifoSet.Stdin,
Stdout: fifoSet.Stdout,
Stderr: fifoSet.Stderr,
},
})
if err != nil {
Expand Down
17 changes: 0 additions & 17 deletions ctrd/events.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
package ctrd

import (
"context"

eventsapi "github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/runtime"
"github.com/pkg/errors"
)

const (
Expand All @@ -23,16 +19,3 @@ const (
// TaskOOMEventTopic for task oom
TaskOOMEventTopic = runtime.TaskOOMEventTopic
)

// Events subscribe containerd events through an event subscribe client.
func (c *Client) Events(ctx context.Context, ef ...string) (eventsapi.Events_SubscribeClient, error) {
wrapperCli, err := c.Get(ctx)
if err != nil {
return nil, errors.Wrap(err, ErrGetCtrdClient.Error())
}

eventsClient := wrapperCli.client.EventService()
return eventsClient.Subscribe(ctx, &eventsapi.SubscribeRequest{
Filters: ef,
})
}
21 changes: 15 additions & 6 deletions ctrd/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ func (c *Client) saveImage(ctx context.Context, exporter ctrdmetaimages.Exporter
// ImportImage creates a set of images by tarstream.
//
// NOTE: One tar may have several manifests.
func (c *Client) ImportImage(ctx context.Context, importer ctrdmetaimages.Importer, reader io.Reader) ([]containerd.Image, error) {
imgs, err := c.importImage(ctx, importer, reader)
func (c *Client) ImportImage(ctx context.Context, reader io.Reader, opts ...containerd.ImportOpt) ([]containerd.Image, error) {
imgs, err := c.importImage(ctx, reader, opts...)
if err != nil {
return imgs, convertCtrdErr(err)
}
Expand All @@ -154,26 +154,35 @@ func (c *Client) ImportImage(ctx context.Context, importer ctrdmetaimages.Import
// importImage creates a set of images by tarstream.
//
// NOTE: One tar may have several manifests.
func (c *Client) importImage(ctx context.Context, importer ctrdmetaimages.Importer, reader io.Reader) ([]containerd.Image, error) {
func (c *Client) importImage(ctx context.Context, reader io.Reader, opts ...containerd.ImportOpt) ([]containerd.Image, error) {
wrapperCli, err := c.Get(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get a containerd grpc client: %v", err)
}

// NOTE: The import will store the data into boltdb. But the unpack may
// fail. It is not transaction.
imgs, err := wrapperCli.client.Import(ctx, importer, reader)
imgs, err := wrapperCli.client.Import(ctx, reader, opts...)
if err != nil {
return nil, err
}

var (
res = make([]containerd.Image, 0, len(imgs))
snaphotter = CurrentSnapshotterName(ctx)
)

for _, img := range imgs {
err = img.Unpack(ctx, CurrentSnapshotterName(ctx))
image := containerd.NewImage(wrapperCli.client, img)

err = image.Unpack(ctx, snaphotter)
if err != nil {
return nil, err
}

res = append(res, image)
}
return imgs, nil
return res, nil
}

// PushImage pushes image to registry
Expand Down
13 changes: 7 additions & 6 deletions ctrd/image_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (c *Client) Commit(ctx context.Context, config *CommitConfig) (_ digest.Dig
if err != nil {
return "", errors.Wrapf(err, "failed to create lease for commit")
}
defer done()
defer done(ctx)

var (
sn = client.SnapshotService(CurrentSnapshotterName(ctx))
Expand Down Expand Up @@ -188,7 +188,7 @@ func (c *Client) Commit(ctx context.Context, config *CommitConfig) (_ digest.Dig

// write manifest content
ref := mfstDigest.String()
if err := content.WriteBlob(ctx, cs, ref, bytes.NewReader(mfstJSON), mfstDesc.Size, mfstDesc.Digest, content.WithLabels(labels)); err != nil {
if err := content.WriteBlob(ctx, cs, ref, bytes.NewReader(mfstJSON), mfstDesc, content.WithLabels(labels)); err != nil {
return "", errors.Wrapf(err, "error writing manifest blob %s", mfstDigest)
}

Expand All @@ -197,7 +197,7 @@ func (c *Client) Commit(ctx context.Context, config *CommitConfig) (_ digest.Dig
labelOpt := content.WithLabels(map[string]string{
fmt.Sprintf("containerd.io/gc.ref.snapshot.%s", CurrentSnapshotterName(ctx)): rootfsID,
})
if err := content.WriteBlob(ctx, cs, ref, bytes.NewReader(imgJSON), configDesc.Size, configDesc.Digest, labelOpt); err != nil {
if err := content.WriteBlob(ctx, cs, ref, bytes.NewReader(imgJSON), configDesc, labelOpt); err != nil {
return "", errors.Wrap(err, "error writing config blob")
}

Expand All @@ -206,8 +206,9 @@ func (c *Client) Commit(ctx context.Context, config *CommitConfig) (_ digest.Dig
}

// export a new layer from a container
func exportLayer(ctx context.Context, name string, sn snapshots.Snapshotter, cs content.Store, differ diff.Differ) (ocispec.Descriptor, digest.Digest, error) {
rwDesc, err := rootfs.Diff(ctx, name, sn, differ)
func exportLayer(ctx context.Context, name string, sn snapshots.Snapshotter, cs content.Store, comparer diff.Comparer) (ocispec.Descriptor, digest.Digest, error) {
// export new layer
rwDesc, err := rootfs.CreateDiff(ctx, name, sn, comparer)
if err != nil {
return ocispec.Descriptor{}, digest.Digest(""), fmt.Errorf("failed to diff: %s", err)
}
Expand Down Expand Up @@ -264,7 +265,7 @@ func newChildImage(ctx context.Context, config *CommitConfig, diffID digest.Dige
}

// create a new snapshot for exported layer
func newSnapshot(ctx context.Context, name string, pImg ocispec.Image, sn snapshots.Snapshotter, differ diff.Differ, layer ocispec.Descriptor) error {
func newSnapshot(ctx context.Context, name string, pImg ocispec.Image, sn snapshots.Snapshotter, differ diff.Applier, layer ocispec.Descriptor) error {
var (
key = randomid.Generate()
parent = identity.ChainID(pImg.RootFS.DiffIDs).String()
Expand Down
5 changes: 1 addition & 4 deletions ctrd/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/alibaba/pouch/pkg/jsonstream"

"github.com/containerd/containerd"
eventsapi "github.com/containerd/containerd/api/services/events/v1"
containerdtypes "github.com/containerd/containerd/api/types"
ctrdmetaimages "github.com/containerd/containerd/images"
"github.com/containerd/containerd/mount"
Expand Down Expand Up @@ -66,8 +65,6 @@ type ContainerAPIClient interface {
SetExitHooks(hooks ...func(string, *Message, func() error) error)
// SetExecExitHooks specified the handlers of exec process exit.
SetExecExitHooks(hooks ...func(string, *Message) error)
// Events subscribe containerd events through an event subscribe client.
Events(ctx context.Context, ef ...string) (eventsapi.Events_SubscribeClient, error)
// SetEventsHooks specified the methods to handle the containerd events.
SetEventsHooks(hooks ...func(context.Context, string, string, map[string]string) error)
}
Expand All @@ -85,7 +82,7 @@ type ImageAPIClient interface {
// RemoveImage removes the image by the given reference.
RemoveImage(ctx context.Context, ref string) error
// ImportImage creates a set of images by tarstream.
ImportImage(ctx context.Context, importer ctrdmetaimages.Importer, reader io.Reader) ([]containerd.Image, error)
ImportImage(ctx context.Context, reader io.Reader, opts ...containerd.ImportOpt) ([]containerd.Image, error)
// SaveImage saves image to tarstream
SaveImage(ctx context.Context, exporter ctrdmetaimages.Exporter, ref string) (io.ReadCloser, error)
// Commit commits an image from a container.
Expand Down
2 changes: 1 addition & 1 deletion ctrd/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (c *Client) CreateSnapshot(ctx context.Context, id, ref string) error {
if err != nil {
return fmt.Errorf("failed to get a containerd grpc client: %v", err)
}
ctx = leases.WithLease(ctx, wrapperCli.lease.ID())
ctx = leases.WithLease(ctx, wrapperCli.lease.ID)

image, err := wrapperCli.client.ImageService().Get(ctx, ref)
if err != nil {
Expand Down
11 changes: 11 additions & 0 deletions ctrd/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,24 @@ import (
"github.com/alibaba/pouch/apis/types"
"github.com/alibaba/pouch/pkg/errtypes"

"github.com/containerd/containerd"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/containerd/runtime/linux/runctypes"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
)

func withExitShimV1CheckpointTaskOpts() containerd.CheckpointTaskOpts {
return func(r *containerd.CheckpointTaskInfo) error {
r.Options = &runctypes.CheckpointOptions{
Exit: true,
}
return nil
}
}

func resolver(authConfig *types.AuthConfig, resolverOpt docker.ResolverOptions) (remotes.Resolver, error) {
var (
// TODO
Expand Down
10 changes: 6 additions & 4 deletions ctrd/wrapper_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"

"github.com/containerd/containerd"
"github.com/containerd/containerd/leases"
"github.com/pkg/errors"
)

Expand All @@ -18,7 +19,7 @@ type WrapperClient struct {
// Lease is a new feature of containerd, We use it to avoid that the images
// are removed by garbage collection. If no lease is defined, the downloaded images will
// be removed automatically when the container is removed.
lease *containerd.Lease
lease *leases.Lease

mux sync.Mutex
// streamQuota records the numbers of stream client without be using
Expand All @@ -33,18 +34,19 @@ func newWrapperClient(rpcAddr string, defaultns string, maxStreamsClient int) (*
if err != nil {
return nil, errors.Wrap(err, "failed to connect containerd")
}
leaseSrv := cli.LeasesService()

// create a new lease or reuse the existed.
var lease containerd.Lease
var lease leases.Lease

leases, err := cli.ListLeases(context.TODO())
leases, err := leaseSrv.List(context.TODO())
if err != nil {
return nil, err
}
if len(leases) != 0 {
lease = leases[0]
} else {
if lease, err = cli.CreateLease(context.TODO()); err != nil {
if lease, err = leaseSrv.Create(context.TODO()); err != nil {
return nil, err
}
}
Expand Down
Loading

0 comments on commit ffeaa43

Please sign in to comment.