Skip to content

Commit

Permalink
cmd/mount: add option upload-hours to limit the period for backgrou…
Browse files Browse the repository at this point in the history
…nd uploads (#4250)

Co-authored-by: Zhoulin <443619637@qq.com>
  • Loading branch information
SandyXSD and feeyman authored Dec 12, 2023
1 parent 6f2440c commit 124f58d
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 7 deletions.
8 changes: 6 additions & 2 deletions cmd/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,12 +176,16 @@ func dataCacheFlags() []cli.Flag {
},
&cli.BoolFlag{
Name: "writeback",
Usage: "upload objects in background",
Usage: "upload blocks in background",
},
&cli.StringFlag{
Name: "upload-delay",
Value: "0",
Usage: "delayed duration (in seconds) for uploading objects",
Usage: "delayed duration (in seconds) for uploading blocks",
},
&cli.StringFlag{
Name: "upload-hours",
Usage: "(start,end) hour of a day between which the delayed blocks can be uploaded",
},
&cli.StringFlag{
Name: "cache-dir",
Expand Down
1 change: 1 addition & 0 deletions cmd/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ func getChunkConf(c *cli.Context, format *meta.Format) *chunk.Config {
UploadLimit: c.Int64("upload-limit") * 1e6 / 8,
DownloadLimit: c.Int64("download-limit") * 1e6 / 8,
UploadDelay: duration(c.String("upload-delay")),
UploadHours: c.String("upload-hours"),

CacheDir: c.String("cache-dir"),
CacheSize: int64(c.Int("cache-size")),
Expand Down
62 changes: 57 additions & 5 deletions pkg/chunk/cached_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func (s *wSlice) upload(indx int) {
logger.Warnf("write %s to disk: %s, upload it directly", stagingPath, err)
} else {
s.errors <- nil
if s.store.conf.UploadDelay == 0 {
if s.store.conf.UploadDelay == 0 && s.store.canUpload() {
select {
case s.store.currentUpload <- true:
defer func() { <-s.store.currentUpload }()
Expand All @@ -441,7 +441,7 @@ func (s *wSlice) upload(indx int) {
}
}
block.Release()
s.store.addDelayedStaging(key, stagingPath, time.Now(), s.store.conf.UploadDelay == 0)
s.store.addDelayedStaging(key, stagingPath, time.Now(), false)
return
}
}
Expand Down Expand Up @@ -524,6 +524,7 @@ type Config struct {
DownloadLimit int64 // bytes per second
Writeback bool
UploadDelay time.Duration
UploadHours string
HashPrefix bool
BlockSize int
GetTimeout time.Duration
Expand All @@ -543,9 +544,14 @@ func (c *Config) SelfCheck(uuid string) {
}
c.CacheDir = "memory"
}
if !c.Writeback && c.UploadDelay > 0 {
if !c.Writeback && (c.UploadDelay > 0 || c.UploadHours != "") {
logger.Warnf("delayed upload is disabled in non-writeback mode")
c.UploadDelay = 0
c.UploadHours = ""
}
if _, _, err := c.parseHours(); err != nil {
logger.Warnf("invalid value (%s) for upload-hours: %s", c.UploadHours, err)
c.UploadHours = ""
}
if c.MaxUpload <= 0 {
logger.Warnf("max-uploads should be greater than 0, set it to 1")
Expand Down Expand Up @@ -574,6 +580,27 @@ func (c *Config) SelfCheck(uuid string) {
}
}

func (c *Config) parseHours() (start, end int, err error) {
if c.UploadHours == "" {
return
}
ps := strings.Split(c.UploadHours, ",")
if len(ps) != 2 {
err = errors.New("unexpected number of fields")
return
}
if start, err = strconv.Atoi(ps[0]); err != nil {
return
}
if end, err = strconv.Atoi(ps[1]); err != nil {
return
}
if start < 0 || start > 23 || end < 0 || end > 23 {
err = errors.New("invalid hour number")
}
return
}

type cachedStore struct {
storage object.ObjectStorage
bcache CacheManager
Expand All @@ -584,6 +611,8 @@ type cachedStore struct {
pendingCh chan *pendingItem
pendingKeys map[string]*pendingItem
pendingMutex sync.Mutex
startHour int
endHour int
compressor compress.Compressor
seekable bool
upLimit *ratelimit.Bucket
Expand Down Expand Up @@ -715,6 +744,12 @@ func NewCachedStore(storage object.ObjectStorage, config Config, reg prometheus.
store.downLimit = ratelimit.NewBucketWithRate(float64(config.DownloadLimit)*0.85, config.DownloadLimit)
}
store.initMetrics()
if store.conf.CacheDir != "memory" && store.conf.Writeback {
store.startHour, store.endHour, _ = config.parseHours()
if store.startHour != store.endHour {
logger.Infof("background upload at %d:00 ~ %d:00", store.startHour, store.endHour)
}
}
store.bcache = newCacheManager(&config, reg, func(key, fpath string, force bool) bool {
if fi, err := os.Stat(fpath); err == nil {
return store.addDelayedStaging(key, fpath, fi.ModTime(), force)
Expand Down Expand Up @@ -742,9 +777,11 @@ func NewCachedStore(storage object.ObjectStorage, config Config, reg prometheus.
}
interval := time.Minute
if d := store.conf.UploadDelay; d > 0 {
logger.Infof("delay uploading by %s", d)
if d < time.Minute {
interval = d
logger.Warnf("delay uploading by %s (this value is too small, and is not recommended)", d)
} else {
logger.Infof("delay uploading by %s", d)
}
}
go func() {
Expand Down Expand Up @@ -848,6 +885,9 @@ func (store *cachedStore) uploadStagingFile(key string, stagingPath string) {
<-store.currentUpload
}()

if !store.canUpload() {
return
}
store.pendingMutex.Lock()
item, ok := store.pendingKeys[key]
store.pendingMutex.Unlock()
Expand Down Expand Up @@ -901,7 +941,7 @@ func (store *cachedStore) addDelayedStaging(key, stagingPath string, added time.
logger.Debugf("Key %s is ignored since it's already being uploaded", key)
return true
}
if force || time.Since(added) > store.conf.UploadDelay {
if force || store.canUpload() && time.Since(added) > store.conf.UploadDelay {
select {
case store.pendingCh <- item:
item.uploading = true
Expand All @@ -919,6 +959,9 @@ func (store *cachedStore) removePending(key string) {
}

func (store *cachedStore) scanDelayedStaging() {
if !store.canUpload() {
return
}
cutoff := time.Now().Add(-store.conf.UploadDelay)
store.pendingMutex.Lock()
defer store.pendingMutex.Unlock()
Expand All @@ -938,6 +981,15 @@ func (store *cachedStore) uploader() {
}
}

func (store *cachedStore) canUpload() bool {
if store.startHour == store.endHour {
return true
}
h := time.Now().Hour()
return store.startHour < store.endHour && h >= store.startHour && h < store.endHour ||
store.startHour > store.endHour && (h >= store.startHour || h < store.endHour)
}

func (store *cachedStore) NewReader(id uint64, length int) Reader {
return sliceForRead(id, length, store)
}
Expand Down

0 comments on commit 124f58d

Please sign in to comment.