Skip to content

Commit

Permalink
Merge pull request #88 from sfzeng/master
Browse files Browse the repository at this point in the history
Bug fix: Make the max run time of a migration job to be configurable.
  • Loading branch information
wisererik authored Dec 7, 2018
2 parents 595c451 + 3150354 commit 7c13c5e
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 2 deletions.
20 changes: 18 additions & 2 deletions datamover/pkg/drivers/https/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

var simuRoutines = 10
var PART_SIZE int64 = 16 * 1024 * 1024 //The max object size that can be moved directly, default is 16M.
var JOB_RUN_TIME_MAX = 86400 //seconds
var JOB_RUN_TIME_MAX = 86400 //seconds, equals 1 day
var s3client osdss3.S3Service
var bkendclient backend.BackendService

Expand Down Expand Up @@ -360,6 +360,10 @@ func multipartMoveObj(obj *osdss3.Object, srcLoca *LocationInfo, destLoca *Locat
err = completeMultipartUpload(uploadObjKey, destLoca, uploadMover)
if err != nil {
logger.Println(err.Error())
err := abortMultipartUpload(obj.ObjectKey, destLoca, uploadMover)
if err != nil {
logger.Printf("Abort s3 multipart upload failed, err:%v\n", err)
}
}

return err
Expand Down Expand Up @@ -679,7 +683,19 @@ func runjob(in *pb.RunJobRequest) error {
ctx := context.Background()
_, ok := ctx.Deadline()
if !ok {
ctx, _ = context.WithTimeout(ctx, 7200*time.Second)
tmoutCfg, err := strconv.ParseInt(os.Getenv("JOB_MAX_RUN_TIME"), 10, 64)
if err != nil || tmoutCfg < 60 || tmoutCfg > 2592000 {
tmoutCfg = int64(JOB_RUN_TIME_MAX)
}
durStr := fmt.Sprintf("%ds", tmoutCfg)
logger.Printf("Vaule of JOB_MAX_RUN_TIME is: %d seconds, durStr:%s.\n", tmoutCfg, durStr)
tmout, err := time.ParseDuration(durStr)
if err == nil {
ctx, _ = context.WithTimeout(ctx, tmout)
} else {
logger.Println("Set timeout to the default value.")
ctx, _ = context.WithTimeout(ctx, 86400*time.Second) //1 day as default
}
}

//prepare for running
Expand Down
1 change: 1 addition & 0 deletions datamover/pkg/hw/obs/obsmover.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ func (mover *ObsMover) UploadPart(objKey string, destLoca *LocationInfo, upBytes
uploadPartInput.Offset = offset
uploadPartInput.PartSize = upBytes
for tries := 1; tries <= 3; tries++ {
uploadPartInput.Body = bytes.NewReader(buf)
uploadPartInputOutput, err := mover.obsClient.UploadPart(uploadPartInput)
if err != nil {
log.Logf("[obsmover] Upload object[%s] range[partnumber#%d,offset#%d] failed %d times. err:%v\n",
Expand Down
3 changes: 3 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ services:
- MICRO_REGISTRY=mdns
- DB_HOST=datastore:27017
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
#PARTSIZE for multi-part migration, should not be less than 5(MB) or more than 100(MB)
- PARTSIZE=16
#JOB_MAX_RUN_TIME for each migration job, should not be less than 60(second) or more than 2592000(second)
- JOB_MAX_RUN_TIME=86400
depends_on:
- zookeeper
- kafka
Expand Down

0 comments on commit 7c13c5e

Please sign in to comment.