Skip to content

Commit

Permalink
meta: fix protection check for compaction (#4901)
Browse files Browse the repository at this point in the history
  • Loading branch information
SandyXSD committed Jun 20, 2024
1 parent 7ae55d0 commit 6355277
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 43 deletions.
20 changes: 20 additions & 0 deletions pkg/meta/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1304,6 +1304,26 @@ func testCompaction(t *testing.T, m Meta, trash bool) {
if len(slices) != 1 || slices[0].Len != 3<<20 {
t.Fatalf("inode %d should be compacted, but have %d slices, size %d", inode, len(slices), slices[0].Len)
}

m.NewSlice(ctx, &sliceId)
_ = m.Write(ctx, inode, 2, 0, Slice{Id: sliceId, Size: 2338508, Len: 2338508}, time.Now())
m.NewSlice(ctx, &sliceId)
_ = m.Write(ctx, inode, 2, 8829056, Slice{Id: sliceId, Size: 1074933, Len: 1074933}, time.Now())
m.NewSlice(ctx, &sliceId)
_ = m.Write(ctx, inode, 2, 7663608, Slice{Id: sliceId, Size: 41480, Len: 4148}, time.Now())
_ = m.Fallocate(ctx, inode, fallocZeroRange, 2*ChunkSize+4515328, 3152428, nil)
_ = m.Fallocate(ctx, inode, fallocZeroRange, 2*ChunkSize+4515328, 2607724, nil)
if c, ok := m.(compactor); ok {
c.compactChunk(inode, 2, true)
}
if st := m.Read(ctx, inode, 2, &slices); st != 0 {
t.Fatalf("read 1: %s", st)
}
// compact twice: 4515328+2607724-2338508 = 4784544; 8829056+1074933-2338508-4784544=2780937
if len(slices) != 3 || slices[0].Len != 2338508 || slices[1].Len != 4784544 || slices[2].Len != 2780937 {
t.Fatalf("inode %d should be compacted, but have %d slices, size %d,%d,%d",
inode, len(slices), slices[0].Len, slices[1].Len, slices[2].Len)
}
}

func testConcurrentWrite(t *testing.T, m Meta) {
Expand Down
32 changes: 17 additions & 15 deletions pkg/meta/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -2918,43 +2918,45 @@ func (m *redisMeta) compactChunk(inode Ino, indx uint32, force bool) {
return
}
skipped := skipSome(ss)
var first, last *slice
if skipped > 0 {
first, last = ss[0], ss[skipped-1]
}
ss = ss[skipped:]
pos, size, slices := compactChunk(ss)
if len(ss) < 2 || size == 0 {
compacted := ss[skipped:]
pos, size, slices := compactChunk(compacted)
if len(compacted) < 2 || size == 0 {
return
}
if first != nil && last != nil && pos+size > first.pos && last.pos+last.len > pos {
panic(fmt.Sprintf("invalid compaction: skipped slices [%+v, %+v], pos %d, size %d", *first, *last, pos, size))
for _, s := range ss[:skipped] {
if pos+size > s.pos && s.pos+s.len > pos {
var sstring string
for _, s := range ss {
sstring += fmt.Sprintf("\n%+v", *s)
}
panic(fmt.Sprintf("invalid compaction skipped %d, pos %d, size %d; slices: %s", skipped, pos, size, sstring))
}
}

var id uint64
st := m.NewSlice(ctx, &id)
if st != 0 {
return
}
logger.Debugf("compact %d:%d: skipped %d slices (%d bytes) %d slices (%d bytes)", inode, indx, skipped, pos, len(ss), size)
logger.Debugf("compact %d:%d: skipped %d slices (%d bytes) %d slices (%d bytes)", inode, indx, skipped, pos, len(compacted), size)
err = m.newMsg(CompactChunk, slices, id)
if err != nil {
if !strings.Contains(err.Error(), "not exist") && !strings.Contains(err.Error(), "not found") {
logger.Warnf("compact %d %d with %d slices: %s", inode, indx, len(ss), err)
logger.Warnf("compact %d %d with %d slices: %s", inode, indx, len(compacted), err)
}
return
}
var buf []byte // trash enabled: track delayed slices
var rs []*redis.IntCmd // trash disabled: check reference of slices
trash := m.toTrash(0)
if trash {
for _, s := range ss {
for _, s := range compacted {
if s.id > 0 {
buf = append(buf, m.encodeDelayedSlice(s.id, s.size)...)
}
}
} else {
rs = make([]*redis.IntCmd, len(ss))
rs = make([]*redis.IntCmd, len(compacted))
}
key := m.chunkKey(inode, indx)
errno := errno(m.txn(ctx, func(tx *redis.Tx) error {
Expand Down Expand Up @@ -2983,7 +2985,7 @@ func (m *redisMeta) compactChunk(inode Ino, indx uint32, force bool) {
pipe.HSet(ctx, m.delSlices(), fmt.Sprintf("%d_%d", id, time.Now().Unix()), buf)
}
} else {
for i, s := range ss {
for i, s := range compacted {
if s.id > 0 {
rs[i] = pipe.HIncrBy(ctx, m.sliceRefs(), m.sliceKey(s.id, s.size), -1)
}
Expand All @@ -3010,7 +3012,7 @@ func (m *redisMeta) compactChunk(inode Ino, indx uint32, force bool) {
m.of.InvalidateChunk(inode, indx)
m.cleanupZeroRef(m.sliceKey(id, size))
if !trash {
for i, s := range ss {
for i, s := range compacted {
if s.id > 0 && rs[i].Err() == nil && rs[i].Val() < 0 {
m.deleteSlice(s.id, s.size)
}
Expand Down
30 changes: 16 additions & 14 deletions pkg/meta/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -2774,36 +2774,38 @@ func (m *dbMeta) compactChunk(inode Ino, indx uint32, force bool) {
return
}
skipped := skipSome(ss)
var first, last *slice
if skipped > 0 {
first, last = ss[0], ss[skipped-1]
}
ss = ss[skipped:]
pos, size, slices := compactChunk(ss)
if len(ss) < 2 || size == 0 {
compacted := ss[skipped:]
pos, size, slices := compactChunk(compacted)
if len(compacted) < 2 || size == 0 {
return
}
if first != nil && last != nil && pos+size > first.pos && last.pos+last.len > pos {
panic(fmt.Sprintf("invalid compaction: skipped slices [%+v, %+v], pos %d, size %d", *first, *last, pos, size))
for _, s := range ss[:skipped] {
if pos+size > s.pos && s.pos+s.len > pos {
var sstring string
for _, s := range ss {
sstring += fmt.Sprintf("\n%+v", *s)
}
panic(fmt.Sprintf("invalid compaction skipped %d, pos %d, size %d; slices: %s", skipped, pos, size, sstring))
}
}

var id uint64
st := m.NewSlice(Background, &id)
if st != 0 {
return
}
logger.Debugf("compact %d:%d: skipped %d slices (%d bytes) %d slices (%d bytes)", inode, indx, skipped, pos, len(ss), size)
logger.Debugf("compact %d:%d: skipped %d slices (%d bytes) %d slices (%d bytes)", inode, indx, skipped, pos, len(compacted), size)
err = m.newMsg(CompactChunk, slices, id)
if err != nil {
if !strings.Contains(err.Error(), "not exist") && !strings.Contains(err.Error(), "not found") {
logger.Warnf("compact %d %d with %d slices: %s", inode, indx, len(ss), err)
logger.Warnf("compact %d %d with %d slices: %s", inode, indx, len(compacted), err)
}
return
}
var buf []byte
trash := m.toTrash(0)
if trash {
for _, s := range ss {
for _, s := range compacted {
if s.id > 0 {
buf = append(buf, m.encodeDelayedSlice(s.id, s.size)...)
}
Expand Down Expand Up @@ -2835,7 +2837,7 @@ func (m *dbMeta) compactChunk(inode Ino, indx uint32, force bool) {
}
}
} else {
for _, s_ := range ss {
for _, s_ := range compacted {
if s_.id == 0 {
continue
}
Expand Down Expand Up @@ -2871,7 +2873,7 @@ func (m *dbMeta) compactChunk(inode Ino, indx uint32, force bool) {
} else if err == nil {
m.of.InvalidateChunk(inode, indx)
if !trash {
for _, s := range ss {
for _, s := range compacted {
if s.id == 0 {
continue
}
Expand Down
30 changes: 16 additions & 14 deletions pkg/meta/tkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -2399,36 +2399,38 @@ func (m *kvMeta) compactChunk(inode Ino, indx uint32, force bool) {
return
}
skipped := skipSome(ss)
var first, last *slice
if skipped > 0 {
first, last = ss[0], ss[skipped-1]
}
ss = ss[skipped:]
pos, size, slices := compactChunk(ss)
if len(ss) < 2 || size == 0 {
compacted := ss[skipped:]
pos, size, slices := compactChunk(compacted)
if len(compacted) < 2 || size == 0 {
return
}
if first != nil && last != nil && pos+size > first.pos && last.pos+last.len > pos {
panic(fmt.Sprintf("invalid compaction: skipped slices [%+v, %+v], pos %d, size %d", *first, *last, pos, size))
for _, s := range ss[:skipped] {
if pos+size > s.pos && s.pos+s.len > pos {
var sstring string
for _, s := range ss {
sstring += fmt.Sprintf("\n%+v", *s)
}
panic(fmt.Sprintf("invalid compaction skipped %d, pos %d, size %d; slices: %s", skipped, pos, size, sstring))
}
}

var id uint64
st := m.NewSlice(Background, &id)
if st != 0 {
return
}
logger.Debugf("compact %d:%d: skipped %d slices (%d bytes) %d slices (%d bytes)", inode, indx, skipped, pos, len(ss), size)
logger.Debugf("compact %d:%d: skipped %d slices (%d bytes) %d slices (%d bytes)", inode, indx, skipped, pos, len(compacted), size)
err = m.newMsg(CompactChunk, slices, id)
if err != nil {
if !strings.Contains(err.Error(), "not exist") && !strings.Contains(err.Error(), "not found") {
logger.Warnf("compact %d %d with %d slices: %s", inode, indx, len(ss), err)
logger.Warnf("compact %d %d with %d slices: %s", inode, indx, len(compacted), err)
}
return
}
var dsbuf []byte
trash := m.toTrash(0)
if trash {
for _, s := range ss {
for _, s := range compacted {
if s.id > 0 {
dsbuf = append(dsbuf, m.encodeDelayedSlice(s.id, s.size)...)
}
Expand All @@ -2450,7 +2452,7 @@ func (m *kvMeta) compactChunk(inode Ino, indx uint32, force bool) {
tx.set(m.delSliceKey(time.Now().Unix(), id), dsbuf)
}
} else {
for _, s := range ss {
for _, s := range compacted {
if s.id > 0 {
tx.incrBy(m.sliceKey(s.id, s.size), -1)
}
Expand Down Expand Up @@ -2480,7 +2482,7 @@ func (m *kvMeta) compactChunk(inode Ino, indx uint32, force bool) {
m.cleanupZeroRef(id, size)
if !trash {
var refs int64
for _, s := range ss {
for _, s := range compacted {
if s.id > 0 && m.client.txn(func(tx *kvTxn) error {
refs = tx.incrBy(m.sliceKey(s.id, s.size), 0)
return nil
Expand Down

0 comments on commit 6355277

Please sign in to comment.