Skip to content

Commit

Permalink
engine: Do not ignore+detach shards with failed Open/Init ops (#2464)
Browse files Browse the repository at this point in the history
  • Loading branch information
cthulhu-rider authored Aug 8, 2023
2 parents 91f716b + fddda3e commit 4a0198f
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 96 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Changelog for NeoFS Node
- Skip unexpected notary events on notary request parsing step (#2315)
- Session inactivity on object PUT request relay (#2460)
- Missing connection retries on IR node startup when the first configured mainnet RPC node is not in sync (#2474)
- Storage node no longer ignores unhealthy shards on startup (#2464)

### Removed
- Deprecated `morph.rpc_endpoint` SN and `morph.endpoint.client` IR config sections (#2400)
Expand Down
10 changes: 2 additions & 8 deletions pkg/local_object_storage/blobstor/control.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package blobstor

import (
"errors"
"fmt"

"go.uber.org/zap"
Expand All @@ -14,20 +13,15 @@ func (b *BlobStor) Open(readOnly bool) error {
for i := range b.storage {
err := b.storage[i].Storage.Open(readOnly)
if err != nil {
return err
return fmt.Errorf("open substorage %s: %w", b.storage[i].Storage.Type(), err)
}
}
return nil
}

// ErrInitBlobovniczas is returned when blobovnicza initialization fails.
var ErrInitBlobovniczas = errors.New("failure on blobovnicza initialization stage")

// Init initializes internal data structures and system resources.
//
// If BlobStor is already initialized, no action is taken.
//
// Returns wrapped ErrInitBlobovniczas on blobovnicza tree's initializaiton failure.
func (b *BlobStor) Init() error {
b.log.Debug("initializing...")

Expand All @@ -38,7 +32,7 @@ func (b *BlobStor) Init() error {
for i := range b.storage {
err := b.storage[i].Storage.Init()
if err != nil {
return fmt.Errorf("%w: %v", ErrInitBlobovniczas, err)
return fmt.Errorf("init substorage %s: %w", b.storage[i].Storage.Type(), err)
}
}
return nil
Expand Down
86 changes: 4 additions & 82 deletions pkg/local_object_storage/engine/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,11 @@ import (
"fmt"
"path/filepath"
"strings"
"sync"

"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
"go.uber.org/zap"
)

type shardInitError struct {
err error
id string
}

// Open opens all StorageEngine's components.
func (e *StorageEngine) Open() error {
return e.open()
Expand All @@ -26,41 +19,9 @@ func (e *StorageEngine) open() error {
e.mtx.Lock()
defer e.mtx.Unlock()

var wg sync.WaitGroup
var errCh = make(chan shardInitError, len(e.shards))

for id, sh := range e.shards {
wg.Add(1)
go func(id string, sh *shard.Shard) {
defer wg.Done()
if err := sh.Open(); err != nil {
errCh <- shardInitError{
err: err,
id: id,
}
}
}(id, sh.Shard)
}
wg.Wait()
close(errCh)

for res := range errCh {
if res.err != nil {
e.log.Error("could not open shard, closing and skipping",
zap.String("id", res.id),
zap.Error(res.err))

sh := e.shards[res.id]
delete(e.shards, res.id)

err := sh.Close()
if err != nil {
e.log.Error("could not close partially initialized shard",
zap.String("id", res.id),
zap.Error(res.err))
}

continue
if err := sh.Open(); err != nil {
return fmt.Errorf("open shard %s: %w", id, err)
}
}

Expand All @@ -72,51 +33,12 @@ func (e *StorageEngine) Init() error {
e.mtx.Lock()
defer e.mtx.Unlock()

var wg sync.WaitGroup
var errCh = make(chan shardInitError, len(e.shards))

for id, sh := range e.shards {
wg.Add(1)
go func(id string, sh *shard.Shard) {
defer wg.Done()
if err := sh.Init(); err != nil {
errCh <- shardInitError{
err: err,
id: id,
}
}
}(id, sh.Shard)
}
wg.Wait()
close(errCh)

for res := range errCh {
if res.err != nil {
if errors.Is(res.err, blobstor.ErrInitBlobovniczas) {
e.log.Error("could not initialize shard, closing and skipping",
zap.String("id", res.id),
zap.Error(res.err))

sh := e.shards[res.id]
delete(e.shards, res.id)

err := sh.Close()
if err != nil {
e.log.Error("could not close partially initialized shard",
zap.String("id", res.id),
zap.Error(res.err))
}

continue
}
return fmt.Errorf("could not initialize shard %s: %w", res.id, res.err)
if err := sh.Init(); err != nil {
return fmt.Errorf("init shard %s: %w", id, err)
}
}

if len(e.shards) == 0 {
return errors.New("failed initialization on all shards")
}

e.wg.Add(1)
go e.setModeLoop()

Expand Down
7 changes: 1 addition & 6 deletions pkg/local_object_storage/engine/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,18 +132,13 @@ func testEngineFailInitAndReload(t *testing.T, badDir string, errOnAdd bool, s [
}
}

e.mtx.RLock()
shardCount := len(e.shards)
e.mtx.RUnlock()
require.Equal(t, 0, shardCount)

require.NoError(t, os.Chmod(badDir, os.ModePerm))
require.NoError(t, e.Reload(ReConfiguration{
shards: map[string][]shard.Option{configID: s},
}))

e.mtx.RLock()
shardCount = len(e.shards)
shardCount := len(e.shards)
e.mtx.RUnlock()
require.Equal(t, 1, shardCount)
}
Expand Down

0 comments on commit 4a0198f

Please sign in to comment.