diff --git a/engine/access/rest/websockets/controller_test.go b/engine/access/rest/websockets/controller_test.go index 8dc16603604..21cd5552708 100644 --- a/engine/access/rest/websockets/controller_test.go +++ b/engine/access/rest/websockets/controller_test.go @@ -7,35 +7,53 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + + streammock "github.com/onflow/flow-go/engine/access/state_stream/mock" + "github.com/google/uuid" "github.com/gorilla/websocket" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" dpmock "github.com/onflow/flow-go/engine/access/rest/websockets/data_providers/mock" connmock "github.com/onflow/flow-go/engine/access/rest/websockets/mock" "github.com/onflow/flow-go/engine/access/rest/websockets/models" + "github.com/onflow/flow-go/engine/access/state_stream/backend" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/utils/unittest" ) +// WsControllerSuite is a test suite for the WebSocket Controller. type WsControllerSuite struct { suite.Suite logger zerolog.Logger wsConfig Config + + connection *connmock.WebsocketConnection + dataProviderFactory *dpmock.DataProviderFactory + + streamApi *streammock.API + streamConfig backend.Config } +func TestControllerSuite(t *testing.T) { + suite.Run(t, new(WsControllerSuite)) +} + +// SetupTest initializes the test suite with required dependencies. func (s *WsControllerSuite) SetupTest() { s.logger = unittest.Logger() s.wsConfig = NewDefaultWebsocketConfig() -} -func TestWsControllerSuite(t *testing.T) { - suite.Run(t, new(WsControllerSuite)) + s.connection = connmock.NewWebsocketConnection(s.T()) + s.dataProviderFactory = dpmock.NewDataProviderFactory(s.T()) + + s.streamApi = streammock.NewAPI(s.T()) + s.streamConfig = backend.Config{} } // TestSubscribeRequest tests the subscribe to topic flow. diff --git a/engine/access/rpc/connection/connection_test.go b/engine/access/rpc/connection/connection_test.go index 4ef7d9a978b..33e993f0b8b 100644 --- a/engine/access/rpc/connection/connection_test.go +++ b/engine/access/rpc/connection/connection_test.go @@ -40,7 +40,11 @@ func TestProxyAccessAPI(t *testing.T) { req := &access.PingRequest{} expected := &access.PingResponse{} - cn.handler.On("Ping", testifymock.Anything, req).Return(expected, nil) + cn.handler. + On("Ping", + testifymock.Anything, + testifymock.AnythingOfType("*access.PingRequest")). + Return(expected, nil) // create the factory connectionFactory := new(ConnectionFactoryImpl) @@ -85,7 +89,11 @@ func TestProxyExecutionAPI(t *testing.T) { req := &execution.PingRequest{} expected := &execution.PingResponse{} - en.handler.On("Ping", testifymock.Anything, req).Return(expected, nil) + en.handler. + On("Ping", + testifymock.Anything, + testifymock.AnythingOfType("*execution.PingRequest")). + Return(expected, nil) // create the factory connectionFactory := new(ConnectionFactoryImpl) @@ -130,7 +138,11 @@ func TestProxyAccessAPIConnectionReuse(t *testing.T) { req := &access.PingRequest{} expected := &access.PingResponse{} - cn.handler.On("Ping", testifymock.Anything, req).Return(expected, nil) + cn.handler. + On("Ping", + testifymock.Anything, + testifymock.AnythingOfType("*access.PingRequest")). + Return(expected, nil) // create the factory connectionFactory := new(ConnectionFactoryImpl) @@ -188,7 +200,11 @@ func TestProxyExecutionAPIConnectionReuse(t *testing.T) { req := &execution.PingRequest{} expected := &execution.PingResponse{} - en.handler.On("Ping", testifymock.Anything, req).Return(expected, nil) + en.handler. + On("Ping", + testifymock.Anything, + testifymock.AnythingOfType("*execution.PingRequest")). + Return(expected, nil) // create the factory connectionFactory := new(ConnectionFactoryImpl) @@ -250,7 +266,12 @@ func TestExecutionNodeClientTimeout(t *testing.T) { // setup the handler mock to not respond within the timeout req := &execution.PingRequest{} resp := &execution.PingResponse{} - en.handler.On("Ping", testifymock.Anything, req).After(timeout+time.Second).Return(resp, nil) + en.handler. + On("Ping", + testifymock.Anything, + testifymock.AnythingOfType("*execution.PingRequest")). + After(timeout+time.Second). + Return(resp, nil) // create the factory connectionFactory := new(ConnectionFactoryImpl) @@ -302,7 +323,12 @@ func TestCollectionNodeClientTimeout(t *testing.T) { // setup the handler mock to not respond within the timeout req := &access.PingRequest{} resp := &access.PingResponse{} - cn.handler.On("Ping", testifymock.Anything, req).After(timeout+time.Second).Return(resp, nil) + cn.handler. + On("Ping", + testifymock.Anything, + testifymock.AnythingOfType("*access.PingRequest")). + After(timeout+time.Second). + Return(resp, nil) // create the factory connectionFactory := new(ConnectionFactoryImpl) @@ -353,11 +379,22 @@ func TestConnectionPoolFull(t *testing.T) { defer cn2.stop(t) defer cn3.stop(t) - req := &access.PingRequest{} expected := &access.PingResponse{} - cn1.handler.On("Ping", testifymock.Anything, req).Return(expected, nil) - cn2.handler.On("Ping", testifymock.Anything, req).Return(expected, nil) - cn3.handler.On("Ping", testifymock.Anything, req).Return(expected, nil) + cn1.handler. + On("Ping", + testifymock.Anything, + testifymock.AnythingOfType("*access.PingRequest")). + Return(expected, nil) + cn2.handler. + On("Ping", + testifymock.Anything, + testifymock.AnythingOfType("*access.PingRequest")). + Return(expected, nil) + cn3.handler. + On("Ping", + testifymock.Anything, + testifymock.AnythingOfType("*access.PingRequest")). + Return(expected, nil) // create the factory connectionFactory := new(ConnectionFactoryImpl) @@ -436,7 +473,11 @@ func TestConnectionPoolStale(t *testing.T) { req := &access.PingRequest{} expected := &access.PingResponse{} - cn.handler.On("Ping", testifymock.Anything, req).Return(expected, nil) + cn.handler. + On("Ping", + testifymock.Anything, + testifymock.AnythingOfType("*access.PingRequest")). + Return(expected, nil) // create the factory connectionFactory := new(ConnectionFactoryImpl) @@ -525,9 +566,14 @@ func TestExecutionNodeClientClosedGracefully(t *testing.T) { req := &execution.PingRequest{} resp := &execution.PingResponse{} respSent := atomic.NewUint64(0) - en.handler.On("Ping", testifymock.Anything, req).Run(func(_ testifymock.Arguments) { - respSent.Inc() - }).Return(resp, nil) + en.handler. + On("Ping", + testifymock.Anything, + testifymock.AnythingOfType("*execution.PingRequest")). + Run(func(_ testifymock.Arguments) { + respSent.Inc() + }). + Return(resp, nil) // create the factory connectionFactory := new(ConnectionFactoryImpl) @@ -615,14 +661,18 @@ func TestEvictingCacheClients(t *testing.T) { // Set up mock handlers for Ping and GetNetworkParameters pingReq := &access.PingRequest{} pingResp := &access.PingResponse{} - cn.handler.On("Ping", testifymock.Anything, pingReq).Return( - func(context.Context, *access.PingRequest) *access.PingResponse { - close(startPing) - <-returnFromPing // keeps request open until returnFromPing is closed - return pingResp - }, - func(context.Context, *access.PingRequest) error { return nil }, - ) + cn.handler. + On("Ping", + testifymock.Anything, + testifymock.AnythingOfType("*access.PingRequest")). + Return( + func(context.Context, *access.PingRequest) *access.PingResponse { + close(startPing) + <-returnFromPing // keeps request open until returnFromPing is closed + return pingResp + }, + func(context.Context, *access.PingRequest) error { return nil }, + ) netReq := &access.GetNetworkParametersRequest{} netResp := &access.GetNetworkParametersResponse{} @@ -748,7 +798,9 @@ func TestConcurrentConnections(t *testing.T) { requestCount := rapid.IntRange(50, 1000).Draw(tt, "r") responsesSent := atomic.NewInt32(0) en.handler. - On("Ping", testifymock.Anything, req). + On("Ping", + testifymock.Anything, + testifymock.AnythingOfType("*execution.PingRequest")). Return(func(_ context.Context, _ *execution.PingRequest) (*execution.PingResponse, error) { time.Sleep(getSleep() * time.Microsecond) @@ -891,7 +943,13 @@ func TestCircuitBreakerExecutionNode(t *testing.T) { ctx := context.Background() // Set up the handler mock to not respond within the requestTimeout. - en.handler.On("Ping", testifymock.Anything, req).After(2*requestTimeout).Return(resp, nil) + en.handler. + On("Ping", + testifymock.Anything, + testifymock.AnythingOfType("*execution.PingRequest")). + After(2*requestTimeout). + Return(resp, nil). + Once() // Call and measure the duration for the first invocation. duration, err := callAndMeasurePingDuration(ctx) @@ -900,12 +958,15 @@ func TestCircuitBreakerExecutionNode(t *testing.T) { // Call and measure the duration for the second invocation (circuit breaker state is now "Open"). duration, err = callAndMeasurePingDuration(ctx) - assert.Equal(t, gobreaker.ErrOpenState, err) + assert.ErrorIs(t, err, gobreaker.ErrOpenState) assert.Greater(t, requestTimeout, duration) - // Reset the mock Ping for the next invocation to return response without delay - en.handler.On("Ping", testifymock.Anything, req).Unset() - en.handler.On("Ping", testifymock.Anything, req).Return(resp, nil) + en.handler. + On("Ping", + testifymock.Anything, + testifymock.AnythingOfType("*execution.PingRequest")). + Return(resp, nil). + Once() // Wait until the circuit breaker transitions to the "HalfOpen" state. time.Sleep(circuitBreakerRestoreTimeout + (500 * time.Millisecond)) @@ -913,15 +974,19 @@ func TestCircuitBreakerExecutionNode(t *testing.T) { // Call and measure the duration for the third invocation (circuit breaker state is now "HalfOpen"). duration, err = callAndMeasurePingDuration(ctx) assert.Greater(t, requestTimeout, duration) - assert.Equal(t, nil, err) + assert.NoError(t, err) }) for _, code := range successCodes { t.Run(fmt.Sprintf("test error %s treated as a success for circuit breaker ", code.String()), func(t *testing.T) { ctx := context.Background() - en.handler.On("Ping", testifymock.Anything, req).Unset() - en.handler.On("Ping", testifymock.Anything, req).Return(nil, status.Error(code, code.String())) + en.handler. + On("Ping", + testifymock.Anything, + testifymock.AnythingOfType("*execution.PingRequest")). + Return(nil, status.Error(code, code.String())). + Once() duration, err := callAndMeasurePingDuration(ctx) require.Error(t, err) @@ -997,7 +1062,13 @@ func TestCircuitBreakerCollectionNode(t *testing.T) { ctx := context.Background() // Set up the handler mock to not respond within the requestTimeout. - cn.handler.On("Ping", testifymock.Anything, req).After(2*requestTimeout).Return(resp, nil) + cn.handler. + On("Ping", + testifymock.Anything, + testifymock.AnythingOfType("*access.PingRequest")). + After(2*requestTimeout). + Return(resp, nil). + Once() // Call and measure the duration for the first invocation. duration, err := callAndMeasurePingDuration(ctx) @@ -1009,9 +1080,12 @@ func TestCircuitBreakerCollectionNode(t *testing.T) { assert.Equal(t, gobreaker.ErrOpenState, err) assert.Greater(t, requestTimeout, duration) - // Reset the mock Ping for the next invocation to return response without delay - cn.handler.On("Ping", testifymock.Anything, req).Unset() - cn.handler.On("Ping", testifymock.Anything, req).Return(resp, nil) + cn.handler. + On("Ping", + testifymock.Anything, + testifymock.AnythingOfType("*access.PingRequest")). + Return(resp, nil). + Once() // Wait until the circuit breaker transitions to the "HalfOpen" state. time.Sleep(circuitBreakerRestoreTimeout + (500 * time.Millisecond)) @@ -1026,8 +1100,12 @@ func TestCircuitBreakerCollectionNode(t *testing.T) { t.Run(fmt.Sprintf("test error %s treated as a success for circuit breaker ", code.String()), func(t *testing.T) { ctx := context.Background() - cn.handler.On("Ping", testifymock.Anything, req).Unset() - cn.handler.On("Ping", testifymock.Anything, req).Return(nil, status.Error(code, code.String())) + cn.handler. + On("Ping", + testifymock.Anything, + testifymock.AnythingOfType("*access.PingRequest")). + Return(nil, status.Error(code, code.String())). + Once() duration, err := callAndMeasurePingDuration(ctx) require.Error(t, err) diff --git a/ledger/complete/wal/checkpoint_v6_writer.go b/ledger/complete/wal/checkpoint_v6_writer.go index 5c420b8842d..b72eff4392e 100644 --- a/ledger/complete/wal/checkpoint_v6_writer.go +++ b/ledger/complete/wal/checkpoint_v6_writer.go @@ -19,6 +19,7 @@ import ( "github.com/onflow/flow-go/ledger/complete/mtrie/node" "github.com/onflow/flow-go/ledger/complete/mtrie/trie" utilsio "github.com/onflow/flow-go/utils/io" + "github.com/onflow/flow-go/utils/merr" ) const subtrieLevel = 4 @@ -708,39 +709,7 @@ func decodeSubtrieCount(encoded []byte) (uint16, error) { return binary.BigEndian.Uint16(encoded), nil } -// closeAndMergeError close the closable and merge the closeErr with the given err into a multierror -// Note: when using this function in a defer function, don't use as below: -// func XXX() ( -// -// err error, -// ) { -// def func() { -// // bad, because the definition of err might get overwritten -// err = closeAndMergeError(closable, err) -// }() -// -// Better to use as below: -// func XXX() ( -// -// errToReturn error, -// ) { -// def func() { -// // good, because the error to returned is only updated here, and guaranteed to be returned -// errToReturn = closeAndMergeError(closable, errToReturn) -// }() -func closeAndMergeError(closable io.Closer, err error) error { - var merr *multierror.Error - if err != nil { - merr = multierror.Append(merr, err) - } - - closeError := closable.Close() - if closeError != nil { - merr = multierror.Append(merr, closeError) - } - - return merr.ErrorOrNil() -} +var closeAndMergeError = merr.CloseAndMergeError // withFile opens the file at the given path, and calls the given function with the opened file. // it handles closing the file and evicting the file from Linux page cache. diff --git a/storage/mock/iter_item.go b/storage/mock/iter_item.go new file mode 100644 index 00000000000..5d699511fb8 --- /dev/null +++ b/storage/mock/iter_item.go @@ -0,0 +1,82 @@ +// Code generated by mockery v2.43.2. DO NOT EDIT. + +package mock + +import mock "github.com/stretchr/testify/mock" + +// IterItem is an autogenerated mock type for the IterItem type +type IterItem struct { + mock.Mock +} + +// Key provides a mock function with given fields: +func (_m *IterItem) Key() []byte { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Key") + } + + var r0 []byte + if rf, ok := ret.Get(0).(func() []byte); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]byte) + } + } + + return r0 +} + +// KeyCopy provides a mock function with given fields: dst +func (_m *IterItem) KeyCopy(dst []byte) []byte { + ret := _m.Called(dst) + + if len(ret) == 0 { + panic("no return value specified for KeyCopy") + } + + var r0 []byte + if rf, ok := ret.Get(0).(func([]byte) []byte); ok { + r0 = rf(dst) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]byte) + } + } + + return r0 +} + +// Value provides a mock function with given fields: _a0 +func (_m *IterItem) Value(_a0 func([]byte) error) error { + ret := _m.Called(_a0) + + if len(ret) == 0 { + panic("no return value specified for Value") + } + + var r0 error + if rf, ok := ret.Get(0).(func(func([]byte) error) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewIterItem creates a new instance of IterItem. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewIterItem(t interface { + mock.TestingT + Cleanup(func()) +}) *IterItem { + mock := &IterItem{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/storage/mock/iterator.go b/storage/mock/iterator.go new file mode 100644 index 00000000000..1b094ac15e1 --- /dev/null +++ b/storage/mock/iterator.go @@ -0,0 +1,106 @@ +// Code generated by mockery v2.43.2. DO NOT EDIT. + +package mock + +import ( + storage "github.com/onflow/flow-go/storage" + mock "github.com/stretchr/testify/mock" +) + +// Iterator is an autogenerated mock type for the Iterator type +type Iterator struct { + mock.Mock +} + +// Close provides a mock function with given fields: +func (_m *Iterator) Close() error { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Close") + } + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// First provides a mock function with given fields: +func (_m *Iterator) First() bool { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for First") + } + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// IterItem provides a mock function with given fields: +func (_m *Iterator) IterItem() storage.IterItem { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for IterItem") + } + + var r0 storage.IterItem + if rf, ok := ret.Get(0).(func() storage.IterItem); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(storage.IterItem) + } + } + + return r0 +} + +// Next provides a mock function with given fields: +func (_m *Iterator) Next() { + _m.Called() +} + +// Valid provides a mock function with given fields: +func (_m *Iterator) Valid() bool { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Valid") + } + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// NewIterator creates a new instance of Iterator. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewIterator(t interface { + mock.TestingT + Cleanup(func()) +}) *Iterator { + mock := &Iterator{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/storage/mock/reader.go b/storage/mock/reader.go new file mode 100644 index 00000000000..f9b15b532b5 --- /dev/null +++ b/storage/mock/reader.go @@ -0,0 +1,98 @@ +// Code generated by mockery v2.43.2. DO NOT EDIT. + +package mock + +import ( + io "io" + + storage "github.com/onflow/flow-go/storage" + mock "github.com/stretchr/testify/mock" +) + +// Reader is an autogenerated mock type for the Reader type +type Reader struct { + mock.Mock +} + +// Get provides a mock function with given fields: key +func (_m *Reader) Get(key []byte) ([]byte, io.Closer, error) { + ret := _m.Called(key) + + if len(ret) == 0 { + panic("no return value specified for Get") + } + + var r0 []byte + var r1 io.Closer + var r2 error + if rf, ok := ret.Get(0).(func([]byte) ([]byte, io.Closer, error)); ok { + return rf(key) + } + if rf, ok := ret.Get(0).(func([]byte) []byte); ok { + r0 = rf(key) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]byte) + } + } + + if rf, ok := ret.Get(1).(func([]byte) io.Closer); ok { + r1 = rf(key) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(io.Closer) + } + } + + if rf, ok := ret.Get(2).(func([]byte) error); ok { + r2 = rf(key) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// NewIter provides a mock function with given fields: startPrefix, endPrefix, ops +func (_m *Reader) NewIter(startPrefix []byte, endPrefix []byte, ops storage.IteratorOption) (storage.Iterator, error) { + ret := _m.Called(startPrefix, endPrefix, ops) + + if len(ret) == 0 { + panic("no return value specified for NewIter") + } + + var r0 storage.Iterator + var r1 error + if rf, ok := ret.Get(0).(func([]byte, []byte, storage.IteratorOption) (storage.Iterator, error)); ok { + return rf(startPrefix, endPrefix, ops) + } + if rf, ok := ret.Get(0).(func([]byte, []byte, storage.IteratorOption) storage.Iterator); ok { + r0 = rf(startPrefix, endPrefix, ops) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(storage.Iterator) + } + } + + if rf, ok := ret.Get(1).(func([]byte, []byte, storage.IteratorOption) error); ok { + r1 = rf(startPrefix, endPrefix, ops) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// NewReader creates a new instance of Reader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewReader(t interface { + mock.TestingT + Cleanup(func()) +}) *Reader { + mock := &Reader{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/storage/mock/reader_batch_writer.go b/storage/mock/reader_batch_writer.go new file mode 100644 index 00000000000..c64c340704e --- /dev/null +++ b/storage/mock/reader_batch_writer.go @@ -0,0 +1,72 @@ +// Code generated by mockery v2.43.2. DO NOT EDIT. + +package mock + +import ( + storage "github.com/onflow/flow-go/storage" + mock "github.com/stretchr/testify/mock" +) + +// ReaderBatchWriter is an autogenerated mock type for the ReaderBatchWriter type +type ReaderBatchWriter struct { + mock.Mock +} + +// AddCallback provides a mock function with given fields: _a0 +func (_m *ReaderBatchWriter) AddCallback(_a0 func(error)) { + _m.Called(_a0) +} + +// GlobalReader provides a mock function with given fields: +func (_m *ReaderBatchWriter) GlobalReader() storage.Reader { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GlobalReader") + } + + var r0 storage.Reader + if rf, ok := ret.Get(0).(func() storage.Reader); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(storage.Reader) + } + } + + return r0 +} + +// Writer provides a mock function with given fields: +func (_m *ReaderBatchWriter) Writer() storage.Writer { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Writer") + } + + var r0 storage.Writer + if rf, ok := ret.Get(0).(func() storage.Writer); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(storage.Writer) + } + } + + return r0 +} + +// NewReaderBatchWriter creates a new instance of ReaderBatchWriter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewReaderBatchWriter(t interface { + mock.TestingT + Cleanup(func()) +}) *ReaderBatchWriter { + mock := &ReaderBatchWriter{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/storage/mock/writer.go b/storage/mock/writer.go new file mode 100644 index 00000000000..f80b206d39c --- /dev/null +++ b/storage/mock/writer.go @@ -0,0 +1,81 @@ +// Code generated by mockery v2.43.2. DO NOT EDIT. + +package mock + +import ( + storage "github.com/onflow/flow-go/storage" + mock "github.com/stretchr/testify/mock" +) + +// Writer is an autogenerated mock type for the Writer type +type Writer struct { + mock.Mock +} + +// Delete provides a mock function with given fields: key +func (_m *Writer) Delete(key []byte) error { + ret := _m.Called(key) + + if len(ret) == 0 { + panic("no return value specified for Delete") + } + + var r0 error + if rf, ok := ret.Get(0).(func([]byte) error); ok { + r0 = rf(key) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// DeleteByRange provides a mock function with given fields: globalReader, startPrefix, endPrefix +func (_m *Writer) DeleteByRange(globalReader storage.Reader, startPrefix []byte, endPrefix []byte) error { + ret := _m.Called(globalReader, startPrefix, endPrefix) + + if len(ret) == 0 { + panic("no return value specified for DeleteByRange") + } + + var r0 error + if rf, ok := ret.Get(0).(func(storage.Reader, []byte, []byte) error); ok { + r0 = rf(globalReader, startPrefix, endPrefix) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Set provides a mock function with given fields: k, v +func (_m *Writer) Set(k []byte, v []byte) error { + ret := _m.Called(k, v) + + if len(ret) == 0 { + panic("no return value specified for Set") + } + + var r0 error + if rf, ok := ret.Get(0).(func([]byte, []byte) error); ok { + r0 = rf(k, v) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewWriter creates a new instance of Writer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewWriter(t interface { + mock.TestingT + Cleanup(func()) +}) *Writer { + mock := &Writer{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/storage/operation/prefix.go b/storage/operation/prefix.go new file mode 100644 index 00000000000..19363c9656f --- /dev/null +++ b/storage/operation/prefix.go @@ -0,0 +1,150 @@ +//nolint:golint,unused +package operation + +import ( + "encoding/binary" + "fmt" + + "github.com/onflow/flow-go/model/flow" +) + +const ( + + // codes for special database markers + // codeMax = 1 // deprecated + codeDBType = 2 // specifies a database type + + // codes for views with special meaning + codeSafetyData = 10 // safety data for hotstuff state + codeLivenessData = 11 // liveness data for hotstuff state + + // codes for fields associated with the root state + codeSporkID = 13 + codeProtocolVersion = 14 + codeEpochCommitSafetyThreshold = 15 + codeSporkRootBlockHeight = 16 + + // code for heights with special meaning + codeFinalizedHeight = 20 // latest finalized block height + codeSealedHeight = 21 // latest sealed block height + codeClusterHeight = 22 // latest finalized height on cluster + codeExecutedBlock = 23 // latest executed block with max height + codeFinalizedRootHeight = 24 // the height of the highest finalized block contained in the root snapshot + codeLastCompleteBlockHeight = 25 // the height of the last block for which all collections were received + codeEpochFirstHeight = 26 // the height of the first block in a given epoch + codeSealedRootHeight = 27 // the height of the highest sealed block contained in the root snapshot + + // codes for single entity storage + codeHeader = 30 + _ = 31 // DEPRECATED: 31 was used for identities before epochs + codeGuarantee = 32 + codeSeal = 33 + codeTransaction = 34 + codeCollection = 35 + codeExecutionResult = 36 + codeResultApproval = 37 + codeChunk = 38 + codeExecutionReceiptMeta = 39 // NOTE: prior to Mainnet25, this erroneously had the same value as codeExecutionResult (36) + + // codes for indexing single identifier by identifier/integer + codeHeightToBlock = 40 // index mapping height to block ID + codeBlockIDToLatestSealID = 41 // index mapping a block its last payload seal + codeClusterBlockToRefBlock = 42 // index cluster block ID to reference block ID + codeRefHeightToClusterBlock = 43 // index reference block height to cluster block IDs + codeBlockIDToFinalizedSeal = 44 // index _finalized_ seal by sealed block ID + codeBlockIDToQuorumCertificate = 45 // index of quorum certificates by block ID + codeEpochProtocolStateByBlockID = 46 // index of epoch protocol state entry by block ID + codeProtocolKVStoreByBlockID = 47 // index of protocol KV store entry by block ID + + // codes for indexing multiple identifiers by identifier + codeBlockChildren = 50 // index mapping block ID to children blocks + _ = 51 // DEPRECATED: 51 was used for identity indexes before epochs + codePayloadGuarantees = 52 // index mapping block ID to payload guarantees + codePayloadSeals = 53 // index mapping block ID to payload seals + codeCollectionBlock = 54 // index mapping collection ID to block ID + codeOwnBlockReceipt = 55 // index mapping block ID to execution receipt ID for execution nodes + _ = 56 // DEPRECATED: 56 was used for block->epoch status prior to Dynamic Protocol State in Mainnet25 + codePayloadReceipts = 57 // index mapping block ID to payload receipts + codePayloadResults = 58 // index mapping block ID to payload results + codeAllBlockReceipts = 59 // index mapping of blockID to multiple receipts + codePayloadProtocolStateID = 60 // index mapping block ID to payload protocol state ID + + // codes related to protocol level information + codeEpochSetup = 61 // EpochSetup service event, keyed by ID + codeEpochCommit = 62 // EpochCommit service event, keyed by ID + codeBeaconPrivateKey = 63 // BeaconPrivateKey, keyed by epoch counter + codeDKGStarted = 64 // flag that the DKG for an epoch has been started + codeDKGEnded = 65 // flag that the DKG for an epoch has ended (stores end state) + codeVersionBeacon = 67 // flag for storing version beacons + codeEpochProtocolState = 68 + codeProtocolKVStore = 69 + + // code for ComputationResult upload status storage + // NOTE: for now only GCP uploader is supported. When other uploader (AWS e.g.) needs to + // be supported, we will need to define new code. + codeComputationResults = 66 + + // job queue consumers and producers + codeJobConsumerProcessed = 70 + codeJobQueue = 71 + codeJobQueuePointer = 72 + + // legacy codes (should be cleaned up) + codeChunkDataPack = 100 + codeCommit = 101 + codeEvent = 102 + codeExecutionStateInteractions = 103 + codeTransactionResult = 104 + codeFinalizedCluster = 105 + codeServiceEvent = 106 + codeTransactionResultIndex = 107 + codeLightTransactionResult = 108 + codeLightTransactionResultIndex = 109 + codeTransactionResultErrorMessage = 110 + codeTransactionResultErrorMessageIndex = 111 + codeIndexCollection = 200 + codeIndexExecutionResultByBlock = 202 + codeIndexCollectionByTransaction = 203 + codeIndexResultApprovalByChunk = 204 + + // TEMPORARY codes + blockedNodeIDs = 205 // manual override for adding node IDs to list of ejected nodes, applies to networking layer only + + // internal failure information that should be preserved across restarts + codeExecutionFork = 254 + codeEpochEmergencyFallbackTriggered = 255 +) + +func MakePrefix(code byte, keys ...interface{}) []byte { + prefix := make([]byte, 1) + prefix[0] = code + for _, key := range keys { + prefix = append(prefix, KeyPartToBytes(key)...) + } + return prefix +} + +func KeyPartToBytes(v interface{}) []byte { + switch i := v.(type) { + case uint8: + return []byte{i} + case uint32: + b := make([]byte, 4) + binary.BigEndian.PutUint32(b, i) + return b + case uint64: + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, i) + return b + case string: + return []byte(i) + case flow.Role: + return []byte{byte(i)} + case flow.Identifier: + return i[:] + case flow.ChainID: + return []byte(i) + default: + panic(fmt.Sprintf("unsupported type to convert (%T)", v)) + } +} diff --git a/storage/operation/reads.go b/storage/operation/reads.go index 6c14102bbc3..dba9b0835d5 100644 --- a/storage/operation/reads.go +++ b/storage/operation/reads.go @@ -9,6 +9,7 @@ import ( "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/utils/merr" ) // CheckFunc is a function that checks if the value should be read and decoded. @@ -51,7 +52,7 @@ func IterateKeysByPrefixRange(r storage.Reader, startPrefix []byte, endPrefix [] // IterateKeys will iterate over all entries in the database, where the key starts with a prefixes in // the range [startPrefix, endPrefix] (both inclusive). // No errors expected during normal operations. -func IterateKeys(r storage.Reader, startPrefix []byte, endPrefix []byte, iterFunc IterationFunc, opt storage.IteratorOption) error { +func IterateKeys(r storage.Reader, startPrefix []byte, endPrefix []byte, iterFunc IterationFunc, opt storage.IteratorOption) (errToReturn error) { if len(startPrefix) == 0 { return fmt.Errorf("startPrefix prefix is empty") } @@ -69,7 +70,9 @@ func IterateKeys(r storage.Reader, startPrefix []byte, endPrefix []byte, iterFun if err != nil { return fmt.Errorf("can not create iterator: %w", err) } - defer it.Close() + defer func() { + errToReturn = merr.CloseAndMergeError(it, errToReturn) + }() for it.First(); it.Valid(); it.Next() { item := it.IterItem() @@ -130,7 +133,7 @@ func TraverseByPrefix(r storage.Reader, prefix []byte, iterFunc IterationFunc, o // When this returned function is executed (and only then), it will write into the `keyExists` whether // the key exists. // No errors are expected during normal operation. -func KeyExists(r storage.Reader, key []byte) (bool, error) { +func KeyExists(r storage.Reader, key []byte) (exist bool, errToReturn error) { _, closer, err := r.Get(key) if err != nil { // the key does not exist in the database @@ -140,7 +143,9 @@ func KeyExists(r storage.Reader, key []byte) (bool, error) { // exception while checking for the key return false, irrecoverable.NewExceptionf("could not load data: %w", err) } - defer closer.Close() + defer func() { + errToReturn = merr.CloseAndMergeError(closer, errToReturn) + }() // the key does exist in the database return true, nil @@ -153,13 +158,15 @@ func KeyExists(r storage.Reader, key []byte) (bool, error) { // - storage.ErrNotFound if the key does not exist in the database // - generic error in case of unexpected failure from the database layer, or failure // to decode an existing database value -func RetrieveByKey(r storage.Reader, key []byte, entity interface{}) error { +func RetrieveByKey(r storage.Reader, key []byte, entity interface{}) (errToReturn error) { val, closer, err := r.Get(key) if err != nil { return err } - defer closer.Close() + defer func() { + errToReturn = merr.CloseAndMergeError(closer, errToReturn) + }() err = msgpack.Unmarshal(val, entity) if err != nil { @@ -172,7 +179,7 @@ func RetrieveByKey(r storage.Reader, key []byte, entity interface{}) error { // keys with the format prefix` + `height` (where "+" denotes concatenation of binary strings). The height // is encoded as Big-Endian (entries with numerically smaller height have lexicographically smaller key). // The function finds the *highest* key with the given prefix and height equal to or below the given height. -func FindHighestAtOrBelowByPrefix(r storage.Reader, prefix []byte, height uint64, entity interface{}) error { +func FindHighestAtOrBelowByPrefix(r storage.Reader, prefix []byte, height uint64, entity interface{}) (errToReturn error) { if len(prefix) == 0 { return fmt.Errorf("prefix must not be empty") } @@ -182,7 +189,9 @@ func FindHighestAtOrBelowByPrefix(r storage.Reader, prefix []byte, height uint64 if err != nil { return fmt.Errorf("can not create iterator: %w", err) } - defer it.Close() + defer func() { + errToReturn = merr.CloseAndMergeError(it, errToReturn) + }() var highestKey []byte @@ -203,7 +212,9 @@ func FindHighestAtOrBelowByPrefix(r storage.Reader, prefix []byte, height uint64 return err } - defer closer.Close() + defer func() { + errToReturn = merr.CloseAndMergeError(closer, errToReturn) + }() err = msgpack.Unmarshal(val, entity) if err != nil { diff --git a/storage/operation/writes.go b/storage/operation/writes.go index 590526b4686..11b4fdf9d29 100644 --- a/storage/operation/writes.go +++ b/storage/operation/writes.go @@ -10,61 +10,55 @@ import ( "github.com/onflow/flow-go/storage" ) -// Upsert will encode the given entity using msgpack and will insert the resulting +// UpsertByKey will encode the given entity using msgpack and will insert the resulting // binary data under the provided key. // If the key already exists, the value will be overwritten. // Error returns: // - generic error in case of unexpected failure from the database layer or // encoding failure. -func Upsert(key []byte, val interface{}) func(storage.Writer) error { - return func(w storage.Writer) error { - value, err := msgpack.Marshal(val) - if err != nil { - return irrecoverable.NewExceptionf("failed to encode value: %w", err) - } - - err = w.Set(key, value) - if err != nil { - return irrecoverable.NewExceptionf("failed to store data: %w", err) - } +func UpsertByKey(w storage.Writer, key []byte, val interface{}) error { + value, err := msgpack.Marshal(val) + if err != nil { + return irrecoverable.NewExceptionf("failed to encode value: %w", err) + } - return nil + err = w.Set(key, value) + if err != nil { + return irrecoverable.NewExceptionf("failed to store data: %w", err) } + + return nil } -// Remove removes the entity with the given key, if it exists. If it doesn't +// RemoveByKey removes the entity with the given key, if it exists. If it doesn't // exist, this is a no-op. // Error returns: // * generic error in case of unexpected database error -func Remove(key []byte) func(storage.Writer) error { - return func(w storage.Writer) error { - err := w.Delete(key) - if err != nil { - return irrecoverable.NewExceptionf("could not delete item: %w", err) - } - return nil +func RemoveByKey(w storage.Writer, key []byte) error { + err := w.Delete(key) + if err != nil { + return irrecoverable.NewExceptionf("could not delete item: %w", err) } + return nil } -// RemoveByPrefix removes all keys with the given prefix +// RemoveByKeyPrefix removes all keys with the given prefix // Error returns: // * generic error in case of unexpected database error -func RemoveByPrefix(reader storage.Reader, key []byte) func(storage.Writer) error { - return RemoveByRange(reader, key, key) +func RemoveByKeyPrefix(reader storage.Reader, w storage.Writer, key []byte) error { + return RemoveByKeyRange(reader, w, key, key) } -// RemoveByRange removes all keys with a prefix that falls within the range [start, end], both inclusive. +// RemoveByKeyRange removes all keys with a prefix that falls within the range [start, end], both inclusive. // It returns error if endPrefix < startPrefix // no other errors are expected during normal operation -func RemoveByRange(reader storage.Reader, startPrefix []byte, endPrefix []byte) func(storage.Writer) error { - return func(w storage.Writer) error { - if bytes.Compare(startPrefix, endPrefix) > 0 { - return fmt.Errorf("startPrefix key must be less than or equal to endPrefix key") - } - err := w.DeleteByRange(reader, startPrefix, endPrefix) - if err != nil { - return irrecoverable.NewExceptionf("could not delete item: %w", err) - } - return nil +func RemoveByKeyRange(reader storage.Reader, w storage.Writer, startPrefix []byte, endPrefix []byte) error { + if bytes.Compare(startPrefix, endPrefix) > 0 { + return fmt.Errorf("startPrefix key must be less than or equal to endPrefix key") + } + err := w.DeleteByRange(reader, startPrefix, endPrefix) + if err != nil { + return irrecoverable.NewExceptionf("could not delete item: %w", err) } + return nil } diff --git a/storage/operation/writes_functors.go b/storage/operation/writes_functors.go new file mode 100644 index 00000000000..1ee182d040b --- /dev/null +++ b/storage/operation/writes_functors.go @@ -0,0 +1,33 @@ +package operation + +import "github.com/onflow/flow-go/storage" + +// Leo: This package includes deprecated functions that wraps the operation of writing to the database. +// They are needed because the original badger implementation is also implemented in the same wrapped function manner, +// since badger requires writes to be done in a transaction, which is stateful. +// Using these deprecated functions could minimize the changes during refactor and easier to review the changes. +// The simplified implementation of the functions are in the writes.go file, which are encouraged to be used instead. + +func Upsert(key []byte, val interface{}) func(storage.Writer) error { + return func(w storage.Writer) error { + return UpsertByKey(w, key, val) + } +} + +func Remove(key []byte) func(storage.Writer) error { + return func(w storage.Writer) error { + return RemoveByKey(w, key) + } +} + +func RemoveByPrefix(reader storage.Reader, key []byte) func(storage.Writer) error { + return func(w storage.Writer) error { + return RemoveByKeyPrefix(reader, w, key) + } +} + +func RemoveByRange(reader storage.Reader, startPrefix []byte, endPrefix []byte) func(storage.Writer) error { + return func(w storage.Writer) error { + return RemoveByKeyRange(reader, w, startPrefix, endPrefix) + } +} diff --git a/utils/merr/closer.go b/utils/merr/closer.go new file mode 100644 index 00000000000..bb59c143b47 --- /dev/null +++ b/utils/merr/closer.go @@ -0,0 +1,31 @@ +package merr + +import ( + "io" + + "github.com/hashicorp/go-multierror" +) + +// CloseAndMergeError close the closable and merge the closeErr with the given err into a multierror +// Note: when using this function in a defer function, don't use as below: +// func XXX() ( +// +// err error, +// ) { +// defer func() { +// // bad, because the definition of err might get overwritten by another deferred function +// err = closeAndMergeError(closable, err) +// }() +// +// Better to use as below: +// func XXX() ( +// +// errToReturn error, +// ) { +// defer func() { +// // good, because the error to returned is only updated here, and guaranteed to be returned +// errToReturn = closeAndMergeError(closable, errToReturn) +// }() +func CloseAndMergeError(closable io.Closer, err error) error { + return multierror.Append(err, closable.Close()).ErrorOrNil() +} diff --git a/utils/merr/closer_test.go b/utils/merr/closer_test.go new file mode 100644 index 00000000000..61015bbc8d6 --- /dev/null +++ b/utils/merr/closer_test.go @@ -0,0 +1,64 @@ +package merr + +import ( + "errors" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/storage" +) + +// mockCloser is a mock implementation of io.Closer +type mockCloser struct { + closeError error +} + +// Close is a mock implementation of io.Closer.Close +func (c *mockCloser) Close() error { + return c.closeError +} + +func TestCloseAndMergeError(t *testing.T) { + // Create a mock closer + closer := &mockCloser{} + + // Test case 1: no error + err := CloseAndMergeError(closer, nil) + require.Nil(t, err) + + // Test case 2: only original error + err = CloseAndMergeError(closer, errors.New("original error")) + require.Error(t, err) + require.Contains(t, err.Error(), "original error") + + // Test case 3: only close error + closer.closeError = errors.New("close error") + err = CloseAndMergeError(closer, nil) + require.Error(t, err) + require.Contains(t, err.Error(), "close error") + + // Test case 4: both original error and close error + err = CloseAndMergeError(closer, errors.New("original error")) + require.Error(t, err) + require.Contains(t, err.Error(), "original error") + require.Contains(t, err.Error(), "close error") + + // Test case 5: original error is storage.ErrNotFound + err = CloseAndMergeError(closer, fmt.Errorf("not found error: %w", storage.ErrNotFound)) + require.Error(t, err) + require.ErrorIs(t, err, storage.ErrNotFound) + + // Test case 6: close error is storage.ErrNotFound + closer.closeError = fmt.Errorf("not found error: %w", storage.ErrNotFound) + err = CloseAndMergeError(closer, nil) + require.Error(t, err) + require.True(t, errors.Is(err, storage.ErrNotFound)) + + // Test case 7: error check works with multierror + closer.closeError = fmt.Errorf("exception") + err = CloseAndMergeError(closer, fmt.Errorf("not found error: %w", storage.ErrNotFound)) + require.Error(t, err) + require.True(t, errors.Is(err, storage.ErrNotFound)) +}