Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd/mount: add option upload-hours to limit the period for background uploads #4250

Merged
merged 12 commits into from
Dec 12, 2023
8 changes: 6 additions & 2 deletions cmd/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,16 @@ func dataCacheFlags() []cli.Flag {
},
&cli.BoolFlag{
Name: "writeback",
Usage: "upload objects in background",
Usage: "upload blocks in background",
},
&cli.StringFlag{
Name: "upload-delay",
Value: "0",
Usage: "delayed duration (in seconds) for uploading objects",
Usage: "delayed duration (in seconds) for uploading blocks",
},
&cli.StringFlag{
Name: "upload-hours",
Usage: "(start,end) hour of a day between which the delayed blocks can be uploaded",
},
&cli.StringFlag{
Name: "cache-dir",
Expand Down
1 change: 1 addition & 0 deletions cmd/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ func getChunkConf(c *cli.Context, format *meta.Format) *chunk.Config {
UploadLimit: c.Int64("upload-limit") * 1e6 / 8,
DownloadLimit: c.Int64("download-limit") * 1e6 / 8,
UploadDelay: duration(c.String("upload-delay")),
UploadHours: c.String("upload-hours"),

CacheDir: c.String("cache-dir"),
CacheSize: int64(c.Int("cache-size")),
Expand Down
62 changes: 57 additions & 5 deletions pkg/chunk/cached_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func (s *wSlice) upload(indx int) {
logger.Warnf("write %s to disk: %s, upload it directly", stagingPath, err)
} else {
s.errors <- nil
if s.store.conf.UploadDelay == 0 {
if s.store.conf.UploadDelay == 0 && s.store.canUpload() {
select {
case s.store.currentUpload <- true:
defer func() { <-s.store.currentUpload }()
Expand All @@ -441,7 +441,7 @@ func (s *wSlice) upload(indx int) {
}
}
block.Release()
s.store.addDelayedStaging(key, stagingPath, time.Now(), s.store.conf.UploadDelay == 0)
s.store.addDelayedStaging(key, stagingPath, time.Now(), false)
return
}
}
Expand Down Expand Up @@ -524,6 +524,7 @@ type Config struct {
DownloadLimit int64 // bytes per second
Writeback bool
UploadDelay time.Duration
UploadHours string
HashPrefix bool
BlockSize int
GetTimeout time.Duration
Expand All @@ -543,9 +544,14 @@ func (c *Config) SelfCheck(uuid string) {
}
c.CacheDir = "memory"
}
if !c.Writeback && c.UploadDelay > 0 {
if !c.Writeback && (c.UploadDelay > 0 || c.UploadHours != "") {
logger.Warnf("delayed upload is disabled in non-writeback mode")
c.UploadDelay = 0
c.UploadHours = ""
}
if _, _, err := c.parseHours(); err != nil {
logger.Warnf("invalid value (%s) for upload-hours: %s", c.UploadHours, err)
c.UploadHours = ""
}
if c.MaxUpload <= 0 {
logger.Warnf("max-uploads should be greater than 0, set it to 1")
Expand Down Expand Up @@ -574,6 +580,27 @@ func (c *Config) SelfCheck(uuid string) {
}
}

func (c *Config) parseHours() (start, end int, err error) {
if c.UploadHours == "" {
return
}
ps := strings.Split(c.UploadHours, ",")
if len(ps) != 2 {
err = errors.New("unexpected number of fields")
return
}
if start, err = strconv.Atoi(ps[0]); err != nil {
return
}
if end, err = strconv.Atoi(ps[1]); err != nil {
return
}
if start < 0 || start > 23 || end < 0 || end > 23 {
err = errors.New("invalid hour number")
}
return
}

type cachedStore struct {
storage object.ObjectStorage
bcache CacheManager
Expand All @@ -584,6 +611,8 @@ type cachedStore struct {
pendingCh chan *pendingItem
pendingKeys map[string]*pendingItem
pendingMutex sync.Mutex
startHour int
endHour int
compressor compress.Compressor
seekable bool
upLimit *ratelimit.Bucket
Expand Down Expand Up @@ -715,6 +744,12 @@ func NewCachedStore(storage object.ObjectStorage, config Config, reg prometheus.
store.downLimit = ratelimit.NewBucketWithRate(float64(config.DownloadLimit)*0.85, config.DownloadLimit)
}
store.initMetrics()
if store.conf.CacheDir != "memory" && store.conf.Writeback {
store.startHour, store.endHour, _ = config.parseHours()
if store.startHour != store.endHour {
logger.Infof("background upload at %d:00 ~ %d:00", store.startHour, store.endHour)
}
}
store.bcache = newCacheManager(&config, reg, func(key, fpath string, force bool) bool {
if fi, err := os.Stat(fpath); err == nil {
return store.addDelayedStaging(key, fpath, fi.ModTime(), force)
Expand Down Expand Up @@ -742,9 +777,11 @@ func NewCachedStore(storage object.ObjectStorage, config Config, reg prometheus.
}
interval := time.Minute
if d := store.conf.UploadDelay; d > 0 {
logger.Infof("delay uploading by %s", d)
if d < time.Minute {
interval = d
logger.Warnf("delay uploading by %s (this value is too small, and is not recommended)", d)
} else {
logger.Infof("delay uploading by %s", d)
}
}
go func() {
Expand Down Expand Up @@ -848,6 +885,9 @@ func (store *cachedStore) uploadStagingFile(key string, stagingPath string) {
<-store.currentUpload
}()

if !store.canUpload() {
return
}
store.pendingMutex.Lock()
item, ok := store.pendingKeys[key]
store.pendingMutex.Unlock()
Expand Down Expand Up @@ -901,7 +941,7 @@ func (store *cachedStore) addDelayedStaging(key, stagingPath string, added time.
logger.Debugf("Key %s is ignored since it's already being uploaded", key)
return true
}
if force || time.Since(added) > store.conf.UploadDelay {
if force || store.canUpload() && time.Since(added) > store.conf.UploadDelay {
select {
case store.pendingCh <- item:
item.uploading = true
Expand All @@ -919,6 +959,9 @@ func (store *cachedStore) removePending(key string) {
}

func (store *cachedStore) scanDelayedStaging() {
if !store.canUpload() {
return
}
cutoff := time.Now().Add(-store.conf.UploadDelay)
store.pendingMutex.Lock()
defer store.pendingMutex.Unlock()
Expand All @@ -938,6 +981,15 @@ func (store *cachedStore) uploader() {
}
}

func (store *cachedStore) canUpload() bool {
if store.startHour == store.endHour {
return true
}
h := time.Now().Hour()
return store.startHour < store.endHour && h >= store.startHour && h < store.endHour ||
store.startHour > store.endHour && (h >= store.startHour || h < store.endHour)
}

func (store *cachedStore) NewReader(id uint64, length int) Reader {
return sliceForRead(id, length, store)
}
Expand Down