diff --git a/core/blockstm/mvhashmap_test.go b/core/blockstm/mvhashmap_test.go new file mode 100644 index 0000000000..7ed728426c --- /dev/null +++ b/core/blockstm/mvhashmap_test.go @@ -0,0 +1,344 @@ +package blockstm + +import ( + "fmt" + "math/big" + "math/rand" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ethereum/go-ethereum/common" +) + +var randomness = rand.Intn(10) + 10 + +// create test data for a given txIdx and incarnation +func valueFor(txIdx, inc int) []byte { + return []byte(fmt.Sprintf("%ver:%ver:%ver", txIdx*5, txIdx+inc, inc*5)) +} + +func getCommonAddress(i int) common.Address { + return common.BigToAddress(big.NewInt(int64(i % randomness))) +} + +func TestHelperFunctions(t *testing.T) { + t.Parallel() + + ap1 := NewAddressKey(getCommonAddress(1)) + ap2 := NewAddressKey(getCommonAddress(2)) + + mvh := MakeMVHashMap() + + mvh.Write(ap1, Version{0, 1}, valueFor(0, 1)) + mvh.Write(ap1, Version{0, 2}, valueFor(0, 2)) + res := mvh.Read(ap1, 0) + require.Equal(t, -1, res.DepIdx()) + require.Equal(t, -1, res.Incarnation()) + require.Equal(t, 2, res.Status()) + + mvh.Write(ap2, Version{1, 1}, valueFor(1, 1)) + mvh.Write(ap2, Version{1, 2}, valueFor(1, 2)) + res = mvh.Read(ap2, 1) + require.Equal(t, -1, res.DepIdx()) + require.Equal(t, -1, res.Incarnation()) + require.Equal(t, 2, res.Status()) + + mvh.Write(ap1, Version{2, 1}, valueFor(2, 1)) + mvh.Write(ap1, Version{2, 2}, valueFor(2, 2)) + res = mvh.Read(ap1, 2) + require.Equal(t, 0, res.DepIdx()) + require.Equal(t, 2, res.Incarnation()) + require.Equal(t, valueFor(0, 2), res.Value().([]byte)) + require.Equal(t, 0, res.Status()) +} + +func TestFlushMVWrite(t *testing.T) { + t.Parallel() + + ap1 := NewAddressKey(getCommonAddress(1)) + ap2 := NewAddressKey(getCommonAddress(2)) + + mvh := MakeMVHashMap() + + var res MVReadResult + + wd := []WriteDescriptor{} + + wd = append(wd, WriteDescriptor{ + Path: ap1, + V: Version{0, 1}, + Val: valueFor(0, 1), + }) + wd = append(wd, WriteDescriptor{ + Path: ap1, + V: Version{0, 2}, + Val: valueFor(0, 2), + }) + wd = append(wd, WriteDescriptor{ + Path: ap2, + V: Version{1, 1}, + Val: valueFor(1, 1), + }) + wd = append(wd, WriteDescriptor{ + Path: ap2, + V: Version{1, 2}, + Val: valueFor(1, 2), + }) + wd = append(wd, WriteDescriptor{ + Path: ap1, + V: Version{2, 1}, + Val: valueFor(2, 1), + }) + wd = append(wd, WriteDescriptor{ + Path: ap1, + V: Version{2, 2}, + Val: valueFor(2, 2), + }) + + mvh.FlushMVWriteSet(wd) + + res = mvh.Read(ap1, 0) + require.Equal(t, -1, res.DepIdx()) + require.Equal(t, -1, res.Incarnation()) + require.Equal(t, 2, res.Status()) + + res = mvh.Read(ap2, 1) + require.Equal(t, -1, res.DepIdx()) + require.Equal(t, -1, res.Incarnation()) + require.Equal(t, 2, res.Status()) + + res = mvh.Read(ap1, 2) + require.Equal(t, 0, res.DepIdx()) + require.Equal(t, 2, res.Incarnation()) + require.Equal(t, valueFor(0, 2), res.Value().([]byte)) + require.Equal(t, 0, res.Status()) +} + +// TODO - handle panic +func TestLowerIncarnation(t *testing.T) { + t.Parallel() + + ap1 := NewAddressKey(getCommonAddress(1)) + + mvh := MakeMVHashMap() + + mvh.Write(ap1, Version{0, 2}, valueFor(0, 2)) + mvh.Read(ap1, 0) + mvh.Write(ap1, Version{1, 2}, valueFor(1, 2)) + mvh.Write(ap1, Version{0, 5}, valueFor(0, 5)) + mvh.Write(ap1, Version{1, 5}, valueFor(1, 5)) +} + +func TestMarkEstimate(t *testing.T) { + t.Parallel() + + ap1 := NewAddressKey(getCommonAddress(1)) + + mvh := MakeMVHashMap() + + mvh.Write(ap1, Version{7, 2}, valueFor(7, 2)) + mvh.MarkEstimate(ap1, 7) + mvh.Write(ap1, Version{7, 4}, valueFor(7, 4)) +} + +func TestMVHashMapBasics(t *testing.T) { + t.Parallel() + + // memory locations + ap1 := NewAddressKey(getCommonAddress(1)) + ap2 := NewAddressKey(getCommonAddress(2)) + ap3 := NewAddressKey(getCommonAddress(3)) + + mvh := MakeMVHashMap() + + res := mvh.Read(ap1, 5) + require.Equal(t, -1, res.depIdx) + + mvh.Write(ap1, Version{10, 1}, valueFor(10, 1)) + + res = mvh.Read(ap1, 9) + require.Equal(t, -1, res.depIdx, "reads that should go the the DB return dependency -1") + res = mvh.Read(ap1, 10) + require.Equal(t, -1, res.depIdx, "Read returns entries from smaller txns, not txn 10") + + // Reads for a higher txn return the entry written by txn 10. + res = mvh.Read(ap1, 15) + require.Equal(t, 10, res.depIdx, "reads for a higher txn return the entry written by txn 10.") + require.Equal(t, 1, res.incarnation) + require.Equal(t, valueFor(10, 1), res.value) + + // More writes. + mvh.Write(ap1, Version{12, 0}, valueFor(12, 0)) + mvh.Write(ap1, Version{8, 3}, valueFor(8, 3)) + + // Verify reads. + res = mvh.Read(ap1, 15) + require.Equal(t, 12, res.depIdx) + require.Equal(t, 0, res.incarnation) + require.Equal(t, valueFor(12, 0), res.value) + + res = mvh.Read(ap1, 11) + require.Equal(t, 10, res.depIdx) + require.Equal(t, 1, res.incarnation) + require.Equal(t, valueFor(10, 1), res.value) + + res = mvh.Read(ap1, 10) + require.Equal(t, 8, res.depIdx) + require.Equal(t, 3, res.incarnation) + require.Equal(t, valueFor(8, 3), res.value) + + // Mark the entry written by 10 as an estimate. + mvh.MarkEstimate(ap1, 10) + + res = mvh.Read(ap1, 11) + require.Equal(t, 10, res.depIdx) + require.Equal(t, -1, res.incarnation, "dep at tx 10 is now an estimate") + + // Delete the entry written by 10, write to a different ap. + mvh.Delete(ap1, 10) + mvh.Write(ap2, Version{10, 2}, valueFor(10, 2)) + + // Read by txn 11 no longer observes entry from txn 10. + res = mvh.Read(ap1, 11) + require.Equal(t, 8, res.depIdx) + require.Equal(t, 3, res.incarnation) + require.Equal(t, valueFor(8, 3), res.value) + + // Reads, writes for ap2 and ap3. + mvh.Write(ap2, Version{5, 0}, valueFor(5, 0)) + mvh.Write(ap3, Version{20, 4}, valueFor(20, 4)) + + res = mvh.Read(ap2, 10) + require.Equal(t, 5, res.depIdx) + require.Equal(t, 0, res.incarnation) + require.Equal(t, valueFor(5, 0), res.value) + + res = mvh.Read(ap3, 21) + require.Equal(t, 20, res.depIdx) + require.Equal(t, 4, res.incarnation) + require.Equal(t, valueFor(20, 4), res.value) + + // Clear ap1 and ap3. + mvh.Delete(ap1, 12) + mvh.Delete(ap1, 8) + mvh.Delete(ap3, 20) + + // Reads from ap1 and ap3 go to db. + res = mvh.Read(ap1, 30) + require.Equal(t, -1, res.depIdx) + + res = mvh.Read(ap3, 30) + require.Equal(t, -1, res.depIdx) + + // No-op delete at ap2 - doesn't panic because ap2 does exist + mvh.Delete(ap2, 11) + + // Read entry by txn 10 at ap2. + res = mvh.Read(ap2, 15) + require.Equal(t, 10, res.depIdx) + require.Equal(t, 2, res.incarnation) + require.Equal(t, valueFor(10, 2), res.value) +} + +func BenchmarkWriteTimeSameLocationDifferentTxIdx(b *testing.B) { + mvh2 := MakeMVHashMap() + ap2 := NewAddressKey(getCommonAddress(2)) + + randInts := []int{} + for i := 0; i < b.N; i++ { + randInts = append(randInts, rand.Intn(1000000000000000)) + } + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + mvh2.Write(ap2, Version{randInts[i], 1}, valueFor(randInts[i], 1)) + } +} + +func BenchmarkReadTimeSameLocationDifferentTxIdx(b *testing.B) { + mvh2 := MakeMVHashMap() + ap2 := NewAddressKey(getCommonAddress(2)) + txIdxSlice := []int{} + + for i := 0; i < b.N; i++ { + txIdx := rand.Intn(1000000000000000) + txIdxSlice = append(txIdxSlice, txIdx) + mvh2.Write(ap2, Version{txIdx, 1}, valueFor(txIdx, 1)) + } + + b.ResetTimer() + + for _, value := range txIdxSlice { + mvh2.Read(ap2, value) + } +} + +func TestTimeComplexity(t *testing.T) { + t.Parallel() + + // for 1000000 read and write with no dependency at different memory location + mvh1 := MakeMVHashMap() + + for i := 0; i < 1000000; i++ { + ap1 := NewAddressKey(getCommonAddress(i)) + mvh1.Write(ap1, Version{i, 1}, valueFor(i, 1)) + mvh1.Read(ap1, i) + } + + // for 1000000 read and write with dependency at same memory location + mvh2 := MakeMVHashMap() + ap2 := NewAddressKey(getCommonAddress(2)) + + for i := 0; i < 1000000; i++ { + mvh2.Write(ap2, Version{i, 1}, valueFor(i, 1)) + mvh2.Read(ap2, i) + } +} + +func TestWriteTimeSameLocationDifferentTxnIdx(t *testing.T) { + t.Parallel() + + mvh1 := MakeMVHashMap() + ap1 := NewAddressKey(getCommonAddress(1)) + + for i := 0; i < 1000000; i++ { + mvh1.Write(ap1, Version{i, 1}, valueFor(i, 1)) + } +} + +func TestWriteTimeSameLocationSameTxnIdx(t *testing.T) { + t.Parallel() + + mvh1 := MakeMVHashMap() + ap1 := NewAddressKey(getCommonAddress(1)) + + for i := 0; i < 1000000; i++ { + mvh1.Write(ap1, Version{1, i}, valueFor(i, 1)) + } +} + +func TestWriteTimeDifferentLocation(t *testing.T) { + t.Parallel() + + mvh1 := MakeMVHashMap() + + for i := 0; i < 1000000; i++ { + ap1 := NewAddressKey(getCommonAddress(i)) + mvh1.Write(ap1, Version{i, 1}, valueFor(i, 1)) + } +} + +func TestReadTimeSameLocation(t *testing.T) { + t.Parallel() + + mvh1 := MakeMVHashMap() + ap1 := NewAddressKey(getCommonAddress(1)) + + mvh1.Write(ap1, Version{1, 1}, valueFor(1, 1)) + + for i := 0; i < 1000000; i++ { + mvh1.Read(ap1, 2) + } +} diff --git a/core/blockstm/status_test.go b/core/blockstm/status_test.go new file mode 100644 index 0000000000..d76ebaba04 --- /dev/null +++ b/core/blockstm/status_test.go @@ -0,0 +1,84 @@ +package blockstm + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestStatusBasics(t *testing.T) { + t.Parallel() + + s := makeStatusManager(10) + + x := s.takeNextPending() + require.Equal(t, 0, x) + require.True(t, s.checkInProgress(x)) + + x = s.takeNextPending() + require.Equal(t, 1, x) + require.True(t, s.checkInProgress(x)) + + x = s.takeNextPending() + require.Equal(t, 2, x) + require.True(t, s.checkInProgress(x)) + + s.markComplete(0) + require.False(t, s.checkInProgress(0)) + s.markComplete(1) + s.markComplete(2) + require.False(t, s.checkInProgress(1)) + require.False(t, s.checkInProgress(2)) + require.Equal(t, 2, s.maxAllComplete()) + + x = s.takeNextPending() + require.Equal(t, 3, x) + + x = s.takeNextPending() + require.Equal(t, 4, x) + + s.markComplete(x) + require.False(t, s.checkInProgress(4)) + // PSP - is this correct? {s.maxAllComplete() -> 2} + // s -> {[5 6 7 8 9] [3] [0 1 2 4] map[] map[]} + require.Equal(t, 2, s.maxAllComplete(), "zero should still be min complete") + + exp := []int{1, 2} + require.Equal(t, exp, s.getRevalidationRange(1)) +} + +func TestMaxComplete(t *testing.T) { + t.Parallel() + + s := makeStatusManager(10) + + for { + tx := s.takeNextPending() + + if tx == -1 { + break + } + + if tx != 7 { + s.markComplete(tx) + } + } + + require.Equal(t, 6, s.maxAllComplete()) + + s2 := makeStatusManager(10) + + for { + tx := s2.takeNextPending() + + if tx == -1 { + break + } + } + s2.markComplete(2) + s2.markComplete(4) + require.Equal(t, -1, s2.maxAllComplete()) + + s2.complete = insertInList(s2.complete, 4) + require.Equal(t, 2, s2.countComplete()) +} diff --git a/go.mod b/go.mod index cf5a532d77..880ec8581f 100644 --- a/go.mod +++ b/go.mod @@ -85,6 +85,8 @@ require ( pgregory.net/rapid v0.4.8 ) +require github.com/orcaman/concurrent-map/v2 v2.0.0 // indirect + require ( github.com/Azure/azure-sdk-for-go/sdk/azcore v0.21.1 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.3 // indirect diff --git a/go.sum b/go.sum index 1bd56c0e36..de33a31248 100644 --- a/go.sum +++ b/go.sum @@ -405,6 +405,8 @@ github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFSt github.com/opentracing/opentracing-go v1.0.3-0.20180606204148-bd9c31933947/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/orcaman/concurrent-map/v2 v2.0.0 h1:iSMwuBQvQ1nX5i9gYuGMiSy0fjWHmazdjF+NdSO9JzI= +github.com/orcaman/concurrent-map/v2 v2.0.0/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q0Jv47PLaNK++uCdOM= github.com/paulbellamy/ratecounter v0.2.0/go.mod h1:Hfx1hDpSGoqxkVVpBi/IlYD7kChlfo5C6hzIHwPqfFE= github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8= github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=