From 57008363ebda37a0798082387e8e589f25f26a14 Mon Sep 17 00:00:00 2001 From: Jiang Liu Date: Tue, 12 Sep 2023 17:26:47 +0800 Subject: [PATCH 01/11] tarfs: remove blobTarFilePath from snapshotStatus The field `blobTarFilePath` is only used once, and it's easy to build it on demand. So remove it from struct snapshotStatus. Signed-off-by: Jiang Liu --- pkg/tarfs/tarfs.go | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/pkg/tarfs/tarfs.go b/pkg/tarfs/tarfs.go index 543dc5ceda..959c0c84cd 100755 --- a/pkg/tarfs/tarfs.go +++ b/pkg/tarfs/tarfs.go @@ -76,7 +76,6 @@ type snapshotStatus struct { mutex sync.Mutex status int blobID string - blobTarFilePath string erofsMountPoint string dataLoopdev *losetup.Device metaLoopdev *losetup.Device @@ -310,7 +309,7 @@ func (t *Manager) blobProcess(ctx context.Context, wg *sync.WaitGroup, snapshotI manifestDigest, layerDigest digest.Digest, upperDirPath string) error { layerBlobID := layerDigest.Hex() epilog := func(err error, msg string) { - st, err1 := t.getSnapshotStatus(snapshotID, true) + st, err1 := t.getSnapshotStatusWithLock(snapshotID, true) if err1 != nil { // return errors.Errorf("can not found status object for snapshot %s after prepare", snapshotID) err1 = errors.Wrapf(err1, "can not found status object for snapshot %s after prepare", snapshotID) @@ -320,7 +319,6 @@ func (t *Manager) blobProcess(ctx context.Context, wg *sync.WaitGroup, snapshotI defer st.mutex.Unlock() st.blobID = layerBlobID - st.blobTarFilePath = t.layerTarFilePath(layerBlobID) if err != nil { log.L.WithError(err).Errorf(msg) st.status = TarfsStatusFailed @@ -424,7 +422,7 @@ func (t *Manager) MergeLayers(s storage.Snapshot, storageLocater func(string) st return errors.Wrapf(err, "wait for tarfs snapshot %s to get ready", snapshotID) } - st, err := t.getSnapshotStatus(snapshotID, false) + st, err := t.getSnapshotStatusWithLock(snapshotID, false) if err != nil { return err } @@ -486,7 +484,7 @@ func (t *Manager) ExportBlockData(s storage.Snapshot, perLayer bool, labels map[ if err != nil { return updateFields, errors.Wrapf(err, "wait for tarfs snapshot %s to get ready", snapshotID) } - st, err := t.getSnapshotStatus(snapshotID, false) + st, err := t.getSnapshotStatusWithLock(snapshotID, false) if err != nil { return updateFields, err } @@ -592,7 +590,7 @@ func (t *Manager) MountTarErofs(snapshotID string, s *storage.Snapshot, labels m return errors.Wrapf(err, "wait for tarfs conversion task") } - st, err := t.getSnapshotStatus(snapshotID, true) + st, err := t.getSnapshotStatusWithLock(snapshotID, true) if err != nil { return err } @@ -604,10 +602,11 @@ func (t *Manager) MountTarErofs(snapshotID string, s *storage.Snapshot, labels m var blobMarker = "\"blob_id\":\"" + st.blobID + "\"" if strings.Contains(blobInfo, blobMarker) { if st.dataLoopdev == nil { - loopdev, err := t.attachLoopdev(st.blobTarFilePath) + blobTarFilePath := t.layerTarFilePath(st.blobID) + loopdev, err := t.attachLoopdev(blobTarFilePath) if err != nil { st.mutex.Unlock() - return errors.Wrapf(err, "attach layer tar file %s to loopdev", st.blobTarFilePath) + return errors.Wrapf(err, "attach layer tar file %s to loopdev", blobTarFilePath) } st.dataLoopdev = loopdev } @@ -618,7 +617,7 @@ func (t *Manager) MountTarErofs(snapshotID string, s *storage.Snapshot, labels m } mountOpts := strings.Join(devices, ",") - st, err := t.getSnapshotStatus(snapshotID, true) + st, err := t.getSnapshotStatusWithLock(snapshotID, true) if err != nil { return err } @@ -656,7 +655,7 @@ func (t *Manager) MountTarErofs(snapshotID string, s *storage.Snapshot, labels m } func (t *Manager) UmountTarErofs(snapshotID string) error { - st, err := t.getSnapshotStatus(snapshotID, true) + st, err := t.getSnapshotStatusWithLock(snapshotID, true) if err != nil { return errors.Wrapf(err, "umount a tarfs snapshot %s which is already removed", snapshotID) } @@ -674,7 +673,7 @@ func (t *Manager) UmountTarErofs(snapshotID string) error { } func (t *Manager) DetachLayer(snapshotID string) error { - st, err := t.getSnapshotStatus(snapshotID, true) + st, err := t.getSnapshotStatusWithLock(snapshotID, true) if err != nil { return os.ErrNotExist } @@ -716,7 +715,7 @@ func (t *Manager) DetachLayer(snapshotID string) error { return nil } -func (t *Manager) getSnapshotStatus(snapshotID string, lock bool) (*snapshotStatus, error) { +func (t *Manager) getSnapshotStatusWithLock(snapshotID string, lock bool) (*snapshotStatus, error) { t.mutex.Lock() defer t.mutex.Unlock() st, ok := t.snapshotMap[snapshotID] @@ -730,7 +729,7 @@ func (t *Manager) getSnapshotStatus(snapshotID string, lock bool) (*snapshotStat } func (t *Manager) waitLayerReady(snapshotID string) error { - st, err := t.getSnapshotStatus(snapshotID, false) + st, err := t.getSnapshotStatusWithLock(snapshotID, false) if err != nil { return err } From 07b9a9179d95d0bd3436ead670e903c2a21d1f01 Mon Sep 17 00:00:00 2001 From: Jiang Liu Date: Tue, 12 Sep 2023 17:56:47 +0800 Subject: [PATCH 02/11] tarfs: remove redundant status check `waitLayerReady()` alreayds ensures the layer is in ready state, so avoid redundant status check. Signed-off-by: Jiang Liu --- pkg/tarfs/tarfs.go | 90 +++++++++++++++++++++++----------------------- 1 file changed, 44 insertions(+), 46 deletions(-) diff --git a/pkg/tarfs/tarfs.go b/pkg/tarfs/tarfs.go index 959c0c84cd..13bb1df963 100755 --- a/pkg/tarfs/tarfs.go +++ b/pkg/tarfs/tarfs.go @@ -309,7 +309,7 @@ func (t *Manager) blobProcess(ctx context.Context, wg *sync.WaitGroup, snapshotI manifestDigest, layerDigest digest.Digest, upperDirPath string) error { layerBlobID := layerDigest.Hex() epilog := func(err error, msg string) { - st, err1 := t.getSnapshotStatusWithLock(snapshotID, true) + st, err1 := t.getSnapshotStatusWithLock(snapshotID) if err1 != nil { // return errors.Errorf("can not found status object for snapshot %s after prepare", snapshotID) err1 = errors.Wrapf(err1, "can not found status object for snapshot %s after prepare", snapshotID) @@ -417,19 +417,11 @@ func (t *Manager) MergeLayers(s storage.Snapshot, storageLocater func(string) st // When merging bootstrap, we need to arrange layer bootstrap in order from low to high for idx := len(s.ParentIDs) - 1; idx >= 0; idx-- { snapshotID := s.ParentIDs[idx] - err := t.waitLayerReady(snapshotID) + _, err := t.waitLayerReady(snapshotID, false) if err != nil { return errors.Wrapf(err, "wait for tarfs snapshot %s to get ready", snapshotID) } - st, err := t.getSnapshotStatusWithLock(snapshotID, false) - if err != nil { - return err - } - if st.status != TarfsStatusReady { - return errors.Errorf("tarfs snapshot %s is not ready, %d", snapshotID, st.status) - } - metaFilePath := t.layerMetaFilePath(storageLocater(snapshotID)) bootstraps = append(bootstraps, metaFilePath) } @@ -480,18 +472,6 @@ func (t *Manager) ExportBlockData(s storage.Snapshot, perLayer bool, labels map[ } snapshotID = s.ParentIDs[0] } - err := t.waitLayerReady(snapshotID) - if err != nil { - return updateFields, errors.Wrapf(err, "wait for tarfs snapshot %s to get ready", snapshotID) - } - st, err := t.getSnapshotStatusWithLock(snapshotID, false) - if err != nil { - return updateFields, err - } - if st.status != TarfsStatusReady { - return updateFields, errors.Errorf("tarfs snapshot %s is not ready, %d", snapshotID, st.status) - } - blobID, ok := labels[label.NydusTarfsLayer] if !ok { return updateFields, errors.Errorf("Missing Nydus tarfs layer annotation for snapshot %s", s.ID) @@ -510,6 +490,13 @@ func (t *Manager) ExportBlockData(s storage.Snapshot, perLayer bool, labels map[ if _, err := os.Stat(diskFileName); err == nil { return updateFields, nil } + + st, err := t.waitLayerReady(snapshotID, true) + if err != nil { + return updateFields, errors.Wrapf(err, "wait for tarfs snapshot %s to get ready", snapshotID) + } + defer st.mutex.Unlock() + diskFileNameTmp := diskFileName + ".tarfs.tmp" defer os.Remove(diskFileNameTmp) @@ -585,20 +572,11 @@ func (t *Manager) MountTarErofs(snapshotID string, s *storage.Snapshot, labels m // When merging bootstrap, we need to arrange layer bootstrap in order from low to high for idx := len(s.ParentIDs) - 1; idx >= 0; idx-- { snapshotID := s.ParentIDs[idx] - err := t.waitLayerReady(snapshotID) + st, err := t.waitLayerReady(snapshotID, true) if err != nil { return errors.Wrapf(err, "wait for tarfs conversion task") } - st, err := t.getSnapshotStatusWithLock(snapshotID, true) - if err != nil { - return err - } - if st.status != TarfsStatusReady { - st.mutex.Unlock() - return errors.Errorf("snapshot %s tarfs format error %d", snapshotID, st.status) - } - var blobMarker = "\"blob_id\":\"" + st.blobID + "\"" if strings.Contains(blobInfo, blobMarker) { if st.dataLoopdev == nil { @@ -617,7 +595,7 @@ func (t *Manager) MountTarErofs(snapshotID string, s *storage.Snapshot, labels m } mountOpts := strings.Join(devices, ",") - st, err := t.getSnapshotStatusWithLock(snapshotID, true) + st, err := t.getSnapshotStatusWithLock(snapshotID) if err != nil { return err } @@ -655,7 +633,7 @@ func (t *Manager) MountTarErofs(snapshotID string, s *storage.Snapshot, labels m } func (t *Manager) UmountTarErofs(snapshotID string) error { - st, err := t.getSnapshotStatusWithLock(snapshotID, true) + st, err := t.getSnapshotStatusWithLock(snapshotID) if err != nil { return errors.Wrapf(err, "umount a tarfs snapshot %s which is already removed", snapshotID) } @@ -673,7 +651,7 @@ func (t *Manager) UmountTarErofs(snapshotID string) error { } func (t *Manager) DetachLayer(snapshotID string) error { - st, err := t.getSnapshotStatusWithLock(snapshotID, true) + st, err := t.getSnapshotStatusWithLock(snapshotID) if err != nil { return os.ErrNotExist } @@ -715,32 +693,52 @@ func (t *Manager) DetachLayer(snapshotID string) error { return nil } -func (t *Manager) getSnapshotStatusWithLock(snapshotID string, lock bool) (*snapshotStatus, error) { +func (t *Manager) getSnapshotStatusWithLock(snapshotID string) (*snapshotStatus, error) { t.mutex.Lock() defer t.mutex.Unlock() st, ok := t.snapshotMap[snapshotID] if ok { - if lock { - st.mutex.Lock() - } + st.mutex.Lock() return st, nil } return nil, errors.Errorf("not found snapshot %s", snapshotID) } -func (t *Manager) waitLayerReady(snapshotID string) error { - st, err := t.getSnapshotStatusWithLock(snapshotID, false) +func (t *Manager) waitLayerReady(snapshotID string, lock bool) (*snapshotStatus, error) { + st, err := t.getSnapshotStatusWithLock(snapshotID) if err != nil { - return err + return nil, err } - if st.status != TarfsStatusReady { + + if st.wg != nil && st.status == TarfsStatusPrepare { + wg := st.wg + st.mutex.Unlock() log.L.Debugf("wait tarfs conversion task for snapshot %s", snapshotID) + wg.Wait() + st, err = t.getSnapshotStatusWithLock(snapshotID) + if err != nil { + return nil, err + } } - st.wg.Wait() + if st.status != TarfsStatusReady { - return errors.Errorf("snapshot %s is in state %d instead of ready state", snapshotID, st.status) + st.mutex.Unlock() + var state string + switch st.status { + case TarfsStatusPrepare: + state = "Prepare" + case TarfsStatusFailed: + state = "Failed" + default: + state = "Unknown" + } + return nil, errors.Errorf("snapshot %s is in %s state instead of Ready", snapshotID, state) } - return nil + + if !lock { + st.mutex.Unlock() + } + return st, nil } func (t *Manager) attachLoopdev(blob string) (*losetup.Device, error) { From e20fb9d92059696b533f8d1f9a4a0b58ef12b0c0 Mon Sep 17 00:00:00 2001 From: Jiang Liu Date: Tue, 12 Sep 2023 23:04:36 +0800 Subject: [PATCH 03/11] tarfs: refactor blobProcess() Refactor tarfs::blobProcess() for coming changes. Signed-off-by: Jiang Liu --- pkg/tarfs/tarfs.go | 79 ++++++++++++++++++++++++---------------------- 1 file changed, 41 insertions(+), 38 deletions(-) diff --git a/pkg/tarfs/tarfs.go b/pkg/tarfs/tarfs.go index 13bb1df963..7ba89759cd 100755 --- a/pkg/tarfs/tarfs.go +++ b/pkg/tarfs/tarfs.go @@ -305,83 +305,86 @@ func (t *Manager) getImageBlobInfo(metaFilePath string) (string, error) { } // download & uncompress an oci/docker blob, and then generate the tarfs bootstrap -func (t *Manager) blobProcess(ctx context.Context, wg *sync.WaitGroup, snapshotID, ref string, +func (t *Manager) blobProcess(ctx context.Context, snapshotID, ref string, manifestDigest, layerDigest digest.Digest, upperDirPath string) error { layerBlobID := layerDigest.Hex() - epilog := func(err error, msg string) { + + epilog := func(err error, msg string) error { st, err1 := t.getSnapshotStatusWithLock(snapshotID) if err1 != nil { - // return errors.Errorf("can not found status object for snapshot %s after prepare", snapshotID) - err1 = errors.Wrapf(err1, "can not found status object for snapshot %s after prepare", snapshotID) - log.L.WithError(err1).Errorf("async prepare tarfs layer for snapshot ID %s", snapshotID) - return + return errors.Wrapf(err, "can not found status object for snapshot %s after prepare", snapshotID) } - defer st.mutex.Unlock() + defer func() { + if st.wg != nil { + st.wg.Done() + st.wg = nil + } + st.mutex.Unlock() + }() st.blobID = layerBlobID if err != nil { - log.L.WithError(err).Errorf(msg) st.status = TarfsStatusFailed } else { st.status = TarfsStatusReady } - log.L.Infof(msg) - } - keyChain, err := auth.GetKeyChainByRef(ref, nil) - if err != nil { - epilog(err, "create key chain for connection") - return err - } - remote := remote.New(keyChain, t.insecure) - rc, _, err := t.getBlobStream(ctx, remote, ref, layerDigest) - if err != nil && remote.RetryWithPlainHTTP(ref, err) { - rc, _, err = t.getBlobStream(ctx, remote, ref, layerDigest) - } - if err != nil { - epilog(err, "get blob stream for layer") - return errors.Wrapf(err, "get blob stream by digest") + return errors.Wrapf(err, msg) } - go func() { - defer wg.Done() + process := func(rc io.ReadCloser, remote *remote.Remote) error { defer rc.Close() ds, err := compression.DecompressStream(rc) if err != nil { - epilog(err, "unpack layer blob stream for tarfs") - return + return epilog(err, "unpack layer blob stream for tarfs") } defer ds.Close() if t.validateDiffID { diffID, err := t.getBlobDiffID(ctx, remote, ref, manifestDigest, layerDigest) if err != nil { - epilog(err, "get layer diffID") - return + return epilog(err, "get layer diffID") } digester := digest.Canonical.Digester() dr := io.TeeReader(ds, digester.Hash()) err = t.generateBootstrap(dr, snapshotID, layerBlobID, upperDirPath) switch { case err != nil && !errdefs.IsAlreadyExists(err): - epilog(err, "generate tarfs from image layer blob") + return epilog(err, "generate tarfs from image layer blob") case err == nil && digester.Digest() != diffID: - epilog(err, "image layer diffID does not match") + return epilog(err, "image layer diffID does not match") default: - msg := fmt.Sprintf("nydus tarfs for snapshot %s is ready, digest %s", snapshotID, digester.Digest()) - epilog(nil, msg) + msg := fmt.Sprintf("Nydus tarfs for snapshot %s is ready, digest %s", snapshotID, digester.Digest()) + return epilog(nil, msg) } } else { err = t.generateBootstrap(ds, snapshotID, layerBlobID, upperDirPath) if err != nil && !errdefs.IsAlreadyExists(err) { - epilog(err, "generate tarfs data from image layer blob") + return epilog(err, "generate tarfs data from image layer blob") } else { - msg := fmt.Sprintf("nydus tarfs for snapshot %s is ready", snapshotID) - epilog(nil, msg) + msg := fmt.Sprintf("Nydus tarfs for snapshot %s is ready", snapshotID) + return epilog(nil, msg) } } - }() + } + + keyChain, err := auth.GetKeyChainByRef(ref, nil) + if err != nil { + epilog(err, "create key chain for connection") + return err + } + remote := remote.New(keyChain, t.insecure) + rc, _, err := t.getBlobStream(ctx, remote, ref, layerDigest) + if err != nil && remote.RetryWithPlainHTTP(ref, err) { + rc, _, err = t.getBlobStream(ctx, remote, ref, layerDigest) + } + if err != nil { + epilog(err, "get blob stream for layer") + return errors.Wrapf(err, "get blob stream by digest") + } + + go process(rc, remote) return err } @@ -403,7 +406,7 @@ func (t *Manager) PrepareLayer(snapshotID, ref string, manifestDigest, layerDige } t.mutex.Unlock() - return t.blobProcess(ctx, wg, snapshotID, ref, manifestDigest, layerDigest, upperDirPath) + return t.blobProcess(ctx, snapshotID, ref, manifestDigest, layerDigest, upperDirPath) } func (t *Manager) MergeLayers(s storage.Snapshot, storageLocater func(string) string) error { From 98990abb646793338d038c20d6714953a7fe6dd6 Mon Sep 17 00:00:00 2001 From: Jiang Liu Date: Tue, 12 Sep 2023 21:36:50 +0800 Subject: [PATCH 04/11] tarfs: retry downloading and converting on failure Tarfs downloads and converts layer content on background, there's no flexible way to report failure from background tasks to containerd. So retry downloading and converting tasks when preparing rw layer for container. If it still fails, failure will be reported to the rw layer preparation request. Signed-off-by: Jiang Liu --- pkg/filesystem/tarfs_adaptor.go | 6 ++- pkg/tarfs/tarfs.go | 91 ++++++++++++++++++++++++++++----- snapshot/process.go | 2 +- snapshot/snapshot.go | 22 ++++++-- 4 files changed, 101 insertions(+), 20 deletions(-) diff --git a/pkg/filesystem/tarfs_adaptor.go b/pkg/filesystem/tarfs_adaptor.go index 3e47295b38..5634018081 100755 --- a/pkg/filesystem/tarfs_adaptor.go +++ b/pkg/filesystem/tarfs_adaptor.go @@ -11,6 +11,7 @@ import ( "github.com/containerd/containerd/log" snpkg "github.com/containerd/containerd/pkg/snapshotters" + "github.com/containerd/containerd/snapshots" "github.com/containerd/containerd/snapshots/storage" "github.com/containerd/nydus-snapshotter/pkg/label" "github.com/opencontainers/go-digest" @@ -63,8 +64,9 @@ func (fs *Filesystem) PrepareTarfsLayer(ctx context.Context, labels map[string]s return nil } -func (fs *Filesystem) MergeTarfsLayers(s storage.Snapshot, storageLocater func(string) string) error { - return fs.tarfsMgr.MergeLayers(s, storageLocater) +func (fs *Filesystem) MergeTarfsLayers(ctx context.Context, s storage.Snapshot, storageLocater func(string) string, + infoGetter func(ctx context.Context, id string) (string, snapshots.Info, error)) error { + return fs.tarfsMgr.MergeLayers(ctx, s, storageLocater, infoGetter) } func (fs *Filesystem) DetachTarfsLayer(snapshotID string) error { diff --git a/pkg/tarfs/tarfs.go b/pkg/tarfs/tarfs.go index 7ba89759cd..8285fbc635 100755 --- a/pkg/tarfs/tarfs.go +++ b/pkg/tarfs/tarfs.go @@ -23,6 +23,7 @@ import ( "github.com/containerd/containerd/archive/compression" "github.com/containerd/containerd/log" + "github.com/containerd/containerd/snapshots" "github.com/containerd/containerd/snapshots/storage" "github.com/containerd/nydus-snapshotter/config" "github.com/containerd/nydus-snapshotter/pkg/auth" @@ -42,7 +43,6 @@ import ( ) const ( - TarfsStatusInit = 0 TarfsStatusPrepare = 1 TarfsStatusReady = 2 TarfsStatusFailed = 3 @@ -306,7 +306,7 @@ func (t *Manager) getImageBlobInfo(metaFilePath string) (string, error) { // download & uncompress an oci/docker blob, and then generate the tarfs bootstrap func (t *Manager) blobProcess(ctx context.Context, snapshotID, ref string, - manifestDigest, layerDigest digest.Digest, upperDirPath string) error { + manifestDigest, layerDigest digest.Digest, upperDirPath string, retry bool) error { layerBlobID := layerDigest.Hex() epilog := func(err error, msg string) error { @@ -362,17 +362,15 @@ func (t *Manager) blobProcess(ctx context.Context, snapshotID, ref string, err = t.generateBootstrap(ds, snapshotID, layerBlobID, upperDirPath) if err != nil && !errdefs.IsAlreadyExists(err) { return epilog(err, "generate tarfs data from image layer blob") - } else { - msg := fmt.Sprintf("Nydus tarfs for snapshot %s is ready", snapshotID) - return epilog(nil, msg) } + msg := fmt.Sprintf("Nydus tarfs for snapshot %s is ready", snapshotID) + return epilog(nil, msg) } } keyChain, err := auth.GetKeyChainByRef(ref, nil) if err != nil { - epilog(err, "create key chain for connection") - return err + return epilog(err, "create key chain for connection") } remote := remote.New(keyChain, t.insecure) rc, _, err := t.getBlobStream(ctx, remote, ref, layerDigest) @@ -380,15 +378,69 @@ func (t *Manager) blobProcess(ctx context.Context, snapshotID, ref string, rc, _, err = t.getBlobStream(ctx, remote, ref, layerDigest) } if err != nil { - epilog(err, "get blob stream for layer") - return errors.Wrapf(err, "get blob stream by digest") + return epilog(err, "get blob stream for layer") } - go process(rc, remote) + if retry { + // Download and convert layer content in synchronous mode when retry for error recovering + err = process(rc, remote) + } else { + // Download and convert layer content in background. + // Will retry when the content is actually needed if the background process failed. + go func() { + _ = process(rc, remote) + }() + } return err } +func (t *Manager) retryPrepareLayer(snapshotID, upperDirPath string, labels map[string]string) error { + ref, ok := labels[label.CRIImageRef] + if !ok { + return errors.Errorf("not found image reference label") + } + layerDigest := digest.Digest(labels[label.CRILayerDigest]) + if layerDigest.Validate() != nil { + return errors.Errorf("not found layer digest label") + } + manifestDigest := digest.Digest(labels[label.CRIManifestDigest]) + if manifestDigest.Validate() != nil { + return errors.Errorf("not found manifest digest label") + } + + st, err := t.getSnapshotStatus(snapshotID, true) + if err != nil { + return errors.Wrapf(err, "retry downloading content for snapshot %s", snapshotID) + } + switch st.status { + case TarfsStatusPrepare: + log.L.Infof("Another thread is retrying snapshot %s, wait for the result", snapshotID) + st.mutex.Unlock() + _, err = t.waitLayerReady(snapshotID, false) + return err + case TarfsStatusReady: + log.L.Infof("Another thread has retried snapshot %s and succeed", snapshotID) + st.mutex.Unlock() + return nil + case TarfsStatusFailed: + log.L.Infof("Snapshot %s is in FAILED state, retry downloading layer content", snapshotID) + if st.wg == nil { + st.wg = &sync.WaitGroup{} + st.wg.Add(1) + } + st.status = TarfsStatusPrepare + st.mutex.Unlock() + } + + ctx := context.Background() + if err := t.blobProcess(ctx, snapshotID, ref, manifestDigest, layerDigest, upperDirPath, true); err != nil { + log.L.WithError(err).Errorf("async prepare tarfs layer of snapshot ID %s", snapshotID) + } + + return nil +} + func (t *Manager) PrepareLayer(snapshotID, ref string, manifestDigest, layerDigest digest.Digest, upperDirPath string) error { t.mutex.Lock() if _, ok := t.snapshotMap[snapshotID]; ok { @@ -406,10 +458,11 @@ func (t *Manager) PrepareLayer(snapshotID, ref string, manifestDigest, layerDige } t.mutex.Unlock() - return t.blobProcess(ctx, snapshotID, ref, manifestDigest, layerDigest, upperDirPath) + return t.blobProcess(ctx, snapshotID, ref, manifestDigest, layerDigest, upperDirPath, false) } -func (t *Manager) MergeLayers(s storage.Snapshot, storageLocater func(string) string) error { +func (t *Manager) MergeLayers(ctx context.Context, s storage.Snapshot, storageLocater func(string) string, + infoGetter func(ctx context.Context, id string) (string, snapshots.Info, error)) error { mergedBootstrap := t.imageMetaFilePath(storageLocater(s.ParentIDs[0])) if _, err := os.Stat(mergedBootstrap); err == nil { log.L.Debugf("tarfs snapshot %s already has merged bootstrap %s", s.ParentIDs[0], mergedBootstrap) @@ -421,6 +474,17 @@ func (t *Manager) MergeLayers(s storage.Snapshot, storageLocater func(string) st for idx := len(s.ParentIDs) - 1; idx >= 0; idx-- { snapshotID := s.ParentIDs[idx] _, err := t.waitLayerReady(snapshotID, false) + if err != nil { + upperDir, info, err1 := infoGetter(ctx, snapshotID) + if err1 == nil { + err1 = t.retryPrepareLayer(snapshotID, upperDir, info.Labels) + if err1 != nil { + log.L.Errorf("failed to retry downloading content for snapshot %s, %s", snapshotID, err1) + } else { + err = nil + } + } + } if err != nil { return errors.Wrapf(err, "wait for tarfs snapshot %s to get ready", snapshotID) } @@ -805,6 +869,9 @@ func (t *Manager) GetConcurrentLimiter(ref string) *semaphore.Weighted { func (t *Manager) copyTarfsAnnotations(labels map[string]string, rafs *rafs.Rafs) { keys := []string{ + label.CRIImageRef, + label.CRILayerDigest, + label.CRIManifestDigest, label.NydusTarfsLayer, label.NydusImageBlockInfo, label.NydusLayerBlockInfo, diff --git a/snapshot/process.go b/snapshot/process.go index ce90f5baf6..992015d7a8 100644 --- a/snapshot/process.go +++ b/snapshot/process.go @@ -157,7 +157,7 @@ func chooseProcessor(ctx context.Context, logger *logrus.Entry, // which have already been prepared by overlay snapshotter logger.Infof("Prepare active snapshot %s in Nydus tarfs mode", key) - err = sn.mergeTarfs(ctx, s, pID, pInfo) + err = sn.mergeTarfs(ctx, s, parent, pInfo) if err != nil { return nil, "", errors.Wrapf(err, "merge tarfs layers for snapshot %s", pID) } diff --git a/snapshot/snapshot.go b/snapshot/snapshot.go index 108ab055d3..a889312d7c 100644 --- a/snapshot/snapshot.go +++ b/snapshot/snapshot.go @@ -518,7 +518,7 @@ func (o *snapshotter) View(ctx context.Context, key, parent string, opts ...snap if o.fs.TarfsEnabled() && label.IsTarfsDataLayer(pInfo.Labels) { log.L.Infof("Prepare view snapshot %s in Nydus tarfs mode", pID) - err = o.mergeTarfs(ctx, s, pID, pInfo) + err = o.mergeTarfs(ctx, s, parent, pInfo) if err != nil { return nil, errors.Wrapf(err, "merge tarfs layers for snapshot %s", pID) } @@ -558,7 +558,7 @@ func (o *snapshotter) Commit(ctx context.Context, name, key string, opts ...snap return err } - log.L.Infof("[Commit] snapshot with key %q snapshot id %s", key, id) + log.L.Infof("[Commit] snapshot with key %q, name %s, snapshot id %s", key, name, id) // For OCI compatibility, we calculate disk usage of the snapshotDir and commit the usage to DB. // Nydus disk usage under the cacheDir will be delayed until containerd queries. @@ -795,9 +795,21 @@ func (o *snapshotter) createSnapshot(ctx context.Context, kind snapshots.Kind, k return &base, s, nil } -func (o *snapshotter) mergeTarfs(ctx context.Context, s storage.Snapshot, pID string, pInfo snapshots.Info) error { - if err := o.fs.MergeTarfsLayers(s, func(id string) string { return o.upperPath(id) }); err != nil { - return errors.Wrapf(err, "tarfs merge fail %s", pID) +func (o *snapshotter) mergeTarfs(ctx context.Context, s storage.Snapshot, parent string, pInfo snapshots.Info) error { + infoGetter := func(ctx context.Context, id string) (string, snapshots.Info, error) { + for { + id2, info, _, err := snapshot.GetSnapshotInfo(ctx, o.ms, parent) + if err != nil { + return "", snapshots.Info{}, err + } else if id2 == id { + return o.upperPath(id), info, nil + } + parent = info.Parent + } + } + + if err := o.fs.MergeTarfsLayers(ctx, s, func(id string) string { return o.upperPath(id) }, infoGetter); err != nil { + return err } if config.GetTarfsExportEnabled() { updateFields, err := o.fs.ExportBlockData(s, false, pInfo.Labels, func(id string) string { return o.upperPath(id) }) From d5558af458739c5437ef52922904a6255a01dd3a Mon Sep 17 00:00:00 2001 From: Jiang Liu Date: Wed, 13 Sep 2023 20:12:48 +0800 Subject: [PATCH 05/11] tarfs: avoid redundant merge operation for images with only one layer Avoid redundant merge operation for images with only one layer. Signed-off-by: Jiang Liu --- pkg/tarfs/tarfs.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pkg/tarfs/tarfs.go b/pkg/tarfs/tarfs.go index 8285fbc635..aa1d687b15 100755 --- a/pkg/tarfs/tarfs.go +++ b/pkg/tarfs/tarfs.go @@ -409,7 +409,7 @@ func (t *Manager) retryPrepareLayer(snapshotID, upperDirPath string, labels map[ return errors.Errorf("not found manifest digest label") } - st, err := t.getSnapshotStatus(snapshotID, true) + st, err := t.getSnapshotStatus(snapshotID) if err != nil { return errors.Wrapf(err, "retry downloading content for snapshot %s", snapshotID) } @@ -493,6 +493,12 @@ func (t *Manager) MergeLayers(ctx context.Context, s storage.Snapshot, storageLo bootstraps = append(bootstraps, metaFilePath) } + // Merging image with only one layer is a noop, just copy the layer bootstrap as image bootstrap + if len(s.ParentIDs) == 1 { + metaFilePath := t.layerMetaFilePath(storageLocater(s.ParentIDs[0])) + return errors.Wrapf(os.Link(metaFilePath, mergedBootstrap), "create hard link from image bootstrap to layer bootstrap") + } + mergedBootstrapTmp := mergedBootstrap + ".tarfs.tmp" defer os.Remove(mergedBootstrapTmp) From 4c198ea3c45341219b471e34100316449e604d8e Mon Sep 17 00:00:00 2001 From: Jiang Liu Date: Thu, 14 Sep 2023 17:59:28 +0800 Subject: [PATCH 06/11] tarfs: recover tarfs information on restart When nydus snapshotter restarts, we need to recover information related to tarfs. Otherwise all tarfs instance will become unusable. Signed-off-by: Jiang Liu --- pkg/filesystem/fs.go | 2 +- pkg/manager/manager.go | 14 +++++++---- pkg/tarfs/tarfs.go | 53 ++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 62 insertions(+), 7 deletions(-) diff --git a/pkg/filesystem/fs.go b/pkg/filesystem/fs.go index 4a95c4a3c6..a58742ba9b 100644 --- a/pkg/filesystem/fs.go +++ b/pkg/filesystem/fs.go @@ -67,7 +67,7 @@ func NewFileSystem(ctx context.Context, opt ...NewFSOpt) (*Filesystem, error) { recoveringDaemons := make(map[string]*daemon.Daemon, 0) liveDaemons := make(map[string]*daemon.Daemon, 0) for _, fsManager := range fs.enabledManagers { - err := fsManager.Recover(ctx, &recoveringDaemons, &liveDaemons) + err := fsManager.Recover(ctx, fs.tarfsMgr, &recoveringDaemons, &liveDaemons) if err != nil { return nil, errors.Wrap(err, "reconnect daemons and recover filesystem instance") } diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 27b6fad7d6..b655ae438d 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -27,6 +27,7 @@ import ( "github.com/containerd/nydus-snapshotter/pkg/rafs" "github.com/containerd/nydus-snapshotter/pkg/store" "github.com/containerd/nydus-snapshotter/pkg/supervisor" + "github.com/containerd/nydus-snapshotter/pkg/tarfs" ) // Manage RAFS filesystem instances and nydusd daemons. @@ -121,12 +122,12 @@ func (m *Manager) CacheDir() string { // - Never ever delete any records from DB // - Only cache daemon information from DB, do not actually start/create daemons // - Only cache RAFS instance information from DB, do not actually recover RAFS runtime state. -func (m *Manager) Recover(ctx context.Context, +func (m *Manager) Recover(ctx context.Context, tarfsMgr *tarfs.Manager, recoveringDaemons *map[string]*daemon.Daemon, liveDaemons *map[string]*daemon.Daemon) error { if err := m.recoverDaemons(ctx, recoveringDaemons, liveDaemons); err != nil { return errors.Wrapf(err, "recover nydusd daemons") } - if err := m.recoverRafsInstances(ctx, recoveringDaemons, liveDaemons); err != nil { + if err := m.recoverRafsInstances(ctx, tarfsMgr, recoveringDaemons, liveDaemons); err != nil { return errors.Wrapf(err, "recover RAFS instances") } return nil @@ -150,7 +151,7 @@ func (m *Manager) RemoveRafsInstance(snapshotID string) error { return m.store.DeleteRafsInstance(snapshotID) } -func (m *Manager) recoverRafsInstances(ctx context.Context, +func (m *Manager) recoverRafsInstances(ctx context.Context, tarfsMgr *tarfs.Manager, recoveringDaemons *map[string]*daemon.Daemon, liveDaemons *map[string]*daemon.Daemon) error { if err := m.store.WalkRafsInstances(ctx, func(r *rafs.Rafs) error { if r.GetFsDriver() != m.FsDriver { @@ -169,7 +170,12 @@ func (m *Manager) recoverRafsInstances(ctx context.Context, } rafs.RafsGlobalCache.Add(r) } else if r.GetFsDriver() == config.FsDriverBlockdev { - rafs.RafsGlobalCache.Add(r) + err1 := tarfsMgr.RecoverRafsInstance(r) + if err1 != nil { + log.L.Errorf("failed to recover tarfs instance %s, %s", r.SnapshotID, err1) + } else { + rafs.RafsGlobalCache.Add(r) + } } return nil diff --git a/pkg/tarfs/tarfs.go b/pkg/tarfs/tarfs.go index aa1d687b15..959bb266b9 100755 --- a/pkg/tarfs/tarfs.go +++ b/pkg/tarfs/tarfs.go @@ -33,6 +33,7 @@ import ( "github.com/containerd/nydus-snapshotter/pkg/remote" "github.com/containerd/nydus-snapshotter/pkg/remote/remotes" losetup "github.com/freddierice/go-losetup" + "github.com/moby/sys/mountinfo" "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" @@ -81,6 +82,7 @@ type snapshotStatus struct { metaLoopdev *losetup.Device wg *sync.WaitGroup cancel context.CancelFunc + ctx context.Context } func NewManager(insecure, checkTarfsHint bool, cacheDirPath, nydusImagePath string, maxConcurrentProcess int64) *Manager { @@ -409,10 +411,11 @@ func (t *Manager) retryPrepareLayer(snapshotID, upperDirPath string, labels map[ return errors.Errorf("not found manifest digest label") } - st, err := t.getSnapshotStatus(snapshotID) + st, err := t.getSnapshotStatusWithLock(snapshotID) if err != nil { return errors.Wrapf(err, "retry downloading content for snapshot %s", snapshotID) } + ctx := st.ctx switch st.status { case TarfsStatusPrepare: log.L.Infof("Another thread is retrying snapshot %s, wait for the result", snapshotID) @@ -433,7 +436,6 @@ func (t *Manager) retryPrepareLayer(snapshotID, upperDirPath string, labels map[ st.mutex.Unlock() } - ctx := context.Background() if err := t.blobProcess(ctx, snapshotID, ref, manifestDigest, layerDigest, upperDirPath, true); err != nil { log.L.WithError(err).Errorf("async prepare tarfs layer of snapshot ID %s", snapshotID) } @@ -455,6 +457,7 @@ func (t *Manager) PrepareLayer(snapshotID, ref string, manifestDigest, layerDige status: TarfsStatusPrepare, wg: wg, cancel: cancel, + ctx: ctx, } t.mutex.Unlock() @@ -766,6 +769,52 @@ func (t *Manager) DetachLayer(snapshotID string) error { return nil } +// This method is called in single threaded mode during startup, so we do not lock `snapshotStatus` objects. +func (t *Manager) RecoverRafsInstance(r *rafs.Rafs) error { + t.mutex.Lock() + defer t.mutex.Unlock() + log.L.Infof("recover rafs instance %s for tarfs", r.SnapshotID) + + if _, ok := t.snapshotMap[r.SnapshotID]; ok { + return errors.Errorf("snapshot %s already exists", r.SnapshotID) + } + + layerDigest := digest.Digest(r.Annotations[label.CRILayerDigest]) + if layerDigest.Validate() != nil { + return errors.Errorf("invalid layer digest for snapshot %s", r.SnapshotID) + } + + ctx, cancel := context.WithCancel(context.Background()) + upperDir := path.Join(r.GetSnapshotDir(), "fs") + metaFilePath := t.layerMetaFilePath(upperDir) + + if _, err := os.Stat(metaFilePath); err == nil { + mountPoint := path.Join(r.GetSnapshotDir(), "mnt") + mounted, err := mountinfo.Mounted(mountPoint) + if !mounted || err != nil { + mountPoint = "" + } + t.snapshotMap[r.SnapshotID] = &snapshotStatus{ + status: TarfsStatusReady, + blobID: layerDigest.Hex(), + erofsMountPoint: mountPoint, + cancel: cancel, + ctx: ctx, + } + } else { + wg := &sync.WaitGroup{} + wg.Add(1) + t.snapshotMap[r.SnapshotID] = &snapshotStatus{ + status: TarfsStatusFailed, + wg: wg, + cancel: cancel, + ctx: ctx, + } + } + + return nil +} + func (t *Manager) getSnapshotStatusWithLock(snapshotID string) (*snapshotStatus, error) { t.mutex.Lock() defer t.mutex.Unlock() From cbe04b3ffc863fb4866f9eb35e9d5e42fdf33501 Mon Sep 17 00:00:00 2001 From: Jiang Liu Date: Sun, 17 Sep 2023 15:38:40 +0800 Subject: [PATCH 07/11] tarfs: import losetup.go from containerd Import losetup.go from containerd, to replace go-losetup. Signed-off-by: Jiang Liu --- pkg/tarfs/losetup_linux.go | 198 ++++++++++++++++++++++++++++++++ pkg/tarfs/losetup_linux_test.go | 117 +++++++++++++++++++ 2 files changed, 315 insertions(+) create mode 100644 pkg/tarfs/losetup_linux.go create mode 100644 pkg/tarfs/losetup_linux_test.go diff --git a/pkg/tarfs/losetup_linux.go b/pkg/tarfs/losetup_linux.go new file mode 100644 index 0000000000..2b3d9afc75 --- /dev/null +++ b/pkg/tarfs/losetup_linux.go @@ -0,0 +1,198 @@ +/* + Copyright (c) 2023. Nydus Developers. All rights reserved. + This file is copied from https://github.com/containerd/containerd/blob/main/mount/losetup_linux.go with modifications. + + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package tarfs + +import ( + "errors" + "fmt" + "math/rand" + "os" + "strings" + "time" + + "golang.org/x/sys/unix" +) + +const ( + loopControlPath = "/dev/loop-control" + loopDevFormat = "/dev/loop%d" + + ebusyString = "device or resource busy" +) + +// LoopParams parameters to control loop device setup +type LoopParams struct { + // Loop device should forbid write + Readonly bool + // Loop device is automatically cleared by kernel when the last opener closes it + Autoclear bool + // Use direct IO to access the loop backing file + Direct bool +} + +func getFreeLoopDev() (uint32, error) { + ctrl, err := os.OpenFile(loopControlPath, os.O_RDWR, 0) + if err != nil { + return 0, fmt.Errorf("could not open %v: %v", loopControlPath, err) + } + defer ctrl.Close() + num, err := unix.IoctlRetInt(int(ctrl.Fd()), unix.LOOP_CTL_GET_FREE) + if err != nil { + return 0, fmt.Errorf("could not get free loop device: %w", err) + } + return uint32(num), nil +} + +// setupLoopDev attaches the backing file to the loop device and returns +// the file handle for the loop device. The caller is responsible for +// closing the file handle. +func setupLoopDev(backingFile, loopDev string, param LoopParams) (_ *os.File, retErr error) { + // 1. Open backing file and loop device + flags := os.O_RDWR + if param.Readonly { + flags = os.O_RDONLY + } + + back, err := os.OpenFile(backingFile, flags, 0) + if err != nil { + return nil, fmt.Errorf("could not open backing file: %s: %w", backingFile, err) + } + defer back.Close() + + loop, err := os.OpenFile(loopDev, flags, 0) + if err != nil { + return nil, fmt.Errorf("could not open loop device: %s: %w", loopDev, err) + } + defer func() { + if retErr != nil { + loop.Close() + } + }() + + // 2. Set FD + if err := unix.IoctlSetInt(int(loop.Fd()), unix.LOOP_SET_FD, int(back.Fd())); err != nil { + return nil, fmt.Errorf("could not set loop fd for device: %s: %w", loopDev, err) + } + + // 3. Set Info + info := unix.LoopInfo64{} + copy(info.File_name[:], backingFile) + if param.Readonly { + info.Flags |= unix.LO_FLAGS_READ_ONLY + } + + if param.Autoclear { + info.Flags |= unix.LO_FLAGS_AUTOCLEAR + } + + if param.Direct { + info.Flags |= unix.LO_FLAGS_DIRECT_IO + } + + err = unix.IoctlLoopSetStatus64(int(loop.Fd()), &info) + if err == nil { + return loop, nil + } + + if param.Direct { + // Retry w/o direct IO flag in case kernel does not support it. The downside is that + // it will suffer from double cache problem. + info.Flags &= ^(uint32(unix.LO_FLAGS_DIRECT_IO)) + err = unix.IoctlLoopSetStatus64(int(loop.Fd()), &info) + if err == nil { + return loop, nil + } + } + + _ = unix.IoctlSetInt(int(loop.Fd()), unix.LOOP_CLR_FD, 0) + return nil, fmt.Errorf("failed to set loop device info: %v", err) +} + +// setupLoop looks for (and possibly creates) a free loop device, and +// then attaches backingFile to it. +// +// When autoclear is true, caller should take care to close it when +// done with the loop device. The loop device file handle keeps +// loFlagsAutoclear in effect and we rely on it to clean up the loop +// device. If caller closes the file handle after mounting the device, +// kernel will clear the loop device after it is umounted. Otherwise +// the loop device is cleared when the file handle is closed. +// +// When autoclear is false, caller should be responsible to remove +// the loop device when done with it. +// +// Upon success, the file handle to the loop device is returned. +func setupLoop(backingFile string, param LoopParams) (*os.File, error) { + for retry := 1; retry < 100; retry++ { + num, err := getFreeLoopDev() + if err != nil { + return nil, err + } + + loopDev := fmt.Sprintf(loopDevFormat, num) + file, err := setupLoopDev(backingFile, loopDev, param) + if err != nil { + // Per util-linux/sys-utils/losetup.c:create_loop(), + // free loop device can race and we end up failing + // with EBUSY when trying to set it up. + if strings.Contains(err.Error(), ebusyString) { + // Fallback a bit to avoid live lock + time.Sleep(time.Millisecond * time.Duration(rand.Intn(retry*10))) + continue + } + return nil, err + } + + return file, nil + } + + return nil, errors.New("timeout creating new loopback device") +} + +func removeLoop(loopdev string) error { + file, err := os.Open(loopdev) + if err != nil { + return err + } + defer file.Close() + + return unix.IoctlSetInt(int(file.Fd()), unix.LOOP_CLR_FD, 0) +} + +// AttachLoopDevice attaches a specified backing file to a loop device +func AttachLoopDevice(backingFile string) (string, error) { + file, err := setupLoop(backingFile, LoopParams{}) + if err != nil { + return "", err + } + defer file.Close() + return file.Name(), nil +} + +// DetachLoopDevice detaches the provided loop devices +func DetachLoopDevice(devices ...string) error { + for _, dev := range devices { + if err := removeLoop(dev); err != nil { + return fmt.Errorf("failed to remove loop device: %s: %w", dev, err) + } + } + + return nil +} diff --git a/pkg/tarfs/losetup_linux_test.go b/pkg/tarfs/losetup_linux_test.go new file mode 100644 index 0000000000..28ee047318 --- /dev/null +++ b/pkg/tarfs/losetup_linux_test.go @@ -0,0 +1,117 @@ +/* + Copyright (c) 2023. Nydus Developers. All rights reserved. + This file is copied from https://github.com/containerd/containerd/blob/main/mount/losetup_linux_test.go with modifications. + + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package tarfs + +import ( + "os" + "testing" + + "github.com/containerd/continuity/testutil" +) + +var randomData = []byte("randomdata") + +func createTempFile(t *testing.T) string { + t.Helper() + + f, err := os.CreateTemp("", "losetup") + if err != nil { + t.Fatal(err) + } + defer f.Close() + + if err = f.Truncate(512); err != nil { + t.Fatal(err) + } + + return f.Name() +} + +func TestNonExistingLoop(t *testing.T) { + testutil.RequiresRoot(t) + + backingFile := "setup-loop-test-no-such-file" + _, err := setupLoop(backingFile, LoopParams{}) + if err == nil { + t.Fatalf("setupLoop with non-existing file should fail") + } +} + +func TestRoLoop(t *testing.T) { + testutil.RequiresRoot(t) + + backingFile := createTempFile(t) + defer func() { + if err := os.Remove(backingFile); err != nil { + t.Fatal(err) + } + }() + + file, err := setupLoop(backingFile, LoopParams{Readonly: true, Autoclear: true}) + if err != nil { + t.Fatal(err) + } + defer file.Close() + + if _, err := file.Write(randomData); err == nil { + t.Fatalf("writing to readonly loop device should fail") + } +} + +func TestRwLoop(t *testing.T) { + testutil.RequiresRoot(t) + + backingFile := createTempFile(t) + defer func() { + if err := os.Remove(backingFile); err != nil { + t.Fatal(err) + } + }() + + file, err := setupLoop(backingFile, LoopParams{Autoclear: false}) + if err != nil { + t.Fatal(err) + } + defer file.Close() + + if _, err := file.Write(randomData); err != nil { + t.Fatal(err) + } +} + +func TestAttachDetachLoopDevice(t *testing.T) { + testutil.RequiresRoot(t) + + path := createTempFile(t) + defer func() { + if err := os.Remove(path); err != nil { + t.Fatal(err) + } + }() + + dev, err := AttachLoopDevice(path) + if err != nil { + t.Fatal(err) + } + + if err = DetachLoopDevice(dev); err != nil { + t.Fatal(err) + } +} From 32f44c37cb1eb2d1d5deaf4fa1617660eaf65ce7 Mon Sep 17 00:00:00 2001 From: Jiang Liu Date: Sun, 17 Sep 2023 15:55:04 +0800 Subject: [PATCH 08/11] tarfs: replace go-losetup with the version from containerd Replace go-losetup with the version from containerd, to support auto-clean unused loop devices. Signed-off-by: Jiang Liu --- pkg/tarfs/losetup_linux.go | 10 +++++++++- pkg/tarfs/tarfs.go | 22 ++++++++++++---------- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/pkg/tarfs/losetup_linux.go b/pkg/tarfs/losetup_linux.go index 2b3d9afc75..63d10d083e 100644 --- a/pkg/tarfs/losetup_linux.go +++ b/pkg/tarfs/losetup_linux.go @@ -20,13 +20,13 @@ package tarfs import ( - "errors" "fmt" "math/rand" "os" "strings" "time" + "github.com/pkg/errors" "golang.org/x/sys/unix" ) @@ -166,6 +166,14 @@ func setupLoop(backingFile string, param LoopParams) (*os.File, error) { return nil, errors.New("timeout creating new loopback device") } +func deleteLoop(file *os.File) error { + err := unix.IoctlSetInt(int(file.Fd()), unix.LOOP_CLR_FD, 0) + if err != nil { + return errors.Wrapf(err, "delete loopdev %s", file.Name()) + } + return file.Close() +} + func removeLoop(loopdev string) error { file, err := os.Open(loopdev) if err != nil { diff --git a/pkg/tarfs/tarfs.go b/pkg/tarfs/tarfs.go index 959bb266b9..c092096e20 100755 --- a/pkg/tarfs/tarfs.go +++ b/pkg/tarfs/tarfs.go @@ -32,7 +32,6 @@ import ( "github.com/containerd/nydus-snapshotter/pkg/rafs" "github.com/containerd/nydus-snapshotter/pkg/remote" "github.com/containerd/nydus-snapshotter/pkg/remote/remotes" - losetup "github.com/freddierice/go-losetup" "github.com/moby/sys/mountinfo" "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" @@ -78,8 +77,8 @@ type snapshotStatus struct { status int blobID string erofsMountPoint string - dataLoopdev *losetup.Device - metaLoopdev *losetup.Device + dataLoopdev *os.File + metaLoopdev *os.File wg *sync.WaitGroup cancel context.CancelFunc ctx context.Context @@ -664,7 +663,7 @@ func (t *Manager) MountTarErofs(snapshotID string, s *storage.Snapshot, labels m } st.dataLoopdev = loopdev } - devices = append(devices, "device="+st.dataLoopdev.Path()) + devices = append(devices, "device="+st.dataLoopdev.Name()) } st.mutex.Unlock() @@ -693,7 +692,7 @@ func (t *Manager) MountTarErofs(snapshotID string, s *storage.Snapshot, labels m } st.metaLoopdev = loopdev } - devName := st.metaLoopdev.Path() + devName := st.metaLoopdev.Name() if err = os.MkdirAll(mountPoint, 0750); err != nil { return errors.Wrapf(err, "create tarfs mount dir %s", mountPoint) @@ -742,7 +741,7 @@ func (t *Manager) DetachLayer(snapshotID string) error { } if st.metaLoopdev != nil { - err := st.metaLoopdev.Detach() + err := deleteLoop(st.metaLoopdev) if err != nil { st.mutex.Unlock() return errors.Wrapf(err, "detach merged bootstrap loopdev for tarfs snapshot %s", snapshotID) @@ -751,7 +750,7 @@ func (t *Manager) DetachLayer(snapshotID string) error { } if st.dataLoopdev != nil { - err := st.dataLoopdev.Detach() + err := deleteLoop(st.dataLoopdev) if err != nil { st.mutex.Unlock() return errors.Wrapf(err, "detach layer bootstrap loopdev for tarfs snapshot %s", snapshotID) @@ -863,12 +862,15 @@ func (t *Manager) waitLayerReady(snapshotID string, lock bool) (*snapshotStatus, return st, nil } -func (t *Manager) attachLoopdev(blob string) (*losetup.Device, error) { +func (t *Manager) attachLoopdev(blob string) (*os.File, error) { // losetup.Attach() is not thread-safe hold lock here t.mutexLoopDev.Lock() defer t.mutexLoopDev.Unlock() - dev, err := losetup.Attach(blob, 0, false) - return &dev, err + param := LoopParams{ + Readonly: true, + Autoclear: true, + } + return setupLoop(blob, param) } func (t *Manager) CheckTarfsHintAnnotation(ctx context.Context, ref string, manifestDigest digest.Digest) (bool, error) { From 4060f0113a762474f35f779d9b520cd08c5a8a8f Mon Sep 17 00:00:00 2001 From: Jiang Liu Date: Mon, 18 Sep 2023 22:57:33 +0800 Subject: [PATCH 09/11] tarfs: remount EROFS for existing tarfs instances on startup On startup, we need to recover information for all tarfs related snapshots, and remount EROFS filesystems. Signed-off-by: Jiang Liu --- pkg/filesystem/fs.go | 2 +- pkg/label/label.go | 2 + pkg/tarfs/tarfs.go | 158 +++++++++++++++++++++++++++++++++++++++---- snapshot/snapshot.go | 42 +++++++++++- 4 files changed, 186 insertions(+), 18 deletions(-) diff --git a/pkg/filesystem/fs.go b/pkg/filesystem/fs.go index a58742ba9b..fcc31488b6 100644 --- a/pkg/filesystem/fs.go +++ b/pkg/filesystem/fs.go @@ -351,7 +351,7 @@ func (fs *Filesystem) Mount(ctx context.Context, snapshotID string, labels map[s err = errors.Wrapf(err, "mount file system by daemon %s, snapshot %s", d.ID(), snapshotID) } case config.FsDriverBlockdev: - err = fs.tarfsMgr.MountTarErofs(snapshotID, s, labels, rafs) + err = fs.tarfsMgr.MountErofs(snapshotID, s, labels, rafs) if err != nil { err = errors.Wrapf(err, "mount tarfs for snapshot %s", snapshotID) } diff --git a/pkg/label/label.go b/pkg/label/label.go index f5392771d4..ccad8f567e 100644 --- a/pkg/label/label.go +++ b/pkg/label/label.go @@ -36,6 +36,8 @@ const ( NydusRefLayer = "containerd.io/snapshot/nydus-ref" // The blobID of associated layer, also marking the layer as a nydus tarfs, set by the snapshotter NydusTarfsLayer = "containerd.io/snapshot/nydus-tarfs" + // List of parent snapshot IDs, saved in `Rafs.Annotation`. + NydusTarfsParents = "containerd.io/snapshot/nydus-tarfs-parent-snapshot-list" // Dm-verity information for image block device NydusImageBlockInfo = "containerd.io/snapshot/nydus-image-block" // Dm-verity information for layer block device diff --git a/pkg/tarfs/tarfs.go b/pkg/tarfs/tarfs.go index c092096e20..90382c1f68 100755 --- a/pkg/tarfs/tarfs.go +++ b/pkg/tarfs/tarfs.go @@ -57,6 +57,7 @@ const ( ) type Manager struct { + RemountMap map[string]*rafs.Rafs // Scratch space to store rafs instances needing remount on startup snapshotMap map[string]*snapshotStatus // tarfs snapshots status, indexed by snapshot ID mutex sync.Mutex mutexLoopDev sync.Mutex @@ -87,6 +88,7 @@ type snapshotStatus struct { func NewManager(insecure, checkTarfsHint bool, cacheDirPath, nydusImagePath string, maxConcurrentProcess int64) *Manager { return &Manager{ snapshotMap: map[string]*snapshotStatus{}, + RemountMap: map[string]*rafs.Rafs{}, cacheDirPath: cacheDirPath, nydusImagePath: nydusImagePath, insecure: insecure, @@ -402,12 +404,12 @@ func (t *Manager) retryPrepareLayer(snapshotID, upperDirPath string, labels map[ return errors.Errorf("not found image reference label") } layerDigest := digest.Digest(labels[label.CRILayerDigest]) - if layerDigest.Validate() != nil { - return errors.Errorf("not found layer digest label") + if err := layerDigest.Validate(); err != nil { + return errors.Wrapf(err, "invalid layer digest") } manifestDigest := digest.Digest(labels[label.CRIManifestDigest]) - if manifestDigest.Validate() != nil { - return errors.Errorf("not found manifest digest label") + if err := manifestDigest.Validate(); err != nil { + return errors.Wrapf(err, "invalid manifest digest") } st, err := t.getSnapshotStatusWithLock(snapshotID) @@ -623,7 +625,7 @@ func (t *Manager) ExportBlockData(s storage.Snapshot, perLayer bool, labels map[ return updateFields, nil } -func (t *Manager) MountTarErofs(snapshotID string, s *storage.Snapshot, labels map[string]string, rafs *rafs.Rafs) error { +func (t *Manager) MountErofs(snapshotID string, s *storage.Snapshot, labels map[string]string, rafs *rafs.Rafs) error { if s == nil { return errors.New("snapshot object for MountTarErofs() is nil") } @@ -644,6 +646,7 @@ func (t *Manager) MountTarErofs(snapshotID string, s *storage.Snapshot, labels m } var devices []string + var parents []string // When merging bootstrap, we need to arrange layer bootstrap in order from low to high for idx := len(s.ParentIDs) - 1; idx >= 0; idx-- { snapshotID := s.ParentIDs[idx] @@ -664,10 +667,89 @@ func (t *Manager) MountTarErofs(snapshotID string, s *storage.Snapshot, labels m st.dataLoopdev = loopdev } devices = append(devices, "device="+st.dataLoopdev.Name()) + parents = append(parents, snapshotID) + } + + st.mutex.Unlock() + } + parentList := strings.Join(parents, ",") + devices = append(devices, "ro") + mountOpts := strings.Join(devices, ",") + + st, err := t.getSnapshotStatusWithLock(snapshotID) + if err != nil { + return err + } + defer st.mutex.Unlock() + + mountPoint := path.Join(rafs.GetSnapshotDir(), "mnt") + if len(st.erofsMountPoint) > 0 { + if st.erofsMountPoint == mountPoint { + log.L.Debugf("tarfs for snapshot %s has already been mounted at %s", snapshotID, mountPoint) + return nil + } + return errors.Errorf("tarfs for snapshot %s has already been mounted at %s", snapshotID, st.erofsMountPoint) + } + + if st.metaLoopdev == nil { + loopdev, err := t.attachLoopdev(mergedBootstrap) + if err != nil { + return errors.Wrapf(err, "attach merged bootstrap %s to loopdev", mergedBootstrap) + } + st.metaLoopdev = loopdev + } + devName := st.metaLoopdev.Name() + + if err = os.MkdirAll(mountPoint, 0750); err != nil { + return errors.Wrapf(err, "create tarfs mount dir %s", mountPoint) + } + + err = unix.Mount(devName, mountPoint, "erofs", 0, mountOpts) + if err != nil { + return errors.Wrapf(err, "mount erofs at %s with opts %s", mountPoint, mountOpts) + } + st.erofsMountPoint = mountPoint + rafs.SetMountpoint(mountPoint) + rafs.AddAnnotation(label.NydusTarfsParents, parentList) + return nil +} + +func (t *Manager) RemountErofs(snapshotID string, rafs *rafs.Rafs) error { + upperDirPath := path.Join(rafs.GetSnapshotDir(), "fs") + + log.L.Infof("remount EROFS for tarfs snapshot %s at %s", snapshotID, upperDirPath) + var parents []string + if parentList, ok := rafs.Annotations[label.NydusTarfsParents]; ok { + parents = strings.Split(parentList, ",") + } else { + if !config.GetTarfsMountOnHost() { + rafs.SetMountpoint(upperDirPath) + } + return nil + } + + var devices []string + for idx := 0; idx < len(parents); idx++ { + snapshotID := parents[idx] + st, err := t.waitLayerReady(snapshotID, true) + if err != nil { + return errors.Wrapf(err, "wait for tarfs conversion task") } + if st.dataLoopdev == nil { + blobTarFilePath := t.layerTarFilePath(st.blobID) + loopdev, err := t.attachLoopdev(blobTarFilePath) + if err != nil { + st.mutex.Unlock() + return errors.Wrapf(err, "attach layer tar file %s to loopdev", blobTarFilePath) + } + st.dataLoopdev = loopdev + } + devices = append(devices, "device="+st.dataLoopdev.Name()) + st.mutex.Unlock() } + devices = append(devices, "ro") mountOpts := strings.Join(devices, ",") st, err := t.getSnapshotStatusWithLock(snapshotID) @@ -686,6 +768,7 @@ func (t *Manager) MountTarErofs(snapshotID string, s *storage.Snapshot, labels m } if st.metaLoopdev == nil { + mergedBootstrap := t.imageMetaFilePath(upperDirPath) loopdev, err := t.attachLoopdev(mergedBootstrap) if err != nil { return errors.Wrapf(err, "attach merged bootstrap %s to loopdev", mergedBootstrap) @@ -768,6 +851,43 @@ func (t *Manager) DetachLayer(snapshotID string) error { return nil } +func (t *Manager) RecoverSnapshoInfo(ctx context.Context, id string, info snapshots.Info, upperPath string) error { + t.mutex.Lock() + defer t.mutex.Unlock() + log.L.Infof("recover tarfs snapshot %s with path %s", id, upperPath) + + if _, ok := t.snapshotMap[id]; ok { + // RecoverSnapshotInfo() is called after RecoverRafsInstance(), so there may be some snapshots already exist. + return nil + } + + layerMetaFilePath := t.layerMetaFilePath(upperPath) + if _, err := os.Stat(layerMetaFilePath); err == nil { + layerDigest := digest.Digest(info.Labels[label.CRILayerDigest]) + if err := layerDigest.Validate(); err != nil { + return errors.Wrapf(err, "fetch layer digest label") + } + ctx, cancel := context.WithCancel(context.Background()) + t.snapshotMap[id] = &snapshotStatus{ + status: TarfsStatusReady, + blobID: layerDigest.Hex(), + cancel: cancel, + ctx: ctx, + } + } else { + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + wg.Add(1) + t.snapshotMap[id] = &snapshotStatus{ + status: TarfsStatusFailed, + wg: wg, + cancel: cancel, + ctx: ctx, + } + } + return nil +} + // This method is called in single threaded mode during startup, so we do not lock `snapshotStatus` objects. func (t *Manager) RecoverRafsInstance(r *rafs.Rafs) error { t.mutex.Lock() @@ -793,6 +913,11 @@ func (t *Manager) RecoverRafsInstance(r *rafs.Rafs) error { if !mounted || err != nil { mountPoint = "" } + if !mounted && err == nil { + if _, ok := r.Annotations[label.NydusTarfsParents]; ok { + t.RemountMap[r.SnapshotID] = r + } + } t.snapshotMap[r.SnapshotID] = &snapshotStatus{ status: TarfsStatusReady, blobID: layerDigest.Hex(), @@ -843,16 +968,8 @@ func (t *Manager) waitLayerReady(snapshotID string, lock bool) (*snapshotStatus, } if st.status != TarfsStatusReady { + state := tarfsStatusString(st.status) st.mutex.Unlock() - var state string - switch st.status { - case TarfsStatusPrepare: - state = "Prepare" - case TarfsStatusFailed: - state = "Failed" - default: - state = "Unknown" - } return nil, errors.Errorf("snapshot %s is in %s state instead of Ready", snapshotID, state) } @@ -960,3 +1077,16 @@ func (t *Manager) layerMetaFilePath(upperDirPath string) string { func (t *Manager) imageMetaFilePath(upperDirPath string) string { return filepath.Join(upperDirPath, "image", TarfsImageBootstrapName) } + +func tarfsStatusString(status int) string { + switch status { + case TarfsStatusReady: + return "Ready" + case TarfsStatusPrepare: + return "Prepare" + case TarfsStatusFailed: + return "Failed" + default: + return "Unknown" + } +} diff --git a/snapshot/snapshot.go b/snapshot/snapshot.go index a889312d7c..640c83de3d 100644 --- a/snapshot/snapshot.go +++ b/snapshot/snapshot.go @@ -34,6 +34,7 @@ import ( "github.com/containerd/nydus-snapshotter/pkg/metrics" "github.com/containerd/nydus-snapshotter/pkg/metrics/collector" "github.com/containerd/nydus-snapshotter/pkg/pprof" + "github.com/containerd/nydus-snapshotter/pkg/rafs" "github.com/containerd/nydus-snapshotter/pkg/referrer" "github.com/containerd/nydus-snapshotter/pkg/system" "github.com/containerd/nydus-snapshotter/pkg/tarfs" @@ -224,8 +225,9 @@ func NewSnapshotter(ctx context.Context, cfg *config.SnapshotterConfig) (snapsho opts = append(opts, filesystem.WithReferrerManager(referrerMgr)) } + var tarfsMgr *tarfs.Manager if cfg.Experimental.TarfsConfig.EnableTarfs { - tarfsMgr := tarfs.NewManager(skipSSLVerify, cfg.Experimental.TarfsConfig.TarfsHint, + tarfsMgr = tarfs.NewManager(skipSSLVerify, cfg.Experimental.TarfsConfig.TarfsHint, cacheConfig.CacheDir, cfg.DaemonConfig.NydusImagePath, int64(cfg.Experimental.TarfsConfig.MaxConcurrentProc)) opts = append(opts, filesystem.WithTarfsManager(tarfsMgr)) @@ -283,7 +285,7 @@ func NewSnapshotter(ctx context.Context, cfg *config.SnapshotterConfig) (snapsho syncRemove = true } - return &snapshotter{ + snapshotter := &snapshotter{ root: cfg.Root, nydusdPath: cfg.DaemonConfig.NydusdPath, ms: ms, @@ -293,7 +295,41 @@ func NewSnapshotter(ctx context.Context, cfg *config.SnapshotterConfig) (snapsho enableNydusOverlayFS: cfg.SnapshotsConfig.EnableNydusOverlayFS, enableKataVolume: cfg.SnapshotsConfig.EnableKataVolume, cleanupOnClose: cfg.CleanupOnClose, - }, nil + } + + // There's special requirement to recover tarfs instance because it depdens on `snapshotter.ms` + // so it can't be done in `Filesystem.Recover()` + if tarfsMgr != nil { + snapshotter.recoverTarfs(ctx, tarfsMgr) + } + + return snapshotter, nil +} + +func (o *snapshotter) recoverTarfs(ctx context.Context, tarfsMgr *tarfs.Manager) { + // First recover all snapshot information related to tarfs, mount operation depends on snapshots. + _ = o.Walk(ctx, func(ctx context.Context, i snapshots.Info) error { + if _, ok := i.Labels[label.NydusTarfsLayer]; ok { + id, _, _, err := snapshot.GetSnapshotInfo(ctx, o.ms, i.Name) + if err != nil { + return errors.Wrapf(err, "get id for snapshot %s", i.Name) + } + log.L.Infof("found tarfs snapshot %s with name %s", id, i.Name) + upperPath := o.upperPath(id) + if err = tarfsMgr.RecoverSnapshoInfo(ctx, id, i, upperPath); err != nil { + return errors.Wrapf(err, "get id for snapshot %s", i.Name) + } + } + return nil + }) + + for id, r := range tarfsMgr.RemountMap { + log.L.Infof("remount tarfs snapshot %s", id) + if err := tarfsMgr.RemountErofs(id, r); err != nil { + log.L.Warnf("failed to remount EROFS filesystem for tarfs, %s", err) + } + } + tarfsMgr.RemountMap = map[string]*rafs.Rafs{} } func (o *snapshotter) Cleanup(ctx context.Context) error { From 54e2afbbd691f3228c03d6916b8fb2b0d864fed2 Mon Sep 17 00:00:00 2001 From: Jiang Liu Date: Sun, 19 Nov 2023 18:06:35 +0800 Subject: [PATCH 10/11] tarfs: add unit test cases for tarfs Add unit test cases for tarfs. Signed-off-by: Jiang Liu --- .github/workflows/ci.yml | 16 +++++ pkg/daemon/config_test.go | 50 +++++++++++++++ pkg/daemon/idgen_test.go | 22 +++++++ pkg/rafs/rafs_test.go | 66 ++++++++++++++++++++ pkg/tarfs/tarfs_test.go | 127 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 281 insertions(+) create mode 100644 pkg/daemon/config_test.go create mode 100644 pkg/daemon/idgen_test.go create mode 100644 pkg/rafs/rafs_test.go create mode 100644 pkg/tarfs/tarfs_test.go diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5c9bd3874d..d7d0527cbc 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -28,6 +28,14 @@ jobs: key: ${{ runner.os }}-go-${{ hashFiles('go.sum') }} restore-keys: | ${{ runner.os }}-go + - name: Setup Nydus + run: | + # Download nydus components + NYDUS_VER=v$(curl -s "https://api.github.com/repos/dragonflyoss/nydus/releases/latest" | jq -r .tag_name | sed 's/^v//') + wget https://github.com/dragonflyoss/nydus/releases/download/$NYDUS_VER/nydus-static-$NYDUS_VER-linux-amd64.tgz + tar xzvf nydus-static-$NYDUS_VER-linux-amd64.tgz + mkdir -p /usr/bin + sudo mv nydus-static/nydus-image /usr/bin/ - name: Build run: | go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.51.2 @@ -135,6 +143,14 @@ jobs: key: ${{ runner.os }}-go-${{ hashFiles('go.sum') }} restore-keys: | ${{ runner.os }}-go + - name: Setup Nydus + run: | + # Download nydus components + NYDUS_VER=v$(curl --header 'authorization: Bearer ${{ secrets.GITHUB_TOKEN }}' -s "https://api.github.com/repos/dragonflyoss/nydus/releases/latest" | jq -r .tag_name | sed 's/^v//') + wget https://github.com/dragonflyoss/nydus/releases/download/$NYDUS_VER/nydus-static-$NYDUS_VER-linux-amd64.tgz + tar xzvf nydus-static-$NYDUS_VER-linux-amd64.tgz + mkdir -p /usr/bin + sudo mv nydus-static/nydus-image /usr/bin/ - name: Run unit tests. run: make cover - name: Upload coverage to Codecov diff --git a/pkg/daemon/config_test.go b/pkg/daemon/config_test.go new file mode 100644 index 0000000000..7b36409f76 --- /dev/null +++ b/pkg/daemon/config_test.go @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2023. Nydus Developers. All rights reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package daemon + +import ( + "testing" + + "github.com/containerd/nydus-snapshotter/config" + "gotest.tools/assert" +) + +func TestConfigOptions(t *testing.T) { + tmpDir := t.TempDir() + opts := []NewDaemonOpt{ + WithSocketDir("/tmp/socket"), + WithRef(5), + WithLogDir(tmpDir), + WithLogToStdout(true), + WithLogLevel("Warning"), + WithLogRotationSize(1024), + WithConfigDir(tmpDir), + WithMountpoint("/tmp/mnt"), + WithNydusdThreadNum(4), + WithFsDriver("fscache"), + WithDaemonMode("dedicated"), + } + + daemon, err := NewDaemon(opts...) + assert.Assert(t, err) + assert.Equal(t, daemon.States.APISocket, "/tmp/socket/"+daemon.ID()+"/api.sock") + assert.Equal(t, daemon.ref, int32(5)) + assert.Equal(t, daemon.States.LogDir, tmpDir+"/"+daemon.ID()) + assert.Equal(t, daemon.States.LogToStdout, true) + assert.Equal(t, daemon.States.LogLevel, "Warning") + assert.Equal(t, daemon.States.LogRotationSize, 1024) + assert.Equal(t, daemon.States.ConfigDir, tmpDir+"/"+daemon.ID()) + assert.Equal(t, daemon.States.Mountpoint, "/tmp/mnt") + assert.Equal(t, daemon.States.ThreadNum, 4) + assert.Equal(t, daemon.States.FsDriver, "fscache") + assert.Equal(t, string(daemon.States.DaemonMode), "dedicated") + +} + +func String(daemonMode config.DaemonMode) { + panic("unimplemented") +} diff --git a/pkg/daemon/idgen_test.go b/pkg/daemon/idgen_test.go new file mode 100644 index 0000000000..a3b5f8e25a --- /dev/null +++ b/pkg/daemon/idgen_test.go @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2023. Nydus Developers. All rights reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package daemon + +import ( + "testing" + + "gotest.tools/assert" +) + +func TestIdGenerate(t *testing.T) { + id1 := newID() + id2 := newID() + + assert.Assert(t, len(id1) > 0) + assert.Assert(t, len(id2) > 0) + assert.Assert(t, id1 != id2) +} diff --git a/pkg/rafs/rafs_test.go b/pkg/rafs/rafs_test.go new file mode 100644 index 0000000000..8e65161c9b --- /dev/null +++ b/pkg/rafs/rafs_test.go @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2022. Nydus Developers. All rights reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package rafs + +import ( + "testing" + + "github.com/containerd/nydus-snapshotter/config" + "github.com/containerd/nydus-snapshotter/internal/constant" + "gotest.tools/assert" +) + +func TestRafsSetEmpty(t *testing.T) { + cache := NewRafsCache() + + assert.Assert(t, cache.Get("rafs1") == nil) + assert.Equal(t, cache.Len(), 0) + assert.Assert(t, cache.Head() == nil) + instances := cache.List() + assert.Equal(t, len(instances), 0) +} + +func TestRafs(t *testing.T) { + tmpDir := t.TempDir() + snapshotterConfig := config.SnapshotterConfig{} + snapshotterConfig.Root = tmpDir + snapshotterConfig.DaemonMode = constant.DaemonModeDedicated + assert.Assert(t, config.ProcessConfigurations(&snapshotterConfig)) + + rafs, err := NewRafs("snapshot1", "image1", "fscache") + assert.Assert(t, err) + assert.Equal(t, rafs, RafsGlobalCache.Get("snapshot1")) + assert.Equal(t, RafsGlobalCache.Len(), 1) + assert.Equal(t, rafs, RafsGlobalCache.Head()) + instances := RafsGlobalCache.List() + assert.Equal(t, len(instances), 1) + assert.Equal(t, instances["snapshot1"].SnapshotID, "snapshot1") + + RafsGlobalCache.Lock() + instances2 := RafsGlobalCache.ListLocked() + RafsGlobalCache.Unlock() + assert.Equal(t, len(instances2), 1) + + RafsGlobalCache.SetIntances(instances) + assert.Equal(t, RafsGlobalCache.Len(), 1) + assert.Equal(t, RafsGlobalCache.Head().SnapshotID, "snapshot1") + + assert.Equal(t, len(rafs.Annotations), 0) + rafs.AddAnnotation("key", "value") + assert.Equal(t, len(rafs.Annotations), 1) + assert.Equal(t, rafs.GetSnapshotDir(), tmpDir+"/snapshots/snapshot1") + assert.Equal(t, rafs.RelaMountpoint(), "/snapshot1") + assert.Equal(t, rafs.FscacheWorkDir(), tmpDir+"/snapshots/snapshot1/fs") + assert.Equal(t, rafs.GetFsDriver(), "fscache") + rafs.SetMountpoint("/tmp/mnt") + assert.Equal(t, rafs.GetMountpoint(), "/tmp/mnt") + _, err = rafs.BootstrapFile() + assert.Assert(t, err != nil) + + RafsGlobalCache.Remove("snapshot1") + assert.Equal(t, RafsGlobalCache.Len(), 0) +} diff --git a/pkg/tarfs/tarfs_test.go b/pkg/tarfs/tarfs_test.go new file mode 100644 index 0000000000..54c9309521 --- /dev/null +++ b/pkg/tarfs/tarfs_test.go @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2023. Nydus Developers. All rights reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package tarfs + +import ( + "context" + "testing" + + "github.com/containerd/continuity/testutil" + "github.com/containerd/nydus-snapshotter/pkg/label" + "github.com/containerd/nydus-snapshotter/pkg/rafs" + "github.com/opencontainers/go-digest" + "gotest.tools/assert" +) + +const ( + BusyboxRef = "quay.io/quay/busybox@sha256:92f3298bf80a1ba949140d77987f5de081f010337880cd771f7e7fc928f8c74d" + BusyboxManifestDigest = "sha256:92f3298bf80a1ba949140d77987f5de081f010337880cd771f7e7fc928f8c74d" + BusyboxLayerDigest = "sha256:ee780d08a5b4de5192a526d422987f451d9a065e6da42aefe8c3b20023a250c7" + NydusImagePath = "nydus-image" +) + +// TODO: add unit test for MergeLayers, ExportBlockData, MountErofs, RemountErofs, UmountTarErofs, DetachLayer, +// RecoverSnapshoInfo, RecoverRafsInstance, getImageBlobInfo + +func TestPrepareLayer(t *testing.T) { + manager := NewManager(true, true, t.TempDir(), NydusImagePath, 4) + manifestDigest, err := digest.Parse(BusyboxManifestDigest) + assert.Assert(t, err) + layerDigest, err := digest.Parse(BusyboxLayerDigest) + assert.Assert(t, err) + + err = manager.PrepareLayer("snapshot1", BusyboxRef, manifestDigest, layerDigest, t.TempDir()) + assert.Assert(t, err) + + snapshot, err := manager.waitLayerReady("snapshot1", true) + // tarfs_test.go:36: assertion failed: error is not nil: generate tarfs from image layer blob: converting OCIv1 layer blob to tarfs: exec: "nydus-image": executable file not found in $PATH + assert.Assert(t, err != nil) + if err == nil { + assert.Equal(t, snapshot.blobID, "ee780d08a5b4de5192a526d422987f451d9a065e6da42aefe8c3b20023a250c7") + } + + err = manager.PrepareLayer("snapshot1", BusyboxRef, manifestDigest, layerDigest, t.TempDir()) + assert.Assert(t, err != nil) +} + +func TestBlobProcess(t *testing.T) { + manager := NewManager(true, true, t.TempDir(), NydusImagePath, 4) + manifestDigest, err := digest.Parse(BusyboxManifestDigest) + assert.Assert(t, err) + layerDigest, err := digest.Parse(BusyboxLayerDigest) + assert.Assert(t, err) + + err = manager.blobProcess(context.Background(), "snapshot2", BusyboxRef, manifestDigest, layerDigest, t.TempDir(), true) + assert.Assert(t, err != nil) +} + +func TestCheckTarfsHintAnnotation(t *testing.T) { + manager := NewManager(true, true, t.TempDir(), NydusImagePath, 4) + ctx := context.Background() + hint, err := manager.CheckTarfsHintAnnotation(ctx, BusyboxRef, BusyboxManifestDigest) + assert.Assert(t, err) + assert.Equal(t, hint, false) +} + +func TestGetConcurrentLimiter(t *testing.T) { + manager := NewManager(false, false, t.TempDir(), NydusImagePath, 4) + limiter := manager.GetConcurrentLimiter("busybox") + assert.Assert(t, limiter != nil) + assert.Equal(t, manager.GetConcurrentLimiter("busybox"), limiter) + +} + +func TestCopyTarfsAnnotations(t *testing.T) { + manager := NewManager(false, false, t.TempDir(), NydusImagePath, 4) + rafs := &rafs.Rafs{ + Annotations: make(map[string]string), + } + + annotations := map[string]string{} + annotations[label.CRIImageRef] = "cri_image_ref" + annotations[label.CRILayerDigest] = "cri_layer_digest" + annotations[label.CRIManifestDigest] = "cri_manigest_digest" + annotations[label.NydusTarfsLayer] = "nydus_tarfs_layer" + annotations[label.NydusImageBlockInfo] = "nydus_image_block_info" + annotations[label.NydusLayerBlockInfo] = "nydus_layer_block_info" + annotations["unsupported_key"] = "error" + + manager.copyTarfsAnnotations(annotations, rafs) + assert.Equal(t, len(rafs.Annotations), 6) + assert.Equal(t, rafs.Annotations[label.CRIImageRef], annotations[label.CRIImageRef]) + assert.Equal(t, rafs.Annotations[label.CRILayerDigest], annotations[label.CRILayerDigest]) +} + +func TestTarfsFilePath(t *testing.T) { + manager := NewManager(false, false, "/tmp/tarfs", NydusImagePath, 4) + + assert.Equal(t, manager.layerTarFilePath("blob1"), "/tmp/tarfs/blob1") + assert.Equal(t, manager.layerDiskFilePath("blob1"), "/tmp/tarfs/blob1.layer.disk") + assert.Equal(t, manager.ImageDiskFilePath("blob1"), "/tmp/tarfs/blob1.image.disk") + assert.Equal(t, manager.layerMetaFilePath("/tarfs/fs"), "/tarfs/fs/image/layer.boot") + assert.Equal(t, manager.imageMetaFilePath("/tarfs/fs"), "/tarfs/fs/image/image.boot") +} + +func TestTarfsStatusString(t *testing.T) { + assert.Equal(t, tarfsStatusString(TarfsStatusReady), "Ready") + assert.Equal(t, tarfsStatusString(TarfsStatusPrepare), "Prepare") + assert.Equal(t, tarfsStatusString(TarfsStatusFailed), "Failed") + assert.Equal(t, tarfsStatusString(4), "Unknown") +} + +func TestAttachBlob(t *testing.T) { + testutil.RequiresRoot(t) + + manager := NewManager(false, false, t.TempDir(), NydusImagePath, 4) + blobFile := createTempFile(t) + loopdev, err := manager.attachLoopdev(blobFile) + assert.Assert(t, err) + err = deleteLoop(loopdev) + assert.Assert(t, err) + err = deleteLoop(loopdev) + assert.Assert(t, err != nil) +} From e0124e7e6098b788c1e2e6a9855be86b9fbd3fe6 Mon Sep 17 00:00:00 2001 From: Jiang Liu Date: Mon, 20 Nov 2023 10:13:47 +0800 Subject: [PATCH 11/11] tarfs: fix a data race condition Fix a data race condition WARNING: DATA RACE Write at 0x00c000178428 by goroutine 27: github.com/containerd/nydus-snapshotter/pkg/remote/remotes/docker.(*httpReadSeeker).Close() /home/runner/work/nydus-snapshotter/nydus-snapshotter/pkg/remote/remotes/docker/httpreadseeker.go:87 +0x57 github.com/containerd/nydus-snapshotter/pkg/tarfs.(*Manager).blobProcess.func2.1() /home/runner/work/nydus-snapshotter/nydus-snapshotter/pkg/tarfs/tarfs.go:339 +0x48 runtime.deferreturn() /opt/hostedtoolcache/go/1.20.1/x64/src/runtime/panic.go:476 +0x32 github.com/containerd/nydus-snapshotter/pkg/tarfs.(*Manager).blobProcess.func3() /home/runner/work/nydus-snapshotter/nydus-snapshotter/pkg/tarfs/tarfs.go:394 +0x71 Previous read at 0x00c000178428 by goroutine 40: github.com/containerd/nydus-snapshotter/pkg/remote/remotes/docker.(*httpReadSeeker).Read() /home/runner/work/nydus-snapshotter/nydus-snapshotter/pkg/remote/remotes/docker/httpreadseeker.go:48 +0x68 bufio.(*Reader).Read() /opt/hostedtoolcache/go/1.20.1/x64/src/bufio/bufio.go:223 +0x2c3 github.com/containerd/containerd/archive/compression.(*bufferedReader).Read() /home/runner/go/pkg/mod/github.com/containerd/containerd@v1.7.0/archive/compression/compression.go:113 +0xa4 io.copyBuffer() /opt/hostedtoolcache/go/1.20.1/x64/src/io/io.go:427 +0x28d io.Copy() /opt/hostedtoolcache/go/1.20.1/x64/src/io/io.go:386 +0x88 os.genericReadFrom() /opt/hostedtoolcache/go/1.20.1/x64/src/os/file.go:161 +0x34 os.(*File).ReadFrom() /opt/hostedtoolcache/go/1.20.1/x64/src/os/file.go:155 +0x324 io.copyBuffer() /opt/hostedtoolcache/go/1.20.1/x64/src/io/io.go:413 +0x1c5 io.Copy() /opt/hostedtoolcache/go/1.20.1/x64/src/io/io.go:386 +0x84 os/exec.(*Cmd).childStdin.func1() /opt/hostedtoolcache/go/1.20.1/x64/src/os/exec/exec.go:511 +0x45 os/exec.(*Cmd).Start.func2() /opt/hostedtoolcache/go/1.20.1/x64/src/os/exec/exec.go:717 +0x42 os/exec.(*Cmd).Start.func3() /opt/hostedtoolcache/go/1.20.1/x64/src/os/exec/exec.go:729 +0x47 Goroutine 27 (running) created at: github.com/containerd/nydus-snapshotter/pkg/tarfs.(*Manager).blobProcess() /home/runner/work/nydus-snapshotter/nydus-snapshotter/pkg/tarfs/tarfs.go:393 +0x9dd github.com/containerd/nydus-snapshotter/pkg/tarfs.(*Manager).PrepareLayer() /home/runner/work/nydus-snapshotter/nydus-snapshotter/pkg/tarfs/tarfs.go:465 +0x444 github.com/containerd/nydus-snapshotter/pkg/tarfs.TestPrepareLayer() /home/runner/work/nydus-snapshotter/nydus-snapshotter/pkg/tarfs/tarfs_test.go:33 +0x188 testing.tRunner() /opt/hostedtoolcache/go/1.20.1/x64/src/testing/testing.go:1576 +0x216 testing.(*T).Run.func1() /opt/hostedtoolcache/go/1.20.1/x64/src/testing/testing.go:1629 +0x47 Goroutine 40 (finished) created at: os/exec.(*Cmd).Start() /opt/hostedtoolcache/go/1.20.1/x64/src/os/exec/exec.go:716 +0xf8e github.com/containerd/containerd/archive/compression.cmdStream() /home/runner/go/pkg/mod/github.com/containerd/containerd@v1.7.0/archive/compression/compression.go:284 +0x36f github.com/containerd/containerd/archive/compression.gzipDecompress() /home/runner/go/pkg/mod/github.com/containerd/containerd@v1.7.0/archive/compression/compression.go:272 +0x152 github.com/containerd/containerd/archive/compression.DecompressStream() /home/runner/go/pkg/mod/github.com/containerd/containerd@v1.7.0/archive/compression/compression.go:203 +0x3e4 github.com/containerd/nydus-snapshotter/pkg/tarfs.(*Manager).blobProcess.func2() /home/runner/work/nydus-snapshotter/nydus-snapshotter/pkg/tarfs/tarfs.go:341 +0x1b1 github.com/containerd/nydus-snapshotter/pkg/tarfs.(*Manager).blobProcess.func3() /home/runner/work/nydus-snapshotter/nydus-snapshotter/pkg/tarfs/tarfs.go:394 +0x71 ================== testing.go:1446: race detected during execution of test Signed-off-by: Jiang Liu --- pkg/tarfs/tarfs.go | 35 ++++++++++++++++++++++++++--------- 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/pkg/tarfs/tarfs.go b/pkg/tarfs/tarfs.go index 90382c1f68..a60a0b44af 100755 --- a/pkg/tarfs/tarfs.go +++ b/pkg/tarfs/tarfs.go @@ -20,6 +20,7 @@ import ( "strings" "sync" "syscall" + "time" "github.com/containerd/containerd/archive/compression" "github.com/containerd/containerd/log" @@ -213,7 +214,7 @@ func (t *Manager) getBlobStream(ctx context.Context, remote *remote.Remote, ref } // generate tar file and layer bootstrap, return if this blob is an empty blob -func (t *Manager) generateBootstrap(tarReader io.Reader, snapshotID, layerBlobID, upperDirPath string) (err error) { +func (t *Manager) generateBootstrap(tarReader io.Reader, snapshotID, layerBlobID, upperDirPath string, w *sync.WaitGroup) (err error) { snapshotImageDir := filepath.Join(upperDirPath, "image") if err := os.MkdirAll(snapshotImageDir, 0750); err != nil { return errors.Wrapf(err, "create data dir %s for tarfs snapshot", snapshotImageDir) @@ -235,20 +236,33 @@ func (t *Manager) generateBootstrap(tarReader io.Reader, snapshotID, layerBlobID defer os.Remove(layerTarFileTmp) fifoName := filepath.Join(upperDirPath, "layer_"+snapshotID+"_"+"tar.fifo") - if err = syscall.Mkfifo(fifoName, 0644); err != nil { + if err = syscall.Mkfifo(fifoName, 0640); err != nil { return err } defer os.Remove(fifoName) + w.Add(1) go func() { - fifoFile, err := os.OpenFile(fifoName, os.O_WRONLY, os.ModeNamedPipe) - if err != nil { - log.L.Warnf("can not open fifo file, err %v", err) - return + defer w.Done() + + var fifoFile *os.File + for i := 1; i < 100 && fifoFile == nil; i++ { + file, err := os.OpenFile(fifoName, os.O_RDWR, os.ModeNamedPipe) + switch { + case err == nil: + fifoFile = file + case os.IsNotExist(err) || os.IsPermission(err): + log.L.Warnf("open fifo file, %v", err) + return + default: + log.L.Warnf("open fifo file, %v", err) + time.Sleep(time.Duration(i) * 10 * time.Millisecond) + } } defer fifoFile.Close() + if _, err := io.Copy(fifoFile, io.TeeReader(tarReader, tarFile)); err != nil { - log.L.Warnf("tar stream copy err %v", err) + log.L.Warnf("tar stream copy, %v", err) } }() @@ -338,6 +352,9 @@ func (t *Manager) blobProcess(ctx context.Context, snapshotID, ref string, process := func(rc io.ReadCloser, remote *remote.Remote) error { defer rc.Close() + var w sync.WaitGroup + defer w.Wait() + ds, err := compression.DecompressStream(rc) if err != nil { return epilog(err, "unpack layer blob stream for tarfs") @@ -351,7 +368,7 @@ func (t *Manager) blobProcess(ctx context.Context, snapshotID, ref string, } digester := digest.Canonical.Digester() dr := io.TeeReader(ds, digester.Hash()) - err = t.generateBootstrap(dr, snapshotID, layerBlobID, upperDirPath) + err = t.generateBootstrap(dr, snapshotID, layerBlobID, upperDirPath, &w) switch { case err != nil && !errdefs.IsAlreadyExists(err): return epilog(err, "generate tarfs from image layer blob") @@ -362,7 +379,7 @@ func (t *Manager) blobProcess(ctx context.Context, snapshotID, ref string, return epilog(nil, msg) } } else { - err = t.generateBootstrap(ds, snapshotID, layerBlobID, upperDirPath) + err = t.generateBootstrap(ds, snapshotID, layerBlobID, upperDirPath, &w) if err != nil && !errdefs.IsAlreadyExists(err) { return epilog(err, "generate tarfs data from image layer blob") }