diff --git a/datamover/pkg/drivers/https/migration.go b/datamover/pkg/drivers/https/migration.go index 200192a5d..4fa1745a1 100644 --- a/datamover/pkg/drivers/https/migration.go +++ b/datamover/pkg/drivers/https/migration.go @@ -27,7 +27,7 @@ import ( var simuRoutines = 10 var PART_SIZE int64 = 16 * 1024 * 1024 //The max object size that can be moved directly, default is 16M. -var JOB_RUN_TIME_MAX = 86400 //seconds +var JOB_RUN_TIME_MAX = 86400 //seconds, equals 1 day var s3client osdss3.S3Service var bkendclient backend.BackendService @@ -360,6 +360,10 @@ func multipartMoveObj(obj *osdss3.Object, srcLoca *LocationInfo, destLoca *Locat err = completeMultipartUpload(uploadObjKey, destLoca, uploadMover) if err != nil { logger.Println(err.Error()) + err := abortMultipartUpload(obj.ObjectKey, destLoca, uploadMover) + if err != nil { + logger.Printf("Abort s3 multipart upload failed, err:%v\n", err) + } } return err @@ -679,7 +683,19 @@ func runjob(in *pb.RunJobRequest) error { ctx := context.Background() _, ok := ctx.Deadline() if !ok { - ctx, _ = context.WithTimeout(ctx, 7200*time.Second) + tmoutCfg, err := strconv.ParseInt(os.Getenv("JOB_MAX_RUN_TIME"), 10, 64) + if err != nil || tmoutCfg < 60 || tmoutCfg > 2592000 { + tmoutCfg = int64(JOB_RUN_TIME_MAX) + } + durStr := fmt.Sprintf("%ds", tmoutCfg) + logger.Printf("Vaule of JOB_MAX_RUN_TIME is: %d seconds, durStr:%s.\n", tmoutCfg, durStr) + tmout, err := time.ParseDuration(durStr) + if err == nil { + ctx, _ = context.WithTimeout(ctx, tmout) + } else { + logger.Println("Set timeout to the default value.") + ctx, _ = context.WithTimeout(ctx, 86400*time.Second) //1 day as default + } } //prepare for running diff --git a/datamover/pkg/hw/obs/obsmover.go b/datamover/pkg/hw/obs/obsmover.go index 7a74516b3..763809e08 100644 --- a/datamover/pkg/hw/obs/obsmover.go +++ b/datamover/pkg/hw/obs/obsmover.go @@ -232,6 +232,7 @@ func (mover *ObsMover) UploadPart(objKey string, destLoca *LocationInfo, upBytes uploadPartInput.Offset = offset uploadPartInput.PartSize = upBytes for tries := 1; tries <= 3; tries++ { + uploadPartInput.Body = bytes.NewReader(buf) uploadPartInputOutput, err := mover.obsClient.UploadPart(uploadPartInput) if err != nil { log.Logf("[obsmover] Upload object[%s] range[partnumber#%d,offset#%d] failed %d times. err:%v\n", diff --git a/docker-compose.yml b/docker-compose.yml index a3e75c3d2..60840d368 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -66,7 +66,10 @@ services: - MICRO_REGISTRY=mdns - DB_HOST=datastore:27017 - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 + #PARTSIZE for multi-part migration, should not be less than 5(MB) or more than 100(MB) - PARTSIZE=16 + #JOB_MAX_RUN_TIME for each migration job, should not be less than 60(second) or more than 2592000(second) + - JOB_MAX_RUN_TIME=86400 depends_on: - zookeeper - kafka