Skip to content

Commit

Permalink
⚡ Improve boot sync performance
Browse files Browse the repository at this point in the history
  • Loading branch information
88250 committed Nov 20, 2024
1 parent 51a4270 commit 12bce3f
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 21 deletions.
3 changes: 2 additions & 1 deletion repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,7 @@ func (repo *Repo) index0(memo string, context map[string]interface{}) (ret *enti
workerErrLock := sync.Mutex{}
var upserts, removes, latestFiles []*entity.File
if !init {
start = time.Now()
count := atomic.Int32{}
total := len(files)
eventbus.Publish(eventbus.EvtIndexBeforeGetLatestFiles, context, total)
Expand Down Expand Up @@ -751,7 +752,7 @@ func (repo *Repo) index0(memo string, context map[string]interface{}) (ret *enti
}
waitGroup.Wait()
p.Release()

logging.LogInfof("get latest files [files=%d] cost [%s]", len(latestFiles), time.Since(start))
if 0 < len(workerErrs) {
err = workerErrs[0]
logging.LogErrorf("get latest files failed: %s", err)
Expand Down
35 changes: 15 additions & 20 deletions sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,19 @@ type TrafficStat struct {
m *sync.Mutex
}

func (repo *Repo) GetSyncCloudFiles(context map[string]interface{}) (fetchedFiles []*entity.File, err error) {
func (repo *Repo) GetSyncCloudFiles(cloudLatest *entity.Index, context map[string]interface{}) (fetchedFiles []*entity.File, err error) {
lock.Lock()
defer lock.Unlock()

fetchedFiles, err = repo.getSyncCloudFiles(context)
fetchedFiles, err = repo.getSyncCloudFiles(cloudLatest, context)
return
}

func (repo *Repo) GetCloudLatest(context map[string]interface{}) (cloudLatest *entity.Index, err error) {
lock.Lock()
defer lock.Unlock()

_, cloudLatest, err = repo.downloadCloudLatest(context)
return
}

Expand Down Expand Up @@ -702,28 +710,13 @@ func (repo *Repo) filterLocalUpserts(localUpserts, cloudUpserts []*entity.File)
return
}

func (repo *Repo) getSyncCloudFiles(context map[string]interface{}) (fetchedFiles []*entity.File, err error) {
func (repo *Repo) getSyncCloudFiles(cloudLatest *entity.Index, context map[string]interface{}) (fetchedFiles []*entity.File, err error) {
latest, err := repo.Latest()
if nil != err {
logging.LogErrorf("get latest failed: %s", err)
return
}

// 从云端获取最新索引
length, cloudLatest, err := repo.downloadCloudLatest(context)
if nil != err {
if !errors.Is(err, cloud.ErrCloudObjectNotFound) {
logging.LogErrorf("download cloud latest failed: %s", err)
return
}
}
trafficStat := &TrafficStat{m: &sync.Mutex{}}
trafficStat.m.Lock()
trafficStat.DownloadFileCount++
trafficStat.DownloadBytes += length
trafficStat.APIGet++
trafficStat.m.Unlock()

if cloudLatest.ID == latest.ID {
// 数据一致,直接返回
return
Expand All @@ -743,11 +736,12 @@ func (repo *Repo) getSyncCloudFiles(context map[string]interface{}) (fetchedFile
}

// 从云端下载缺失文件并入库
length, fetchedFiles, err = repo.downloadCloudFilesPut(fetchFileIDs, context)
length, fetchedFiles, err := repo.downloadCloudFilesPut(fetchFileIDs, context)
if nil != err {
logging.LogErrorf("download cloud files put failed: %s", err)
return
}
trafficStat := &TrafficStat{m: &sync.Mutex{}}
trafficStat.DownloadBytes += length
trafficStat.DownloadFileCount += len(fetchFileIDs)
trafficStat.APIGet += len(fetchFileIDs)
Expand Down Expand Up @@ -1536,6 +1530,7 @@ func (repo *Repo) downloadCloudIndex(id string, context map[string]interface{})
}

func (repo *Repo) downloadCloudLatest(context map[string]interface{}) (downloadBytes int64, index *entity.Index, err error) {
start := time.Now()
index = &entity.Index{}

key := path.Join("refs", "latest")
Expand Down Expand Up @@ -1582,7 +1577,7 @@ func (repo *Repo) downloadCloudLatest(context map[string]interface{}) (downloadB
}
}

logging.LogInfof("got cloud latest [%s]", index.String())
logging.LogInfof("got cloud latest [%s], cost [%s]", index.String(), time.Since(start))
return
}

Expand Down

0 comments on commit 12bce3f

Please sign in to comment.