Skip to content

Commit

Permalink
perf(metarepos): add mrpb.StorageNodeUncommitReport pool
Browse files Browse the repository at this point in the history
This patch adds an `mrpb.StorageNodeUncommitReport` pool, which is
`proto/mrpb.storageNodeUncommitReportPool`. It reduces heap objects allocated repeatedly whenever
processing reports received from storage nodes.

Below heap profiling is the previous result.

```
      flat  flat%   sum%        cum   cum%
  70865580  8.38% 44.73%   71293773  8.43%  github.com/kakao/varlog/internal/metarepos.(*reportCollectExecutor).processReport
```

Now here it is after applying this patch.

```
      flat  flat%   sum%        cum   cum%
  20734504  2.10% 75.92%   42122814  4.26%  github.com/kakao/varlog/internal/metarepos.(*reportCollectExecutor).processReport
```
  • Loading branch information
ijsong committed May 16, 2023
1 parent 626fe12 commit fe4c70e
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 8 deletions.
16 changes: 8 additions & 8 deletions internal/metarepos/report_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,7 @@ func (rce *reportCollectExecutor) getReport(ctx context.Context) error {
}

report := rce.processReport(response)
defer report.Release()
if report.Len() > 0 {
if err := rce.helper.ProposeReport(rce.storageNodeID, report.UncommitReports); err != nil {
rce.reportCtx.setExpire()
Expand All @@ -646,10 +647,8 @@ func (rce *reportCollectExecutor) getReport(ctx context.Context) error {
}

func (rce *reportCollectExecutor) processReport(response *snpb.GetReportResponse) *mrpb.StorageNodeUncommitReport {
report := &mrpb.StorageNodeUncommitReport{
StorageNodeID: response.StorageNodeID,
UncommitReports: response.UncommitReports,
}
report := mrpb.NewStoragenodeUncommitReport(response.StorageNodeID)
report.UncommitReports = response.UncommitReports

if report.Len() == 0 {
return report
Expand All @@ -664,9 +663,9 @@ func (rce *reportCollectExecutor) processReport(response *snpb.GetReportResponse
return report
}

diff := &mrpb.StorageNodeUncommitReport{
StorageNodeID: report.StorageNodeID,
}
diff := mrpb.NewStoragenodeUncommitReport(report.StorageNodeID)
diff.UncommitReports = make([]snpb.LogStreamUncommitReport, 0, len(report.UncommitReports))
defer report.Release()

i := 0
j := 0
Expand Down Expand Up @@ -919,7 +918,8 @@ func (rc *reportContext) saveReport(report *mrpb.StorageNodeUncommitReport) {
rc.mu.Lock()
defer rc.mu.Unlock()

rc.report = report
rc.report = mrpb.NewStoragenodeUncommitReport(report.StorageNodeID)
rc.report.UncommitReports = report.UncommitReports
}

func (rc *reportContext) getReport() *mrpb.StorageNodeUncommitReport {
Expand Down
19 changes: 19 additions & 0 deletions proto/mrpb/raft_metadata_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mrpb

import (
"sort"
"sync"

"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/proto/snpb"
Expand Down Expand Up @@ -75,6 +76,24 @@ func (l *LogStreamUncommitReports) Deleted() bool {
return l.Status == varlogpb.LogStreamStatusDeleted
}

var storageNodeUncommitReportPool = sync.Pool{
New: func() any {
return new(StorageNodeUncommitReport)
},
}

func NewStoragenodeUncommitReport(snid types.StorageNodeID) *StorageNodeUncommitReport {
r := storageNodeUncommitReportPool.Get().(*StorageNodeUncommitReport)
r.StorageNodeID = snid
return r
}

func (l *StorageNodeUncommitReport) Release() {
l.StorageNodeID = types.StorageNodeID(0)
l.UncommitReports = nil
storageNodeUncommitReportPool.Put(l)
}

func (l *StorageNodeUncommitReport) Len() int {
return len(l.UncommitReports)
}
Expand Down

0 comments on commit fe4c70e

Please sign in to comment.