diff --git a/.golangci.yaml b/.golangci.yaml index 02ad861e..9a576cdc 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -19,10 +19,51 @@ run: linters: enable: - goimports + - goconst + - misspell + - errcheck + - gosimple + - govet + - ineffassign + - staticcheck + - unused + # - revive + - asasalint + - asciicheck + - bodyclose + # - contextcheck + - errname + - errorlint + - exportloopref + # - gocritic + # - godot + # - gomoddirectives + # - gosec + - nakedret + # - nilerr + # - nilnil + # - noctx + - nolintlint + # - prealloc + # - predeclared + - promlinter + # - reassign + - rowserrcheck + - sqlclosecheck + - tenv + # - testableexamples + # - thelper + - tparallel + - unconvert + # - unparam + - usestdlibvars + - wastedassign linters-settings: goimports: local-prefixes: github.com/streamnative/oxia + goconst: + ignore-tests: true issues: fix: true \ No newline at end of file diff --git a/cmd/client/cmd_test.go b/cmd/client/cmd_test.go index 51b2b0fa..60633354 100644 --- a/cmd/client/cmd_test.go +++ b/cmd/client/cmd_test.go @@ -77,11 +77,11 @@ func TestClientCmd(t *testing.T) { "\\{\"error\":\"binary flag was set but value is not valid base64\"\\}", "^$", }, - {"put-no-value", []string{"put", "-k", "k-put-np"}, "", put.ErrorExpectedKeyValueInconsistent, + {"put-no-value", []string{"put", "-k", "k-put-np"}, "", put.ErrExpectedKeyValueInconsistent, ".*", "Error: inconsistent flags; key and value flags must be in pairs", }, - {"put-no-key", []string{"put", "-v", "k-put-np"}, "", put.ErrorExpectedKeyValueInconsistent, + {"put-no-key", []string{"put", "-v", "k-put-np"}, "", put.ErrExpectedKeyValueInconsistent, ".*", "Error: inconsistent flags; key and value flags must be in pairs", }, @@ -89,7 +89,7 @@ func TestClientCmd(t *testing.T) { "\\{\"version\":\\{\"version_id\":7,\"created_timestamp\":\\d+,\"modified_timestamp\":\\d+\\,\"modifications_count\":0}\\}\\n{\"version\":\\{\"version_id\":8,\"created_timestamp\":\\d+,\"modified_timestamp\":\\d+\\,\"modifications_count\":0}\\}", "^$", }, - {"put-bad-binary-use", []string{"put", "-b"}, "", put.ErrorIncorrectBinaryFlagUse, + {"put-bad-binary-use", []string{"put", "-b"}, "", put.ErrIncorrectBinaryFlagUse, ".*", "Error: binary flag was set when config is being sourced from stdin", }, @@ -113,7 +113,7 @@ func TestClientCmd(t *testing.T) { "\\{\"binary\":true,\"value\":\"aGVsbG8y\",\"version\":\\{\"version_id\":6,\"created_timestamp\":\\d+,\"modified_timestamp\":\\d+\\,\"modifications_count\":0}\\}\\n{\"binary\":false,\"value\":\"c\",\"version\":\\{\"version_id\":4,\"created_timestamp\":\\d+,\"modified_timestamp\":\\d+\\,\"modifications_count\":0}\\}", "^$", }, - {"get-bad-binary-use", []string{"get", "-b"}, "", get.ErrorIncorrectBinaryFlagUse, + {"get-bad-binary-use", []string{"get", "-b"}, "", get.ErrIncorrectBinaryFlagUse, ".*", "Error: binary flag was set when config is being sourced from stdin", }, @@ -125,11 +125,11 @@ func TestClientCmd(t *testing.T) { "\\{\"keys\":\\[\"k-put\",\"k-put-binary-ok\"\\]\\}", "^$", }, - {"list-no-minimum", []string{"list", "--key-max", "XXY"}, "", list.ErrorExpectedRangeInconsistent, + {"list-no-minimum", []string{"list", "--key-max", "XXY"}, "", list.ErrExpectedRangeInconsistent, ".*", "Error: inconsistent flags; min and max flags must be in pairs", }, - {"list-no-maximum", []string{"list", "--key-min", "XXX"}, "", list.ErrorExpectedRangeInconsistent, + {"list-no-maximum", []string{"list", "--key-min", "XXX"}, "", list.ErrExpectedRangeInconsistent, ".*", "Error: inconsistent flags; min and max flags must be in pairs", }, @@ -169,7 +169,7 @@ func TestClientCmd(t *testing.T) { "\\{\\}", "^$", }, - {"delete-range-with-expected", []string{"delete", "--key-min", "q", "--key-max", "s", "-e", "0"}, "", delete.ErrorExpectedVersionInconsistent, + {"delete-range-with-expected", []string{"delete", "--key-min", "q", "--key-max", "s", "-e", "0"}, "", delete.ErrExpectedVersionInconsistent, ".*", "Error: inconsistent flags; zero or all keys must have an expected version", }, diff --git a/cmd/client/delete/cmd.go b/cmd/client/delete/cmd.go index e5f5f486..a7932c6d 100644 --- a/cmd/client/delete/cmd.go +++ b/cmd/client/delete/cmd.go @@ -28,8 +28,8 @@ import ( var ( Config = flags{} - ErrorExpectedVersionInconsistent = errors.New("inconsistent flags; zero or all keys must have an expected version") - ErrorExpectedRangeInconsistent = errors.New("inconsistent flags; min and max flags must be in pairs") + ErrExpectedVersionInconsistent = errors.New("inconsistent flags; zero or all keys must have an expected version") + ErrExpectedRangeInconsistent = errors.New("inconsistent flags; min and max flags must be in pairs") ) type flags struct { @@ -72,10 +72,10 @@ func exec(cmd *cobra.Command, args []string) error { func _exec(flags flags, in io.Reader, queue common.QueryQueue) error { if len(flags.keyMinimums) != len(flags.keyMaximums) { - return ErrorExpectedRangeInconsistent + return ErrExpectedRangeInconsistent } if len(flags.keys) != len(flags.expectedVersions) && len(flags.expectedVersions) > 0 { - return ErrorExpectedVersionInconsistent + return ErrExpectedVersionInconsistent } if len(flags.keyMinimums) > 0 || len(flags.keys) > 0 { for i, n := range flags.keyMinimums { diff --git a/cmd/client/delete/cmd_test.go b/cmd/client/delete/cmd_test.go index eb60e46e..09be6fb6 100644 --- a/cmd/client/delete/cmd_test.go +++ b/cmd/client/delete/cmd_test.go @@ -121,7 +121,7 @@ func Test_exec(t *testing.T) { flags{ expectedVersions: []int64{1}, }, - ErrorExpectedVersionInconsistent, + ErrExpectedVersionInconsistent, nil}, {"missing-version", "", @@ -129,7 +129,7 @@ func Test_exec(t *testing.T) { keys: []string{"x", "y"}, expectedVersions: []int64{1}, }, - ErrorExpectedVersionInconsistent, + ErrExpectedVersionInconsistent, nil}, {"range-no-max", "", @@ -137,7 +137,7 @@ func Test_exec(t *testing.T) { keyMinimums: []string{"a", "x"}, keyMaximums: []string{"y"}, }, - ErrorExpectedRangeInconsistent, + ErrExpectedRangeInconsistent, nil, }, {"range", @@ -160,7 +160,7 @@ func Test_exec(t *testing.T) { keyMinimums: []string{"a"}, keyMaximums: []string{"b", "y"}, }, - ErrorExpectedRangeInconsistent, + ErrExpectedRangeInconsistent, nil, }, {"stdin", diff --git a/cmd/client/get/cmd.go b/cmd/client/get/cmd.go index d166cb42..08525cc2 100644 --- a/cmd/client/get/cmd.go +++ b/cmd/client/get/cmd.go @@ -29,7 +29,7 @@ import ( var ( Config = flags{} - ErrorIncorrectBinaryFlagUse = errors.New("binary flag was set when config is being sourced from stdin") + ErrIncorrectBinaryFlagUse = errors.New("binary flag was set when config is being sourced from stdin") ) type flags struct { @@ -82,7 +82,7 @@ func _exec(flags flags, in io.Reader, queue common.QueryQueue) error { } } else { if flags.binaryValues { - return ErrorIncorrectBinaryFlagUse + return ErrIncorrectBinaryFlagUse } common.ReadStdin(in, Query{}, queue) } diff --git a/cmd/client/list/cmd.go b/cmd/client/list/cmd.go index d26356c8..011be6be 100644 --- a/cmd/client/list/cmd.go +++ b/cmd/client/list/cmd.go @@ -29,7 +29,7 @@ import ( var ( Config = flags{} - ErrorExpectedRangeInconsistent = errors.New("inconsistent flags; min and max flags must be in pairs") + ErrExpectedRangeInconsistent = errors.New("inconsistent flags; min and max flags must be in pairs") ) type flags struct { @@ -66,7 +66,7 @@ func exec(cmd *cobra.Command, args []string) error { func _exec(flags flags, in io.Reader, queue common.QueryQueue) error { if len(flags.keyMinimums) != len(flags.keyMaximums) { - return ErrorExpectedRangeInconsistent + return ErrExpectedRangeInconsistent } if len(flags.keyMinimums) > 0 { for i, n := range flags.keyMinimums { diff --git a/cmd/client/list/cmd_test.go b/cmd/client/list/cmd_test.go index 1c0d4b5f..9fb21f34 100644 --- a/cmd/client/list/cmd_test.go +++ b/cmd/client/list/cmd_test.go @@ -106,14 +106,14 @@ func Test_exec(t *testing.T) { flags{ keyMaximums: []string{"b"}, }, - ErrorExpectedRangeInconsistent, + ErrExpectedRangeInconsistent, nil}, {"range-no-max", "", flags{ keyMaximums: []string{"b"}, }, - ErrorExpectedRangeInconsistent, + ErrExpectedRangeInconsistent, nil}, } { t.Run(test.name, func(t *testing.T) { diff --git a/cmd/client/put/cmd.go b/cmd/client/put/cmd.go index 4bd5391c..4d549f63 100644 --- a/cmd/client/put/cmd.go +++ b/cmd/client/put/cmd.go @@ -29,10 +29,10 @@ import ( var ( Config = flags{} - ErrorExpectedKeyValueInconsistent = errors.New("inconsistent flags; key and value flags must be in pairs") - ErrorExpectedVersionInconsistent = errors.New("inconsistent flags; zero or all keys must have an expected version") - ErrorBase64ValueInvalid = errors.New("binary flag was set but value is not valid base64") - ErrorIncorrectBinaryFlagUse = errors.New("binary flag was set when config is being sourced from stdin") + ErrExpectedKeyValueInconsistent = errors.New("inconsistent flags; key and value flags must be in pairs") + ErrExpectedVersionInconsistent = errors.New("inconsistent flags; zero or all keys must have an expected version") + ErrBase64ValueInvalid = errors.New("binary flag was set but value is not valid base64") + ErrIncorrectBinaryFlagUse = errors.New("binary flag was set when config is being sourced from stdin") ) type flags struct { @@ -78,10 +78,10 @@ func exec(cmd *cobra.Command, args []string) error { func _exec(flags flags, in io.Reader, queue common.QueryQueue) error { if len(flags.keys) != len(flags.values) && (len(flags.values) > 0 || len(flags.keys) > 0) { - return ErrorExpectedKeyValueInconsistent + return ErrExpectedKeyValueInconsistent } if (len(flags.expectedVersions) > 0) && len(flags.keys) != len(flags.expectedVersions) { - return ErrorExpectedVersionInconsistent + return ErrExpectedVersionInconsistent } if len(flags.keys) > 0 { for i, k := range flags.keys { @@ -97,7 +97,7 @@ func _exec(flags flags, in io.Reader, queue common.QueryQueue) error { } } else { if flags.binaryValues { - return ErrorIncorrectBinaryFlagUse + return ErrIncorrectBinaryFlagUse } common.ReadStdin(in, Query{}, queue) } @@ -143,7 +143,7 @@ func convertValue(binary bool, value string) ([]byte, error) { decoded := make([]byte, int64(float64(len(value))*0.8)) _, err := base64.StdEncoding.Decode(decoded, []byte(value)) if err != nil { - return nil, ErrorBase64ValueInvalid + return nil, ErrBase64ValueInvalid } return decoded, nil } else { diff --git a/cmd/client/put/cmd_test.go b/cmd/client/put/cmd_test.go index ddb312bc..1fd2e3bb 100644 --- a/cmd/client/put/cmd_test.go +++ b/cmd/client/put/cmd_test.go @@ -88,14 +88,14 @@ func Test_exec(t *testing.T) { flags{ values: []string{"y"}, }, - ErrorExpectedKeyValueInconsistent, + ErrExpectedKeyValueInconsistent, nil}, {"entry-no-value", "", flags{ keys: []string{"x"}, }, - ErrorExpectedKeyValueInconsistent, + ErrExpectedKeyValueInconsistent, nil}, {"entry-missing-version", "", @@ -104,7 +104,7 @@ func Test_exec(t *testing.T) { values: []string{"a", "b"}, expectedVersions: []int64{1}, }, - ErrorExpectedVersionInconsistent, + ErrExpectedVersionInconsistent, nil}, {"entry-binary", "", @@ -194,7 +194,7 @@ func Test_exec(t *testing.T) { flags{ binaryValues: true, }, - ErrorIncorrectBinaryFlagUse, + ErrIncorrectBinaryFlagUse, nil}, } { t.Run(test.name, func(t *testing.T) { @@ -312,7 +312,7 @@ func TestConvertValue(t *testing.T) { name: "invalid-binary", value: "hello", binary: true, - expectedErr: ErrorBase64ValueInvalid, + expectedErr: ErrBase64ValueInvalid, }, } { t.Run(test.name, func(t *testing.T) { diff --git a/common/batch/batcher.go b/common/batch/batcher.go index a4e3ed54..3d25f851 100644 --- a/common/batch/batcher.go +++ b/common/batch/batcher.go @@ -21,7 +21,7 @@ import ( "time" ) -var ErrorShuttingDown = errors.New("shutting down") +var ErrShuttingDown = errors.New("shutting down") type Batcher interface { io.Closer @@ -46,7 +46,7 @@ func (b *batcherImpl) Close() error { func (b *batcherImpl) Add(call any) { if b.closed.Load() { - b.failCall(call, ErrorShuttingDown) + b.failCall(call, ErrShuttingDown) } else { b.callC <- call } @@ -103,13 +103,13 @@ func (b *batcherImpl) Run() { case <-b.closeC: if batch != nil { timer.Stop() - batch.Fail(ErrorShuttingDown) + batch.Fail(ErrShuttingDown) batch = nil } for { select { case call := <-b.callC: - b.failCall(call, ErrorShuttingDown) + b.failCall(call, ErrShuttingDown) default: return } diff --git a/common/batch/batcher_test.go b/common/batch/batcher_test.go index 25ac8e2a..d8d064f9 100644 --- a/common/batch/batcher_test.go +++ b/common/batch/batcher_test.go @@ -52,7 +52,7 @@ func (b *testBatch) Complete() { func (b *testBatch) Fail(err error) { b.result <- err - //closeC(b.result) + // closeC(b.result) } func TestBatcher(t *testing.T) { @@ -65,7 +65,7 @@ func TestBatcher(t *testing.T) { }{ {"complete on maxRequestsPerBatch", 1 * time.Second, 1, false, nil}, {"complete on linger", 1 * time.Millisecond, 2, false, nil}, - {"fail on close", 1 * time.Second, 2, true, ErrorShuttingDown}, + {"fail on close", 1 * time.Second, 2, true, ErrShuttingDown}, } { t.Run(item.name, func(t *testing.T) { testBatch := newTestBatch() diff --git a/common/logger.go b/common/logger.go index f6c225fb..db84fe92 100644 --- a/common/logger.go +++ b/common/logger.go @@ -33,13 +33,13 @@ import ( const DefaultLogLevel = slog.LevelInfo var ( - // LogLevel Used for flags + // LogLevel Used for flags. LogLevel slog.Level - // LogJson Used for flags + // LogJson Used for flags. LogJson bool ) -// ParseLogLevel will convert the slog level configuration to slog.Level values +// ParseLogLevel will convert the slog level configuration to slog.Level values. func ParseLogLevel(levelStr string) (slog.Level, error) { switch { case strings.EqualFold(levelStr, slog.LevelDebug.String()): diff --git a/common/memoize.go b/common/memoize.go index 3ffdd958..ec77a3a6 100644 --- a/common/memoize.go +++ b/common/memoize.go @@ -29,7 +29,7 @@ type memoize[T any] struct { } // Memoize is used to cache the result of the invocation of a function -// for a certain amount of time +// for a certain amount of time. func Memoize[T any](provider func() T, cacheTime time.Duration) func() T { m := memoize[T]{ provider: provider, diff --git a/common/metrics/counter.go b/common/metrics/counter.go index 323bdeb2..747d906f 100644 --- a/common/metrics/counter.go +++ b/common/metrics/counter.go @@ -20,7 +20,7 @@ import ( "go.opentelemetry.io/otel/metric" ) -// Counter is a monotonically increasing counter +// Counter is a monotonically increasing counter. type Counter interface { Inc() Add(incr int) @@ -51,7 +51,7 @@ func NewCounter(name string, description string, unit Unit, labels map[string]an } // UpDownCounter is a counter that is incremented and decremented -// to report the current state +// to report the current state. type UpDownCounter interface { Counter Dec() diff --git a/common/metrics/metrics_test.go b/common/metrics/metrics_test.go index 366f159f..91bf00fa 100644 --- a/common/metrics/metrics_test.go +++ b/common/metrics/metrics_test.go @@ -30,6 +30,9 @@ func TestPrometheusMetrics(t *testing.T) { url := fmt.Sprintf("http://localhost:%d/metrics", metrics.Port()) response, err := http.Get(url) assert.NoError(t, err) + if response != nil && response.Body != nil { + defer response.Body.Close() + } assert.Equal(t, 200, response.StatusCode) @@ -42,7 +45,11 @@ func TestPrometheusMetrics(t *testing.T) { err = metrics.Close() assert.NoError(t, err) - response, err = http.Get(url) + response2, err := http.Get(url) assert.ErrorContains(t, err, "connection refused") - assert.Nil(t, response) + assert.Nil(t, response2) + + if response2 != nil && response2.Body != nil { + defer response2.Body.Close() + } } diff --git a/common/pprof.go b/common/pprof.go index eb4ed6c3..5117668b 100644 --- a/common/pprof.go +++ b/common/pprof.go @@ -31,7 +31,7 @@ var ( ) // DoWithLabels attaches the labels to the current go-routine Pprof context, -// for the duration of the call to f +// for the duration of the call to f. func DoWithLabels(labels map[string]string, f func()) { var l []string for k, v := range labels { diff --git a/coordinator/impl/cluster_rebalance.go b/coordinator/impl/cluster_rebalance.go index 95282627..190fe0ea 100644 --- a/coordinator/impl/cluster_rebalance.go +++ b/coordinator/impl/cluster_rebalance.go @@ -29,7 +29,7 @@ type SwapNodeAction struct { } // Make sure every server is assigned a similar number of shards -// Output a list of actions to be taken to rebalance the cluster +// Output a list of actions to be taken to rebalance the cluster. func rebalanceCluster(servers []model.ServerAddress, currentStatus *model.ClusterStatus) []SwapNodeAction { res := make([]SwapNodeAction, 0) diff --git a/coordinator/impl/coordinator.go b/coordinator/impl/coordinator.go index 4d058edd..6c0b140a 100644 --- a/coordinator/impl/coordinator.go +++ b/coordinator/impl/coordinator.go @@ -32,7 +32,7 @@ import ( ) var ( - ErrorNamespaceNotFound = errors.New("namespace not found") + ErrNamespaceNotFound = errors.New("namespace not found") ) type ShardAssignmentsProvider interface { @@ -110,7 +110,7 @@ func NewCoordinator(metadataProvider MetadataProvider, c.assignmentsChanged = common.NewConditionContext(c) c.clusterStatus, c.metadataVersion, err = metadataProvider.Get() - if err != nil && !errors.Is(err, ErrorMetadataNotInitialized) { + if err != nil && !errors.Is(err, ErrMetadataNotInitialized) { return nil, err } @@ -172,7 +172,7 @@ func (c *coordinator) waitForAllNodesToBeAvailable() { } } -// Assign the shards to the available servers +// Assign the shards to the available servers. func (c *coordinator) initialAssignment() error { c.log.Info( "Performing initial assignment", @@ -258,7 +258,7 @@ func (c *coordinator) InitiateLeaderElection(namespace string, shard int64, meta cs := c.clusterStatus.Clone() ns, ok := cs.Namespaces[namespace] if !ok { - return ErrorNamespaceNotFound + return ErrNamespaceNotFound } ns.Shards[shard] = metadata @@ -279,7 +279,7 @@ func (c *coordinator) ElectedLeader(namespace string, shard int64, metadata mode cs := c.clusterStatus.Clone() ns, ok := cs.Namespaces[namespace] if !ok { - return ErrorNamespaceNotFound + return ErrNamespaceNotFound } ns.Shards[shard] = metadata @@ -303,7 +303,7 @@ func (c *coordinator) ShardDeleted(namespace string, shard int64) error { cs := c.clusterStatus.Clone() ns, ok := cs.Namespaces[namespace] if !ok { - return ErrorNamespaceNotFound + return ErrNamespaceNotFound } delete(ns.Shards, shard) @@ -323,7 +323,7 @@ func (c *coordinator) ShardDeleted(namespace string, shard int64) error { return nil } -// This is called while already holding the lock on the coordinator +// This is called while already holding the lock on the coordinator. func (c *coordinator) computeNewAssignments() { c.assignments = &proto.ShardAssignments{ Namespaces: map[string]*proto.NamespaceShardsAssignment{}, diff --git a/coordinator/impl/coordinator_e2e_test.go b/coordinator/impl/coordinator_e2e_test.go index 69690c3d..95503770 100644 --- a/coordinator/impl/coordinator_e2e_test.go +++ b/coordinator/impl/coordinator_e2e_test.go @@ -312,7 +312,7 @@ func TestCoordinator_MultipleNamespaces(t *testing.T) { // Key will not be visible in other namespace res, _, err := clientNs1.Get(ctx, "my-key") - assert.ErrorIs(t, err, oxia.ErrorKeyNotFound) + assert.ErrorIs(t, err, oxia.ErrKeyNotFound) assert.Nil(t, res) version2, err := clientNs1.Put(ctx, "my-key", []byte("my-value-2")) diff --git a/coordinator/impl/k8s_client.go b/coordinator/impl/k8s_client.go index e46e55f1..e104c115 100644 --- a/coordinator/impl/k8s_client.go +++ b/coordinator/impl/k8s_client.go @@ -41,7 +41,7 @@ func NewK8SClientConfig() *rest.Config { } // -//func NewOxiaClientset(config *rest.Config) oxia.Interface { +// func NewOxiaClientset(config *rest.Config) oxia.Interface { // clientset, err := oxia.NewForConfig(config) // if err != nil { // log.Fatal().Err(err).Msg("failed to create client") diff --git a/coordinator/impl/metadata.go b/coordinator/impl/metadata.go index 4e8f8e2f..0ea1814c 100644 --- a/coordinator/impl/metadata.go +++ b/coordinator/impl/metadata.go @@ -25,8 +25,8 @@ import ( type Version string var ( - ErrorMetadataNotInitialized = errors.New("metadata not initialized") - ErrorMetadataBadVersion = errors.New("metadata bad version") + ErrMetadataNotInitialized = errors.New("metadata not initialized") + ErrMetadataBadVersion = errors.New("metadata bad version") ) const MetadataNotExists Version = "-1" diff --git a/coordinator/impl/metadata_configmap.go b/coordinator/impl/metadata_configmap.go index f286497d..25ece381 100644 --- a/coordinator/impl/metadata_configmap.go +++ b/coordinator/impl/metadata_configmap.go @@ -103,14 +103,14 @@ func (m *metadataProviderConfigMap) Store(status *model.ClusterStatus, expectedV } if version != expectedVersion { - err = ErrorMetadataBadVersion + err = ErrMetadataBadVersion return } data := configMap(m.name, status, expectedVersion) cm, err := K8SConfigMaps(m.kubernetes).Upsert(m.namespace, data) if k8sError.IsConflict(err) { - err = ErrorMetadataBadVersion + err = ErrMetadataBadVersion } version = Version(cm.ResourceVersion) m.metadataSize.Store(int64(len(data.Data["status"]))) diff --git a/coordinator/impl/metadata_file.go b/coordinator/impl/metadata_file.go index 876972ef..1d8c9416 100644 --- a/coordinator/impl/metadata_file.go +++ b/coordinator/impl/metadata_file.go @@ -27,7 +27,7 @@ import ( ) // MetadataProviderMemory is a provider that just keeps the cluster status in a local file, -// using a lock mechanism to prevent missing updates +// using a lock mechanism to prevent missing updates. type metadataProviderFile struct { path string fileLock *fslock.Lock @@ -101,7 +101,7 @@ func (m *metadataProviderFile) Store(cs *model.ClusterStatus, expectedVersion Ve } if expectedVersion != existingVersion { - return MetadataNotExists, ErrorMetadataBadVersion + return MetadataNotExists, ErrMetadataBadVersion } newVersion = incrVersion(existingVersion) diff --git a/coordinator/impl/metadata_memory.go b/coordinator/impl/metadata_memory.go index c7628fe5..00bba570 100644 --- a/coordinator/impl/metadata_memory.go +++ b/coordinator/impl/metadata_memory.go @@ -22,7 +22,7 @@ import ( ) // MetadataProviderMemory is a provider that just keeps the cluster status in memory -// Used for unit tests +// Used for unit tests. type metadataProviderMemory struct { sync.Mutex @@ -52,7 +52,7 @@ func (m *metadataProviderMemory) Store(cs *model.ClusterStatus, expectedVersion defer m.Unlock() if expectedVersion != m.version { - return MetadataNotExists, ErrorMetadataBadVersion + return MetadataNotExists, ErrMetadataBadVersion } m.cs = cs.Clone() diff --git a/coordinator/impl/metadata_test.go b/coordinator/impl/metadata_test.go index a6decfd4..84be6678 100644 --- a/coordinator/impl/metadata_test.go +++ b/coordinator/impl/metadata_test.go @@ -56,7 +56,7 @@ func TestMetadataProvider(t *testing.T) { newVersion, err := m.Store(&model.ClusterStatus{ Namespaces: map[string]model.NamespaceStatus{}, }, "") - assert.ErrorIs(t, err, ErrorMetadataBadVersion) + assert.ErrorIs(t, err, ErrMetadataBadVersion) assert.Equal(t, MetadataNotExists, newVersion) newVersion, err = m.Store(&model.ClusterStatus{ diff --git a/coordinator/impl/node_controller.go b/coordinator/impl/node_controller.go index 17bf6844..e1c8e872 100644 --- a/coordinator/impl/node_controller.go +++ b/coordinator/impl/node_controller.go @@ -45,7 +45,7 @@ const ( ) // The NodeController takes care of checking the health-status of each node -// and to push all the service discovery updates +// and to push all the service discovery updates. type NodeController interface { io.Closer diff --git a/coordinator/impl/shard_controller.go b/coordinator/impl/shard_controller.go index 1625d180..f89017a0 100644 --- a/coordinator/impl/shard_controller.go +++ b/coordinator/impl/shard_controller.go @@ -36,15 +36,15 @@ import ( const ( // When fencing quorum of servers, after we reach the majority, wait a bit more - // to include responses from all healthy servers + // to include responses from all healthy servers. quorumFencingGracePeriod = 100 * time.Millisecond - // Timeout when waiting for followers to catchup with leader + // Timeout when waiting for followers to catchup with leader. catchupTimeout = 5 * time.Minute ) // The ShardController is responsible to handle all the state transition for a given a shard -// e.g. electing a new leader +// e.g. electing a new leader. type ShardController interface { io.Closer @@ -423,7 +423,7 @@ func (s *shardController) newTermAndAddFollower(ctx context.Context, node model. } // Send NewTerm to all the ensemble members in parallel and wait for -// a majority of them to reply successfully +// a majority of them to reply successfully. func (s *shardController) newTermQuorum() (map[model.ServerAddress]*proto.EntryId, error) { timer := s.newTermQuorumLatency.Timer() @@ -723,7 +723,7 @@ func (s *shardController) SwapNode(from model.ServerAddress, to model.ServerAddr return nil } -// Check that all the followers in the ensemble are catching up with the leader +// Check that all the followers in the ensemble are catching up with the leader. func (s *shardController) waitForFollowersToCatchUp(ctx context.Context, leader model.ServerAddress, ensemble []model.ServerAddress) error { ctx, cancel := context.WithTimeout(ctx, catchupTimeout) defer cancel() diff --git a/coordinator/model/shard_status.go b/coordinator/model/shard_status.go index f9092167..7aa4a069 100644 --- a/coordinator/model/shard_status.go +++ b/coordinator/model/shard_status.go @@ -46,7 +46,7 @@ var toShardStatus = map[string]ShardStatus{ "Deleting": ShardStatusDeleting, } -// MarshalJSON marshals the enum as a quoted json string +// MarshalJSON marshals the enum as a quoted json string. func (s ShardStatus) MarshalJSON() ([]byte, error) { buffer := bytes.NewBufferString(`"`) buffer.WriteString(toString[s]) @@ -54,7 +54,7 @@ func (s ShardStatus) MarshalJSON() ([]byte, error) { return buffer.Bytes(), nil } -// UnmarshalJSON unmarshals a quoted json string to the enum value +// UnmarshalJSON unmarshals a quoted json string to the enum value. func (s *ShardStatus) UnmarshalJSON(b []byte) error { var j string if err := json.Unmarshal(b, &j); err != nil { diff --git a/maelstrom/messages.go b/maelstrom/messages.go index 4cda6958..2c90a404 100644 --- a/maelstrom/messages.go +++ b/maelstrom/messages.go @@ -32,7 +32,7 @@ const ( MsgTypeInit MsgType = "init" MsgTypeError MsgType = "error" - // Maelstrom "lin-kv" workload messages + // Maelstrom "lin-kv" workload messages. MsgTypeWrite MsgType = "write" MsgTypeWriteOk MsgType = "write_ok" @@ -41,7 +41,7 @@ const ( MsgTypeCas MsgType = "cas" MsgTypeCasOk MsgType = "cas_ok" - /* Oxia specific messages */ + /* Oxia specific messages. */ MsgTypeNewTermRequest MsgType = "term-req" MsgTypeNewTermResponse MsgType = "term-resp" diff --git a/maelstrom/replication_rpc_provider.go b/maelstrom/replication_rpc_provider.go index 845ef77e..6276257f 100644 --- a/maelstrom/replication_rpc_provider.go +++ b/maelstrom/replication_rpc_provider.go @@ -87,7 +87,7 @@ func (r *maelstromReplicationRpcProvider) SendSnapshot(ctx context.Context, foll panic("not implemented") } -// //////// ReplicateClient +// //////// ReplicateClient. type maelstromReplicateClient struct { BaseStream diff --git a/oxia/async_client_impl.go b/oxia/async_client_impl.go index fc2749de..cd02d45c 100644 --- a/oxia/async_client_impl.go +++ b/oxia/async_client_impl.go @@ -229,7 +229,7 @@ func (c *clientImpl) List(ctx context.Context, minKeyInclusive string, maxKeyExc for { response, err := client.Recv() - if err == io.EOF { + if errors.Is(err, io.EOF) { break } else if err != nil { ch <- ListResult{Err: err} diff --git a/oxia/async_client_impl_test.go b/oxia/async_client_impl_test.go index e15006ef..f468cfc9 100644 --- a/oxia/async_client_impl_test.go +++ b/oxia/async_client_impl_test.go @@ -72,7 +72,7 @@ func TestAsyncClientImpl(t *testing.T) { getResult = <-client.Get("/a") assert.Equal(t, GetResult{ - Err: ErrorKeyNotFound, + Err: ErrKeyNotFound, }, getResult) deleteRangeResult := <-client.DeleteRange("/c", "/d") @@ -80,7 +80,7 @@ func TestAsyncClientImpl(t *testing.T) { getResult = <-client.Get("/d") assert.Equal(t, GetResult{ - Err: ErrorKeyNotFound, + Err: ErrKeyNotFound, }, getResult) err = client.Close() @@ -249,7 +249,7 @@ func TestAsyncClientImpl_Sessions(t *testing.T) { "Get resulted in", slog.Any("res", res), ) - return errors.Is(res.Err, ErrorKeyNotFound) + return errors.Is(res.Err, ErrKeyNotFound) case <-time.After(1 * time.Second): assert.Fail(t, "Shouldn't have timed out") diff --git a/oxia/cache.go b/oxia/cache.go index 47cf16e0..f8ca7283 100644 --- a/oxia/cache.go +++ b/oxia/cache.go @@ -33,7 +33,7 @@ import ( // // The cached values are automatically updated when there are updates or // deletions. -// The cache is storing de-serialized object +// The cache is storing de-serialized object. type Cache[Value any] interface { io.Closer @@ -70,19 +70,19 @@ type Cache[Value any] interface { Get(ctx context.Context, key string) (Value, Version, error) } -// ModifyFunc is the transformation function to apply on ReadModifyUpdate +// ModifyFunc is the transformation function to apply on ReadModifyUpdate. type ModifyFunc[Value any] func(v Optional[Value]) (Value, error) -// SerializeFunc is the serialization function. eg: [json.Marshall] +// SerializeFunc is the serialization function. eg: [json.Marshall]. type SerializeFunc func(value any) ([]byte, error) -// DeserializeFunc is the deserialization function. eg: [json.Unmarshall] +// DeserializeFunc is the deserialization function. eg: [json.Unmarshall]. type DeserializeFunc func(data []byte, value any) error //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // NewCache creates a new cache object for a specific type -// Uses the `serializeFunc` and `deserializeFunc` for SerDe +// Uses the `serializeFunc` and `deserializeFunc` for SerDe. func NewCache[T any](client SyncClient, serializeFunc SerializeFunc, deserializeFunc DeserializeFunc) (Cache[T], error) { c, ok := client.(*syncClientImpl) if !ok { @@ -241,7 +241,7 @@ func (c *cacheImpl[Value]) Put(ctx context.Context, key string, value Value, opt } version, err := c.client.Put(ctx, key, data, options...) - if !errors.Is(err, ErrorUnexpectedVersionId) { + if !errors.Is(err, ErrUnexpectedVersionId) { c.valueCache.Del(key) } @@ -259,7 +259,7 @@ func (c *cacheImpl[Value]) Get(ctx context.Context, key string) (value Value, ve if cv, present := cachedValue.(cachedResult[Value]).Get(); present { return cv.value, cv.version, nil } else { - return value, version, ErrorKeyNotFound + return value, version, ErrKeyNotFound } } @@ -268,7 +268,7 @@ func (c *cacheImpl[Value]) Get(ctx context.Context, key string) (value Value, ve func (c *cacheImpl[Value]) load(ctx context.Context, key string) (value Value, version Version, err error) { data, existingVersion, err := c.client.Get(ctx, key) - if err == ErrorKeyNotFound { + if errors.Is(err, ErrKeyNotFound) { cr := empty[cachedResult[Value]]() c.valueCache.Set(key, cr, 0) return value, version, err @@ -297,7 +297,7 @@ func (c *cacheImpl[Value]) ReadModifyUpdate(ctx context.Context, key string, mod var versionId int64 existingValue, version, err := c.Get(ctx, key) - if err == ErrorKeyNotFound { + if errors.Is(err, ErrKeyNotFound) { optValue = empty[Value]() versionId = VersionIdNotExists } else if err != nil { @@ -314,7 +314,7 @@ func (c *cacheImpl[Value]) ReadModifyUpdate(ctx context.Context, key string, mod _, err = c.Put(ctx, key, newValue, ExpectedVersionId(versionId)) if err != nil { - if err == ErrorUnexpectedVersionId { + if errors.Is(err, ErrUnexpectedVersionId) { // Retry on conflict return err } else { diff --git a/oxia/cache_test.go b/oxia/cache_test.go index 7cefd34d..c903a49d 100644 --- a/oxia/cache_test.go +++ b/oxia/cache_test.go @@ -62,20 +62,20 @@ func TestCache_Empty(t *testing.T) { assert.NoError(t, err) value, version, err := cache.Get(context.Background(), "/non-existing-key") - assert.ErrorIs(t, ErrorKeyNotFound, err) + assert.ErrorIs(t, ErrKeyNotFound, err) assert.Equal(t, Version{}, version) assert.Equal(t, testStruct{}, value) value, version, err = cache.Get(context.Background(), "/non-existing-key/child") - assert.ErrorIs(t, ErrorKeyNotFound, err) + assert.ErrorIs(t, ErrKeyNotFound, err) assert.Equal(t, Version{}, version) assert.Equal(t, testStruct{}, value) err = cache.Delete(context.Background(), "/non-existing-key") - assert.ErrorIs(t, ErrorKeyNotFound, err) + assert.ErrorIs(t, ErrKeyNotFound, err) err = cache.Delete(context.Background(), "/non-existing-key/child") - assert.ErrorIs(t, ErrorKeyNotFound, err) + assert.ErrorIs(t, ErrKeyNotFound, err) assert.NoError(t, cache.Close()) assert.NoError(t, client.Close()) @@ -96,7 +96,7 @@ func TestCache_InsertionDeletion(t *testing.T) { v2 := testStruct{"hello", 2} version2, err := cache.Put(context.Background(), k1, v2, ExpectedRecordNotExists()) - assert.ErrorIs(t, err, ErrorUnexpectedVersionId) + assert.ErrorIs(t, err, ErrUnexpectedVersionId) assert.Equal(t, Version{}, version2) value, version, err := cache.Get(context.Background(), k1) diff --git a/oxia/client.go b/oxia/client.go index 8d8307a2..8dd38a06 100644 --- a/oxia/client.go +++ b/oxia/client.go @@ -23,23 +23,23 @@ import ( ) const ( - // VersionIdNotExists represent the VersionId of a non-existing record + // VersionIdNotExists represent the VersionId of a non-existing record. VersionIdNotExists int64 = -1 ) var ( - // ErrorKeyNotFound A record associated with the specified key was not found - ErrorKeyNotFound = errors.New("key not found") + // ErrKeyNotFound A record associated with the specified key was not found. + ErrKeyNotFound = errors.New("key not found") - // ErrorUnexpectedVersionId The expected version id passed as a condition does not match - // the current version id of the stored record - ErrorUnexpectedVersionId = errors.New("unexpected version id") + // ErrUnexpectedVersionId The expected version id passed as a condition does not match + // the current version id of the stored record. + ErrUnexpectedVersionId = errors.New("unexpected version id") - // ErrorRequestTooLarge is returned when a request is larger than the maximum batch size - ErrorRequestTooLarge = batch.ErrorRequestTooLarge + // ErrRequestTooLarge is returned when a request is larger than the maximum batch size. + ErrRequestTooLarge = batch.ErrRequestTooLarge - // ErrorUnknownStatus Unknown error - ErrorUnknownStatus = errors.New("unknown status") + // ErrUnknownStatus Unknown error. + ErrUnknownStatus = errors.New("unknown status") ) // AsyncClient Oxia client with methods suitable for asynchronous operations. @@ -155,7 +155,7 @@ type SyncClient interface { GetNotifications() (Notifications, error) } -// Version includes some information regarding the state of a record +// Version includes some information regarding the state of a record. type Version struct { // VersionId represents an identifier that can be used to refer to a particular version // of a record. @@ -186,7 +186,7 @@ type Version struct { } // PutResult structure is wrapping the version information for the result -// of a `Put` operation and an eventual error in the [AsyncClient] +// of a `Put` operation and an eventual error in the [AsyncClient]. type PutResult struct { // The Version information Version Version @@ -196,7 +196,7 @@ type PutResult struct { } // GetResult structure is wrapping a Value, its version information and -// an eventual error as results for a `Get` operation in the [AsyncClient] +// an eventual error as results for a `Get` operation in the [AsyncClient]. type GetResult struct { // Value is the value of the record Value []byte @@ -209,7 +209,7 @@ type GetResult struct { } // ListResult structure is wrapping a list of keys, and a potential error as -// results for a `List` operation in the [AsyncClient] +// results for a `List` operation in the [AsyncClient]. type ListResult struct { // The list of keys returned by [List] Keys []string @@ -218,7 +218,7 @@ type ListResult struct { } // Notifications allow applications to receive the feed of changes -// that are happening in the Oxia database +// that are happening in the Oxia database. type Notifications interface { io.Closer @@ -226,15 +226,15 @@ type Notifications interface { Ch() <-chan *Notification } -// NotificationType represents the type of the notification event +// NotificationType represents the type of the notification event. type NotificationType int const ( - // KeyCreated A record that didn't exist was created + // KeyCreated A record that didn't exist was created. KeyCreated NotificationType = iota - // KeyModified An existing record was modified + // KeyModified An existing record was modified. KeyModified - // KeyDeleted A record was deleted + // KeyDeleted A record was deleted. KeyDeleted ) @@ -251,7 +251,7 @@ func (n NotificationType) String() string { return "Unknown" } -// Notification represents one change in the Oxia database +// Notification represents one change in the Oxia database. type Notification struct { // The type of the modification Type NotificationType diff --git a/oxia/internal/batch/manager_test.go b/oxia/internal/batch/manager_test.go index b24cf0a9..7093e1de 100644 --- a/oxia/internal/batch/manager_test.go +++ b/oxia/internal/batch/manager_test.go @@ -23,7 +23,7 @@ import ( "github.com/streamnative/oxia/common/batch" ) -var closeErr = errors.New("closed") +var errClose = errors.New("closed") type testBatcher struct { closed bool @@ -31,7 +31,7 @@ type testBatcher struct { func (b *testBatcher) Close() error { b.closed = true - return closeErr + return errClose } func (b *testBatcher) Add(any) {} @@ -58,12 +58,12 @@ func TestManager(t *testing.T) { assert.Equal(t, 1, newBatcherInvocations) err := manager.Close() - assert.ErrorIs(t, err, closeErr) + assert.ErrorIs(t, err, errClose) assert.True(t, testBatcher.closed) _ = manager.Get(shardId) - //proves that the batcher was removed on Close - //as it had to recreate it on Get + // proves that the batcher was removed on Close + // as it had to recreate it on Get assert.Equal(t, 2, newBatcherInvocations) } diff --git a/oxia/internal/batch/read_batch.go b/oxia/internal/batch/read_batch.go index c82a0b41..4600cf09 100644 --- a/oxia/internal/batch/read_batch.go +++ b/oxia/internal/batch/read_batch.go @@ -16,6 +16,7 @@ package batch import ( "context" + "errors" "io" "log/slog" "time" @@ -121,7 +122,7 @@ func (b *readBatch) doRequest(ctx context.Context, request *proto.ReadRequest) ( for { recv, err := stream.Recv() - if err == io.EOF { + if errors.Is(err, io.EOF) { return response, nil } if err != nil { diff --git a/oxia/internal/batch/write_batch.go b/oxia/internal/batch/write_batch.go index 636190e0..24810403 100644 --- a/oxia/internal/batch/write_batch.go +++ b/oxia/internal/batch/write_batch.go @@ -29,7 +29,7 @@ import ( "github.com/streamnative/oxia/proto" ) -var ErrorRequestTooLarge = errors.New("put request is too large") +var ErrRequestTooLarge = errors.New("put request is too large") type writeBatchFactory struct { execute func(context.Context, *proto.WriteRequest) (*proto.WriteResponse, error) diff --git a/oxia/internal/shard_manager.go b/oxia/internal/shard_manager.go index 6188219c..6d87bcf1 100644 --- a/oxia/internal/shard_manager.go +++ b/oxia/internal/shard_manager.go @@ -210,7 +210,7 @@ func (s *shardManagerImpl) update(updates []Shard) { for _, update := range updates { if _, ok := s.shards[update.Id]; !ok { - //delete overlaps + // delete overlaps for shardId, existing := range s.shards { if overlap(update.HashRange, existing.HashRange) { s.logger.Info( diff --git a/oxia/notifications.go b/oxia/notifications.go index c92d7514..010e4d31 100644 --- a/oxia/notifications.go +++ b/oxia/notifications.go @@ -16,6 +16,7 @@ package oxia import ( "context" + "errors" "fmt" "io" "log/slog" @@ -107,7 +108,7 @@ func (nm *notifications) Close() error { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -// Manages the notifications for a specific shard +// Manages the notifications for a specific shard. type shardNotificationsManager struct { shard int64 ctx context.Context @@ -142,7 +143,7 @@ func newShardNotificationsManager(shard int64, nm *notifications) *shardNotifica func (snm *shardNotificationsManager) getNotificationsWithRetries() { _ = backoff.RetryNotify(snm.getNotifications, snm.backoff, func(err error, duration time.Duration) { - if err != context.Canceled { + if !errors.Is(err, context.Canceled) { snm.log.Error( "Error while getting notifications", slog.Any("error", err), diff --git a/oxia/optional.go b/oxia/optional.go index 82b86b5d..73e53ab0 100644 --- a/oxia/optional.go +++ b/oxia/optional.go @@ -14,7 +14,7 @@ package oxia -// Optional represents a wrapper for some value that can be present or not +// Optional represents a wrapper for some value that can be present or not. type Optional[T any] interface { // Present is true if the optional value is set Present() bool diff --git a/oxia/options.go b/oxia/options.go index 91f482d1..af961bbe 100644 --- a/oxia/options.go +++ b/oxia/options.go @@ -38,13 +38,13 @@ const ( ) var ( - ErrorInvalidOptionBatchLinger = errors.New("BatchLinger must be greater than or equal to zero") - ErrorInvalidOptionMaxRequestsPerBatch = errors.New("MaxRequestsPerBatch must be greater than zero") - ErrorInvalidOptionMaxBatchSize = errors.New("MaxBatchSize must be greater than zero") - ErrorInvalidOptionRequestTimeout = errors.New("RequestTimeout must be greater than zero") - ErrorInvalidOptionSessionTimeout = errors.New("SessionTimeout must be greater than zero") - ErrorInvalidOptionIdentity = errors.New("Identity must be non-empty") - ErrorInvalidOptionNamespace = errors.New("Namespace cannot be empty") + ErrInvalidOptionBatchLinger = errors.New("BatchLinger must be greater than or equal to zero") + ErrInvalidOptionMaxRequestsPerBatch = errors.New("MaxRequestsPerBatch must be greater than zero") + ErrInvalidOptionMaxBatchSize = errors.New("MaxBatchSize must be greater than zero") + ErrInvalidOptionRequestTimeout = errors.New("RequestTimeout must be greater than zero") + ErrInvalidOptionSessionTimeout = errors.New("SessionTimeout must be greater than zero") + ErrInvalidOptionIdentity = errors.New("Identity must be non-empty") + ErrInvalidOptionNamespace = errors.New("Namespace cannot be empty") ) // clientOptions contains options for the Oxia client. @@ -106,11 +106,11 @@ func (f clientOptionFunc) apply(c clientOptions) (clientOptions, error) { } // WithNamespace set the Oxia namespace to be used for this client. -// If not set, the client will be using the `default` namespace +// If not set, the client will be using the `default` namespace. func WithNamespace(namespace string) ClientOption { return clientOptionFunc(func(options clientOptions) (clientOptions, error) { if namespace == "" { - return options, ErrorInvalidOptionNamespace + return options, ErrInvalidOptionNamespace } options.namespace = namespace return options, nil @@ -122,7 +122,7 @@ func WithNamespace(namespace string) ClientOption { func WithBatchLinger(batchLinger time.Duration) ClientOption { return clientOptionFunc(func(options clientOptions) (clientOptions, error) { if batchLinger < 0 { - return options, ErrorInvalidOptionBatchLinger + return options, ErrInvalidOptionBatchLinger } options.batchLinger = batchLinger return options, nil @@ -134,7 +134,7 @@ func WithBatchLinger(batchLinger time.Duration) ClientOption { func WithMaxRequestsPerBatch(maxRequestsPerBatch int) ClientOption { return clientOptionFunc(func(options clientOptions) (clientOptions, error) { if maxRequestsPerBatch <= 0 { - return options, ErrorInvalidOptionMaxRequestsPerBatch + return options, ErrInvalidOptionMaxRequestsPerBatch } options.maxRequestsPerBatch = maxRequestsPerBatch return options, nil @@ -144,7 +144,7 @@ func WithMaxRequestsPerBatch(maxRequestsPerBatch int) ClientOption { func WithRequestTimeout(requestTimeout time.Duration) ClientOption { return clientOptionFunc(func(options clientOptions) (clientOptions, error) { if requestTimeout <= 0 { - return options, ErrorInvalidOptionRequestTimeout + return options, ErrInvalidOptionRequestTimeout } options.requestTimeout = requestTimeout return options, nil @@ -167,11 +167,11 @@ func WithGlobalMeterProvider() ClientOption { return WithMeterProvider(otel.GetMeterProvider()) } -// WithSessionTimeout specifies the session timeout to +// WithSessionTimeout specifies the session timeout to. func WithSessionTimeout(sessionTimeout time.Duration) ClientOption { return clientOptionFunc(func(options clientOptions) (clientOptions, error) { if sessionTimeout <= 0 { - return options, ErrorInvalidOptionSessionTimeout + return options, ErrInvalidOptionSessionTimeout } options.sessionTimeout = sessionTimeout return options, nil @@ -181,7 +181,7 @@ func WithSessionTimeout(sessionTimeout time.Duration) ClientOption { func WithIdentity(identity string) ClientOption { return clientOptionFunc(func(options clientOptions) (clientOptions, error) { if identity == "" { - return options, ErrorInvalidOptionIdentity + return options, ErrInvalidOptionIdentity } options.identity = identity return options, nil @@ -195,7 +195,7 @@ type putOptions struct { ephemeral bool } -// PutOption represents an option for the [SyncClient.Put] operation +// PutOption represents an option for the [SyncClient.Put] operation. type PutOption interface { applyPut(opts putOptions) putOptions } @@ -218,7 +218,7 @@ type deleteOptions struct { expectedVersion *int64 } -// DeleteOption represents an option for the [SyncClient.Delete] operation +// DeleteOption represents an option for the [SyncClient.Delete] operation. type DeleteOption interface { PutOption applyDelete(opts deleteOptions) deleteOptions @@ -233,7 +233,7 @@ func newDeleteOptions(opts []DeleteOption) deleteOptions { } // ExpectedVersionId Marks that the operation should only be successful -// if the versionId of the record stored in the server matches the expected one +// if the versionId of the record stored in the server matches the expected one. func ExpectedVersionId(versionId int64) DeleteOption { return &expectedVersionId{versionId} } diff --git a/oxia/options_test.go b/oxia/options_test.go index d3adcca5..ef0b36b6 100644 --- a/oxia/options_test.go +++ b/oxia/options_test.go @@ -37,7 +37,7 @@ func TestWithBatchLinger(t *testing.T) { expectedBatchLinger time.Duration expectedErr error }{ - {-1, DefaultBatchLinger, ErrorInvalidOptionBatchLinger}, + {-1, DefaultBatchLinger, ErrInvalidOptionBatchLinger}, {0, 0, nil}, {1, 1, nil}, } { @@ -53,8 +53,8 @@ func TestWithMaxRequestsPerBatch(t *testing.T) { expectedMaxRequestsPerBatch int expectedErr error }{ - {-1, DefaultMaxRequestsPerBatch, ErrorInvalidOptionMaxRequestsPerBatch}, - {0, DefaultMaxRequestsPerBatch, ErrorInvalidOptionMaxRequestsPerBatch}, + {-1, DefaultMaxRequestsPerBatch, ErrInvalidOptionMaxRequestsPerBatch}, + {0, DefaultMaxRequestsPerBatch, ErrInvalidOptionMaxRequestsPerBatch}, {1, 1, nil}, } { options, err := newClientOptions("serviceAddress", WithMaxRequestsPerBatch(item.maxRequestsPerBatch)) @@ -69,8 +69,8 @@ func TestWithRequestTimeout(t *testing.T) { expectedRequestTimeout time.Duration expectedErr error }{ - {-1, DefaultRequestTimeout, ErrorInvalidOptionRequestTimeout}, - {0, DefaultRequestTimeout, ErrorInvalidOptionRequestTimeout}, + {-1, DefaultRequestTimeout, ErrInvalidOptionRequestTimeout}, + {0, DefaultRequestTimeout, ErrInvalidOptionRequestTimeout}, {1, 1, nil}, } { options, err := newClientOptions("serviceAddress", WithRequestTimeout(item.requestTimeout)) diff --git a/oxia/proto_utils.go b/oxia/proto_utils.go index f07c5cd1..4ce63877 100644 --- a/oxia/proto_utils.go +++ b/oxia/proto_utils.go @@ -67,10 +67,10 @@ func toError(status proto.Status) error { case proto.Status_OK: return nil case proto.Status_UNEXPECTED_VERSION_ID: - return ErrorUnexpectedVersionId + return ErrUnexpectedVersionId case proto.Status_KEY_NOT_FOUND: - return ErrorKeyNotFound + return ErrKeyNotFound default: - return ErrorUnknownStatus + return ErrUnknownStatus } } diff --git a/oxia/sessions.go b/oxia/sessions.go index f7b2cb99..abd53f7a 100644 --- a/oxia/sessions.go +++ b/oxia/sessions.go @@ -112,7 +112,7 @@ func (cs *clientSession) executeWithId(callback func(int64, error)) { cs.Unlock() } case <-cs.ctx.Done(): - if cs.ctx.Err() != nil && cs.ctx.Err() != context.Canceled { + if cs.ctx.Err() != nil && !errors.Is(cs.ctx.Err(), context.Canceled) { callback(-1, cs.ctx.Err()) } } @@ -122,7 +122,7 @@ func (cs *clientSession) createSessionWithRetries() { backOff := common.NewBackOff(cs.ctx) err := backoff.RetryNotify(cs.createSession, backOff, func(err error, duration time.Duration) { - if err != context.Canceled { + if !errors.Is(err, context.Canceled) { cs.log.Error( "Error while creating session", slog.Any("error", err), @@ -131,7 +131,7 @@ func (cs *clientSession) createSessionWithRetries() { } }) - if err != nil && err != context.Canceled { + if err != nil && !errors.Is(err, context.Canceled) { cs.Lock() cs.started <- err close(cs.started) diff --git a/perf/perf.go b/perf/perf.go index b54d5637..3053c000 100644 --- a/perf/perf.go +++ b/perf/perf.go @@ -194,7 +194,7 @@ func (p *perf) generateReadTraffic(ctx context.Context, client oxia.AsyncClient, ch := client.Get(key) go func() { r := <-ch - if r.Err != nil && !errors.Is(r.Err, oxia.ErrorKeyNotFound) { + if r.Err != nil && !errors.Is(r.Err, oxia.ErrKeyNotFound) { slog.Warn( "Operation has failed", slog.Any("error", r.Err), diff --git a/server/follower_controller.go b/server/follower_controller.go index 4352bb7a..8909764a 100644 --- a/server/follower_controller.go +++ b/server/follower_controller.go @@ -34,7 +34,7 @@ import ( "github.com/streamnative/oxia/server/wal" ) -// FollowerController handles all the operations of a given shard's follower +// FollowerController handles all the operations of a given shard's follower. type FollowerController interface { io.Closer @@ -51,7 +51,7 @@ type FollowerController interface { // - send any entries to followers if it was a leader. // // Any existing follow cursors are destroyed as is any state - //regarding reconfigurations. + // regarding reconfigurations. NewTerm(req *proto.NewTermRequest) (*proto.NewTermResponse, error) // Truncate @@ -209,7 +209,7 @@ func (fc *followerController) closeStream(err error) { } func (fc *followerController) closeStreamNoMutex(err error) { - if err != nil && err != io.EOF && err != context.Canceled && status.Code(err) != codes.Canceled { + if err != nil && !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) && status.Code(err) != codes.Canceled { fc.log.Warn( "Error in handle Replicate stream", slog.Any("error", err), @@ -509,7 +509,7 @@ func (fc *followerController) processCommittedEntries(maxInclusive int64) error for reader.HasNext() { entry, err := reader.ReadNext() - if err == wal.ErrorReaderClosed { + if errors.Is(err, wal.ErrReaderClosed) { fc.log.Info("Stopped reading committed entries") return err } else if err != nil { diff --git a/server/follower_controller_test.go b/server/follower_controller_test.go index 0bbfee17..3f1869a1 100644 --- a/server/follower_controller_test.go +++ b/server/follower_controller_test.go @@ -158,7 +158,7 @@ func TestReadingUpToCommitOffset(t *testing.T) { stream := newMockServerReplicateStream() go func() { - //cancelled due to fc.Close() below + // cancelled due to fc.Close() below assert.ErrorIs(t, fc.Replicate(stream), context.Canceled) }() @@ -247,7 +247,7 @@ func TestFollower_AdvanceCommitOffsetToHead(t *testing.T) { stream := newMockServerReplicateStream() go func() { - //cancelled due to fc.Close() below + // cancelled due to fc.Close() below assert.ErrorIs(t, fc.Replicate(stream), context.Canceled) }() @@ -309,7 +309,7 @@ func TestFollower_NewTerm(t *testing.T) { // If a node is restarted, it might get the truncate request // when it's in the `NotMember` state. That is ok, provided // the request comes in the same term that the follower -// currently has +// currently has. func TestFollower_TruncateAfterRestart(t *testing.T) { var shardId int64 kvFactory, err := kv.NewPebbleKVFactory(&kv.KVFactoryOptions{DataDir: t.TempDir()}) @@ -415,7 +415,7 @@ func TestFollower_CommitOffsetLastEntry(t *testing.T) { stream := newMockServerReplicateStream() go func() { - //cancelled due to fc.Close() below + // cancelled due to fc.Close() below assert.ErrorIs(t, fc.Replicate(stream), context.Canceled) }() @@ -489,7 +489,7 @@ func TestFollowerController_RejectEntriesWithDifferentTerm(t *testing.T) { stream.AddRequest(createAddRequest(t, 5, 0, map[string]string{"a": "2", "b": "2"}, wal.InvalidOffset)) go func() { - //cancelled due to fc.Close() below + // cancelled due to fc.Close() below assert.ErrorIs(t, fc.Replicate(stream), context.Canceled) }() @@ -707,7 +707,7 @@ func TestFollower_DisconnectLeader(t *testing.T) { assert.Nil(t, fc.(*followerController).closeStreamWg) go func() { - //cancelled due to fc.Close() below + // cancelled due to fc.Close() below assert.ErrorIs(t, fc.Replicate(stream), context.Canceled) }() @@ -728,7 +728,7 @@ func TestFollower_DupEntries(t *testing.T) { stream := newMockServerReplicateStream() go func() { - //cancelled due to fc.Close() below + // cancelled due to fc.Close() below assert.ErrorIs(t, fc.Replicate(stream), context.Canceled) }() @@ -767,7 +767,7 @@ func TestFollowerController_DeleteShard(t *testing.T) { stream := newMockServerReplicateStream() go func() { - //cancelled due to fc.Close() below + // cancelled due to fc.Close() below assert.ErrorIs(t, fc.Replicate(stream), context.Canceled) }() @@ -860,7 +860,7 @@ func TestFollower_GetStatus(t *testing.T) { stream := newMockServerReplicateStream() go func() { - //cancelled due to fc.Close() below + // cancelled due to fc.Close() below assert.ErrorIs(t, fc.Replicate(stream), context.Canceled) }() diff --git a/server/follower_cursor.go b/server/follower_cursor.go index 3fc0c1e8..50dc81d0 100644 --- a/server/follower_cursor.go +++ b/server/follower_cursor.go @@ -16,6 +16,7 @@ package server import ( "context" + "errors" "fmt" "io" "log/slog" @@ -37,7 +38,7 @@ import ( // ReplicateStreamProvider // This is a provider for the ReplicateStream Grpc handler -// It's used to allow passing in a mocked version of the Grpc service +// It's used to allow passing in a mocked version of the Grpc service. type ReplicateStreamProvider interface { GetReplicateStream(ctx context.Context, follower string, namespace string, shard int64) (proto.OxiaLogReplication_ReplicateClient, error) SendSnapshot(ctx context.Context, follower string, namespace string, shard int64) (proto.OxiaLogReplication_SendSnapshotClient, error) @@ -391,7 +392,7 @@ func (fc *followerCursor) streamEntries() error { func (fc *followerCursor) receiveAcks(cancel context.CancelFunc, stream proto.OxiaLogReplication_ReplicateClient) { for { res, err := stream.Recv() - if err == io.EOF { + if errors.Is(err, io.EOF) { fc.log.Info("Ack stream finished") return } diff --git a/server/kv/db.go b/server/kv/db.go index 12a1b12d..107a607b 100644 --- a/server/kv/db.go +++ b/server/kv/db.go @@ -30,7 +30,7 @@ import ( "github.com/streamnative/oxia/server/wal" ) -var ErrorBadVersionId = errors.New("oxia: bad version id") +var ErrBadVersionId = errors.New("oxia: bad version id") const ( commitOffsetKey = common.InternalKeyPrefix + "commit-offset" @@ -319,7 +319,7 @@ func (d *db) ReadTerm() (term int64, err error) { func (d *db) applyPut(commitOffset int64, batch WriteBatch, notifications *notifications, putReq *proto.PutRequest, timestamp uint64, updateOperationCallback UpdateOperationCallback) (*proto.PutResponse, error) { se, err := checkExpectedVersionId(batch, putReq.Key, putReq.ExpectedVersionId) - if errors.Is(err, ErrorBadVersionId) { + if errors.Is(err, ErrBadVersionId) { return &proto.PutResponse{ Status: proto.Status_UNEXPECTED_VERSION_ID, }, nil @@ -395,7 +395,7 @@ func (d *db) applyDelete(batch WriteBatch, notifications *notifications, delReq defer se.ReturnToVTPool() } - if errors.Is(err, ErrorBadVersionId) { + if errors.Is(err, ErrBadVersionId) { return &proto.DeleteResponse{Status: proto.Status_UNEXPECTED_VERSION_ID}, nil } else if err != nil { return nil, errors.Wrap(err, "oxia db: failed to apply batch") @@ -462,7 +462,7 @@ func (d *db) applyDeleteRange(batch WriteBatch, notifications *notifications, de func applyGet(kv KV, getReq *proto.GetRequest) (*proto.GetResponse, error) { value, closer, err := kv.Get(getReq.Key) - if errors.Is(err, ErrorKeyNotFound) { + if errors.Is(err, ErrKeyNotFound) { return &proto.GetResponse{Status: proto.Status_KEY_NOT_FOUND}, nil } else if err != nil { return nil, errors.Wrap(err, "oxia db: failed to apply batch") @@ -523,12 +523,12 @@ func GetStorageEntry(batch WriteBatch, key string) (*proto.StorageEntry, error) func checkExpectedVersionId(batch WriteBatch, key string, expectedVersionId *int64) (*proto.StorageEntry, error) { se, err := GetStorageEntry(batch, key) if err != nil { - if errors.Is(err, ErrorKeyNotFound) { + if errors.Is(err, ErrKeyNotFound) { if expectedVersionId == nil || *expectedVersionId == -1 { // OK, we were checking that the key was not there, and it's indeed not there return nil, nil } else { - return nil, ErrorBadVersionId + return nil, ErrBadVersionId } } return nil, err @@ -536,7 +536,7 @@ func checkExpectedVersionId(batch WriteBatch, key string, expectedVersionId *int if expectedVersionId != nil && se.VersionId != *expectedVersionId { se.ReturnToVTPool() - return nil, ErrorBadVersionId + return nil, ErrBadVersionId } return se, nil diff --git a/server/kv/db_test.go b/server/kv/db_test.go index cc0f9d7b..43768e26 100644 --- a/server/kv/db_test.go +++ b/server/kv/db_test.go @@ -121,7 +121,7 @@ func TestDBSimple(t *testing.T) { // TODO: Add the request call and the verification /// Second batch - //req = &proto.WriteRequest{ + // req = &proto.WriteRequest{ // Puts: []*proto.PutRequest{ // { // Should succeed: no version check // Key: "a", diff --git a/server/kv/kv.go b/server/kv/kv.go index 09422834..77499a0e 100644 --- a/server/kv/kv.go +++ b/server/kv/kv.go @@ -21,7 +21,7 @@ import ( ) var ( - ErrorKeyNotFound = errors.New("oxia: key not found") + ErrKeyNotFound = errors.New("oxia: key not found") MaxSnapshotChunkSize int64 = 1024 * 1024 // bytes ) diff --git a/server/kv/kv_pebble.go b/server/kv/kv_pebble.go index 189225f1..2820f706 100644 --- a/server/kv/kv_pebble.go +++ b/server/kv/kv_pebble.go @@ -365,7 +365,7 @@ func (p *Pebble) NewWriteBatch() WriteBatch { func (p *Pebble) Get(key string) ([]byte, io.Closer, error) { value, closer, err := p.db.Get([]byte(key)) if errors.Is(err, pebble.ErrNotFound) { - err = ErrorKeyNotFound + err = ErrKeyNotFound } else if err != nil { p.readErrors.Inc() } @@ -466,7 +466,7 @@ func (b *PebbleBatch) Delete(key string) error { func (b *PebbleBatch) Get(key string) ([]byte, io.Closer, error) { value, closer, err := b.b.Get([]byte(key)) if errors.Is(err, pebble.ErrNotFound) { - err = ErrorKeyNotFound + err = ErrKeyNotFound } else if err != nil { b.p.readErrors.Inc() } @@ -691,8 +691,8 @@ func (ps *pebbleSnapshot) Chunk() (SnapshotChunk, error) { } return &pebbleSnapshotChunk{ ps.files[0], - int32(ps.chunkIndex), - int32(ps.chunkCount), + ps.chunkIndex, + ps.chunkCount, content}, nil } @@ -743,7 +743,7 @@ func (ps *pebbleSnapshot) NextChunkContent() ([]byte, error) { } content := make([]byte, MaxSnapshotChunkSize) byteCount, err := io.ReadFull(ps.file, content) - if err != nil && err != io.ErrUnexpectedEOF && err != io.EOF { + if err != nil && !errors.Is(err, io.ErrUnexpectedEOF) && !errors.Is(err, io.EOF) { return nil, err } if int64(byteCount) < MaxSnapshotChunkSize { diff --git a/server/kv/kv_pebble_test.go b/server/kv/kv_pebble_test.go index fe1473c4..85d38f39 100644 --- a/server/kv/kv_pebble_test.go +++ b/server/kv/kv_pebble_test.go @@ -60,7 +60,7 @@ func TestPebbbleSimple(t *testing.T) { assert.NoError(t, closer.Close()) res, closer, err = kv.Get("non-existing") - assert.ErrorIs(t, err, ErrorKeyNotFound) + assert.ErrorIs(t, err, ErrKeyNotFound) assert.Nil(t, res) assert.Nil(t, closer) @@ -84,7 +84,7 @@ func TestPebbbleSimple(t *testing.T) { assert.NoError(t, closer.Close()) res, closer, err = kv.Get("c") - assert.ErrorIs(t, err, ErrorKeyNotFound) + assert.ErrorIs(t, err, ErrKeyNotFound) assert.Nil(t, res) assert.Nil(t, closer) @@ -320,7 +320,7 @@ func TestPebbbleGetWithinBatch(t *testing.T) { assert.NoError(t, closer.Close()) value, closer, err = wb.Get("non-existent") - assert.ErrorIs(t, err, ErrorKeyNotFound) + assert.ErrorIs(t, err, ErrKeyNotFound) assert.Nil(t, value) assert.Nil(t, closer) @@ -345,7 +345,7 @@ func TestPebbbleGetWithinBatch(t *testing.T) { assert.NoError(t, wb.Delete("a")) value, closer, err = wb.Get("a") - assert.ErrorIs(t, err, ErrorKeyNotFound) + assert.ErrorIs(t, err, ErrKeyNotFound) assert.Nil(t, value) assert.Nil(t, closer) @@ -479,7 +479,7 @@ func TestPebbbleDeleteRangeInBatch(t *testing.T) { res, closer, err := kv.Get("/a/b/a/a") assert.Nil(t, res) assert.Nil(t, closer) - assert.ErrorIs(t, err, ErrorKeyNotFound) + assert.ErrorIs(t, err, ErrKeyNotFound) res, closer, err = kv.Get("/a/a/a/zzzzzz") assert.Equal(t, "/a/a/a/zzzzzz", string(res)) @@ -662,7 +662,7 @@ func TestPebbleSnapshot_Loader(t *testing.T) { } r, closer, err := kv2.Get("my-key") - assert.ErrorIs(t, err, ErrorKeyNotFound) + assert.ErrorIs(t, err, ErrKeyNotFound) assert.Nil(t, r) assert.Nil(t, closer) diff --git a/server/kv/notifications_trimmer.go b/server/kv/notifications_trimmer.go index 06486b5b..857891be 100644 --- a/server/kv/notifications_trimmer.go +++ b/server/kv/notifications_trimmer.go @@ -188,7 +188,7 @@ func (t *notificationsTrimmer) getFirstLast() (first, last int64, err error) { return first, last, nil } -// Perform binary search to find the highest entry that falls within the cutoff time +// Perform binary search to find the highest entry that falls within the cutoff time. func (t *notificationsTrimmer) binarySearch(firstOffset, lastOffset int64, cutoffTime time.Time) (int64, error) { for firstOffset < lastOffset { med := (firstOffset + lastOffset) / 2 diff --git a/server/public_rpc_server.go b/server/public_rpc_server.go index e63f1fd3..9ec3ce68 100644 --- a/server/public_rpc_server.go +++ b/server/public_rpc_server.go @@ -29,8 +29,8 @@ import ( ) const ( - maxTotalReadValueSize = 4 << (10 * 2) //4Mi - maxTotalListKeySize = 4 << (10 * 2) //4Mi + maxTotalReadValueSize = 4 << (10 * 2) // 4Mi + maxTotalListKeySize = 4 << (10 * 2) // 4Mi ) type publicRpcServer struct { diff --git a/server/quorum_ack_tracker.go b/server/quorum_ack_tracker.go index 35e08ee4..527483b6 100644 --- a/server/quorum_ack_tracker.go +++ b/server/quorum_ack_tracker.go @@ -27,8 +27,8 @@ import ( ) var ( - ErrorTooManyCursors = errors.New("too many cursors") - ErrorInvalidHeadOffset = errors.New("invalid head offset") + ErrTooManyCursors = errors.New("too many cursors") + ErrInvalidHeadOffset = errors.New("invalid head offset") ) // QuorumAckTracker @@ -37,7 +37,7 @@ var ( // - Commit offset: the oldest entry that is considered "fully committed", as it has received the requested amount // of acks from the followers // -// The quorum ack tracker is also used to block until the head offset or commit offset are advanced +// The quorum ack tracker is also used to block until the head offset or commit offset are advanced. type QuorumAckTracker interface { io.Closer @@ -196,11 +196,11 @@ func (q *quorumAckTracker) NewCursorAcker(ackOffset int64) (CursorAcker, error) defer q.Unlock() if uint32(q.cursorIdxGenerator) >= q.replicationFactor-1 { - return nil, ErrorTooManyCursors + return nil, ErrTooManyCursors } if ackOffset > q.headOffset.Load() { - return nil, ErrorInvalidHeadOffset + return nil, ErrInvalidHeadOffset } qa := &cursorAcker{ diff --git a/server/quorum_ack_tracker_test.go b/server/quorum_ack_tracker_test.go index bd3fb77f..be5b387b 100644 --- a/server/quorum_ack_tracker_test.go +++ b/server/quorum_ack_tracker_test.go @@ -139,7 +139,7 @@ func TestQuorumAckTrackerMaxCursors(t *testing.T) { assert.NotNil(t, c2) c3, err := at.NewCursorAcker(wal.InvalidOffset) - assert.ErrorIs(t, err, ErrorTooManyCursors) + assert.ErrorIs(t, err, ErrTooManyCursors) assert.Nil(t, c3) } @@ -230,7 +230,7 @@ func TestQuorumAckTracker_AddingCursors_RF3(t *testing.T) { c, err := at.NewCursorAcker(11) assert.Nil(t, c) - assert.ErrorIs(t, err, ErrorInvalidHeadOffset) + assert.ErrorIs(t, err, ErrInvalidHeadOffset) c1, err := at.NewCursorAcker(7) assert.NotNil(t, c1) @@ -255,7 +255,7 @@ func TestQuorumAckTracker_AddingCursors_RF5(t *testing.T) { c, err := at.NewCursorAcker(11) assert.Nil(t, c) - assert.ErrorIs(t, err, ErrorInvalidHeadOffset) + assert.ErrorIs(t, err, ErrInvalidHeadOffset) c1, err := at.NewCursorAcker(7) assert.NotNil(t, c1) diff --git a/server/server_test.go b/server/server_test.go index 5eb0f104..84a7d8fa 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -36,6 +36,9 @@ func TestNewServer(t *testing.T) { url := fmt.Sprintf("http://localhost:%d/metrics", server.metrics.Port()) response, err := http.Get(url) assert.NoError(t, err) + if response != nil && response.Body != nil { + defer response.Body.Close() + } assert.Equal(t, 200, response.StatusCode) diff --git a/server/session_manager.go b/server/session_manager.go index ef021838..61aef0f0 100644 --- a/server/session_manager.go +++ b/server/session_manager.go @@ -316,7 +316,7 @@ func (_ *updateCallback) OnPut(batch kv.WriteBatch, request *proto.PutRequest, e // We are adding an ephemeral value, let's check if the session exists var _, closer, err = batch.Get(SessionKey(SessionId(*sessionId))) if err != nil { - if errors.Is(err, kv.ErrorKeyNotFound) { + if errors.Is(err, kv.ErrKeyNotFound) { return proto.Status_SESSION_DOES_NOT_EXIST, nil } return proto.Status_SESSION_DOES_NOT_EXIST, err @@ -336,7 +336,7 @@ func (_ *updateCallback) OnPut(batch kv.WriteBatch, request *proto.PutRequest, e func deleteShadow(batch kv.WriteBatch, key string, existingEntry *proto.StorageEntry) (proto.Status, error) { existingSessionId := SessionId(*existingEntry.SessionId) err := batch.Delete(SessionKey(existingSessionId) + url.PathEscape(key)) - if err != nil && !errors.Is(err, kv.ErrorKeyNotFound) { + if err != nil && !errors.Is(err, kv.ErrKeyNotFound) { return proto.Status_SESSION_DOES_NOT_EXIST, err } return proto.Status_OK, nil @@ -346,7 +346,7 @@ func (_ *updateCallback) OnDelete(batch kv.WriteBatch, key string) error { se, err := kv.GetStorageEntry(batch, key) defer se.ReturnToVTPool() - if errors.Is(err, kv.ErrorKeyNotFound) { + if errors.Is(err, kv.ErrKeyNotFound) { return nil } if err == nil && se.SessionId != nil { diff --git a/server/session_manager_test.go b/server/session_manager_test.go index ba0bcada..7f041bd1 100644 --- a/server/session_manager_test.go +++ b/server/session_manager_test.go @@ -87,7 +87,7 @@ func (m mockWriteBatch) Delete(key string) error { func (m mockWriteBatch) Get(key string) ([]byte, io.Closer, error) { val, found := m[key] if !found { - return nil, nil, kv.ErrorKeyNotFound + return nil, nil, kv.ErrKeyNotFound } err, wasError := val.(error) if wasError { diff --git a/server/util/bitset.go b/server/util/bitset.go index 727f49b8..d3bfe685 100644 --- a/server/util/bitset.go +++ b/server/util/bitset.go @@ -22,7 +22,7 @@ import ( const MaxBitSetSize = 16 // BitSet -// Simplified and compact bitset +// Simplified and compact bitset. type BitSet struct { bits uint16 } diff --git a/server/util/stream_reader.go b/server/util/stream_reader.go index 38f10e46..f5ce1f10 100644 --- a/server/util/stream_reader.go +++ b/server/util/stream_reader.go @@ -15,6 +15,7 @@ package util import ( + "errors" "io" "log/slog" @@ -81,7 +82,7 @@ func (s *streamReader[T, U]) handleServerStream() { } func (s *streamReader[T, U]) close(err error) { - if err != nil && err != io.EOF && status.Code(err) != codes.Canceled { + if err != nil && !errors.Is(err, io.EOF) && status.Code(err) != codes.Canceled { s.log.Warn( "error while handling stream", slog.Any("error", err), diff --git a/server/wal/wal.go b/server/wal/wal.go index a532abb5..2ffa5873 100644 --- a/server/wal/wal.go +++ b/server/wal/wal.go @@ -25,10 +25,10 @@ import ( ) var ( - ErrorEntryNotFound = errors.New("oxia: entry not found") - ErrorOffsetOutOfBounds = errors.New("oxia: offset out of bounds") - ErrorReaderClosed = errors.New("oxia: reader already closed") - ErrorInvalidNextOffset = errors.New("oxia: invalid next offset in wal") + ErrEntryNotFound = errors.New("oxia: entry not found") + ErrOffsetOutOfBounds = errors.New("oxia: offset out of bounds") + ErrReaderClosed = errors.New("oxia: reader already closed") + ErrInvalidNextOffset = errors.New("oxia: invalid next offset in wal") InvalidTerm int64 = -1 InvalidOffset int64 = -1 diff --git a/server/wal/wal_impl.go b/server/wal/wal_impl.go index f1f95f6e..1295b470 100644 --- a/server/wal/wal_impl.go +++ b/server/wal/wal_impl.go @@ -383,7 +383,7 @@ func (t *wal) checkNextOffset(nextOffset int64) error { expectedOffset := lastAppendedOffset + 1 if lastAppendedOffset != InvalidOffset && nextOffset != expectedOffset { - return errors.Wrapf(ErrorInvalidNextOffset, + return errors.Wrapf(ErrInvalidNextOffset, "%d can not immediately follow %d", nextOffset, lastAppendedOffset) } return nil diff --git a/server/wal/wal_reader.go b/server/wal/wal_reader.go index d3d8e2a2..29dcbaca 100644 --- a/server/wal/wal_reader.go +++ b/server/wal/wal_reader.go @@ -24,7 +24,7 @@ func (t *wal) NewReader(after int64) (WalReader, error) { firstOffset := after + 1 if firstOffset < t.FirstOffset() { - return nil, ErrorEntryNotFound + return nil, ErrEntryNotFound } r := &forwardReader{ @@ -87,7 +87,7 @@ func (r *forwardReader) ReadNext() (*proto.LogEntry, error) { defer r.Unlock() if r.closed { - return nil, ErrorReaderClosed + return nil, ErrReaderClosed } index := r.nextOffset @@ -113,7 +113,7 @@ func (r *forwardReader) HasNext() bool { func (r *reverseReader) ReadNext() (*proto.LogEntry, error) { if r.closed { - return nil, ErrorReaderClosed + return nil, ErrReaderClosed } entry, err := r.wal.readAtIndex(r.nextOffset) diff --git a/server/wal/wal_ro_segment.go b/server/wal/wal_ro_segment.go index 3cae7a86..a868c800 100644 --- a/server/wal/wal_ro_segment.go +++ b/server/wal/wal_ro_segment.go @@ -117,7 +117,7 @@ func (ms *readonlySegment) LastOffset() int64 { func (ms *readonlySegment) Read(offset int64) ([]byte, error) { if offset < ms.baseOffset || offset > ms.lastOffset { - return nil, ErrorOffsetOutOfBounds + return nil, ErrOffsetOutOfBounds } fileOffset := fileOffset(ms.idxMappedFile, ms.baseOffset, offset) @@ -251,7 +251,7 @@ func (r *readOnlySegmentsGroup) Get(offset int64) (common.RefCount[ReadOnlySegme return res, nil } - return nil, ErrorOffsetOutOfBounds + return nil, ErrOffsetOutOfBounds } func (r *readOnlySegmentsGroup) TrimSegments(offset int64) error { diff --git a/server/wal/wal_ro_segment_test.go b/server/wal/wal_ro_segment_test.go index 1d602128..69ec8c49 100644 --- a/server/wal/wal_ro_segment_test.go +++ b/server/wal/wal_ro_segment_test.go @@ -45,11 +45,11 @@ func TestReadOnlySegment(t *testing.T) { data, err := ro.Read(100) assert.Nil(t, data) - assert.ErrorIs(t, err, ErrorOffsetOutOfBounds) + assert.ErrorIs(t, err, ErrOffsetOutOfBounds) data, err = ro.Read(-1) assert.Nil(t, data) - assert.ErrorIs(t, err, ErrorOffsetOutOfBounds) + assert.ErrorIs(t, err, ErrOffsetOutOfBounds) assert.NoError(t, ro.Close()) } diff --git a/server/wal/wal_rw_segment.go b/server/wal/wal_rw_segment.go index 4d1256a8..83cedb1c 100644 --- a/server/wal/wal_rw_segment.go +++ b/server/wal/wal_rw_segment.go @@ -130,7 +130,7 @@ func (ms *readWriteSegment) Append(offset int64, data []byte) error { defer ms.Unlock() if offset != ms.lastOffset+1 { - return ErrorInvalidNextOffset + return ErrInvalidNextOffset } entryOffset := ms.currentFileOffset @@ -210,7 +210,7 @@ func (ms *readWriteSegment) writeIndex() error { func (ms *readWriteSegment) Truncate(lastSafeOffset int64) error { if lastSafeOffset < ms.baseOffset || lastSafeOffset > ms.lastOffset { - return ErrorOffsetOutOfBounds + return ErrOffsetOutOfBounds } // Write zeroes in the section to clear diff --git a/server/wal/wal_rw_segment_test.go b/server/wal/wal_rw_segment_test.go index 7346d6f2..47d08cd7 100644 --- a/server/wal/wal_rw_segment_test.go +++ b/server/wal/wal_rw_segment_test.go @@ -77,11 +77,11 @@ func TestReadWriteSegment_NonZero(t *testing.T) { assert.EqualValues(t, 5, rw.BaseOffset()) assert.EqualValues(t, 6, rw.LastOffset()) - assert.ErrorIs(t, rw.Append(4, []byte("entry-4")), ErrorInvalidNextOffset) + assert.ErrorIs(t, rw.Append(4, []byte("entry-4")), ErrInvalidNextOffset) assert.EqualValues(t, 5, rw.BaseOffset()) assert.EqualValues(t, 6, rw.LastOffset()) - assert.ErrorIs(t, rw.Append(8, []byte("entry-8")), ErrorInvalidNextOffset) + assert.ErrorIs(t, rw.Append(8, []byte("entry-8")), ErrInvalidNextOffset) assert.EqualValues(t, 5, rw.BaseOffset()) assert.EqualValues(t, 6, rw.LastOffset()) diff --git a/server/wal/wal_test.go b/server/wal/wal_test.go index bce3d5e4..bf6fb879 100644 --- a/server/wal/wal_test.go +++ b/server/wal/wal_test.go @@ -148,7 +148,7 @@ func TestAppend(t *testing.T) { Offset: int64(88), Value: []byte("E"), }) - assert.ErrorIs(t, err, ErrorInvalidNextOffset) + assert.ErrorIs(t, err, ErrInvalidNextOffset) err = w.Close() assert.NoError(t, err) @@ -466,7 +466,7 @@ func TestTrim(t *testing.T) { // Test reading a trimmed offset r, err = w.NewReader(48) - assert.ErrorIs(t, err, ErrorEntryNotFound) + assert.ErrorIs(t, err, ErrEntryNotFound) assert.Nil(t, r) assert.NoError(t, w.Close()) diff --git a/server/wal/wal_trimmer.go b/server/wal/wal_trimmer.go index ebf5dcca..059681e0 100644 --- a/server/wal/wal_trimmer.go +++ b/server/wal/wal_trimmer.go @@ -162,7 +162,7 @@ func (t *trimmer) doTrim() error { return nil } -// Perform binary search to find the highest entry that falls within the cutoff time +// Perform binary search to find the highest entry that falls within the cutoff time. func (t *trimmer) binarySearch(firstOffset, lastOffset int64, cutoffTime time.Time) (int64, error) { for firstOffset < lastOffset { med := (firstOffset + lastOffset) / 2