diff --git a/Gopkg.lock b/Gopkg.lock index 8fc30309..bf5ded3d 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -285,6 +285,22 @@ revision = "30f82fa23fd844bd5bb1e5f216db87fd77b5eb43" version = "v1.0.0" +[[projects]] + branch = "master" + digest = "1:5870202748fcf0dc2ea07455327848cddb130acc612edcdf5c57a46dd3539b89" + name = "github.com/gammazero/deque" + packages = ["."] + pruneopts = "UT" + revision = "46e4ffb7a622d58ccd5c73a7401afac7d5a8edee" + +[[projects]] + branch = "master" + digest = "1:6f1f36f253c393c3df2b393bc3e6f2741698f8622e88e82af19eefaa9f79862e" + name = "github.com/gammazero/workerpool" + packages = ["."] + pruneopts = "UT" + revision = "971bc780f6d7f265ea1e2aaff23f5644134e27c4" + [[projects]] digest = "1:eceec69de4581189a65244bf958b2c4b3a121ed80700f92ae25c03ab2c8f61a2" name = "github.com/getlantern/deepcopy" @@ -1006,6 +1022,7 @@ "github.com/docker/docker/api/types/filters", "github.com/docker/docker/client", "github.com/elazarl/go-bindata-assetfs", + "github.com/gammazero/workerpool", "github.com/getlantern/deepcopy", "github.com/ghodss/yaml", "github.com/gizak/termui", diff --git a/config/config.go b/config/config.go index 1d958587..4bb8eda8 100644 --- a/config/config.go +++ b/config/config.go @@ -147,6 +147,8 @@ type Worker struct { // Normally the worker cleans up its working directory after executing. // This option disables that behavior. LeaveWorkDir bool + // Limit the number of concurrent downloads/uploads + MaxParallelTransfers int } // HPCBackend describes the configuration for a HPC scheduler backend such as diff --git a/config/default-config.yaml b/config/default-config.yaml index 645e247e..bc96938d 100644 --- a/config/default-config.yaml +++ b/config/default-config.yaml @@ -115,6 +115,9 @@ Worker: # This option disables that behavior. LeaveWorkDir: false + # Limit the number of concurrent downloads/uploads + MaxParallelTransfers: 10 + #------------------------------------------------------------------------------- # Databases and/or Event Writers/Handlers #------------------------------------------------------------------------------- diff --git a/config/default.go b/config/default.go index 38ea8735..09ece71e 100644 --- a/config/default.go +++ b/config/default.go @@ -56,10 +56,11 @@ func DefaultConfig() Config { Metadata: map[string]string{}, }, Worker: Worker{ - WorkDir: workDir, - PollingRate: Duration(time.Second * 5), - LogUpdateRate: Duration(time.Second * 5), - LogTailSize: 10000, + WorkDir: workDir, + PollingRate: Duration(time.Second * 5), + LogUpdateRate: Duration(time.Second * 5), + LogTailSize: 10000, + MaxParallelTransfers: 10, }, Logger: logger.DefaultConfig(), // databases / event handlers diff --git a/storage/transfer.go b/storage/transfer.go index 87f68b25..1fd7674c 100644 --- a/storage/transfer.go +++ b/storage/transfer.go @@ -2,8 +2,8 @@ package storage import ( "context" - "sync" + "github.com/gammazero/workerpool" "github.com/ohsu-comp-bio/funnel/util/fsutil" ) @@ -23,52 +23,44 @@ type Transfer interface { // // Transfer events (started, failed, finished, etc) are communicated // via the Transfer interface. -func Download(ctx context.Context, store Storage, transfers []Transfer) { - wg := &sync.WaitGroup{} - wg.Add(len(transfers)) - +func Download(ctx context.Context, store Storage, transfers []Transfer, parallelLimit int) { + wp := workerpool.New(parallelLimit) for _, x := range transfers { - go func(x Transfer) { - defer wg.Done() + x := x + wp.Submit(func() { x.Started() - var obj *Object err := fsutil.EnsurePath(x.Path()) if err == nil { obj, err = store.Get(ctx, x.URL(), x.Path()) } - if err != nil { x.Failed(err) } else { x.Finished(obj) } - }(x) + }) } - wg.Wait() + wp.StopWait() } // Upload uploads a list of transfers to storage, in parallel. // // Transfer events (started, failed, finished, etc) are communicated // via the Transfer interface. -func Upload(ctx context.Context, store Storage, transfers []Transfer) { - wg := &sync.WaitGroup{} - wg.Add(len(transfers)) - +func Upload(ctx context.Context, store Storage, transfers []Transfer, parallelLimit int) { + wp := workerpool.New(parallelLimit) for _, x := range transfers { - go func(x Transfer) { - defer wg.Done() - + x := x + wp.Submit(func() { x.Started() obj, err := store.Put(ctx, x.URL(), x.Path()) - if err != nil { x.Failed(err) } else { x.Finished(obj) } - }(x) + }) } - wg.Wait() + wp.StopWait() } diff --git a/tests/storage/amazon_s3_test.go b/tests/storage/amazon_s3_test.go index 151f7c39..4de5da0a 100644 --- a/tests/storage/amazon_s3_test.go +++ b/tests/storage/amazon_s3_test.go @@ -29,6 +29,7 @@ func TestAmazonS3Storage(t *testing.T) { ev := events.NewTaskWriter("test-task", 0, &events.Logger{Log: log}) testBucket := "funnel-e2e-tests-" + tests.RandomString(6) ctx := context.Background() + parallelXfer := 10 client, err := newS3Test(conf.AmazonS3) if err != nil { @@ -54,7 +55,7 @@ func TestAmazonS3Storage(t *testing.T) { inFileURL := protocol + testBucket + "/" + fPath _, err = worker.UploadOutputs(ctx, []*tes.Output{ {Url: inFileURL, Path: fPath}, - }, store, ev) + }, store, ev, parallelXfer) if err != nil { t.Fatal("error uploading test file:", err) } @@ -64,7 +65,7 @@ func TestAmazonS3Storage(t *testing.T) { inDirURL := protocol + testBucket + "/" + dPath _, err = worker.UploadOutputs(ctx, []*tes.Output{ {Url: inDirURL, Path: dPath, Type: tes.Directory}, - }, store, ev) + }, store, ev, parallelXfer) if err != nil { t.Fatal("error uploading test directory:", err) } @@ -129,7 +130,7 @@ func TestAmazonS3Storage(t *testing.T) { err = worker.DownloadInputs(ctx, []*tes.Input{ {Url: outFileURL, Path: "./test_tmp/test-s3-file.txt"}, - }, store, ev) + }, store, ev, parallelXfer) if err != nil { t.Fatal("Failed to download file:", err) } @@ -148,7 +149,7 @@ func TestAmazonS3Storage(t *testing.T) { err = worker.DownloadInputs(ctx, []*tes.Input{ {Url: outDirURL, Path: "./test_tmp/test-s3-directory", Type: tes.Directory}, - }, store, ev) + }, store, ev, parallelXfer) if err != nil { t.Fatal("Failed to download directory:", err) } diff --git a/tests/storage/ftp_test.go b/tests/storage/ftp_test.go index ee346f51..50c25cea 100644 --- a/tests/storage/ftp_test.go +++ b/tests/storage/ftp_test.go @@ -28,7 +28,7 @@ func TestFTPStorage(t *testing.T) { ev := events.NewTaskWriter("test-task", 0, &events.Logger{Log: log}) testBucket := "bob:bob@localhost:8021" ctx := context.Background() - + parallelXfer := 10 protocol := "ftp://" store, err := storage.NewMux(conf) @@ -40,7 +40,7 @@ func TestFTPStorage(t *testing.T) { inFileURL := protocol + testBucket + "/" + fPath _, err = worker.UploadOutputs(ctx, []*tes.Output{ {Url: inFileURL, Path: fPath}, - }, store, ev) + }, store, ev, parallelXfer) if err != nil { t.Fatal("error uploading test file:", err) } @@ -49,7 +49,7 @@ func TestFTPStorage(t *testing.T) { inDirURL := protocol + testBucket + "/" + dPath _, err = worker.UploadOutputs(ctx, []*tes.Output{ {Url: inDirURL, Path: dPath, Type: tes.Directory}, - }, store, ev) + }, store, ev, parallelXfer) if err != nil { t.Fatal("error uploading test directory:", err) } @@ -111,7 +111,7 @@ func TestFTPStorage(t *testing.T) { err = worker.DownloadInputs(ctx, []*tes.Input{ {Url: outFileURL, Path: "./test_tmp/test-gs-file.txt"}, - }, store, ev) + }, store, ev, parallelXfer) if err != nil { t.Fatal("Failed to download file:", err) } @@ -130,7 +130,7 @@ func TestFTPStorage(t *testing.T) { err = worker.DownloadInputs(ctx, []*tes.Input{ {Url: outDirURL, Path: "./test_tmp/test-gs-directory", Type: tes.Directory}, - }, store, ev) + }, store, ev, parallelXfer) if err != nil { t.Fatal("Failed to download directory:", err) } @@ -218,7 +218,7 @@ func TestFTPStorageConfigAuth(t *testing.T) { ev := events.NewTaskWriter("test-task", 0, &events.Logger{Log: log}) testBucket := "localhost:8021" ctx := context.Background() - + parallelXfer := 10 protocol := "ftp://" store, err := storage.NewMux(conf) @@ -230,7 +230,7 @@ func TestFTPStorageConfigAuth(t *testing.T) { inFileURL := protocol + testBucket + "/" + fPath _, err = worker.UploadOutputs(ctx, []*tes.Output{ {Url: inFileURL, Path: fPath}, - }, store, ev) + }, store, ev, parallelXfer) if err != nil { t.Fatal("error uploading test file:", err) } @@ -239,7 +239,7 @@ func TestFTPStorageConfigAuth(t *testing.T) { inDirURL := protocol + testBucket + "/" + dPath _, err = worker.UploadOutputs(ctx, []*tes.Output{ {Url: inDirURL, Path: dPath, Type: tes.Directory}, - }, store, ev) + }, store, ev, parallelXfer) if err != nil { t.Fatal("error uploading test directory:", err) } @@ -301,7 +301,7 @@ func TestFTPStorageConfigAuth(t *testing.T) { err = worker.DownloadInputs(ctx, []*tes.Input{ {Url: outFileURL, Path: "./test_tmp/test-gs-file.txt"}, - }, store, ev) + }, store, ev, parallelXfer) if err != nil { t.Fatal("Failed to download file:", err) } @@ -320,7 +320,7 @@ func TestFTPStorageConfigAuth(t *testing.T) { err = worker.DownloadInputs(ctx, []*tes.Input{ {Url: outDirURL, Path: "./test_tmp/test-gs-directory", Type: tes.Directory}, - }, store, ev) + }, store, ev, parallelXfer) if err != nil { t.Fatal("Failed to download directory:", err) } diff --git a/tests/storage/generic_s3_test.go b/tests/storage/generic_s3_test.go index ced132f3..0c4d7a45 100644 --- a/tests/storage/generic_s3_test.go +++ b/tests/storage/generic_s3_test.go @@ -31,6 +31,7 @@ func TestGenericS3Storage(t *testing.T) { ev := events.NewTaskWriter("test-task", 0, &events.Logger{Log: log}) testBucket := "funnel-e2e-tests-" + tests.RandomString(6) ctx := context.Background() + parallelXfer := 10 client, err := newMinioTest(conf.GenericS3[0]) if err != nil { @@ -55,7 +56,7 @@ func TestGenericS3Storage(t *testing.T) { inFileURL := protocol + testBucket + "/" + fPath _, err = worker.UploadOutputs(ctx, []*tes.Output{ {Url: inFileURL, Path: fPath}, - }, store, ev) + }, store, ev, parallelXfer) if err != nil { t.Fatal("error uploading test file:", err) } @@ -64,7 +65,7 @@ func TestGenericS3Storage(t *testing.T) { inDirURL := protocol + testBucket + "/" + dPath _, err = worker.UploadOutputs(ctx, []*tes.Output{ {Url: inDirURL, Path: dPath, Type: tes.Directory}, - }, store, ev) + }, store, ev, parallelXfer) if err != nil { t.Fatal("error uploading test directory:", err) } @@ -126,7 +127,7 @@ func TestGenericS3Storage(t *testing.T) { err = worker.DownloadInputs(ctx, []*tes.Input{ {Url: outFileURL, Path: "./test_tmp/test-s3-file.txt"}, - }, store, ev) + }, store, ev, parallelXfer) if err != nil { t.Fatal("Failed to download file:", err) } @@ -145,7 +146,7 @@ func TestGenericS3Storage(t *testing.T) { err = worker.DownloadInputs(ctx, []*tes.Input{ {Url: outDirURL, Path: "./test_tmp/test-s3-directory", Type: tes.Directory}, - }, store, ev) + }, store, ev, parallelXfer) if err != nil { t.Fatal("Failed to download directory:", err) } diff --git a/tests/storage/gs_test.go b/tests/storage/gs_test.go index 038f9223..bf14d122 100644 --- a/tests/storage/gs_test.go +++ b/tests/storage/gs_test.go @@ -38,6 +38,7 @@ func TestGoogleStorage(t *testing.T) { ev := events.NewTaskWriter("test-task", 0, &events.Logger{Log: log}) testBucket := "funnel-e2e-tests-" + tests.RandomString(6) ctx := context.Background() + parallelXfer := 10 client, err := newGsTest() if err != nil { @@ -62,7 +63,7 @@ func TestGoogleStorage(t *testing.T) { inFileURL := protocol + testBucket + "/" + fPath _, err = worker.UploadOutputs(ctx, []*tes.Output{ {Url: inFileURL, Path: fPath}, - }, store, ev) + }, store, ev, parallelXfer) if err != nil { t.Fatal("error uploading test file:", err) } @@ -71,7 +72,7 @@ func TestGoogleStorage(t *testing.T) { inDirURL := protocol + testBucket + "/" + dPath _, err = worker.UploadOutputs(ctx, []*tes.Output{ {Url: inDirURL, Path: dPath, Type: tes.Directory}, - }, store, ev) + }, store, ev, parallelXfer) if err != nil { t.Fatal("error uploading test directory:", err) } @@ -133,7 +134,7 @@ func TestGoogleStorage(t *testing.T) { err = worker.DownloadInputs(ctx, []*tes.Input{ {Url: outFileURL, Path: "./test_tmp/test-gs-file.txt"}, - }, store, ev) + }, store, ev, parallelXfer) if err != nil { t.Fatal("Failed to download file:", err) } @@ -152,7 +153,7 @@ func TestGoogleStorage(t *testing.T) { err = worker.DownloadInputs(ctx, []*tes.Input{ {Url: outDirURL, Path: "./test_tmp/test-gs-directory", Type: tes.Directory}, - }, store, ev) + }, store, ev, parallelXfer) if err != nil { t.Fatal("Failed to download directory:", err) } diff --git a/tests/storage/multi_s3_test.go b/tests/storage/multi_s3_test.go index 4a2d2f99..69af0892 100644 --- a/tests/storage/multi_s3_test.go +++ b/tests/storage/multi_s3_test.go @@ -28,6 +28,7 @@ func TestMultiS3Storage(t *testing.T) { ev := events.NewTaskWriter("test-task", 0, &events.Logger{Log: log}) testBucket := "funnel-e2e-tests-" + tests.RandomString(6) ctx := context.Background() + parallelXfer := 10 // Generic S3 backend setup gconf1 := conf.GenericS3[0] @@ -63,7 +64,7 @@ func TestMultiS3Storage(t *testing.T) { g1FileURL := protocol + gconf1.Endpoint + "/" + testBucket + "/" + fPath + tests.RandomString(6) _, err = worker.UploadOutputs(ctx, []*tes.Output{ {Url: g1FileURL, Path: fPath}, - }, gclient1.fcli, ev) + }, gclient1.fcli, ev, parallelXfer) if err != nil { t.Fatal("error uploading test file:", err) } @@ -71,7 +72,7 @@ func TestMultiS3Storage(t *testing.T) { g2FileURL := protocol + gconf2.Endpoint + "/" + testBucket + "/" + fPath + tests.RandomString(6) _, err = worker.UploadOutputs(ctx, []*tes.Output{ {Url: g2FileURL, Path: fPath, Type: tes.Directory}, - }, gclient2.fcli, ev) + }, gclient2.fcli, ev, parallelXfer) if err != nil { t.Fatal("error uploading test file:", err) } @@ -127,7 +128,7 @@ func TestMultiS3Storage(t *testing.T) { err = worker.DownloadInputs(ctx, []*tes.Input{ {Url: outFileURL, Path: "./test_tmp/test-s3-file.txt"}, - }, gclient1.fcli, ev) + }, gclient1.fcli, ev, parallelXfer) if err != nil { t.Fatal("Failed to download file:", err) } diff --git a/tests/storage/swift_test.go b/tests/storage/swift_test.go index eb5c1232..dabe3eb0 100644 --- a/tests/storage/swift_test.go +++ b/tests/storage/swift_test.go @@ -26,6 +26,7 @@ func TestSwiftStorage(t *testing.T) { ev := events.NewTaskWriter("test-task", 0, &events.Logger{Log: log}) testBucket := "funnel-e2e-tests-" + tests.RandomString(6) ctx := context.Background() + parallelXfer := 10 client, err := newSwiftTest() if err != nil { @@ -50,7 +51,7 @@ func TestSwiftStorage(t *testing.T) { inFileURL := protocol + testBucket + "/" + fPath _, err = worker.UploadOutputs(ctx, []*tes.Output{ {Url: inFileURL, Path: fPath}, - }, store, ev) + }, store, ev, parallelXfer) if err != nil { t.Fatal("error uploading test file:", err) } @@ -59,7 +60,7 @@ func TestSwiftStorage(t *testing.T) { inDirURL := protocol + testBucket + "/" + dPath _, err = worker.UploadOutputs(ctx, []*tes.Output{ {Url: inDirURL, Path: dPath, Type: tes.Directory}, - }, store, ev) + }, store, ev, parallelXfer) if err != nil { t.Fatal("error uploading test directory:", err) } @@ -121,7 +122,7 @@ func TestSwiftStorage(t *testing.T) { err = worker.DownloadInputs(ctx, []*tes.Input{ {Url: outFileURL, Path: "./test_tmp/test-s3-file.txt"}, - }, store, ev) + }, store, ev, parallelXfer) if err != nil { t.Fatal("Failed to download file:", err) } @@ -140,7 +141,7 @@ func TestSwiftStorage(t *testing.T) { err = worker.DownloadInputs(ctx, []*tes.Input{ {Url: outDirURL, Path: "./test_tmp/test-s3-directory", Type: tes.Directory}, - }, store, ev) + }, store, ev, parallelXfer) if err != nil { t.Fatal("Failed to download directory:", err) } diff --git a/worker/storage.go b/worker/storage.go index e8b05983..542f5aa5 100644 --- a/worker/storage.go +++ b/worker/storage.go @@ -48,7 +48,7 @@ func FlattenInputs(ctx context.Context, inputs []*tes.Input, store storage.Stora } // DownloadInputs downloads the given inputs. -func DownloadInputs(pctx context.Context, inputs []*tes.Input, store storage.Storage, ev *events.TaskWriter) error { +func DownloadInputs(pctx context.Context, inputs []*tes.Input, store storage.Storage, ev *events.TaskWriter, parallelLimit int) error { ctx, cancel := context.WithCancel(pctx) defer cancel() @@ -67,7 +67,7 @@ func DownloadInputs(pctx context.Context, inputs []*tes.Input, store storage.Sto })) } - storage.Download(ctx, store, downloads) + storage.Download(ctx, store, downloads, parallelLimit) var errs util.MultiError for _, x := range downloads { @@ -117,7 +117,7 @@ func FlattenOutputs(ctx context.Context, outputs []*tes.Output, store storage.St } // UploadOutputs uploads the outputs. -func UploadOutputs(ctx context.Context, outputs []*tes.Output, store storage.Storage, ev *events.TaskWriter) ([]*tes.OutputFileLog, error) { +func UploadOutputs(ctx context.Context, outputs []*tes.Output, store storage.Storage, ev *events.TaskWriter, parallelLimit int) ([]*tes.OutputFileLog, error) { flat, err := FlattenOutputs(ctx, outputs, store, ev) if err != nil { @@ -130,7 +130,7 @@ func UploadOutputs(ctx context.Context, outputs []*tes.Output, store storage.Sto uploads = append(uploads, storage.Transfer(&upload{ev: ev, out: output})) } - storage.Upload(ctx, store, uploads) + storage.Upload(ctx, store, uploads, parallelLimit) var logs []*tes.OutputFileLog var errs util.MultiError diff --git a/worker/worker.go b/worker/worker.go index ec391360..51b1275b 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -119,7 +119,7 @@ func (r *DefaultWorker) Run(pctx context.Context) (runerr error) { // Download inputs if run.ok() { - run.syserr = DownloadInputs(ctx, mapper.Inputs, r.Store, event) + run.syserr = DownloadInputs(ctx, mapper.Inputs, r.Store, event, r.Conf.MaxParallelTransfers) } if run.ok() { @@ -166,7 +166,7 @@ func (r *DefaultWorker) Run(pctx context.Context) (runerr error) { // Upload outputs var outputLog []*tes.OutputFileLog if run.ok() { - outputLog, run.syserr = UploadOutputs(ctx, mapper.Outputs, r.Store, event) + outputLog, run.syserr = UploadOutputs(ctx, mapper.Outputs, r.Store, event, r.Conf.MaxParallelTransfers) } // unmap paths for OutputFileLog