From 12bce3fc2a586352e1cf97304d363ed2373526ab Mon Sep 17 00:00:00 2001 From: Daniel <845765@qq.com> Date: Thu, 21 Nov 2024 01:04:49 +0800 Subject: [PATCH] :zap: Improve boot sync performance --- repo.go | 3 ++- sync.go | 35 +++++++++++++++-------------------- 2 files changed, 17 insertions(+), 21 deletions(-) diff --git a/repo.go b/repo.go index c19a97b..3ad284c 100644 --- a/repo.go +++ b/repo.go @@ -681,6 +681,7 @@ func (repo *Repo) index0(memo string, context map[string]interface{}) (ret *enti workerErrLock := sync.Mutex{} var upserts, removes, latestFiles []*entity.File if !init { + start = time.Now() count := atomic.Int32{} total := len(files) eventbus.Publish(eventbus.EvtIndexBeforeGetLatestFiles, context, total) @@ -751,7 +752,7 @@ func (repo *Repo) index0(memo string, context map[string]interface{}) (ret *enti } waitGroup.Wait() p.Release() - + logging.LogInfof("get latest files [files=%d] cost [%s]", len(latestFiles), time.Since(start)) if 0 < len(workerErrs) { err = workerErrs[0] logging.LogErrorf("get latest files failed: %s", err) diff --git a/sync.go b/sync.go index 1cff49a..69502cc 100644 --- a/sync.go +++ b/sync.go @@ -81,11 +81,19 @@ type TrafficStat struct { m *sync.Mutex } -func (repo *Repo) GetSyncCloudFiles(context map[string]interface{}) (fetchedFiles []*entity.File, err error) { +func (repo *Repo) GetSyncCloudFiles(cloudLatest *entity.Index, context map[string]interface{}) (fetchedFiles []*entity.File, err error) { lock.Lock() defer lock.Unlock() - fetchedFiles, err = repo.getSyncCloudFiles(context) + fetchedFiles, err = repo.getSyncCloudFiles(cloudLatest, context) + return +} + +func (repo *Repo) GetCloudLatest(context map[string]interface{}) (cloudLatest *entity.Index, err error) { + lock.Lock() + defer lock.Unlock() + + _, cloudLatest, err = repo.downloadCloudLatest(context) return } @@ -702,28 +710,13 @@ func (repo *Repo) filterLocalUpserts(localUpserts, cloudUpserts []*entity.File) return } -func (repo *Repo) getSyncCloudFiles(context map[string]interface{}) (fetchedFiles []*entity.File, err error) { +func (repo *Repo) getSyncCloudFiles(cloudLatest *entity.Index, context map[string]interface{}) (fetchedFiles []*entity.File, err error) { latest, err := repo.Latest() if nil != err { logging.LogErrorf("get latest failed: %s", err) return } - // 从云端获取最新索引 - length, cloudLatest, err := repo.downloadCloudLatest(context) - if nil != err { - if !errors.Is(err, cloud.ErrCloudObjectNotFound) { - logging.LogErrorf("download cloud latest failed: %s", err) - return - } - } - trafficStat := &TrafficStat{m: &sync.Mutex{}} - trafficStat.m.Lock() - trafficStat.DownloadFileCount++ - trafficStat.DownloadBytes += length - trafficStat.APIGet++ - trafficStat.m.Unlock() - if cloudLatest.ID == latest.ID { // 数据一致,直接返回 return @@ -743,11 +736,12 @@ func (repo *Repo) getSyncCloudFiles(context map[string]interface{}) (fetchedFile } // 从云端下载缺失文件并入库 - length, fetchedFiles, err = repo.downloadCloudFilesPut(fetchFileIDs, context) + length, fetchedFiles, err := repo.downloadCloudFilesPut(fetchFileIDs, context) if nil != err { logging.LogErrorf("download cloud files put failed: %s", err) return } + trafficStat := &TrafficStat{m: &sync.Mutex{}} trafficStat.DownloadBytes += length trafficStat.DownloadFileCount += len(fetchFileIDs) trafficStat.APIGet += len(fetchFileIDs) @@ -1536,6 +1530,7 @@ func (repo *Repo) downloadCloudIndex(id string, context map[string]interface{}) } func (repo *Repo) downloadCloudLatest(context map[string]interface{}) (downloadBytes int64, index *entity.Index, err error) { + start := time.Now() index = &entity.Index{} key := path.Join("refs", "latest") @@ -1582,7 +1577,7 @@ func (repo *Repo) downloadCloudLatest(context map[string]interface{}) (downloadB } } - logging.LogInfof("got cloud latest [%s]", index.String()) + logging.LogInfof("got cloud latest [%s], cost [%s]", index.String(), time.Since(start)) return }