Skip to content

Commit

Permalink
fix: incorrect release of sharky locations (#2848)
Browse files Browse the repository at this point in the history
  • Loading branch information
aloknerurkar authored Mar 16, 2022
1 parent 447d24d commit dda5606
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 4 deletions.
21 changes: 18 additions & 3 deletions pkg/localstore/mode_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,12 @@ func (db *DB) putRequest(
return false, 0, 0, err
}

l, err := sharky.LocationFromBinary(previous.Location)
previousIdx, err := db.retrievalDataIndex.Get(previous)
if err != nil {
return false, 0, 0, fmt.Errorf("could not fetch previous item: %w", err)
}

l, err := sharky.LocationFromBinary(previousIdx.Location)
if err != nil {
return false, 0, 0, err
}
Expand Down Expand Up @@ -383,7 +388,12 @@ func (db *DB) putUpload(
return false, 0, fmt.Errorf("same slot remove: %w", err)
}

l, err := sharky.LocationFromBinary(previous.Location)
previousIdx, err := db.retrievalDataIndex.Get(previous)
if err != nil {
return false, 0, fmt.Errorf("could not fetch previous item: %w", err)
}

l, err := sharky.LocationFromBinary(previousIdx.Location)
if err != nil {
return false, 0, err
}
Expand Down Expand Up @@ -443,7 +453,12 @@ func (db *DB) putSync(batch *leveldb.Batch, loc *releaseLocations, binIDs map[ui
return false, 0, 0, err
}

l, err := sharky.LocationFromBinary(previous.Location)
previousIdx, err := db.retrievalDataIndex.Get(previous)
if err != nil {
return false, 0, 0, fmt.Errorf("could not fetch previous item: %w", err)
}

l, err := sharky.LocationFromBinary(previousIdx.Location)
if err != nil {
return false, 0, 0, err
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/localstore/mode_put_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,14 @@ func TestModePutSyncUpload_SameIndex(t *testing.T) {
if !yes {
t.Fatal("chunk should be there")
}

out, err := db.retrievalDataIndex.Get(chunkToItem(chunks[1]))
if err != nil {
t.Fatal(err)
}

validateData(t, db, out, chunks[1].Data())

binIDs := make(map[uint8]uint64)

for _, ch := range chunks {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sharky/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (s *Store) Release(ctx context.Context, loc Location) error {
s.metrics.TotalReleaseCalls.Inc()
if err == nil {
shard := strconv.Itoa(int(sh.index))
s.metrics.CurrentShardSize.WithLabelValues(shard).Inc()
s.metrics.CurrentShardSize.WithLabelValues(shard).Dec()
s.metrics.ShardFragmentation.WithLabelValues(shard).Sub(float64(s.maxDataSize - int(loc.Length)))
} else {
s.metrics.TotalReleaseCallsErr.Inc()
Expand Down

0 comments on commit dda5606

Please sign in to comment.