Tool based on Hadoop DistCp to copy files from a HDFS to S3.
The copy is done in map-only job using multipart uploads to improve the throughput.
Parameter | Required | Description |
---|---|---|
--help |
No | Print help text. |
--async |
No | If passed, S3MapReduceCp will submit the copy Job and return control to the shell immediately. By default S3MapReduceCp is a blocking process. |
--src |
Yes | A comma-separated list of absolute HDFS paths to be copied over to S3. |
--dest |
Yes | The target location where the sources will be copied. Must be an absolute URI which ends with / if the destination is a directory. |
--credentialsProvider |
Yes | Absolute URI to the JCEKS file with the AWS credentials. See the hadoop credential docs for details. |
--multipartUploadChunkSize |
No | The size in MB of the multipart upload part size. Must be a number greater than zero. Defaults to 5 MB. |
--multipartUploadThreshold |
No | The size size threshold in MB for when to use multipart uploads. Must be a number greater than zero. Defaults to 16 MB. |
--s3ServerSideEncryption |
No | Enable file encryption for the upload files. |
--storageClass |
No | S3 storage class identifier. See IDs in [com.amazonaws.services.s3.model.StorageClass] (https://github.com/aws/aws-sdk-java/blob/1.11.100/aws-java-sdk-s3/src/main/java/com/amazonaws/services/s3/model/StorageClass.java). Defaults to STANDARD . |
--region |
No | AWS region. See com.amazonaws.regions.Regions for default values. If not specified S3MapReduceCp will try to get the region of the target bucket. Defaults to null . |
--maxBandwidth |
No | Maximum bandwidth per task specified in MB/second. Each map will be restricted to consume only the specified bandwidth. This is not always exact. The map throttles back its bandwidth consumption during a copy, such that the net bandwidth used tends towards the specified value. Defaults to 100 . |
--numberOfUploadWorkers |
No | Number of threads per mapper that perform uploads to S3. Defaults to 20 . |
--maxMaps |
No | Specify the number of maps to copy data. Note that more maps may not necessarily improve throughput. Defaults to 20 . |
--copyStrategy |
No | Possible values are static (A.K.A uniformsize ) and dynamic . By default, uniformsize is used (i.e. Maps are balanced on the total size of files copied by each map.) If dynamic is specified, DynamicInputFormat is used instead. Refer to Input-formats and Map-Reduce Components for more details. |
--logPath |
No | Location of the log files generated by the job. Defaults to null which means log files will be written to JobStagingDir/_logs . |
--ignoreFailures |
No | This option will keep more accurate statistics about the copy than the default case. It also preserves logs from failed copies, which can be valuable for debugging. Finally, a failing map will not cause the job to fail before all splits are attempted. Defaults to false . |
--s3EndpointUri |
No | URI of the S3 end-point used by the S3 client. Defaults to null which means the client will select the end-point. |
--uploadRetryCount |
No | Maximum number of upload retries. Defaults to 3 |
--uploadRetryDelayMs |
No | Milliseconds between upload retries. The actual delay will be computed as delay = attempt * uploadRetryDelayMs where attempt is the current retry number. Defaults to 300 ms. |
--uploadBufferSize |
No | Size of the buffer used to upload the stream of data. If the value is 0 the upload will use the value of the HDFS property io.file.buffer.size to configure the buffer. Defaults to 0 |
The architecture of S3MapReduceCp
is based on DistCp
and offers similar functionalities.
The components of the S3MapReduceCp
may be classified into the following categories:
- DistCp Driver
- Copy-listing generator
- Input-formats and Map-Reduce components
The S3MapReduceCp
Driver components are responsible for:
- Parsing the arguments passed to the
S3MapReduceCp
command on the command-line, via:OptionsParser
, andS3MapReduceCpOptions
- Assembling the command arguments into an appropriate
S3MapReduceCpOptions
object, and initializingS3MapReduceCp
. These arguments include:- Source-paths
- Target location
- Credentials provider
- Copy options (e.g. whether to bandwidth throttling, etc.)
- Orchestrating the copy operation by:
- Invoking the copy-listing-generator to create the list of files to be copied.
- Setting up and launching the Hadoop Map-Reduce Job to carry out the copy.
- Based on the options, either returning a handle to the Hadoop MR Job immediately, or waiting till completion.
The parser-elements are exercised only from the command-line (or if S3MapReduceCp::run()
is invoked). The S3MapReduceCp
class may also be used programmatically, by constructing the S3MapReduceCp
object, and initializing a S3MapReduceCp
object appropriately.
The copy-listing-generator classes are responsible for creating the list of files/directories to be copied from source. They examine the contents of the source-paths (files/directories, including wild-cards), and record all paths that need copy into a sequence file, for consumption by the S3MapReduceCp
Hadoop Job. The main classes in this module include:
CopyListing
: The interface that should be implemented by any copy-listing-generator implementation. Also provides the factory method by which the concreteCopyListing
implementation is chosen.SimpleCopyListing
: An implementation ofCopyListing
that accepts multiple source paths (files/directories), and recursively lists all the individual files under each for copy.
One may customize the method by which the copy-listing is constructed by providing a custom implementation of the CopyListing
interface.
The Input-formats and Map-Reduce components are responsible for the actual copy of files and directories from the source to the destination path. The listing-file created during copy-listing generation is consumed at this point, when the copy is carried out. The classes of interest here include:
UniformSizeInputFormat
: This implementation oforg.apache.hadoop.mapreduce.InputFormat
provides equivalence with Legacy DistCp in balancing load across maps. The aim of theUniformSizeInputFormat
is to make each map copy roughly the same number of bytes. Apropos, the listing file is split into groups of paths, such that the sum of file-sizes in eachInputSplit
is nearly equal to every other map. The splitting isn't always perfect, but its trivial implementation keeps the setup-time low.DynamicInputFormat
andDynamicRecordReader
: TheDynamicInputFormat
implementsorg.apache.hadoop.mapreduce.InputFormat
. The listing-file is split into several "chunk-files", the exact number of chunk-files being a multiple of the number of maps requested for in the Hadoop Job. Each map task is "assigned" one of the chunk-files (by renaming the chunk to the task's id), before the Job is launched. Paths are read from each chunk using theDynamicRecordReader
, and processed in theCopyMapper
. After all the paths in a chunk are processed, the current chunk is deleted and a new chunk is acquired. The process continues until no more chunks are available. Thisdynamic
approach allows faster map-tasks to consume more paths than slower ones, thus speeding up theS3MapReduceCp
job overall.CopyMapper
: This class implements the physical file-copy. Contrary to DistCP,S3MapReduceCp
does not perform any smart checks to determine whether the file must be copied or not, i.e. every file in the copy listing will always be transferred to S3.CopyCommitter
: This class is responsible for the commit-phase of theS3MapReduceCp
job which only cleans up temporary files, working directories, etc.
By default, S3MapReduceCp
makes an attempt to size each map comparably so that each copies roughly the same number of bytes. Note that files are the finest level of granularity, so increasing the number of simultaneous copiers (i.e. maps) may not always increase the number of simultaneous copies nor the overall throughput.
S3MapReduceCp
also provides a strategy to dynamically
size maps, allowing faster data-nodes to copy more bytes than slower nodes. Using --copyStrategy dynamic
(explained in the Architecture), rather than to assign a fixed set of source-files to each map-task, files are instead split into several sets. The number of sets exceeds the number of maps, usually by a factor of 2-3. Each map picks up and copies all files listed in a chunk. When a chunk is exhausted, a new chunk is acquired and processed, until no more chunks remain.
By not assigning a source-path to a fixed map, faster map-tasks (i.e. data-nodes) are able to consume more chunks, and thus copy more data, than slower nodes. While this distribution isn't uniform, it is fair with regard to each mapper's capacity.
The dynamic-strategy is implemented by the DynamicInputFormat
. It provides superior performance under most conditions.
Tuning the number of maps to the size of the source and destination clusters, the size of the copy, and the available bandwidth is recommended for long-running and regularly run jobs.