diff --git a/.gitignore b/.gitignore index 39a633b..6b85f4e 100644 --- a/.gitignore +++ b/.gitignore @@ -2,15 +2,18 @@ *.o *.a *.so +__*.* # Folders _obj _test +_build/ _build/linux_amd64 _build/windows_amd64 _wd _old .vscode +*_testdata # Architecture specific extensions/prefixes *.[568vq] diff --git a/README.md b/README.md index 0d25369..f05b061 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,7 @@ Sources and targets are decoupled, this design enables the composition of variou Download, extract and set permissions: ```bash -wget -O bp_linux.tar.gz https://github.com/Azure/blobporter/releases/download/v0.6.09/bp_linux.tar.gz +wget -O bp_linux.tar.gz https://github.com/Azure/blobporter/releases/download/v0.6.12/bp_linux.tar.gz tar -xvf bp_linux.tar.gz linux_amd64/blobporter chmod +x ~/linux_amd64/blobporter cd ~/linux_amd64 @@ -46,7 +46,7 @@ export ACCOUNT_KEY= ### Windows -Download [BlobPorter.exe](https://github.com/Azure/blobporter/releases/download/v0.6.09/bp_windows.zip) +Download [BlobPorter.exe](https://github.com/Azure/blobporter/releases/download/v0.6.12/bp_windows.zip) Set environment variables (if using the command prompt): diff --git a/args.go b/args.go index d94b749..d790497 100644 --- a/args.go +++ b/args.go @@ -72,6 +72,7 @@ type arguments struct { readTokenExp int numberOfHandlesPerFile int //numberOfHandlesPerFile = defaultNumberOfHandlesPerFile numberOfFilesInBatch int //numberOfFilesInBatch = defaultNumberOfFilesInBatch + transferStatusPath string } //represents validated parameters @@ -95,6 +96,7 @@ type validatedParameters struct { blobSource blobParams blobTarget blobParams perfSourceDefinitions []sources.SourceDefinition + tracker *internal.TransferTracker } type s3Source struct { @@ -166,12 +168,14 @@ func (p *paramParserValidator) parseAndValidate() error { p.targetSegment = t err = p.runParseAndValidationRules( p.pvgCalculateReadersAndWorkers, + p.pvgTransferStatusPathIsPresent, p.pvgBatchLimits, p.pvgHTTPTimeOut, p.pvgDupCheck, p.pvgParseBlockSize, p.pvgQuietMode, - p.pvgKeepDirectoryStructure) + p.pvgKeepDirectoryStructure, + p.pvgUseExactMatch) if err != nil { return err @@ -254,6 +258,27 @@ func (p *paramParserValidator) getSourceRules() ([]parseAndValidationRule, error //************************** //Global rules.... +func (p *paramParserValidator) pvgUseExactMatch() error { + p.params.useExactMatch = p.args.exactNameMatch + return nil +} + +func (p *paramParserValidator) pvgTransferStatusPathIsPresent() error { + + if p.args.transferStatusPath != "" { + if !p.args.quietMode{ + fmt.Printf("Transfer is resumable. Transfer status file:%v \n", p.args.transferStatusPath) + } + tracker, err := internal.NewTransferTracker(p.args.transferStatusPath) + + if err != nil { + return err + } + + p.params.tracker = tracker + } + return nil +} func (p *paramParserValidator) pvgKeepDirectoryStructure() error { p.params.keepDirStructure = !p.args.removeDirStructure return nil @@ -503,7 +528,7 @@ func (p *paramParserValidator) pvSourceInfoForS3IsReq() error { burl, err := url.Parse(p.params.sourceURIs[0]) if err != nil { - return fmt.Errorf("Invalid S3 endpoint URL. Parsing error: %v.\nThe format is s3://[END_POINT]/[BUCKET]/[OBJECT]", err) + return fmt.Errorf("Invalid S3 endpoint URL. Parsing error: %v.\nThe format is s3://[END_POINT]/[BUCKET]/[PREFIX]", err) } p.params.s3Source.endpoint = burl.Hostname() @@ -514,10 +539,14 @@ func (p *paramParserValidator) pvSourceInfoForS3IsReq() error { segments := strings.Split(burl.Path, "/") + if len(segments) < 2 { + return fmt.Errorf("Invalid S3 endpoint URL. Bucket not specified. The format is s3://[END_POINT]/[BUCKET]/[PREFIX]") + } + p.params.s3Source.bucket = segments[1] if p.params.s3Source.bucket == "" { - return fmt.Errorf("Invalid source S3 URI. Bucket name could be parsed") + return fmt.Errorf("Invalid source S3 URI. Bucket name could not be parsed") } prefix := "" diff --git a/blobporter.go b/blobporter.go index 199a409..50c1428 100644 --- a/blobporter.go +++ b/blobporter.go @@ -10,13 +10,12 @@ import ( "strconv" "sync/atomic" + "github.com/Azure/blobporter/internal" "github.com/Azure/blobporter/pipeline" "github.com/Azure/blobporter/transfer" "github.com/Azure/blobporter/util" - "github.com/Azure/blobporter/internal" ) - var argsUtil paramParserValidator func init() { @@ -46,6 +45,7 @@ func init() { numberOfHandlersPerFileMsg = "Number of open handles for concurrent reads and writes per file." numberOfFilesInBatchMsg = "Maximum number of files in a transfer.\n\tIf the number is exceeded new transfers are created" readTokenExpMsg = "Expiration in minutes of the read-only access token that will be generated to read from S3 or Azure Blob sources." + transferStatusFileMsg = "Transfer status file location. If set, blobporter will use this file to track the status of the transfer.\n\tIn case of failure and if the option is set the same status file, source files that were transferred will be skipped.\n\tIf the transfer is successful a summary will be appended." ) flag.Usage = func() { @@ -68,6 +68,7 @@ func init() { util.PrintUsageDefaults("h", "handles_per_file", strconv.Itoa(argsUtil.args.numberOfHandlesPerFile), numberOfHandlersPerFileMsg) util.PrintUsageDefaults("x", "files_per_transfer", strconv.Itoa(argsUtil.args.numberOfFilesInBatch), numberOfFilesInBatchMsg) util.PrintUsageDefaults("o", "read_token_exp", strconv.Itoa(defaultReadTokenExp), readTokenExpMsg) + util.PrintUsageDefaults("l", "transfer_status", "", transferStatusFileMsg) } util.StringListVarAlias(&argsUtil.args.sourceURIs, "f", "source_file", "", fileMsg) @@ -89,39 +90,35 @@ func init() { util.IntVarAlias(&argsUtil.args.numberOfHandlesPerFile, "h", "handles_per_file", defaultNumberOfHandlesPerFile, numberOfHandlersPerFileMsg) util.IntVarAlias(&argsUtil.args.numberOfFilesInBatch, "x", "files_per_transfer", defaultNumberOfFilesInBatch, numberOfFilesInBatchMsg) util.IntVarAlias(&argsUtil.args.readTokenExp, "o", "read_token_exp", defaultReadTokenExp, readTokenExpMsg) - + util.StringVarAlias(&argsUtil.args.transferStatusPath, "l", "transfer_status", "", transferStatusFileMsg) } var dataTransferred uint64 var targetRetries int32 -func displayFilesToTransfer(sourcesInfo []pipeline.SourceInfo, numOfBatches int, batchNumber int) { - if numOfBatches == 1 { - fmt.Printf("Files to Transfer (%v) :\n", argsUtil.params.transferType) - var totalSize uint64 - summary := "" - - for _, source := range sourcesInfo { - //if the source is URL, remove the QS - display := source.SourceName - if u, err := url.Parse(source.SourceName); err == nil { - display = fmt.Sprintf("%v%v", u.Hostname(), u.Path) - } - summary = summary + fmt.Sprintf("Source: %v Size:%v \n", display, source.Size) - totalSize = totalSize + source.Size - } +func displayFilesToTransfer(sourcesInfo []pipeline.SourceInfo) { + fmt.Printf("\nFiles to Transfer (%v) :\n", argsUtil.params.transferType) + var totalSize uint64 + summary := "" - if len(sourcesInfo) < 10 { - fmt.Printf(summary) - return + for _, source := range sourcesInfo { + //if the source is URL, remove the QS + display := source.SourceName + if u, err := url.Parse(source.SourceName); err == nil { + display = fmt.Sprintf("%v%v", u.Hostname(), u.Path) } + summary = summary + fmt.Sprintf("Source: %v Size:%v \n", display, source.Size) + totalSize = totalSize + source.Size + } - fmt.Printf("%v files. Total size:%v\n", len(sourcesInfo), totalSize) - + if len(sourcesInfo) < 10 { + fmt.Printf(summary) return } - fmt.Printf("\nBatch transfer (%v).\nFiles per Batch: %v.\nBatch: %v of %v\n ", argsUtil.params.transferType, len(sourcesInfo), batchNumber+1, numOfBatches) + fmt.Printf("%v files. Total size:%v\n", len(sourcesInfo), totalSize) + + return } func main() { @@ -141,21 +138,32 @@ func main() { stats := transfer.NewStats(argsUtil.params.numberOfWorkers, argsUtil.params.numberOfReaders) - for b, sourcePipeline := range sourcePipelines { - sourcesInfo := sourcePipeline.GetSourcesInfo() + for sourcePipeline := range sourcePipelines { + + if sourcePipeline.Err != nil { + log.Fatal(sourcePipeline.Err) + } + + sourcesInfo := sourcePipeline.Source.GetSourcesInfo() - tfer := transfer.NewTransfer(&sourcePipeline, &targetPipeline, argsUtil.params.numberOfReaders, argsUtil.params.numberOfWorkers, argsUtil.params.blockSize) + tfer := transfer.NewTransfer(sourcePipeline.Source, targetPipeline, argsUtil.params.numberOfReaders, argsUtil.params.numberOfWorkers, argsUtil.params.blockSize) + tfer.SetTransferTracker(argsUtil.params.tracker) - displayFilesToTransfer(sourcesInfo, len(sourcePipelines), b) + displayFilesToTransfer(sourcesInfo) pb := getProgressBarDelegate(tfer.TotalSize, argsUtil.params.quietMode) tfer.StartTransfer(argsUtil.params.dedupeLevel, pb) - tfer.WaitForCompletion() stats.AddTransferInfo(tfer.GetStats()) } + if argsUtil.params.tracker != nil { + if err = argsUtil.params.tracker.TrackTransferComplete(); err != nil { + log.Fatal(err) + } + } + stats.DisplaySummary() } diff --git a/docs/bptransfer.png b/docs/bptransfer.png new file mode 100644 index 0000000..fd4321c Binary files /dev/null and b/docs/bptransfer.png differ diff --git a/docs/conf.py b/docs/conf.py index dc6d9e8..c1b6301 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -15,12 +15,12 @@ # import os # import sys # sys.path.insert(0, os.path.abspath('.')) - +import sphinx_bootstrap_theme # -- Project information ----------------------------------------------------- project = u'BlobPorter' -#copyright = u'2018, Jesus Aguilar' +copyright = u'2018, BlobPorter Contributors' author = u'BlobPorter Contributors' # The short X.Y version @@ -74,7 +74,9 @@ # The theme to use for HTML and HTML Help pages. See the documentation for # a list of builtin themes. # -html_theme = 'alabaster' +html_theme = 'sphinx_rtd_theme' +#html_theme = 'bootstrap' +#html_theme_path = sphinx_bootstrap_theme.get_html_theme_path() # Theme options are theme-specific and customize the look and feel of a theme # further. For a list of options available for each theme, see the diff --git a/docs/examples.rst b/docs/examples.rst new file mode 100644 index 0000000..ef15ce2 --- /dev/null +++ b/docs/examples.rst @@ -0,0 +1,173 @@ +Examples +======== + + +Upload to Azure Block Blob Storage +----------------------------------- + +Single file upload: + +:: + + ./blobporter -f /datadrive/myfile.tar -c mycontainer -n myfile.tar + +.. note:: BlobPorter will create the container if it doesn't exist. + +Upload all files that match the pattern: + +:: + + ./blobporter -f "/datadrive/*.tar" -c mycontainer + +You can also specify a list of files or patterns explicitly: + +:: + + ./blobporter -f "/datadrive/*.tar" -f "/datadrive/readme.md" -f "/datadrive/log" -c mycontainer + +If you want to rename the target file name, you can use the -n option: + +:: + + ./blobporter -f /datadrive/f1.tar -n newname.tar -c mycontainer + +Upload to Azure Page Blob Storage +---------------------------------- + +Same as uploading to block blob storage, but with the transfer definiton (-t option) set to ``file-pageblob``. For example, a single file upload to page blob: + +:: + + ./blobporter -f /datadrive/mydisk.vhd -c mycontainer -n mydisk.vhd -t file-pageblob + + +.. note:: The file size and block size must be a multiple of 512 (bytes). The maximum block size is 4MB. + +Transfer data from S3 to Azure Storage +--------------------------------------- + +You can transfer data from an S3 compatible endpoint. + +First you must specify the access and secret keys via environment variables. + +:: + + export S3_ACCESS_KEY= + export S3_SECRET_KEY= + +Then you can specify an S3 URI, with the following format: + +:: + + [HOST]/[BUCKET]/[PREFIX] + +For example: + +:: + + ./blobporter -f s3://mys3api.com/mybucket/mydata -c froms3 -t s3-blockblob + +.. note:: + + BlobPorter will upload the data as it downloads it from the source. + The performance of the transfer will be constraint by the bandwidth of the host running BlobPorter. Consider running this type of transfer from a Virtual Machine running in the same Azure region as the target or the source. + +Transfer data between Azure Storage accounts, containers and blob types +----------------------------------------------------------------------- + +First, you must set the account key of the source storage account. + +:: + + export SOURCE_ACCOUNT_KEY= + + +Then you can specify the URI of the source. The source could be a page, block or append blob. Prefixes are supported. + +:: + + ./blobporter -f "https://mysourceaccount.blob.core.windows.net/container/myblob" -c mycontainer -t blob-blockblob + +.. note:: + + BlobPorter will upload the data as it downloads it from the source. + The performance of the transfer will be constraint by the bandwidth of the host running BlobPorter. Consider running this type of transfer from a Virtual Machine running in the same Azure region as the target or the source. + +Transfer from an HTTP/HTTPS source to Azure Blob Storage +-------------------------------------------------------- + +To block blob storage: + +:: + + ./blobporter -f "http://mysource/file.bam" -c mycontainer -n file.bam -t http-blockblob + +To page blob storage: + +:: + + ./blobporter -f "http://mysource/my.vhd" -c mycontainer -n my.vhd -t http-pageblob + +.. note:: + + BlobPorter will upload the data as it downloads it from the source. + The performance of the transfer will be constraint by the bandwidth of the host running BlobPorter. Consider running this type of transfer from a Virtual Machine running in the same Azure region as the target or the source. + +Download from Azure Blob Storage +-------------------------------- + +For download scenarios, the source can be a page, append or block blob: + +:: + + ./blobporter -c mycontainer -n file.bam -t blob-file + +You can use the -n option to specify a prefix. All blobs that match the prefix will be downloaded. + +The following will download all blobs in the container that start with f: + +:: + + ./blobporter -c mycontainer -n f -t blob-file + + +Without the -n option all files in the container will be downloaded. + +:: + + ./blobporter -c mycontainer -t blob-file + + +By default files are downloaded keeping the same directory structure as the remote source. + +If you want download to the same directory where you are running blobporter set -i option. + +:: + + ./blobporter -c mycontainer -t blob-file -i + + +For scenarios where blob endpoint is from a soverign cloud (e.g. China and Germany), Azure Gov or Azure Stack, you can specify the fully qualified domain name: + +:: + + ./blobporter -f "https://[ACCOUNT_NAME].[BASE_URL]/[CONTAINER_NAME]/[PREFIX]" -t blob-file + +And the source account key, must be set via an environment variable. + +:: + + export SOURCE_ACCOUNT_KEY= + + + +Download a file from a HTTP source +---------------------------------- + +:: + + ./blobporter -f "http://mysource/file.bam" -n /datadrive/file.bam -t http-file + +.. note:: + + The ACCOUNT_NAME and ACCOUNT_KEY environment variables are not required. diff --git a/docs/gettingstarted.rst b/docs/gettingstarted.rst new file mode 100644 index 0000000..2e1709c --- /dev/null +++ b/docs/gettingstarted.rst @@ -0,0 +1,94 @@ +=============== +Getting Started +=============== + +Linux +----- + +Download, extract and set permissions + +:: + + wget -O bp_linux.tar.gz https://github.com/Azure/blobporter/releases/download/v0.6.12/bp_linux.tar.gz + tar -xvf bp_linux.tar.gz linux_amd64/blobporter + chmod +x ~/linux_amd64/blobporter + cd ~/linux_amd64 + +Set environment variables: :: + + export ACCOUNT_NAME= + export ACCOUNT_KEY= + +.. note:: + + You can also set these values via options + +Windows +------- + +Download `BlobPorter.exe `_ + +Set environment variables (if using the command prompt): :: + + set ACCOUNT_NAME= + set ACCOUNT_KEY= + +Set environment variables (if using PowerShell): :: + + $env:ACCOUNT_NAME="" + $env:ACCOUNT_KEY="" + + +Command Options +--------------- + + -f, --source_file (string) URL, Azure Blob or S3 Endpoint, + file or files (e.g. /data/\*.gz) to upload. + + -c, --container_name (string) Container name (e.g. mycontainer). + -n, --blob_name (string) Blob name (e.g. myblob.txt) or prefix for download scenarios. + -g, --concurrent_workers (int) Number of go-routines for parallel upload. + -r, --concurrent_readers (int) Number of go-routines for parallel reading of the input. + -b, --block_size (string) Desired size of each blob block. + + Can be specified as an integer byte count or integer suffixed with B, KB or MB. + + -a, --account_name (string) Storage account name (e.g. mystorage). + + Can also be specified via the ACCOUNT_NAME environment variable. + + -k, --account_key (string) Storage account key string. + + Can also be specified via the ACCOUNT_KEY environment variable. + -s, --http_timeout (int) HTTP client timeout in seconds. Default value is 600s. + -t, --transfer_type (string) Defines the source and target of the transfer. + + Must be one of :: + + file-blockblob, file-pageblob, http-blockblob, + http-pageblob, blob-file, pageblock-file (alias of blob-file), + blockblob-file (alias of blob-file), http-file, + blob-pageblob, blob-blockblob, s3-pageblob and s3-blockblob. + + + -m, --compute_blockmd5 (bool) If set, block level MD5 has will be computed and included + as a header when the block is sent to blob storage. + + Default is false. + -q, --quiet_mode (bool) If set, the progress indicator is not displayed. + + The files to transfer, errors, warnings and transfer completion summary is still displayed. + -x, --files_per_transfer (int) Number of files in a batch transfer. Default is 500. + -h, --handles_per_file (int) Number of open handles for concurrent reads and writes per file. Default is 2. + -i, --remove_directories (bool) If set blobs are downloaded or uploaded without keeping the directory structure of the source. + + Not applicable when the source is a HTTP endpoint. + -o, --read_token_exp (int) Expiration in minutes of the read-only access token that will be generated to read from S3 or Azure Blob sources. + + Default value: 360. + -l, --transfer_status (string) Transfer status file location. + If set, blobporter will use this file to track the status of the transfer. + + In case of failure and the same file is referrenced, the source files that were transferred will be skipped. + + If the transfer is successful a summary will be appended. diff --git a/docs/index.rst b/docs/index.rst index 123ef7b..f3fd7f3 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -3,18 +3,23 @@ You can adapt this file completely to your liking, but it should at least contain the root `toctree` directive. -Welcome to BlobPorter's documentation! +BlobPorter ====================================== -.. toctree:: - :maxdepth: 2 - :caption: Contents: +BlobPorter is a data transfer tool for Azure Blob Storage that maximizes throughput through concurrent reads and writes that can scale up and down independently. +:: + +.. image :: bptransfer.png -Indices and tables -================== +.. toctree:: + :maxdepth: 2 + :caption: Contents: -* :ref:`genindex` -* :ref:`modindex` -* :ref:`search` + gettingstarted + examples + resumabletrans + perfmode + + diff --git a/docs/perfmode.rst b/docs/perfmode.rst index 47df12b..a66e09e 100644 --- a/docs/perfmode.rst +++ b/docs/perfmode.rst @@ -1,42 +1,72 @@ -Performance Mode -====================================== +Performance Consideration +========================= -If you want to maximize performance, and your source and target are public HTTP based end-points (Blob, S3, and HTTP), running the transfer in a high bandwidth environment such as a VM on the cloud, is strongly recommended. This recommendation comes from the fact that blob to blob, S3 to blob or HTTP to blob transfers are bidirectional where BlobPorter downloads the data (without writing to disk) and uploads it as it is received. +Best Practices +-------------- -When running in the cloud, consider the region where the transfer VM ( where BlobPorter will be running), will be deployed. When possible, deploy the transfer VM in the same the same region as the target of the transfer. Running in the same region as the target minimizes the transfer costs (egress from the VM to the target storage account) and the network performance impact (lower latency) for the upload operation. -For downloads or uploads of multiple or large files the disk i/o could be the constraining resource that slows down the transfer. And often identifying if this is the case, is a cumbersome process. But if done, it could lead to informed decisions about the environment where BlobPorter runs. +- By default, BlobPorter creates 5 readers and 8 workers for each core on the computer. You can overwrite these values by using the options -r (number of readers) and -g (number of workers). When overriding these options there are few considerations: -To help with this indentification process, BlobPorter has a performance mode that uploads random data generated in memory and measures the performance of the operation without the impact of disk i/o. -The performance mode for uploads could help you identify the potential upper limit of throughput that the network and the target storage account can provide. + - If during the transfer the buffer level is constant at 000%, workers could be waiting for data. Consider increasing the number of readers. If the level is 100% the opposite applies; increasing the number of workers could help. + + - Each reader or worker correlates to one goroutine. Goroutines are lightweight and a Go program can create a high number of goroutines, however, there's a point where the overhead of context switching impacts overall performance. Increase these values in small increments, e.g. 5. + +- For transfers from fast disks (SSD) or HTTP sources reducing the number readers or workers could provide better performance than the default values. Reduce these values if you want to minimize resource utilization. Lowering these numbers reduces contention and the likelihood of experiencing throttling conditions. + +- Transfers can be batched. Each batch transfer will concurrently read and transfer up to 500 files (default value) from the source. The batch size can be modified using the -x option. + +- Blobs smaller than the block size are transferred in a single operation. With relatively small files (<32MB) performance may be better if you set a block size equal to the size of the files. Setting the number of workers and readers to the number of files could yield performance gains. + +- The block size can have a significant memory impact if set to a large value (e.g. 100MB). For large files, use a block size that is close to the minimum required for the transfer and reduce the number of workers (g option). + + The following table list the maximum file size for typical block sizes. + + =============== =================== + Block Size (MB) Max File Size (GB) + =============== =================== + 8 400 + 16 800 + 32 1600 + 64 3200 + =============== =================== + +Performance Measurement Mode +---------------------------- + +BlobPorter has a performance measurement mode that uploads random data generated in memory and measures the performance of the operation without the impact of disk i/o. +The performance mode for uploads could help you identify the potential upper limit of the data throughput that your environment can provide. For example, the following command will upload 10 x 10GB files to a storage account. -``` -blobporter -f "1GB:10" -c perft -t perf-blockblob -``` +:: + + blobporter -f "1GB:10" -c perft -t perf-blockblob You can also use this mode to see if increasing (or decreasing) the number of workers/writers (-g option) will have a potential impact. -``` -blobporter -f "1GB:10" -c perft -t perf-blockblob -g 20 -``` +:: + + blobporter -f "1GB:10" -c perft -t perf-blockblob -g 20 Similarly, for downloads, you can simulate downloading data from a storage account without writing to disk. This mode could also help you fine-tune the number of readers (-r option) and get an idea of the maximum download throughput. -The following command will download the data we previously uploaded. +The following command downloads the data previously uploaded. + +:: + + export SRC_ACCOUNT_KEY=$ACCOUNT_KEY + +.. + +:: -``` -export SRC_ACCOUNT_KEY=$ACCOUNT_KEY -blobporter -f "https://myaccount.blob.core.windows.net/perft" -t blob-perf -``` + blobporter -f "https://myaccount.blob.core.windows.net/perft" -t blob-perf`` -Then you can try downloading to disk. -``` -blobporter -c perft -t blob-file -``` +Then you can download the file to disk. -If the performance difference is significant then you can conclude that disk i/o is the bottleneck, at which point you can consider an SSD backed VM. +:: + blobporter -c perft -t blob-file +The performance difference will provide with a base-line of the impact of disk i/o. \ No newline at end of file diff --git a/docs/resumabletrans.rst b/docs/resumabletrans.rst new file mode 100644 index 0000000..846078d --- /dev/null +++ b/docs/resumabletrans.rst @@ -0,0 +1,56 @@ +Resumable Transfers +=================== + +BlobPorter supports resumable transfers. This feature is enabled when the -l option is set with the path where the transfer status file will be created. +In case of failure, if the same status file is specified, BlobPorter will skip files that were already transferred. + +:: + + blobporter -f "manyfiles/*" -c many -l mylog + +For each file in the transfer two entries will be created in the status file. One when file is queued (Started) and another when the file is successfully transferred (Completed). + +The log entries are created with the following tab-delimited format: + +:: + + [Timestamp] [Filename] [Status (1:Started,2:Completed,3:Ignored)] [Size] [Transfer ID ] + + +The following output from a transfer status file shows that three files were included in the transfer: **file10** , **file11** and **file15** . +However, only **file10** and **file11** were successfully transferred. For **file15** the output indicates that it was queued but the lack of a second entry confirming completion (status = 2), indicates that the transfer process was interrupted. :: + + 2018-03-05T03:31:13.034245807Z file10 1 104857600 938520246_mylog + 2018-03-05T03:31:13.034390509Z file11 1 104857600 938520246_mylog + 2018-03-05T03:31:13.034437109Z file15 1 104857600 938520246_mylog + 2018-03-05T03:31:25.232572306Z file10 2 104857600 938520246_mylog + 2018-03-05T03:31:25.591239355Z file11 2 104857600 938520246_mylog + +Consider the previous scenario and assume that the transfer was executed again. +In this case, the status file shows two new entries for **file15** in a new transfer (the transfer ID is different) which is an indication that this time the file was transferred successfully. :: + + 2018-03-05T03:31:13.034245807Z file10 1 104857600 938520246_mylog + 2018-03-05T03:31:13.034390509Z file11 1 104857600 938520246_mylog + 2018-03-05T03:31:13.034437109Z file15 1 104857600 938520246_mylog + 2018-03-05T03:31:25.232572306Z file10 2 104857600 938520246_mylog + 2018-03-05T03:31:25.591239355Z file11 2 104857600 938520246_mylog + 2018-03-05T03:54:33.660161772Z file15 1 104857600 495675852_mylog + 2018-03-05T03:54:34.579295059Z file15 2 104857600 495675852_mylog + +Finally, since the process completed successfully, a summary is appended to the transfer status file. :: + + ---------------------------------------------------------- + Transfer Completed---------------------------------------- + Start Summary--------------------------------------------- + Last Transfer ID:495675852_mylog + Date:Mon Mar 5 03:54:34 UTC 2018 + File:file15 Size:104857600 TID:495675852_mylog + File:file10 Size:104857600 TID:938520246_mylog + File:file11 Size:104857600 TID:938520246_mylog + Transferred Files:3 Total Size:314572800 + End Summary----------------------------------------------- + + + + + diff --git a/internal/azutil.go b/internal/azutil.go index 3dd4c33..6ef0505 100644 --- a/internal/azutil.go +++ b/internal/azutil.go @@ -11,7 +11,7 @@ import ( "os" "syscall" "time" - + "github.com/Azure/blobporter/util" "github.com/Azure/azure-pipeline-go/pipeline" "github.com/Azure/azure-storage-blob-go/2016-05-31/azblob" @@ -190,6 +190,7 @@ func (p *AzUtil) PutBlockBlob(blobName string, body io.ReadSeeker, md5 []byte) e h := azblob.BlobHTTPHeaders{} + //16 is md5.Size if len(md5) != 16 { var md5bytes [16]byte @@ -375,6 +376,24 @@ func isWinsockTimeOutError(err error) net.Error { return nil } +func isDialConnectError(err error) net.Error { + if uerr, ok := err.(*url.Error); ok { + if derr, ok := uerr.Err.(*net.OpError); ok { + if serr, ok := derr.Err.(*os.SyscallError); ok && serr.Syscall == "connect" { + return &retriableError{error: err} + } + } + } + return nil +} + +func isRetriableDialError(err error) net.Error { + if derr := isWinsockTimeOutError(err); derr != nil { + return derr + } + return isDialConnectError(err) +} + type retriableError struct { error } @@ -387,11 +406,19 @@ func (*retriableError) Temporary() bool { return true } +const tcpKeepOpenMinLength = 8 * int64(util.MB) + func (p *clientPolicy) Do(ctx context.Context, request pipeline.Request) (pipeline.Response, error) { - r, err := pipelineHTTPClient.Do(request.WithContext(ctx)) + req := request.WithContext(ctx) + + if req.ContentLength < tcpKeepOpenMinLength { + req.Close=true + } + + r, err := pipelineHTTPClient.Do(req) pipresp := pipeline.NewHTTPResponse(r) if err != nil { - if derr := isWinsockTimeOutError(err); derr != nil { + if derr := isRetriableDialError(err); derr != nil { return pipresp, derr } err = pipeline.NewError(err, "HTTP request failed") @@ -411,9 +438,9 @@ func newpipelineHTTPClient() *http.Client { KeepAlive: 30 * time.Second, DualStack: true, }).Dial, - MaxIdleConns: 0, + MaxIdleConns: 100, MaxIdleConnsPerHost: 100, - IdleConnTimeout: 90 * time.Second, + IdleConnTimeout: 60 * time.Second, TLSHandshakeTimeout: 10 * time.Second, ExpectContinueTimeout: 1 * time.Second, DisableKeepAlives: false, diff --git a/internal/const.go b/internal/const.go index 0cbdcf4..7530994 100644 --- a/internal/const.go +++ b/internal/const.go @@ -6,7 +6,7 @@ import ( ) //ProgramVersion blobporter version -const ProgramVersion = "0.6.09" +const ProgramVersion = "0.6.12" //HTTPClientTimeout HTTP client timeout when reading from HTTP sources and try timeout for blob storage operations. var HTTPClientTimeout = 90 diff --git a/internal/tracker.go b/internal/tracker.go new file mode 100644 index 0000000..adf6dcc --- /dev/null +++ b/internal/tracker.go @@ -0,0 +1,341 @@ +package internal + +import ( + "bufio" + "fmt" + "os" + "strconv" + "strings" + "time" +) + +type fileStatus struct { + sourcename string + size int64 + source string + status TransferStatus + tid string +} + +//TransferStatus TODO +type TransferStatus int + +const ( + //None No status avaiable. + None = iota + //Started Indicates that the file was included in the transfer. + Started + //Completed Indicates that the file was transferred succesfully. + Completed + //Ignored Indicates that the file was ignored - e.g. empty files... + Ignored +) + +func (t TransferStatus) String() string { + switch t { + case None: + return "0" + case Started: + return "1" + case Completed: + return "2" + case Ignored: + return "3" + } + + return "" +} + +func parseTransferStatus(val string) (TransferStatus, error) { + var ts int + var err error + if ts, err = strconv.Atoi(val); err != nil { + return None, fmt.Errorf("Invalid transfer status, %v, err:%v", val, err) + } + switch ts { + case None: + return None, nil + case Completed: + return Completed, nil + case Started: + return Started, nil + case Ignored: + return Ignored, nil + } + return None, fmt.Errorf("Invalid transfer status, %v", val) +} + +func newLogEntry(line string) (*fileStatus, error) { + f := fileStatus{} + + if line == summaryheader { + return nil, fmt.Errorf("Invalid transfer status file. The file contains information from a completed transfer.\nUse a new file name") + } + + tokens := strings.Split(line, "\t") + + if len(tokens) < 5 { + return nil, fmt.Errorf("Invalid log entry. Less than 5 tokens were found. Value:%v ", line) + } + + f.sourcename = tokens[1] + var size int64 + var err error + if size, err = strconv.ParseInt(tokens[3], 10, 64); err != nil { + return nil, err + } + + f.size = size + var status TransferStatus + + status, err = parseTransferStatus(tokens[2]) + + f.status = status + + f.tid = tokens[4] + + if err != nil { + return nil, err + } + + return &f, nil +} + +//Timestamp, sourcename, status, size, id +const formatString = "%s\t%s\t%s\t%v\t%s\t" + +func (f *fileStatus) toLogEntry() string { + return fmt.Sprintf(formatString, time.Now().Format(time.RFC3339Nano), f.sourcename, f.status, f.size, f.tid) +} + +func (f *fileStatus) key() string { + return fmt.Sprintf("%s%v%v", f.sourcename, f.size, f.status) +} + +type transferCompletedRequest struct { + duration time.Duration + response chan error +} + +type fileTransferredRequest struct { + fileName string + response chan error +} + +type isInLogRequest struct { + fileName string + size int64 + status TransferStatus + response chan isInLogResponse +} +type isInLogResponse struct { + inlog bool + err error +} + +//TransferTracker TODO +type TransferTracker struct { + id string + loghandle *os.File + restoredStatus map[string]fileStatus + currentStatus map[string]fileStatus + isInLogReq chan isInLogRequest + fileTransferredReq chan fileTransferredRequest + complete chan error + //transferCompletedReq chan transferCompletedRequest +} + +//NewTransferTracker TODO +func NewTransferTracker(logPath string) (*TransferTracker, error) { + var loghandle *os.File + var err error + load := true + + if loghandle, err = os.OpenFile(logPath, os.O_APPEND|os.O_RDWR, os.ModePerm); err != nil { + if os.IsNotExist(err) { + if loghandle, err = os.Create(logPath); err != nil { + return nil, err + } + load = false + } else { + return nil, err + } + } + + tt := TransferTracker{loghandle: loghandle, + restoredStatus: make(map[string]fileStatus), + currentStatus: make(map[string]fileStatus), + isInLogReq: make(chan isInLogRequest, 500), + fileTransferredReq: make(chan fileTransferredRequest, 500), + complete: make(chan error), + id: fmt.Sprintf("%v_%s", time.Now().Nanosecond(), logPath), + } + if load { + err = tt.load() + if err != nil { + return nil, err + } + } + tt.startTracker() + + return &tt, err +} + +//IsTransferredAndTrackIfNot returns true if the file was previously transferred. If not, track/log that the transferred +//was started. +func (t *TransferTracker) IsTransferredAndTrackIfNot(name string, size int64) (bool, error) { + req := isInLogRequest{fileName: name, + size: size, status: Completed, + response: make(chan isInLogResponse), + } + t.isInLogReq <- req + + resp := <-req.response + + return resp.inlog, resp.err +} + +//TrackFileTransferComplete TODO +func (t *TransferTracker) TrackFileTransferComplete(name string) error { + req := fileTransferredRequest{fileName: name, + response: make(chan error), + } + t.fileTransferredReq <- req + + return <-req.response +} + +//TrackTransferComplete TODO +func (t *TransferTracker) TrackTransferComplete() error { + t.closeChannels() + return <-t.complete +} + +func (t *TransferTracker) isInLog(name string, size int64, status TransferStatus) bool { + fs := fileStatus{sourcename: name, size: size, status: status} + + _, ok := t.restoredStatus[fs.key()] + + return ok +} + +func (t *TransferTracker) load() error { + scanner := bufio.NewScanner(t.loghandle) + for scanner.Scan() { + line := scanner.Text() + s, err := newLogEntry(line) + + if err != nil { + return err + } + + t.restoredStatus[s.key()] = *s + } + + return scanner.Err() +} + +const summaryheader = "----------------------------------------------------------" + +func (t *TransferTracker) writeSummary() error { + t.loghandle.Write([]byte(fmt.Sprintf("%v\n", summaryheader))) + t.loghandle.Write([]byte(fmt.Sprintf("Transfer Completed----------------------------------------\n"))) + t.loghandle.Write([]byte(fmt.Sprintf("Start Summary---------------------------------------------\n"))) + t.loghandle.Write([]byte(fmt.Sprintf("Last Transfer ID:%s\n", t.id))) + t.loghandle.Write([]byte(fmt.Sprintf("Date:%v\n", time.Now().Format(time.UnixDate)))) + + tsize, f, err := t.writeSummaryEntries(t.currentStatus) + if err != nil { + return err + } + tsize2, f2, err := t.writeSummaryEntries(t.restoredStatus) + if err != nil { + return err + } + + t.loghandle.Write([]byte(fmt.Sprintf("Transferred Files:%v\tTotal Size:%d\n", f+f2, tsize+tsize2))) + t.loghandle.Write([]byte(fmt.Sprintf("End Summary-----------------------------------------------\n"))) + + return t.loghandle.Close() +} +func (t *TransferTracker) writeSummaryEntries(entries map[string]fileStatus) (size int64, n int, err error) { + for _, entry := range entries { + if entry.status == Completed { + size = size + entry.size + _, err = t.loghandle.Write([]byte(fmt.Sprintf("File:%s\tSize:%v\tTID:%s\n", entry.sourcename, entry.size, entry.tid))) + if err != nil { + return + } + n++ + } + } + + return +} +func (t *TransferTracker) writeStartedEntry(name string, size int64) error { + + var status TransferStatus = Started + if size == 0 { + status = Ignored + } + + fs := fileStatus{sourcename: name, size: size, status: status, tid: t.id} + t.currentStatus[name] = fs + + line := fmt.Sprintf("%v\n", fs.toLogEntry()) + _, err := t.loghandle.Write([]byte(line)) + return err +} + +func (t *TransferTracker) writeCompleteEntry(name string) error { + fs, ok := t.currentStatus[name] + + if !ok { + return fmt.Errorf("The current status tracker is not consistent. Started entry was not found") + } + + fs.status = Completed + + line := fmt.Sprintf("%v\n", fs.toLogEntry()) + _, err := t.loghandle.Write([]byte(line)) + t.currentStatus[name] = fs + return err +} + +func (t *TransferTracker) startTracker() { + + go func() { + for { + select { + case incReq, ok := <-t.isInLogReq: + if !ok { + break + } + + inlog := t.isInLog(incReq.fileName, incReq.size, Completed) + resp := isInLogResponse{inlog: inlog} + if !inlog { + resp.err = t.writeStartedEntry(incReq.fileName, incReq.size) + } + + incReq.response <- resp + case ftReq, ok := <-t.fileTransferredReq: + if !ok { + t.complete <- t.writeSummary() + return + } + ftReq.response <- t.writeCompleteEntry(ftReq.fileName) + } + } + }() +} + +func (t *TransferTracker) closeChannels() { + close(t.isInLogReq) + close(t.fileTransferredReq) +} + +//SourceFilter TODO +type SourceFilter interface { + IsTransferredAndTrackIfNot(name string, size int64) (bool, error) +} diff --git a/internal/tracker_test.go b/internal/tracker_test.go new file mode 100644 index 0000000..044cdfe --- /dev/null +++ b/internal/tracker_test.go @@ -0,0 +1,78 @@ +package internal + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" +) + +//IMPORTANT: This test will create local files in internal_testdata/ +//make sure *_testdata is in gitinore. +const workdir = "internal_testdata" +const logfile = workdir + "/my_log" + +func TestBasicTracking(t *testing.T) { + os.Mkdir(workdir, 466) + os.Remove(logfile) + + tracker, err := NewTransferTracker(logfile) + + assert.NoError(t, err, "unexpected error") + + istrans, err := tracker.IsTransferredAndTrackIfNot("file1", 10) + assert.NoError(t, err, "unexpected error") + assert.False(t, istrans, "must be fales, i.e. not in the log") + + err = tracker.TrackFileTransferComplete("file1") + assert.NoError(t, err, "unexpected error") + + err = tracker.TrackTransferComplete() + assert.NoError(t, err, "unexpected error") + os.Remove(logfile) + +} + +func TestBasicTrackingAndResume(t *testing.T) { + os.Mkdir(workdir, 0666) + os.Remove(logfile) + + tracker, err := NewTransferTracker(logfile) + + assert.NoError(t, err, "unexpected error") + + istrans, err := tracker.IsTransferredAndTrackIfNot("file1", 10) + assert.NoError(t, err, "unexpected error") + assert.False(t, istrans, "must be false, i.e. not in the log") + + istrans, err = tracker.IsTransferredAndTrackIfNot("file2", 10) + assert.NoError(t, err, "unexpected error") + assert.False(t, istrans, "must be false, i.e. not in the log") + + //only one succeeds. + err = tracker.TrackFileTransferComplete("file1") + assert.NoError(t, err, "unexpected error") + + //closing the handle manually as the program will continue (handle to the file will exists) and need to simulate a crash + err = tracker.loghandle.Close() + assert.NoError(t, err, "unexpected error") + + //create another instance of a tracker at this point only one file should be transferred. + tracker, err = NewTransferTracker(logfile) + assert.NoError(t, err, "unexpected error") + + istrans, err = tracker.IsTransferredAndTrackIfNot("file1", 10) + assert.NoError(t, err, "unexpected error") + assert.True(t, istrans, "must be true, as this file was signaled as succesfully transferred") + + istrans, err = tracker.IsTransferredAndTrackIfNot("file2", 10) + assert.NoError(t, err, "unexpected error") + assert.False(t, istrans, "must be false, this was was not signaled as complete") + + err = tracker.TrackFileTransferComplete("file2") + assert.NoError(t, err, "unexpected error") + + err = tracker.TrackTransferComplete() + assert.NoError(t, err, "unexpected error") + os.Remove(logfile) +} diff --git a/pipelinefactory.go b/pipelinefactory.go index 72da262..52cf23c 100644 --- a/pipelinefactory.go +++ b/pipelinefactory.go @@ -9,11 +9,11 @@ import ( "github.com/Azure/blobporter/transfer" ) -func newTransferPipelines(params *validatedParameters) ([]pipeline.SourcePipeline, pipeline.TargetPipeline, error) { +func newTransferPipelines(params *validatedParameters) (<-chan sources.FactoryResult, pipeline.TargetPipeline, error) { fact := newPipelinesFactory(params) - var sourcesp []pipeline.SourcePipeline + var sourcesp <-chan sources.FactoryResult var targetp pipeline.TargetPipeline var err error @@ -72,7 +72,7 @@ func (p *pipelinesFactory) newTargetPipeline() (pipeline.TargetPipeline, error) return nil, fmt.Errorf("Invalid target segment:%v", p.target) } -func (p *pipelinesFactory) newSourcePipelines() ([]pipeline.SourcePipeline, error) { +func (p *pipelinesFactory) newSourcePipelines() (<-chan sources.FactoryResult, error) { params, err := p.newSourceParams() @@ -83,18 +83,19 @@ func (p *pipelinesFactory) newSourcePipelines() ([]pipeline.SourcePipeline, erro switch p.source { case transfer.File: params := params.(sources.FileSystemSourceParams) - return sources.NewFileSystemSourcePipeline(¶ms), nil + return sources.NewFileSystemSourcePipelineFactory(¶ms), nil case transfer.HTTP: params := params.(sources.HTTPSourceParams) - return []pipeline.SourcePipeline{sources.NewHTTPSourcePipeline(params.SourceURIs, params.TargetAliases, params.SourceParams.CalculateMD5)}, nil + return sources.NewHTTPSourcePipelineFactory(params), nil case transfer.S3: params := params.(sources.S3Params) - return sources.NewS3SourcePipeline(¶ms), nil + return sources.NewS3SourcePipelineFactory(¶ms), nil case transfer.Blob: params := params.(sources.AzureBlobParams) - return sources.NewAzureBlobSourcePipeline(¶ms), nil + return sources.NewAzBlobSourcePipelineFactory(¶ms), nil case transfer.Perf: - return sources.NewPerfSourcePipeline(params.(sources.PerfSourceParams)), nil + params := params.(sources.PerfSourceParams) + return sources.NewPerfSourcePipelineFactory(params), nil } return nil, fmt.Errorf("Invalid source segment:%v", p.source) @@ -104,13 +105,16 @@ func (p *pipelinesFactory) newSourceParams() (interface{}, error) { switch p.source { case transfer.File: return sources.FileSystemSourceParams{ - SourcePatterns: p.valParams.sourceURIs, - BlockSize: p.valParams.blockSize, - TargetAliases: p.valParams.targetAliases, - NumOfPartitions: p.valParams.numberOfReaders, //TODO make this more explicit by numofpartitions as param.. - MD5: p.valParams.calculateMD5, - KeepDirStructure: p.valParams.keepDirStructure, - FilesPerPipeline: p.valParams.numberOfFilesInBatch}, nil + SourcePatterns: p.valParams.sourceURIs, + BlockSize: p.valParams.blockSize, + TargetAliases: p.valParams.targetAliases, + NumOfPartitions: p.valParams.numberOfReaders, //TODO make this more explicit by numofpartitions as param.. + SourceParams: sources.SourceParams{ + Tracker: p.valParams.tracker, + CalculateMD5: p.valParams.calculateMD5, + UseExactNameMatch: p.valParams.useExactMatch, + FilesPerPipeline: p.valParams.numberOfFilesInBatch, + KeepDirStructure: p.valParams.keepDirStructure}}, nil case transfer.HTTP: return sources.HTTPSourceParams{ SourceURIs: p.valParams.sourceURIs, @@ -126,6 +130,7 @@ func (p *pipelinesFactory) newSourceParams() (interface{}, error) { AccessKey: p.valParams.s3Source.accessKey, SecretKey: p.valParams.s3Source.secretKey, SourceParams: sources.SourceParams{ + Tracker: p.valParams.tracker, CalculateMD5: p.valParams.calculateMD5, UseExactNameMatch: p.valParams.useExactMatch, FilesPerPipeline: p.valParams.numberOfFilesInBatch, @@ -140,6 +145,7 @@ func (p *pipelinesFactory) newSourceParams() (interface{}, error) { BaseBlobURL: p.valParams.blobSource.baseBlobURL, SasExp: p.valParams.blobSource.sasExpMin, SourceParams: sources.SourceParams{ + Tracker: p.valParams.tracker, CalculateMD5: p.valParams.calculateMD5, UseExactNameMatch: p.valParams.useExactMatch, FilesPerPipeline: p.valParams.numberOfFilesInBatch, diff --git a/sources/azblobinfo.go b/sources/azblobinfo.go index 02991dd..4d3ef98 100644 --- a/sources/azblobinfo.go +++ b/sources/azblobinfo.go @@ -1,7 +1,6 @@ package sources import ( - "fmt" "log" "path" "time" @@ -39,35 +38,59 @@ func newazBlobInfoProvider(params *AzureBlobParams) *azBlobInfoProvider { return &azBlobInfoProvider{params: params, azUtil: azutil} } -//getSourceInfo gets a list of SourceInfo that represent the list of azure blobs returned by the service -// based on the provided criteria (container/prefix). If the exact match flag is set, then a specific match is -// performed instead of the prefix. Marker semantics are also honored so a complete list is expected -func (b *azBlobInfoProvider) getSourceInfo() ([]pipeline.SourceInfo, error) { - var err error +func (b *azBlobInfoProvider) toSourceInfo(obj *azblob.Blob) (*pipeline.SourceInfo, error) { exp := b.params.SasExp if exp == 0 { exp = defaultSasExpHours } date := time.Now().Add(time.Duration(exp) * time.Minute).UTC() - sourceURIs := make([]pipeline.SourceInfo, 0) + + sourceURLWithSAS := b.azUtil.GetBlobURLWithReadOnlySASToken(obj.Name, date) + + targetAlias := obj.Name + if !b.params.KeepDirStructure { + targetAlias = path.Base(obj.Name) + } + + return &pipeline.SourceInfo{ + SourceName: sourceURLWithSAS.String(), + Size: uint64(*obj.Properties.ContentLength), + TargetAlias: targetAlias}, nil +} + +func (b *azBlobInfoProvider) listObjects(filter internal.SourceFilter) <-chan ObjectListingResult { + sources := make(chan ObjectListingResult, 2) + list := make([]pipeline.SourceInfo, 0) + bsize := 0 blobCallback := func(blob *azblob.Blob, prefix string) (bool, error) { include := true if b.params.UseExactNameMatch { include = blob.Name == prefix } - if include { - sourceURLWithSAS := b.azUtil.GetBlobURLWithReadOnlySASToken(blob.Name, date) - targetAlias := blob.Name - if !b.params.KeepDirStructure { - targetAlias = path.Base(blob.Name) + transferred, err := filter.IsTransferredAndTrackIfNot(blob.Name, int64(*blob.Properties.ContentLength)) + + if err != nil { + return true, err + } + + if include && !transferred { + + si, err := b.toSourceInfo(blob) + + if err != nil { + return true, err + } + + list = append(list, *si) + + if bsize++; bsize == b.params.FilesPerPipeline { + sources <- ObjectListingResult{Sources: list} + list = make([]pipeline.SourceInfo, 0) + bsize = 0 } - sourceURIs = append(sourceURIs, pipeline.SourceInfo{ - SourceName: sourceURLWithSAS.String(), - Size: uint64(*blob.Properties.ContentLength), - TargetAlias: targetAlias}) if b.params.UseExactNameMatch { //true, stops iteration return true, nil @@ -78,19 +101,21 @@ func (b *azBlobInfoProvider) getSourceInfo() ([]pipeline.SourceInfo, error) { return false, nil } - for _, blobName := range b.params.BlobNames { - if err = b.azUtil.IterateBlobList(blobName, blobCallback); err != nil { - return nil, err + go func() { + for _, blobName := range b.params.BlobNames { + if err := b.azUtil.IterateBlobList(blobName, blobCallback); err != nil { + sources <- ObjectListingResult{Err: err} + return + } + if bsize > 0 { + sources <- ObjectListingResult{Sources: list} + list = make([]pipeline.SourceInfo, 0) + bsize = 0 + } } - } + close(sources) - if len(sourceURIs) == 0 { - nameMatchMode := "prefix" - if b.params.UseExactNameMatch { - nameMatchMode = "name" - } - return nil, fmt.Errorf(" the %v %s did not match any blob names ", nameMatchMode, b.params.BlobNames) - } + }() - return sourceURIs, nil + return sources } diff --git a/sources/fileinfo.go b/sources/fileinfo.go new file mode 100644 index 0000000..df16c4b --- /dev/null +++ b/sources/fileinfo.go @@ -0,0 +1,153 @@ +package sources + +import ( + "fmt" + "os" + "path/filepath" +) + +type fileInfoProvider struct { + params *FileSystemSourceParams +} + +type fileInfoResponse struct { + fileInfos map[string]FileInfo + totalNumOfBlocks int64 + totalSize int64 + err error +} + +//FileInfo Contains the metadata associated with a file to be transferred +type FileInfo struct { + FileStats os.FileInfo + SourceURI string + TargetAlias string + NumOfBlocks int +} + +func newfileInfoProvider(params *FileSystemSourceParams) *fileInfoProvider { + return &fileInfoProvider{params: params} +} + +func (f *fileInfoProvider) listSourcesInfo() <-chan fileInfoResponse { + ret := make(chan fileInfoResponse, 1) + finfos := make(map[string]FileInfo) + var tblocks int64 + var tsize int64 + + walkFunc := func(path string, n int, info *FileInfo) (bool, error) { + tblocks += int64(info.NumOfBlocks) + tsize += info.FileStats.Size() + finfos[info.SourceURI] = *info + if (n+1)%f.params.FilesPerPipeline == 0 { //n counter is zero based + ret <- fileInfoResponse{fileInfos: finfos, totalNumOfBlocks: tblocks, totalSize: tsize} + finfos = make(map[string]FileInfo) + tblocks = 0 + tsize = 0 + } + + return false, nil + } + + go func() { + defer close(ret) + for _, pattern := range f.params.SourcePatterns { + if err := f.walkPattern(pattern, walkFunc); err != nil { + ret <- fileInfoResponse{err: err} + break + } + } + + if len(finfos) > 0 { + ret <- fileInfoResponse{fileInfos: finfos, totalNumOfBlocks: tblocks, totalSize: tsize} + } + }() + + return ret +} + +func (f *fileInfoProvider) newFileInfo(sourceURI string, fileStat os.FileInfo, alias string) (*FileInfo, error) { + + //directories are not allowed... + if fileStat.IsDir() { + return nil, nil + } + + if fileStat.Size() == 0 { + return nil, fmt.Errorf("Empty files are not allowed. The file %v is empty", fileStat.Name()) + } + + numOfBlocks := int(fileStat.Size()+int64(f.params.BlockSize-1)) / int(f.params.BlockSize) + + targetName := fileStat.Name() + + if f.params.KeepDirStructure { + targetName = sourceURI + } + + if alias != "" { + targetName = alias + } + return &FileInfo{FileStats: fileStat, SourceURI: sourceURI, TargetAlias: targetName, NumOfBlocks: numOfBlocks}, nil +} + +func (f *fileInfoProvider) walkPattern(pattern string, walkFunc func(path string, n int, info *FileInfo) (bool, error)) error { + matches, err := filepath.Glob(pattern) + + if err != nil { + return err + } + + if len(matches) == 0 { + return fmt.Errorf(" the pattern %v did not match any files", pattern) + } + + include := true + useAlias := len(f.params.SourcePatterns) == 1 && len(f.params.TargetAliases) == len(matches) + n := 0 + for fi := 0; fi < len(matches); fi++ { + fsinfo, err := os.Stat(matches[fi]) + + if err != nil { + return err + } + + if f.params.Tracker != nil { + var transferred bool + if transferred, err = f.params.Tracker.IsTransferredAndTrackIfNot(matches[fi], fsinfo.Size()); err != nil { + return err + } + + include = !transferred + } + + alias := "" + + if useAlias { + alias = f.params.TargetAliases[fi] + } + + if include { + info, err := f.newFileInfo(matches[fi], fsinfo, alias) + + if err != nil { + return err + } + + if info != nil { + + stop, err := walkFunc(matches[fi], n, info) + + if err != nil { + return err + } + n++ + if stop { + return nil + } + } + } + } + + return nil +} diff --git a/sources/http.go b/sources/http.go index 94b7c95..2f6e531 100644 --- a/sources/http.go +++ b/sources/http.go @@ -11,6 +11,7 @@ import ( "fmt" "net/http" + "net/url" "github.com/Azure/blobporter/internal" "github.com/Azure/blobporter/pipeline" @@ -30,51 +31,9 @@ type HTTPSource struct { includeMD5 bool } -type sourceHTTPPipelineFactory func(httpSource HTTPSource) (pipeline.SourcePipeline, error) - -func newHTTPSource(sourceListManager objectListManager, pipelineFactory sourceHTTPPipelineFactory, numOfFilePerPipeline int, includeMD5 bool) ([]pipeline.SourcePipeline, error) { - var err error - var sourceInfos []pipeline.SourceInfo - - if sourceInfos, err = sourceListManager.getSourceInfo(); err != nil { - return nil, err - } - - if numOfFilePerPipeline <= 0 { - return nil, fmt.Errorf("Invalid operation. The number of files per batch must be greater than zero") - } - - numOfBatches := (len(sourceInfos) + numOfFilePerPipeline - 1) / numOfFilePerPipeline - pipelines := make([]pipeline.SourcePipeline, numOfBatches) - numOfFilesInBatch := numOfFilePerPipeline - filesSent := len(sourceInfos) - start := 0 - - for b := 0; b < numOfBatches; b++ { - - start = b * numOfFilesInBatch - - if filesSent < numOfFilesInBatch { - numOfFilesInBatch = filesSent - } - - httpSource := HTTPSource{Sources: sourceInfos[start : start+numOfFilesInBatch], HTTPClient: httpSourceHTTPClient, includeMD5: includeMD5} - - pipelines[b], err = pipelineFactory(httpSource) - - if err != nil { - return nil, err - } - - filesSent = filesSent - numOfFilesInBatch - } - - return pipelines, err -} - -//NewHTTPSourcePipeline creates a new instance of an HTTP source +//newHTTPSourcePipeline creates a new instance of an HTTP source //To get the file size, a HTTP HEAD request is issued and the Content-Length header is inspected. -func NewHTTPSourcePipeline(sourceURIs []string, targetAliases []string, md5 bool) pipeline.SourcePipeline { +func newHTTPSourcePipeline(sourceURIs []string, targetAliases []string, md5 bool) pipeline.SourcePipeline { setTargetAlias := len(sourceURIs) == len(targetAliases) sources := make([]pipeline.SourceInfo, len(sourceURIs)) for i := 0; i < len(sourceURIs); i++ { @@ -83,7 +42,7 @@ func NewHTTPSourcePipeline(sourceURIs []string, targetAliases []string, md5 bool targetAlias = targetAliases[i] } else { var err error - targetAlias, err = util.GetFileNameFromURL(sourceURIs[i]) + targetAlias, err = getFileNameFromURL(sourceURIs[i]) if err != nil { log.Fatal(err) @@ -98,6 +57,24 @@ func NewHTTPSourcePipeline(sourceURIs []string, targetAliases []string, md5 bool return &HTTPSource{Sources: sources, HTTPClient: httpSourceHTTPClient, includeMD5: md5} } +// returns last part of URL (filename) +func getFileNameFromURL(sourceURI string) (string, error) { + + purl, err := url.Parse(sourceURI) + + if err != nil { + return "", err + } + + parts := strings.Split(purl.Path, "/") + + if len(parts) == 0 { + return "", fmt.Errorf("Invalid URL file was not found in the path") + } + + return parts[len(parts)-1], nil +} + func getSourceSize(sourceURI string) (size int) { client := &http.Client{} resp, err := client.Head(sourceURI) diff --git a/sources/multifile.go b/sources/multifile.go index cf66a3e..ca6a3a7 100644 --- a/sources/multifile.go +++ b/sources/multifile.go @@ -5,8 +5,6 @@ import ( "os" "sync" - "path/filepath" - "fmt" "io" @@ -31,132 +29,17 @@ type FileSystemSource struct { handlePool *internal.FileHandlePool } -//FileInfo Contains the metadata associated with a file to be transferred -type FileInfo struct { - FileStats *os.FileInfo - SourceURI string - TargetAlias string - NumOfBlocks int -} - //FileSystemSourceParams parameters used to create a new instance of multi-file source pipeline type FileSystemSourceParams struct { - SourcePatterns []string - BlockSize uint64 - TargetAliases []string - NumOfPartitions int - MD5 bool - FilesPerPipeline int - KeepDirStructure bool -} - -// NewFileSystemSourcePipeline creates a new MultiFilePipeline. -// If the sourcePattern results in a single file and the targetAlias is set, the alias will be used as the target name. -// Otherwise the original file name will be used. -func NewFileSystemSourcePipeline(params *FileSystemSourceParams) []pipeline.SourcePipeline { - var files []string - var err error - //get files from patterns - for i := 0; i < len(params.SourcePatterns); i++ { - var sourceFiles []string - if sourceFiles, err = filepath.Glob(params.SourcePatterns[i]); err != nil { - log.Fatal(err) - } - files = append(files, sourceFiles...) - } - - if params.FilesPerPipeline <= 0 { - log.Fatal(fmt.Errorf("Invalid operation. The number of files per batch must be greater than zero")) - } - - if len(files) == 0 { - log.Fatal(fmt.Errorf("The pattern(s) %v did not match any files", fmt.Sprint(params.SourcePatterns))) - } - - numOfBatches := (len(files) + params.FilesPerPipeline - 1) / params.FilesPerPipeline - pipelines := make([]pipeline.SourcePipeline, numOfBatches) - numOfFilesInBatch := params.FilesPerPipeline - filesSent := len(files) - start := 0 - for b := 0; b < numOfBatches; b++ { - var targetAlias []string - - if filesSent < numOfFilesInBatch { - numOfFilesInBatch = filesSent - } - start = b * numOfFilesInBatch - - if len(params.TargetAliases) == len(files) { - targetAlias = params.TargetAliases[start : start+numOfFilesInBatch] - } - pipelines[b] = newMultiFilePipeline(files[start:start+numOfFilesInBatch], - targetAlias, - params.BlockSize, - params.NumOfPartitions, - params.MD5, - params.KeepDirStructure) - filesSent = filesSent - numOfFilesInBatch - } - - return pipelines + SourceParams + SourcePatterns []string + BlockSize uint64 + TargetAliases []string + NumOfPartitions int } const maxNumOfHandlesPerFile int = 4 -func newMultiFilePipeline(files []string, targetAliases []string, blockSize uint64, numOfPartitions int, md5 bool, keepDirStructure bool) pipeline.SourcePipeline { - totalNumberOfBlocks := 0 - var totalSize uint64 - var err error - fileInfos := make(map[string]FileInfo) - useTargetAlias := len(targetAliases) == len(files) - for f := 0; f < len(files); f++ { - var fileStat os.FileInfo - var sName string - - if fileStat, err = os.Stat(files[f]); err != nil { - log.Fatalf("Error: %v", err) - } - - //directories are not allowed... so skipping them - if fileStat.IsDir() { - continue - } - - if fileStat.Size() == 0 { - log.Fatalf("Empty files are not allowed. The file %v is empty", files[f]) - } - numOfBlocks := int(uint64(fileStat.Size())+(blockSize-1)) / int(blockSize) - totalSize = totalSize + uint64(fileStat.Size()) - totalNumberOfBlocks = totalNumberOfBlocks + numOfBlocks - - //use the param instead of the original filename only when - //the number of targets matches the number files to transfer - if useTargetAlias { - sName = targetAliases[f] - } else { - sName = fileStat.Name() - if keepDirStructure { - sName = files[f] - } - - } - - fileInfo := FileInfo{FileStats: &fileStat, SourceURI: files[f], TargetAlias: sName, NumOfBlocks: numOfBlocks} - fileInfos[files[f]] = fileInfo - } - - handlePool := internal.NewFileHandlePool(maxNumOfHandlesPerFile, internal.Read, false) - - return &FileSystemSource{filesInfo: fileInfos, - totalNumberOfBlocks: totalNumberOfBlocks, - blockSize: blockSize, - totalSize: totalSize, - numOfPartitions: numOfPartitions, - includeMD5: md5, - handlePool: handlePool, - } -} - //ExecuteReader implements ExecuteReader from the pipeline.SourcePipeline Interface. //For each file the reader will maintain a open handle from which data will be read. // This implementation uses partitions (group of parts that can be read sequentially). @@ -234,7 +117,7 @@ func (f *FileSystemSource) GetSourcesInfo() []pipeline.SourceInfo { sources := make([]pipeline.SourceInfo, len(f.filesInfo)) var i = 0 for _, file := range f.filesInfo { - sources[i] = pipeline.SourceInfo{SourceName: file.SourceURI, TargetAlias: file.TargetAlias, Size: uint64((*file.FileStats).Size())} + sources[i] = pipeline.SourceInfo{SourceName: file.SourceURI, TargetAlias: file.TargetAlias, Size: uint64(file.FileStats.Size())} i++ } @@ -283,7 +166,7 @@ func (f *FileSystemSource) ConstructBlockInfoQueue(blockSize uint64) (partitions pindex := 0 maxpartitionNumber := 0 for _, source := range f.filesInfo { - partitions := pipeline.ConstructPartsPartition(f.numOfPartitions, (*source.FileStats).Size(), int64(blockSize), source.SourceURI, source.TargetAlias, bufferQ) + partitions := pipeline.ConstructPartsPartition(f.numOfPartitions, source.FileStats.Size(), int64(blockSize), source.SourceURI, source.TargetAlias, bufferQ) allPartitions[pindex] = partitions if len(partitions) > maxpartitionNumber { maxpartitionNumber = len(partitions) diff --git a/sources/ostorefactory.go b/sources/ostorefactory.go deleted file mode 100644 index 46dfde7..0000000 --- a/sources/ostorefactory.go +++ /dev/null @@ -1,75 +0,0 @@ -package sources - -import ( - "fmt" - "log" - - "github.com/Azure/blobporter/pipeline" -) - -//AzureBlobSource constructs parts channel and implements data readers for Azure Blobs exposed via HTTP -type AzureBlobSource struct { - HTTPSource - container string - blobNames []string - exactNameMatch bool -} - -//NewAzureBlobSourcePipeline creates a new instance of the HTTPPipeline for Azure Blobs -func NewAzureBlobSourcePipeline(params *AzureBlobParams) []pipeline.SourcePipeline { - var err error - var azObjStorage objectListManager - azObjStorage = newazBlobInfoProvider(params) - - if params.FilesPerPipeline <= 0 { - log.Fatal(fmt.Errorf("Invalid operation. The number of files per batch must be greater than zero")) - } - - factory := func(httpSource HTTPSource) (pipeline.SourcePipeline, error) { - return &AzureBlobSource{container: params.Container, - blobNames: params.BlobNames, - HTTPSource: httpSource, - exactNameMatch: params.SourceParams.UseExactNameMatch}, nil - } - - var pipelines []pipeline.SourcePipeline - if pipelines, err = newHTTPSource(azObjStorage, factory, params.SourceParams.FilesPerPipeline, params.SourceParams.CalculateMD5); err != nil { - log.Fatal(err) - } - - return pipelines -} - -//S3Source S3 source HTTP based pipeline -type S3Source struct { - HTTPSource - exactNameMatch bool -} - -//NewS3SourcePipeline creates a new instance of the HTTPPipeline for S3 -func NewS3SourcePipeline(params *S3Params) []pipeline.SourcePipeline { - var err error - var s3ObjStorage objectListManager - s3ObjStorage, err = newS3InfoProvider(params) - - if err != nil { - log.Fatal(err) - } - - if params.FilesPerPipeline <= 0 { - log.Fatal(fmt.Errorf("Invalid operation. The number of files per batch must be greater than zero")) - } - - factory := func(httpSource HTTPSource) (pipeline.SourcePipeline, error) { - return &S3Source{ - HTTPSource: httpSource, - exactNameMatch: params.SourceParams.UseExactNameMatch}, nil - } - - var pipelines []pipeline.SourcePipeline - if pipelines, err = newHTTPSource(s3ObjStorage, factory, params.SourceParams.FilesPerPipeline, params.SourceParams.CalculateMD5); err != nil { - log.Fatal(err) - } - - return pipelines -} diff --git a/sources/perfsource.go b/sources/perfsource.go index 37cba4e..ff8cb84 100644 --- a/sources/perfsource.go +++ b/sources/perfsource.go @@ -80,8 +80,8 @@ type PerfSourceParams struct { BlockSize uint64 } -//NewPerfSourcePipeline TODO -func NewPerfSourcePipeline(params PerfSourceParams) []pipeline.SourcePipeline { +//newPerfSourcePipeline TODO +func newPerfSourcePipeline(params PerfSourceParams) []pipeline.SourcePipeline { ssps := make([]pipeline.SourcePipeline, 1) ssp := PerfSourcePipeline{ definitions: params.Definitions, diff --git a/sources/s3info.go b/sources/s3info.go index 3ab879c..a281d07 100644 --- a/sources/s3info.go +++ b/sources/s3info.go @@ -1,15 +1,15 @@ package sources import ( - "fmt" "log" "net/url" "path" + "strings" "time" - "github.com/minio/minio-go" - + "github.com/Azure/blobporter/internal" "github.com/Azure/blobporter/pipeline" + "github.com/minio/minio-go" ) //S3Params parameters used to create a new instance of a S3 source pipeline @@ -33,6 +33,7 @@ type s3InfoProvider struct { func newS3InfoProvider(params *S3Params) (*s3InfoProvider, error) { s3client, err := minio.New(params.Endpoint, params.AccessKey, params.SecretKey, true) + if err != nil { log.Fatalln(err) } @@ -41,89 +42,83 @@ func newS3InfoProvider(params *S3Params) (*s3InfoProvider, error) { } -//getSourceInfo gets a list of SourceInfo that represent the list of objects returned by the service -// based on the provided criteria (bucket/prefix). If the exact match flag is set, then a specific match is -// performed instead of the prefix. Marker semantics are also honored so a complete list is expected -func (s *s3InfoProvider) getSourceInfo() ([]pipeline.SourceInfo, error) { +func (s *s3InfoProvider) toSourceInfo(obj *minio.ObjectInfo) (*pipeline.SourceInfo, error) { + exp := time.Duration(s.params.PreSignedExpMin) * time.Minute + + u, err := s.s3client.PresignedGetObject(s.params.Bucket, obj.Key, exp, url.Values{}) - objLists, err := s.getObjectLists() if err != nil { return nil, err } - exp := time.Duration(s.params.PreSignedExpMin) * time.Minute - - sourceURIs := make([]pipeline.SourceInfo, 0) - for prefix, objList := range objLists { - - for _, obj := range objList { - - include := true - - if s.params.SourceParams.UseExactNameMatch { - include = obj.Key == prefix - } + targetAlias := obj.Key + if !s.params.KeepDirStructure { + targetAlias = path.Base(obj.Key) + } - if include { + return &pipeline.SourceInfo{ + SourceName: u.String(), + Size: uint64(obj.Size), + TargetAlias: targetAlias}, nil - var u *url.URL - u, err = s.s3client.PresignedGetObject(s.params.Bucket, obj.Key, exp, url.Values{}) +} - if err != nil { - return nil, err +func (s *s3InfoProvider) listObjects(filter internal.SourceFilter) <-chan ObjectListingResult { + sources := make(chan ObjectListingResult, 2) + go func() { + list := make([]pipeline.SourceInfo, 0) + bsize := 0 + + for _, prefix := range s.params.Prefixes { + // Create a done channel to control 'ListObjects' go routine. + done := make(chan struct{}) + defer close(done) + for object := range s.s3client.ListObjects(s.params.Bucket, prefix, true, done) { + if object.Err != nil { + sources <- ObjectListingResult{Err: object.Err} + return } + include := true - targetAlias := obj.Key - if !s.params.KeepDirStructure { - targetAlias = path.Base(obj.Key) + if s.params.SourceParams.UseExactNameMatch { + include = object.Key == prefix } - sourceURIs = append(sourceURIs, pipeline.SourceInfo{ - SourceName: u.String(), - Size: uint64(obj.Size), - TargetAlias: targetAlias}) + isfolder := strings.HasSuffix(object.Key, "/") && object.Size == 0 - } - } - } - - if len(sourceURIs) == 0 { - nameMatchMode := "prefix" - if s.params.UseExactNameMatch { - nameMatchMode = "name" - } - return nil, fmt.Errorf(" the %v %s did not match any object key(s) ", nameMatchMode, s.params.Prefixes) - } - - return sourceURIs, nil + transferred, err := filter.IsTransferredAndTrackIfNot(object.Key, object.Size) -} -func (s *s3InfoProvider) getObjectLists() (map[string][]minio.ObjectInfo, error) { - listLength := 1 + if err != nil { + sources <- ObjectListingResult{Err: err} + return + } - if len(s.params.Prefixes) > 1 { - listLength = len(s.params.Prefixes) - } + if include && !isfolder && !transferred { + si, err := s.toSourceInfo(&object) - listOfLists := make(map[string][]minio.ObjectInfo, listLength) + if err != nil { + sources <- ObjectListingResult{Err: err} + return + } + list = append(list, *si) - for _, prefix := range s.params.Prefixes { - list := make([]minio.ObjectInfo, 0) + if bsize++; bsize == s.params.FilesPerPipeline { + sources <- ObjectListingResult{Sources: list} + list = make([]pipeline.SourceInfo, 0) + bsize = 0 + } + } - // Create a done channel to control 'ListObjects' go routine. - done := make(chan struct{}) + } - defer close(done) - for object := range s.s3client.ListObjects(s.params.Bucket, prefix, true, done) { - if object.Err != nil { - return nil, object.Err + if bsize > 0 { + sources <- ObjectListingResult{Sources: list} + list = make([]pipeline.SourceInfo, 0) + bsize = 0 } - list = append(list, object) } + close(sources) + }() - listOfLists[prefix] = list - } - - return listOfLists, nil - + return sources } diff --git a/sources/sourcefactory.go b/sources/sourcefactory.go new file mode 100644 index 0000000..1014997 --- /dev/null +++ b/sources/sourcefactory.go @@ -0,0 +1,164 @@ +package sources + +import ( + "fmt" + + "github.com/Azure/blobporter/internal" + "github.com/Azure/blobporter/pipeline" +) + +//AzureBlobSource constructs parts channel and implements data readers for Azure Blobs exposed via HTTP +type AzureBlobSource struct { + HTTPSource + container string + blobNames []string + exactNameMatch bool +} + +//S3Source S3 source HTTP based pipeline +type S3Source struct { + HTTPSource + exactNameMatch bool +} + +//FactoryResult TODO +type FactoryResult struct { + Source pipeline.SourcePipeline + Err error +} + +//NewHTTPSourcePipelineFactory TODO +func NewHTTPSourcePipelineFactory(params HTTPSourceParams) <-chan FactoryResult { + result := make(chan FactoryResult, 1) + defer close(result) + + result <- FactoryResult{ + Source: newHTTPSourcePipeline(params.SourceURIs, + params.TargetAliases, + params.SourceParams.CalculateMD5), + } + return result +} + +//NewPerfSourcePipelineFactory TODO +func NewPerfSourcePipelineFactory(params PerfSourceParams) <-chan FactoryResult { + result := make(chan FactoryResult, 1) + defer close(result) + + perf := newPerfSourcePipeline(params) + result <- FactoryResult{Source: perf[0]} + return result +} + +//NewS3SourcePipelineFactory returns TODO +func NewS3SourcePipelineFactory(params *S3Params) <-chan FactoryResult { + var err error + s3Provider, err := newS3InfoProvider(params) + + if err != nil { + return factoryError(err) + } + + var filter internal.SourceFilter + filter = &defaultItemFilter{} + + if params.Tracker != nil { + filter = params.Tracker + } + return newObjectListPipelineFactory(s3Provider, filter, params.CalculateMD5, params.FilesPerPipeline) +} + +//NewAzBlobSourcePipelineFactory TODO +func NewAzBlobSourcePipelineFactory(params *AzureBlobParams) <-chan FactoryResult { + azProvider := newazBlobInfoProvider(params) + + var filter internal.SourceFilter + filter = &defaultItemFilter{} + + if params.Tracker != nil { + filter = params.Tracker + } + + return newObjectListPipelineFactory(azProvider, filter, params.CalculateMD5, params.FilesPerPipeline) +} + +//NewFileSystemSourcePipelineFactory TODO +func NewFileSystemSourcePipelineFactory(params *FileSystemSourceParams) <-chan FactoryResult { + result := make(chan FactoryResult, 1) + provider := newfileInfoProvider(params) + + go func() { + defer close(result) + for fInfoResp := range provider.listSourcesInfo() { + + if fInfoResp.err != nil { + result <- FactoryResult{Err: fInfoResp.err} + return + } + + handlePool := internal.NewFileHandlePool(maxNumOfHandlesPerFile, internal.Read, false) + result <- FactoryResult{ + Source: &FileSystemSource{filesInfo: fInfoResp.fileInfos, + totalNumberOfBlocks: int(fInfoResp.totalNumOfBlocks), + blockSize: params.BlockSize, + totalSize: uint64(fInfoResp.totalSize), + numOfPartitions: params.NumOfPartitions, + includeMD5: params.CalculateMD5, + handlePool: handlePool, + }, + } + } + return + }() + + return result +} + +func newObjectListPipelineFactory(provider objectListProvider, filter internal.SourceFilter, includeMD5 bool, filesPerPipeline int) <-chan FactoryResult { + result := make(chan FactoryResult, 1) + var err error + + if filesPerPipeline <= 0 { + err = fmt.Errorf("Invalid operation. The number of files per batch must be greater than zero") + return factoryError(err) + } + + go func() { + defer close(result) + for lst := range provider.listObjects(filter) { + if lst.Err != nil { + result <- FactoryResult{Err: lst.Err} + return + } + + httpSource := &HTTPSource{Sources: lst.Sources, + HTTPClient: httpSourceHTTPClient, + includeMD5: includeMD5, + } + result <- FactoryResult{Source: httpSource} + } + }() + + return result +} + +func factoryError(err error) chan FactoryResult { + result := make(chan FactoryResult, 1) + defer close(result) + result <- FactoryResult{Err: err} + return result +} + +type defaultItemFilter struct { +} + +//IsIncluded TODO +func (f *defaultItemFilter) IsTransferredAndTrackIfNot(name string, size int64) (bool, error) { + return false, nil +} + +//ObjectListingResult TODO +type ObjectListingResult struct { + Sources []pipeline.SourceInfo + Err error +} diff --git a/sources/types.go b/sources/types.go index 65c60f0..cd4b9ac 100644 --- a/sources/types.go +++ b/sources/types.go @@ -1,10 +1,9 @@ package sources -import "github.com/Azure/blobporter/pipeline" +import "github.com/Azure/blobporter/internal" -//objectListManager abstracs the oerations required to get a list of sources/objects from a underlying service such as Azure Object storage and S3 -type objectListManager interface { - getSourceInfo() ([]pipeline.SourceInfo, error) +type objectListProvider interface { + listObjects(filter internal.SourceFilter) <-chan ObjectListingResult } //SourceParams input base parameters for blob and S3 based pipelines @@ -13,6 +12,7 @@ type SourceParams struct { UseExactNameMatch bool KeepDirStructure bool FilesPerPipeline int + Tracker *internal.TransferTracker } //HTTPSourceParams input parameters for HTTP pipelines diff --git a/transfer/transfer.go b/transfer/transfer.go index e21ee15..ccdfa72 100644 --- a/transfer/transfer.go +++ b/transfer/transfer.go @@ -11,6 +11,8 @@ import ( "sync/atomic" "time" + "github.com/Azure/blobporter/internal" + "github.com/Azure/blobporter/pipeline" "github.com/Azure/blobporter/util" ) @@ -266,8 +268,8 @@ type ProgressUpdate func(results pipeline.WorkerResult, committedCount int, buff //Transfer top data structure holding the state of the transfer. type Transfer struct { - SourcePipeline *pipeline.SourcePipeline - TargetPipeline *pipeline.TargetPipeline + SourcePipeline pipeline.SourcePipeline + TargetPipeline pipeline.TargetPipeline NumOfReaders int NumOfWorkers int TotalNumOfBlocks int @@ -276,6 +278,7 @@ type Transfer struct { SyncWaitGroups *WaitGroups ControlChannels *Channels TimeStats *TimeStatsInfo + tracker *internal.TransferTracker readPartsBufferSize int blockSize uint64 totalNumberOfRetries int32 @@ -309,7 +312,7 @@ const extraThreadTarget = 4 //NewTransfer creates a new Transfer, this will adjust the thread target //and initialize the channels and the wait groups for the writers, readers and the committers -func NewTransfer(source *pipeline.SourcePipeline, target *pipeline.TargetPipeline, readers int, workers int, blockSize uint64) *Transfer { +func NewTransfer(source pipeline.SourcePipeline, target pipeline.TargetPipeline, readers int, workers int, blockSize uint64) *Transfer { threadTarget := readers + workers + extraThreadTarget runtime.GOMAXPROCS(runtime.NumCPU()) @@ -321,7 +324,7 @@ func NewTransfer(source *pipeline.SourcePipeline, target *pipeline.TargetPipelin blockSize: blockSize} //validate that all sourceURIs are unique - sources := (*source).GetSourcesInfo() + sources := source.GetSourcesInfo() if s := getFirstDuplicateSource(sources); s != nil { log.Fatalf("Invalid input. You can't start a transfer with duplicate sources.\nFirst duplicate detected:%v\n", s) @@ -332,7 +335,7 @@ func NewTransfer(source *pipeline.SourcePipeline, target *pipeline.TargetPipelin t.SyncWaitGroups.Commits.Add(1) //Create buffered channels - channels.Partitions, channels.Parts, t.TotalNumOfBlocks, t.TotalSize = (*source).ConstructBlockInfoQueue(blockSize) + channels.Partitions, channels.Parts, t.TotalNumOfBlocks, t.TotalSize = source.ConstructBlockInfoQueue(blockSize) readParts := make(chan pipeline.Part, t.getReadPartsBufferSize()) results := make(chan pipeline.WorkerResult, t.TotalNumOfBlocks) @@ -382,14 +385,14 @@ func (t *Transfer) StartTransfer(dupeLevel DupeCheckLevel, progressBarDelegate P //Concurrely calls the PreProcessSourceInfo implementation of the target pipeline for each source in the transfer. func (t *Transfer) preprocessSources() { - sourcesInfo := (*t.SourcePipeline).GetSourcesInfo() + sourcesInfo := t.SourcePipeline.GetSourcesInfo() var wg sync.WaitGroup wg.Add(len(sourcesInfo)) for i := 0; i < len(sourcesInfo); i++ { go func(s *pipeline.SourceInfo, b uint64) { defer wg.Done() - if err := (*t.TargetPipeline).PreProcessSourceInfo(s, b); err != nil { + if err := t.TargetPipeline.PreProcessSourceInfo(s, b); err != nil { log.Fatal(err) } }(&sourcesInfo[i], t.blockSize) @@ -408,14 +411,13 @@ func (t *Transfer) WaitForCompletion() (time.Duration, time.Duration) { close(t.ControlChannels.Results) t.SyncWaitGroups.Commits.Wait() // Ensure all commits complete t.TimeStats.Duration = time.Now().Sub(t.TimeStats.StartTime) - return t.TimeStats.Duration, t.TimeStats.CumWriteDuration } //GetStats returns the statistics of the transfer. func (t *Transfer) GetStats() *StatInfo { return &StatInfo{ - NumberOfFiles: len((*t.SourcePipeline).GetSourcesInfo()), + NumberOfFiles: len(t.SourcePipeline.GetSourcesInfo()), Duration: t.TimeStats.Duration, CumWriteDuration: t.TimeStats.CumWriteDuration, TotalNumberOfBlocks: t.TotalNumOfBlocks, @@ -425,7 +427,7 @@ func (t *Transfer) GetStats() *StatInfo { // StartWorkers creates and starts the set of Workers to send data blocks // from the to the target. Workers are started after waiting workerDelayStarTime. -func (t *Transfer) startWorkers(workerQ chan pipeline.Part, resultQ chan pipeline.WorkerResult, numOfWorkers int, wg *sync.WaitGroup, d DupeCheckLevel, target *pipeline.TargetPipeline) { +func (t *Transfer) startWorkers(workerQ chan pipeline.Part, resultQ chan pipeline.WorkerResult, numOfWorkers int, wg *sync.WaitGroup, d DupeCheckLevel, target pipeline.TargetPipeline) { for w := 0; w < numOfWorkers; w++ { worker := newWorker(w, workerQ, resultQ, wg, d) go worker.startWorker(target) @@ -434,7 +436,7 @@ func (t *Transfer) startWorkers(workerQ chan pipeline.Part, resultQ chan pipelin //ProcessAndCommitResults reads from the results channel and calls the target's implementations of the ProcessWrittenPart // and CommitList, if the last part is received. The update delegate is called as well. -func (t *Transfer) processAndCommitResults(resultQ chan pipeline.WorkerResult, update ProgressUpdate, target *pipeline.TargetPipeline, commitWg *sync.WaitGroup) { +func (t *Transfer) processAndCommitResults(resultQ chan pipeline.WorkerResult, update ProgressUpdate, target pipeline.TargetPipeline, commitWg *sync.WaitGroup) { lists := make(map[string]pipeline.TargetCommittedListInfo) blocksProcessed := make(map[string]int) @@ -456,7 +458,7 @@ func (t *Transfer) processAndCommitResults(resultQ chan pipeline.WorkerResult, u list = lists[result.SourceURI] numOfBlocks = blocksProcessed[result.SourceURI] - if requeue, err = (*target).ProcessWrittenPart(&result, &list); err == nil { + if requeue, err = target.ProcessWrittenPart(&result, &list); err == nil { lists[result.SourceURI] = list if requeue { resultQ <- result @@ -471,10 +473,15 @@ func (t *Transfer) processAndCommitResults(resultQ chan pipeline.WorkerResult, u } if numOfBlocks == result.NumberOfBlocks { - if _, err := (*target).CommitList(&list, numOfBlocks, result.TargetName); err != nil { + if _, err := target.CommitList(&list, numOfBlocks, result.TargetName); err != nil { log.Fatal(err) } committedCount++ + if t.tracker != nil { + if err := t.tracker.TrackFileTransferComplete(result.TargetName); err != nil { + log.Fatal(err) + } + } } workerBufferLevel = int(float64(len(t.ControlChannels.ReadParts)) / float64(t.getReadPartsBufferSize()) * 100) @@ -483,23 +490,28 @@ func (t *Transfer) processAndCommitResults(resultQ chan pipeline.WorkerResult, u } } +//SetTransferTracker TODO +func (t *Transfer) SetTransferTracker(tracker *internal.TransferTracker) { + t.tracker = tracker +} + // StartReaders starts 'n' readers. Each reader is a routine that executes the source's implementations of ExecuteReader. -func (t *Transfer) startReaders(partitionsQ chan pipeline.PartsPartition, partsQ chan pipeline.Part, workerQ chan pipeline.Part, numberOfReaders int, wg *sync.WaitGroup, pipeline *pipeline.SourcePipeline) { +func (t *Transfer) startReaders(partitionsQ chan pipeline.PartsPartition, partsQ chan pipeline.Part, workerQ chan pipeline.Part, numberOfReaders int, wg *sync.WaitGroup, pipeline pipeline.SourcePipeline) { for i := 0; i < numberOfReaders; i++ { - go (*pipeline).ExecuteReader(partitionsQ, partsQ, workerQ, i, wg) + go pipeline.ExecuteReader(partitionsQ, partsQ, workerQ, i, wg) } } // startWorker starts a worker that reads from the worker queue channel. Which contains the read parts from the source. // Calls the target's WritePart implementation and sends the result to the results channel. -func (w *Worker) startWorker(target *pipeline.TargetPipeline) { +func (w *Worker) startWorker(target pipeline.TargetPipeline) { var tb pipeline.Part var ok bool var duration time.Duration var startTime time.Time var retries int var err error - var t = (*target) + var t = target defer w.Wg.Done() for { tb, ok = <-w.WorkerQueue diff --git a/transfer/transfer_test.go b/transfer/transfer_test.go index 23a703e..6960af3 100644 --- a/transfer/transfer_test.go +++ b/transfer/transfer_test.go @@ -21,7 +21,7 @@ import ( //******** //IMPORTANT: // -- Tests require a valid storage account and env variables set up accordingly. -// -- Tests create a working directory named _wd and temp files under it. Plase make sure the working directory is in .gitignore +// -- Tests create a working directory named transfer_testdata and temp files under it. Plase make sure the working directory is in .gitignore //******** var sourceFiles = make([]string, 1) var accountName = os.Getenv("ACCOUNT_NAME") @@ -36,7 +36,7 @@ var targetBaseBlobURL = "" const ( containerName1 = "bptest" containerName2 = "bphttptest" - tempDir = "_wd" + tempDir = "transfer_testdata" ) func TestFileToPageHTTPToPage(t *testing.T) { @@ -45,14 +45,17 @@ func TestFileToPageHTTPToPage(t *testing.T) { var sourceFile = createPageFile("tb", 1) sourceParams := &sources.FileSystemSourceParams{ - SourcePatterns: []string{sourceFile}, - BlockSize: blockSize, - FilesPerPipeline: filesPerPipeline, - NumOfPartitions: numOfReaders, - KeepDirStructure: true, - MD5: true} - - fp := sources.NewFileSystemSourcePipeline(sourceParams)[0] + SourcePatterns: []string{sourceFile}, + BlockSize: blockSize, + NumOfPartitions: numOfReaders, + SourceParams: sources.SourceParams{ + FilesPerPipeline: filesPerPipeline, + KeepDirStructure: true, + CalculateMD5: true}} + + fpf := <-sources.NewFileSystemSourcePipelineFactory(sourceParams) + fp := fpf.Source + tparams := targets.AzureTargetParams{ AccountName: accountName, AccountKey: accountKey, @@ -60,7 +63,7 @@ func TestFileToPageHTTPToPage(t *testing.T) { BaseBlobURL: targetBaseBlobURL} ap := targets.NewAzurePageTargetPipeline(tparams) - tfer := NewTransfer(&fp, &ap, numOfReaders, numOfWorkers, blockSize) + tfer := NewTransfer(fp, ap, numOfReaders, numOfWorkers, blockSize) tfer.StartTransfer(None, delegate) tfer.WaitForCompletion() sourceURI, err := createSasTokenURL(sourceFile, container) @@ -73,8 +76,16 @@ func TestFileToPageHTTPToPage(t *testing.T) { BaseBlobURL: targetBaseBlobURL} ap = targets.NewAzurePageTargetPipeline(tparams) - fp = sources.NewHTTPSourcePipeline([]string{sourceURI}, []string{sourceFile}, true) - tfer = NewTransfer(&fp, &ap, numOfReaders, numOfWorkers, blockSize) + + httpparams := sources.HTTPSourceParams{SourceParams: sources.SourceParams{ + CalculateMD5: true}, + SourceURIs: []string{sourceURI}, + TargetAliases: []string{sourceFile}, + } + + fpf = <-sources.NewHTTPSourcePipelineFactory(httpparams) + fp = fpf.Source + tfer = NewTransfer(fp, ap, numOfReaders, numOfWorkers, blockSize) tfer.StartTransfer(None, delegate) tfer.WaitForCompletion() @@ -86,14 +97,16 @@ func TestFileToPage(t *testing.T) { container, _ := getContainers() sourceParams := &sources.FileSystemSourceParams{ - SourcePatterns: []string{sourceFile}, - BlockSize: blockSize, - FilesPerPipeline: filesPerPipeline, - NumOfPartitions: numOfReaders, - KeepDirStructure: true, - MD5: true} - - fp := sources.NewFileSystemSourcePipeline(sourceParams)[0] + SourcePatterns: []string{sourceFile}, + BlockSize: blockSize, + NumOfPartitions: numOfReaders, + SourceParams: sources.SourceParams{ + FilesPerPipeline: filesPerPipeline, + KeepDirStructure: true, + CalculateMD5: true}} + + fpf := <-sources.NewFileSystemSourcePipelineFactory(sourceParams) + fp := fpf.Source tparams := targets.AzureTargetParams{ AccountName: accountName, AccountKey: accountKey, @@ -102,7 +115,7 @@ func TestFileToPage(t *testing.T) { pt := targets.NewAzurePageTargetPipeline(tparams) - tfer := NewTransfer(&fp, &pt, numOfReaders, numOfWorkers, blockSize) + tfer := NewTransfer(fp, pt, numOfReaders, numOfWorkers, blockSize) tfer.StartTransfer(None, delegate) tfer.WaitForCompletion() @@ -114,18 +127,20 @@ func TestFileToFile(t *testing.T) { var destFile = sourceFile + "d" sourceParams := &sources.FileSystemSourceParams{ - SourcePatterns: []string{sourceFile}, - BlockSize: blockSize, - FilesPerPipeline: filesPerPipeline, - NumOfPartitions: numOfReaders, - TargetAliases: []string{destFile}, - KeepDirStructure: true, - MD5: true} - - fp := sources.NewFileSystemSourcePipeline(sourceParams)[0] + SourcePatterns: []string{sourceFile}, + BlockSize: blockSize, + NumOfPartitions: numOfReaders, + TargetAliases: []string{destFile}, + SourceParams: sources.SourceParams{ + FilesPerPipeline: filesPerPipeline, + KeepDirStructure: true, + CalculateMD5: true}} + + fpf := <-sources.NewFileSystemSourcePipelineFactory(sourceParams) + fp := fpf.Source ft := targets.NewFileSystemTargetPipeline(true, numOfWorkers) - tfer := NewTransfer(&fp, &ft, numOfReaders, numOfWorkers, blockSize) + tfer := NewTransfer(fp, ft, numOfReaders, numOfWorkers, blockSize) tfer.StartTransfer(None, delegate) tfer.WaitForCompletion() @@ -139,14 +154,16 @@ func TestFileToBlob(t *testing.T) { var sourceFile = createFile("tb", 1) sourceParams := &sources.FileSystemSourceParams{ - SourcePatterns: []string{sourceFile}, - BlockSize: blockSize, - FilesPerPipeline: filesPerPipeline, - NumOfPartitions: numOfReaders, - KeepDirStructure: true, - MD5: true} - - fp := sources.NewFileSystemSourcePipeline(sourceParams)[0] + SourcePatterns: []string{sourceFile}, + BlockSize: blockSize, + NumOfPartitions: numOfReaders, + SourceParams: sources.SourceParams{ + FilesPerPipeline: filesPerPipeline, + KeepDirStructure: true, + CalculateMD5: true}} + + fpf := <-sources.NewFileSystemSourcePipelineFactory(sourceParams) + fp := fpf.Source tparams := targets.AzureTargetParams{ AccountName: accountName, AccountKey: accountKey, @@ -154,7 +171,7 @@ func TestFileToBlob(t *testing.T) { ap := targets.NewAzureBlockTargetPipeline(tparams) - tfer := NewTransfer(&fp, &ap, numOfReaders, numOfWorkers, blockSize) + tfer := NewTransfer(fp, ap, numOfReaders, numOfWorkers, blockSize) tfer.StartTransfer(None, delegate) tfer.WaitForCompletion() @@ -166,21 +183,23 @@ func TestFileToBlobToBlock(t *testing.T) { var sourceFile = createFile("tb", 1) sourceParams := &sources.FileSystemSourceParams{ - SourcePatterns: []string{sourceFile}, - BlockSize: blockSize, - FilesPerPipeline: filesPerPipeline, - NumOfPartitions: numOfReaders, - KeepDirStructure: true, - MD5: true} - - fp := sources.NewFileSystemSourcePipeline(sourceParams)[0] + SourcePatterns: []string{sourceFile}, + BlockSize: blockSize, + NumOfPartitions: numOfReaders, + SourceParams: sources.SourceParams{ + FilesPerPipeline: filesPerPipeline, + KeepDirStructure: true, + CalculateMD5: true}} + + fpf := <-sources.NewFileSystemSourcePipelineFactory(sourceParams) + fp := fpf.Source tparams := targets.AzureTargetParams{ AccountName: accountName, AccountKey: accountKey, Container: container} ap := targets.NewAzureBlockTargetPipeline(tparams) - tfer := NewTransfer(&fp, &ap, numOfReaders, numOfWorkers, blockSize) + tfer := NewTransfer(fp, ap, numOfReaders, numOfWorkers, blockSize) tfer.StartTransfer(None, delegate) tfer.WaitForCompletion() @@ -193,14 +212,16 @@ func TestFileToBlobWithLargeBlocks(t *testing.T) { bsize := uint64(16 * util.MiByte) sourceParams := &sources.FileSystemSourceParams{ - SourcePatterns: []string{sourceFile}, - BlockSize: blockSize, - FilesPerPipeline: filesPerPipeline, - NumOfPartitions: numOfReaders, - KeepDirStructure: true, - MD5: true} - - fp := sources.NewFileSystemSourcePipeline(sourceParams)[0] + SourcePatterns: []string{sourceFile}, + BlockSize: blockSize, + NumOfPartitions: numOfReaders, + SourceParams: sources.SourceParams{ + FilesPerPipeline: filesPerPipeline, + KeepDirStructure: true, + CalculateMD5: true}} + + fpf := <-sources.NewFileSystemSourcePipelineFactory(sourceParams) + fp := fpf.Source tparams := targets.AzureTargetParams{ AccountName: accountName, AccountKey: accountKey, @@ -208,7 +229,7 @@ func TestFileToBlobWithLargeBlocks(t *testing.T) { ap := targets.NewAzureBlockTargetPipeline(tparams) - tfer := NewTransfer(&fp, &ap, numOfReaders, numOfWorkers, bsize) + tfer := NewTransfer(fp, ap, numOfReaders, numOfWorkers, bsize) tfer.StartTransfer(None, delegate) tfer.WaitForCompletion() @@ -223,14 +244,15 @@ func TestFilesToBlob(t *testing.T) { var sf4 = createFile("tbm", 1) sourceParams := &sources.FileSystemSourceParams{ - SourcePatterns: []string{tempDir + "/tbm*"}, - BlockSize: blockSize, - FilesPerPipeline: filesPerPipeline, - NumOfPartitions: numOfReaders, - KeepDirStructure: true, - MD5: true} - - fp := sources.NewFileSystemSourcePipeline(sourceParams)[0] + SourcePatterns: []string{tempDir + "/tbm*"}, + BlockSize: blockSize, + NumOfPartitions: numOfReaders, + SourceParams: sources.SourceParams{ + FilesPerPipeline: filesPerPipeline, + KeepDirStructure: true, + CalculateMD5: true}} + fpf := <-sources.NewFileSystemSourcePipelineFactory(sourceParams) + fp := fpf.Source tparams := targets.AzureTargetParams{ AccountName: accountName, AccountKey: accountKey, @@ -239,7 +261,7 @@ func TestFilesToBlob(t *testing.T) { ap := targets.NewAzureBlockTargetPipeline(tparams) - tfer := NewTransfer(&fp, &ap, numOfReaders, numOfWorkers, blockSize) + tfer := NewTransfer(fp, ap, numOfReaders, numOfWorkers, blockSize) tfer.StartTransfer(None, delegate) tfer.WaitForCompletion() @@ -255,14 +277,16 @@ func TestFileToBlobHTTPToBlob(t *testing.T) { var sourceFile = createFile("tb", 1) sourceParams := &sources.FileSystemSourceParams{ - SourcePatterns: []string{sourceFile}, - BlockSize: blockSize, - FilesPerPipeline: filesPerPipeline, - NumOfPartitions: numOfReaders, - KeepDirStructure: true, - MD5: true} + SourcePatterns: []string{sourceFile}, + BlockSize: blockSize, + NumOfPartitions: numOfReaders, + SourceParams: sources.SourceParams{ + FilesPerPipeline: filesPerPipeline, + KeepDirStructure: true, + CalculateMD5: true}} - fp := sources.NewFileSystemSourcePipeline(sourceParams)[0] + fpf := <-sources.NewFileSystemSourcePipelineFactory(sourceParams) + fp := fpf.Source tparams := targets.AzureTargetParams{ AccountName: accountName, @@ -272,7 +296,7 @@ func TestFileToBlobHTTPToBlob(t *testing.T) { ap := targets.NewAzureBlockTargetPipeline(tparams) - tfer := NewTransfer(&fp, &ap, numOfReaders, numOfWorkers, blockSize) + tfer := NewTransfer(fp, ap, numOfReaders, numOfWorkers, blockSize) tfer.StartTransfer(None, delegate) tfer.WaitForCompletion() sourceURI, err := createSasTokenURL(sourceFile, container) @@ -287,8 +311,15 @@ func TestFileToBlobHTTPToBlob(t *testing.T) { ap = targets.NewAzureBlockTargetPipeline(tparams) - fp = sources.NewHTTPSourcePipeline([]string{sourceURI}, []string{sourceFile}, true) - tfer = NewTransfer(&fp, &ap, numOfReaders, numOfWorkers, blockSize) + httpparams := sources.HTTPSourceParams{SourceParams: sources.SourceParams{ + CalculateMD5: true}, + SourceURIs: []string{sourceURI}, + TargetAliases: []string{sourceFile}, + } + + fpf = <-sources.NewHTTPSourcePipelineFactory(httpparams) + fp = fpf.Source + tfer = NewTransfer(fp, ap, numOfReaders, numOfWorkers, blockSize) tfer.StartTransfer(None, delegate) tfer.WaitForCompletion() @@ -299,14 +330,16 @@ func TestFileToBlobHTTPToFile(t *testing.T) { var sourceFile = createFile("tb", 1) sourceParams := &sources.FileSystemSourceParams{ - SourcePatterns: []string{sourceFile}, - BlockSize: blockSize, - FilesPerPipeline: filesPerPipeline, - NumOfPartitions: numOfReaders, - KeepDirStructure: true, - MD5: true} - - fp := sources.NewFileSystemSourcePipeline(sourceParams)[0] + SourcePatterns: []string{sourceFile}, + BlockSize: blockSize, + NumOfPartitions: numOfReaders, + SourceParams: sources.SourceParams{ + FilesPerPipeline: filesPerPipeline, + KeepDirStructure: true, + CalculateMD5: true}} + + fpf := <-sources.NewFileSystemSourcePipelineFactory(sourceParams) + fp := fpf.Source tparams := targets.AzureTargetParams{ AccountName: accountName, AccountKey: accountKey, @@ -315,7 +348,7 @@ func TestFileToBlobHTTPToFile(t *testing.T) { ap := targets.NewAzureBlockTargetPipeline(tparams) - tfer := NewTransfer(&fp, &ap, numOfReaders, numOfWorkers, blockSize) + tfer := NewTransfer(fp, ap, numOfReaders, numOfWorkers, blockSize) tfer.StartTransfer(None, delegate) tfer.WaitForCompletion() sourceURI, err := createSasTokenURL(sourceFile, container) @@ -324,8 +357,16 @@ func TestFileToBlobHTTPToFile(t *testing.T) { sourceFiles[0] = sourceFile + "d" ap = targets.NewFileSystemTargetPipeline(true, numOfWorkers) - fp = sources.NewHTTPSourcePipeline([]string{sourceURI}, sourceFiles, true) - tfer = NewTransfer(&fp, &ap, numOfReaders, numOfWorkers, blockSize) + + httpparams := sources.HTTPSourceParams{SourceParams: sources.SourceParams{ + CalculateMD5: true}, + SourceURIs: []string{sourceURI}, + TargetAliases: []string{sourceFile}, + } + + fpf = <-sources.NewHTTPSourcePipelineFactory(httpparams) + fp = fpf.Source + tfer = NewTransfer(fp, ap, numOfReaders, numOfWorkers, blockSize) tfer.StartTransfer(None, delegate) tfer.WaitForCompletion() @@ -338,14 +379,16 @@ func TestFileToBlobToFile(t *testing.T) { var sourceFile = createFile("tb", 1) sourceParams := &sources.FileSystemSourceParams{ - SourcePatterns: []string{sourceFile}, - BlockSize: blockSize, - FilesPerPipeline: filesPerPipeline, - NumOfPartitions: numOfReaders, - KeepDirStructure: true, - MD5: true} - - fp := sources.NewFileSystemSourcePipeline(sourceParams)[0] + SourcePatterns: []string{sourceFile}, + BlockSize: blockSize, + NumOfPartitions: numOfReaders, + SourceParams: sources.SourceParams{ + FilesPerPipeline: filesPerPipeline, + KeepDirStructure: true, + CalculateMD5: true}} + + fpf := <-sources.NewFileSystemSourcePipelineFactory(sourceParams) + fp := fpf.Source tparams := targets.AzureTargetParams{ AccountName: accountName, AccountKey: accountKey, @@ -353,7 +396,7 @@ func TestFileToBlobToFile(t *testing.T) { BaseBlobURL: ""} ap := targets.NewAzureBlockTargetPipeline(tparams) - tfer := NewTransfer(&fp, &ap, numOfReaders, numOfWorkers, blockSize) + tfer := NewTransfer(fp, ap, numOfReaders, numOfWorkers, blockSize) tfer.StartTransfer(None, delegate) tfer.WaitForCompletion() @@ -368,8 +411,10 @@ func TestFileToBlobToFile(t *testing.T) { CalculateMD5: true, KeepDirStructure: true, UseExactNameMatch: false}} - fp = sources.NewAzureBlobSourcePipeline(azureBlobParams)[0] - tfer = NewTransfer(&fp, &ap, numOfReaders, numOfWorkers, blockSize) + + fpf = <-sources.NewAzBlobSourcePipelineFactory(azureBlobParams) + fp = fpf.Source + tfer = NewTransfer(fp, ap, numOfReaders, numOfWorkers, blockSize) tfer.StartTransfer(None, delegate) tfer.WaitForCompletion() @@ -382,15 +427,17 @@ func TestFileToBlobToFileWithAlias(t *testing.T) { var alias = sourceFile + "alias" sourceParams := &sources.FileSystemSourceParams{ - SourcePatterns: []string{sourceFile}, - BlockSize: blockSize, - TargetAliases: []string{alias}, - FilesPerPipeline: filesPerPipeline, - NumOfPartitions: numOfReaders, - KeepDirStructure: true, - MD5: true} - - fp := sources.NewFileSystemSourcePipeline(sourceParams)[0] + SourcePatterns: []string{sourceFile}, + BlockSize: blockSize, + TargetAliases: []string{alias}, + NumOfPartitions: numOfReaders, + SourceParams: sources.SourceParams{ + FilesPerPipeline: filesPerPipeline, + KeepDirStructure: true, + CalculateMD5: true}} + + fpf := <-sources.NewFileSystemSourcePipelineFactory(sourceParams) + fp := fpf.Source tparams := targets.AzureTargetParams{ AccountName: accountName, AccountKey: accountKey, @@ -399,7 +446,7 @@ func TestFileToBlobToFileWithAlias(t *testing.T) { ap := targets.NewAzureBlockTargetPipeline(tparams) - tfer := NewTransfer(&fp, &ap, numOfReaders, numOfWorkers, blockSize) + tfer := NewTransfer(fp, ap, numOfReaders, numOfWorkers, blockSize) tfer.StartTransfer(None, delegate) tfer.WaitForCompletion() @@ -414,8 +461,9 @@ func TestFileToBlobToFileWithAlias(t *testing.T) { CalculateMD5: true, KeepDirStructure: true, UseExactNameMatch: true}} - fp = sources.NewAzureBlobSourcePipeline(azureBlobParams)[0] - tfer = NewTransfer(&fp, &ap, numOfReaders, numOfWorkers, blockSize) + fpf = <-sources.NewAzBlobSourcePipelineFactory(azureBlobParams) + fp = fpf.Source + tfer = NewTransfer(fp, ap, numOfReaders, numOfWorkers, blockSize) tfer.StartTransfer(None, delegate) tfer.WaitForCompletion() diff --git a/util/util.go b/util/util.go index 8f1097e..f9ac1cf 100644 --- a/util/util.go +++ b/util/util.go @@ -5,12 +5,9 @@ import ( "fmt" "log" "os" - "regexp" "strconv" "strings" "time" - - "net/url" ) // Verbose mode active? @@ -202,62 +199,10 @@ func RetriableOperation(operation func(r int) error) (duration time.Duration, st } } -//AskUser places a yes/no question to the user provided by the stdin -func AskUser(question string) bool { - fmt.Printf(question) - for { - var input string - n, err := fmt.Scanln(&input) - if n < 1 || err != nil { - fmt.Println("invalid input") - } - input = strings.ToLower(input) - switch input { - case "y": - return true - case "n": - return false - default: - fmt.Printf("Invalid response.\n") - fmt.Printf(question) - } - } -} - -//isValidContainerName is true if the name of the container is valid, false if not -func isValidContainerName(name string) bool { - if len(name) < 3 { - return false - } - expr := "^[a-z0-9]+([-]?[a-z0-9]){1,63}$" - valid := regexp.MustCompile(expr) - resp := valid.MatchString(name) - if !resp { - fmt.Printf("The name provided for the container is invalid, it must conform the following rules:\n") - fmt.Printf("1. Container names must start with a letter or number, and can contain only letters, numbers, and the dash (-) character.\n") - fmt.Printf("2. Every dash (-) character must be immediately preceded and followed by a letter or number; consecutive dashes are not permitted in container names.\n") - fmt.Printf("3. All letters in a container name must be lowercase.\n") - fmt.Printf("4. Container names must be from 3 through 63 characters long.\n") - } - return resp -} - func handleExceededRetries(err error) { - errMsg := fmt.Sprintf("The number of retries has exceeded the maximum allowed.\nError: %v\nSuggestion:%v\n", err.Error(), getSuggestion(err)) + errMsg := fmt.Sprintf("The number of retries has exceeded the maximum allowed.\nError: %v\n", err.Error()) log.Fatal(errMsg) } -func getSuggestion(err error) string { - switch { - case strings.Contains(err.Error(), "ErrorMessage=The specified blob or block content is invalid"): - return "Try using a different container or upload and then delete a small blob with the same name." - case strings.Contains(err.Error(), "Client.Timeout"): - return "Try increasing the timeout using the -s option or reducing the number of workers and readers, options: -r and -g" - case strings.Contains(err.Error(), "too many open files"): - return "Try reducing the number of sources or batch size" - default: - return "" - } -} //PrintfIfDebug TODO func PrintfIfDebug(format string, values ...interface{}) { @@ -266,21 +211,3 @@ func PrintfIfDebug(format string, values ...interface{}) { fmt.Printf("%v\t%s\n", time.Now(), msg) } } - -//GetFileNameFromURL returns last part of URL (filename) -func GetFileNameFromURL(sourceURI string) (string, error) { - - purl, err := url.Parse(sourceURI) - - if err != nil { - return "", err - } - - parts := strings.Split(purl.Path, "/") - - if len(parts) == 0 { - return "", fmt.Errorf("Invalid URL file was not found in the path") - } - - return parts[len(parts)-1], nil -}