From e0124e7e6098b788c1e2e6a9855be86b9fbd3fe6 Mon Sep 17 00:00:00 2001 From: Jiang Liu Date: Mon, 20 Nov 2023 10:13:47 +0800 Subject: [PATCH] tarfs: fix a data race condition Fix a data race condition WARNING: DATA RACE Write at 0x00c000178428 by goroutine 27: github.com/containerd/nydus-snapshotter/pkg/remote/remotes/docker.(*httpReadSeeker).Close() /home/runner/work/nydus-snapshotter/nydus-snapshotter/pkg/remote/remotes/docker/httpreadseeker.go:87 +0x57 github.com/containerd/nydus-snapshotter/pkg/tarfs.(*Manager).blobProcess.func2.1() /home/runner/work/nydus-snapshotter/nydus-snapshotter/pkg/tarfs/tarfs.go:339 +0x48 runtime.deferreturn() /opt/hostedtoolcache/go/1.20.1/x64/src/runtime/panic.go:476 +0x32 github.com/containerd/nydus-snapshotter/pkg/tarfs.(*Manager).blobProcess.func3() /home/runner/work/nydus-snapshotter/nydus-snapshotter/pkg/tarfs/tarfs.go:394 +0x71 Previous read at 0x00c000178428 by goroutine 40: github.com/containerd/nydus-snapshotter/pkg/remote/remotes/docker.(*httpReadSeeker).Read() /home/runner/work/nydus-snapshotter/nydus-snapshotter/pkg/remote/remotes/docker/httpreadseeker.go:48 +0x68 bufio.(*Reader).Read() /opt/hostedtoolcache/go/1.20.1/x64/src/bufio/bufio.go:223 +0x2c3 github.com/containerd/containerd/archive/compression.(*bufferedReader).Read() /home/runner/go/pkg/mod/github.com/containerd/containerd@v1.7.0/archive/compression/compression.go:113 +0xa4 io.copyBuffer() /opt/hostedtoolcache/go/1.20.1/x64/src/io/io.go:427 +0x28d io.Copy() /opt/hostedtoolcache/go/1.20.1/x64/src/io/io.go:386 +0x88 os.genericReadFrom() /opt/hostedtoolcache/go/1.20.1/x64/src/os/file.go:161 +0x34 os.(*File).ReadFrom() /opt/hostedtoolcache/go/1.20.1/x64/src/os/file.go:155 +0x324 io.copyBuffer() /opt/hostedtoolcache/go/1.20.1/x64/src/io/io.go:413 +0x1c5 io.Copy() /opt/hostedtoolcache/go/1.20.1/x64/src/io/io.go:386 +0x84 os/exec.(*Cmd).childStdin.func1() /opt/hostedtoolcache/go/1.20.1/x64/src/os/exec/exec.go:511 +0x45 os/exec.(*Cmd).Start.func2() /opt/hostedtoolcache/go/1.20.1/x64/src/os/exec/exec.go:717 +0x42 os/exec.(*Cmd).Start.func3() /opt/hostedtoolcache/go/1.20.1/x64/src/os/exec/exec.go:729 +0x47 Goroutine 27 (running) created at: github.com/containerd/nydus-snapshotter/pkg/tarfs.(*Manager).blobProcess() /home/runner/work/nydus-snapshotter/nydus-snapshotter/pkg/tarfs/tarfs.go:393 +0x9dd github.com/containerd/nydus-snapshotter/pkg/tarfs.(*Manager).PrepareLayer() /home/runner/work/nydus-snapshotter/nydus-snapshotter/pkg/tarfs/tarfs.go:465 +0x444 github.com/containerd/nydus-snapshotter/pkg/tarfs.TestPrepareLayer() /home/runner/work/nydus-snapshotter/nydus-snapshotter/pkg/tarfs/tarfs_test.go:33 +0x188 testing.tRunner() /opt/hostedtoolcache/go/1.20.1/x64/src/testing/testing.go:1576 +0x216 testing.(*T).Run.func1() /opt/hostedtoolcache/go/1.20.1/x64/src/testing/testing.go:1629 +0x47 Goroutine 40 (finished) created at: os/exec.(*Cmd).Start() /opt/hostedtoolcache/go/1.20.1/x64/src/os/exec/exec.go:716 +0xf8e github.com/containerd/containerd/archive/compression.cmdStream() /home/runner/go/pkg/mod/github.com/containerd/containerd@v1.7.0/archive/compression/compression.go:284 +0x36f github.com/containerd/containerd/archive/compression.gzipDecompress() /home/runner/go/pkg/mod/github.com/containerd/containerd@v1.7.0/archive/compression/compression.go:272 +0x152 github.com/containerd/containerd/archive/compression.DecompressStream() /home/runner/go/pkg/mod/github.com/containerd/containerd@v1.7.0/archive/compression/compression.go:203 +0x3e4 github.com/containerd/nydus-snapshotter/pkg/tarfs.(*Manager).blobProcess.func2() /home/runner/work/nydus-snapshotter/nydus-snapshotter/pkg/tarfs/tarfs.go:341 +0x1b1 github.com/containerd/nydus-snapshotter/pkg/tarfs.(*Manager).blobProcess.func3() /home/runner/work/nydus-snapshotter/nydus-snapshotter/pkg/tarfs/tarfs.go:394 +0x71 ================== testing.go:1446: race detected during execution of test Signed-off-by: Jiang Liu --- pkg/tarfs/tarfs.go | 35 ++++++++++++++++++++++++++--------- 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/pkg/tarfs/tarfs.go b/pkg/tarfs/tarfs.go index 90382c1f68..a60a0b44af 100755 --- a/pkg/tarfs/tarfs.go +++ b/pkg/tarfs/tarfs.go @@ -20,6 +20,7 @@ import ( "strings" "sync" "syscall" + "time" "github.com/containerd/containerd/archive/compression" "github.com/containerd/containerd/log" @@ -213,7 +214,7 @@ func (t *Manager) getBlobStream(ctx context.Context, remote *remote.Remote, ref } // generate tar file and layer bootstrap, return if this blob is an empty blob -func (t *Manager) generateBootstrap(tarReader io.Reader, snapshotID, layerBlobID, upperDirPath string) (err error) { +func (t *Manager) generateBootstrap(tarReader io.Reader, snapshotID, layerBlobID, upperDirPath string, w *sync.WaitGroup) (err error) { snapshotImageDir := filepath.Join(upperDirPath, "image") if err := os.MkdirAll(snapshotImageDir, 0750); err != nil { return errors.Wrapf(err, "create data dir %s for tarfs snapshot", snapshotImageDir) @@ -235,20 +236,33 @@ func (t *Manager) generateBootstrap(tarReader io.Reader, snapshotID, layerBlobID defer os.Remove(layerTarFileTmp) fifoName := filepath.Join(upperDirPath, "layer_"+snapshotID+"_"+"tar.fifo") - if err = syscall.Mkfifo(fifoName, 0644); err != nil { + if err = syscall.Mkfifo(fifoName, 0640); err != nil { return err } defer os.Remove(fifoName) + w.Add(1) go func() { - fifoFile, err := os.OpenFile(fifoName, os.O_WRONLY, os.ModeNamedPipe) - if err != nil { - log.L.Warnf("can not open fifo file, err %v", err) - return + defer w.Done() + + var fifoFile *os.File + for i := 1; i < 100 && fifoFile == nil; i++ { + file, err := os.OpenFile(fifoName, os.O_RDWR, os.ModeNamedPipe) + switch { + case err == nil: + fifoFile = file + case os.IsNotExist(err) || os.IsPermission(err): + log.L.Warnf("open fifo file, %v", err) + return + default: + log.L.Warnf("open fifo file, %v", err) + time.Sleep(time.Duration(i) * 10 * time.Millisecond) + } } defer fifoFile.Close() + if _, err := io.Copy(fifoFile, io.TeeReader(tarReader, tarFile)); err != nil { - log.L.Warnf("tar stream copy err %v", err) + log.L.Warnf("tar stream copy, %v", err) } }() @@ -338,6 +352,9 @@ func (t *Manager) blobProcess(ctx context.Context, snapshotID, ref string, process := func(rc io.ReadCloser, remote *remote.Remote) error { defer rc.Close() + var w sync.WaitGroup + defer w.Wait() + ds, err := compression.DecompressStream(rc) if err != nil { return epilog(err, "unpack layer blob stream for tarfs") @@ -351,7 +368,7 @@ func (t *Manager) blobProcess(ctx context.Context, snapshotID, ref string, } digester := digest.Canonical.Digester() dr := io.TeeReader(ds, digester.Hash()) - err = t.generateBootstrap(dr, snapshotID, layerBlobID, upperDirPath) + err = t.generateBootstrap(dr, snapshotID, layerBlobID, upperDirPath, &w) switch { case err != nil && !errdefs.IsAlreadyExists(err): return epilog(err, "generate tarfs from image layer blob") @@ -362,7 +379,7 @@ func (t *Manager) blobProcess(ctx context.Context, snapshotID, ref string, return epilog(nil, msg) } } else { - err = t.generateBootstrap(ds, snapshotID, layerBlobID, upperDirPath) + err = t.generateBootstrap(ds, snapshotID, layerBlobID, upperDirPath, &w) if err != nil && !errdefs.IsAlreadyExists(err) { return epilog(err, "generate tarfs data from image layer blob") }