Skip to content

Commit

Permalink
node: Make Peapod's flush interval configurable
Browse files Browse the repository at this point in the history
There may be a need to tune time interval b/w batch writes to disk in
Peapod component.

Add storage node's config with `flush_interval` key of type duration
defaulting to 10ms.

Signed-off-by: Leonard Lyubich <leonard@morphbits.io>
  • Loading branch information
cthulhu-rider committed Aug 15, 2023
1 parent c060b16 commit b01baf6
Show file tree
Hide file tree
Showing 11 changed files with 77 additions and 20 deletions.
3 changes: 2 additions & 1 deletion cmd/blobovnicza-to-peapod/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"path/filepath"
"strings"
"time"

"github.com/nspcc-dev/neofs-node/cmd/neofs-node/config"
engineconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine"
Expand Down Expand Up @@ -67,7 +68,7 @@ func main() {
}

ppdPath := filepath.Join(filepath.Dir(bbcz.Path()), "peapod.db")
ppd := peapod.New(ppdPath, perm)
ppd := peapod.New(ppdPath, perm, 10*time.Millisecond)

var compressCfg compression.Config
compressCfg.Enabled = sc.Compress()
Expand Down
16 changes: 12 additions & 4 deletions cmd/neofs-node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
shardconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard"
blobovniczaconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/blobovnicza"
fstreeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/fstree"
peapodconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/peapod"
loggerconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/logger"
metricsconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/metrics"
nodeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/node"
Expand Down Expand Up @@ -159,16 +160,21 @@ func (c *shardCfg) id() string {

type subStorageCfg struct {
// common for all storages
typ string
path string
perm fs.FileMode
typ string
path string
perm fs.FileMode

// tree-specific (FS and blobovnicza)
depth uint64
noSync bool

// blobovnicza-specific
size uint64
width uint64
openedCacheSize int

// Peapod-specific
flushInterval time.Duration
}

// readConfig fills applicationConfiguration with raw configuration values
Expand Down Expand Up @@ -266,6 +272,8 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
sCfg.depth = sub.Depth()
sCfg.noSync = sub.NoSync()
case peapod.Type:
peapodCfg := peapodconfig.From((*config.Config)(storagesCfg[i]))
sCfg.flushInterval = peapodCfg.FlushInterval()
default:
return fmt.Errorf("invalid storage type: %s", storagesCfg[i].Type())
}
Expand Down Expand Up @@ -736,7 +744,7 @@ func (c *cfg) shardOpts() []shardOptsWithID {
})
case peapod.Type:
ss = append(ss, blobstor.SubStorage{
Storage: peapod.New(sRead.path, sRead.perm),
Storage: peapod.New(sRead.path, sRead.perm, sRead.flushInterval),
Policy: func(_ *objectSDK.Object, data []byte) bool {
return uint64(len(data)) < shCfg.smallSizeObjectLimit
},
Expand Down
4 changes: 3 additions & 1 deletion cmd/neofs-node/config/engine/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
shardconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard"
blobovniczaconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/blobovnicza"
fstreeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/fstree"
peapodconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/peapod"
piloramaconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/pilorama"
configtest "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/test"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/peapod"
Expand Down Expand Up @@ -133,10 +134,11 @@ func TestEngineSection(t *testing.T) {
require.EqualValues(t, 102400, sc.SmallSizeLimit())

require.Equal(t, 2, len(ss))

ppd := peapodconfig.From((*config.Config)(ss[0]))
require.Equal(t, "tmp/1/blob/peapod.db", ss[0].Path())
require.EqualValues(t, 0644, ss[0].Perm())
require.EqualValues(t, peapod.Type, ss[0].Type())
require.EqualValues(t, 30*time.Millisecond, ppd.FlushInterval())

require.Equal(t, "tmp/1/blob", ss[1].Path())
require.EqualValues(t, 0644, ss[1].Perm())
Expand Down
34 changes: 34 additions & 0 deletions cmd/neofs-node/config/engine/shard/blobstor/peapod/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package peapodconfig

import (
"time"

"github.com/nspcc-dev/neofs-node/cmd/neofs-node/config"
)

// Config is a wrapper over the config section
// which provides access to Peapod configurations.
type Config config.Config

// Various Peapod config defaults.
const (
// DefaultFlushInterval is a default time interval between Peapod's batch writes
// to disk.
DefaultFlushInterval = 10 * time.Millisecond
)

// From wraps config section into Config.
func From(c *config.Config) *Config {
return (*Config)(c)
}

// FlushInterval returns the value of "flush_interval" config parameter.
//
// Returns DefaultFlushInterval if the value is not a positive duration.
func (x *Config) FlushInterval() time.Duration {
d := config.DurationSafe((*config.Config)(x), "flush_interval")
if d > 0 {
return d
}
return DefaultFlushInterval
}
1 change: 1 addition & 0 deletions config/example/node.env
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ NEOFS_STORAGE_SHARD_1_SMALL_OBJECT_SIZE=102400
NEOFS_STORAGE_SHARD_1_BLOBSTOR_0_TYPE=peapod
NEOFS_STORAGE_SHARD_1_BLOBSTOR_0_PATH=tmp/1/blob/peapod.db
NEOFS_STORAGE_SHARD_1_BLOBSTOR_0_PERM=0644
NEOFS_STORAGE_SHARD_1_BLOBSTOR_0_FLUSH_INTERVAL=30ms
### FSTree config
NEOFS_STORAGE_SHARD_1_BLOBSTOR_1_TYPE=fstree
NEOFS_STORAGE_SHARD_1_BLOBSTOR_1_PATH=tmp/1/blob
Expand Down
5 changes: 1 addition & 4 deletions config/example/node.json
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,7 @@
"type": "peapod",
"path": "tmp/1/blob/peapod.db",
"perm": "0644",
"size": 4194304,
"depth": 1,
"width": 4,
"opened_cache_capacity": 50
"flush_interval": "30ms"
},
{
"type": "fstree",
Expand Down
1 change: 1 addition & 0 deletions config/example/node.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ storage:
blobstor:
- type: peapod
path: tmp/1/blob/peapod.db # path to Peapod database
flush_interval: 30ms # time interval between batch writes to disk (defaults to 10ms)
- type: fstree
path: tmp/1/blob # blobstor path
no_sync: true
Expand Down
1 change: 1 addition & 0 deletions docs/storage-node-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ blobstor:
|---------------------|-----------|---------------|-------------------------------------------------------|
| `path` | `string` | | Path to the Peapod database file. |
| `perm` | file mode | `0660` | Default permission for created files and directories. |
| `flush_interval` | `duration`| `10ms` | Time interval between batch writes to disk. |

### `gc` subsection

Expand Down
3 changes: 2 additions & 1 deletion pkg/local_object_storage/blobstor/common/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/rand"
"path/filepath"
"testing"
"time"

"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/blobovniczatree"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
Expand Down Expand Up @@ -43,7 +44,7 @@ func TestCopy(t *testing.T) {

require.NoError(t, src.Close())

dst := peapod.New(filepath.Join(dir, "peapod.db"), 0600)
dst := peapod.New(filepath.Join(dir, "peapod.db"), 0600, 10*time.Millisecond)

err := common.Copy(dst, src)
require.NoError(t, err)
Expand Down
20 changes: 15 additions & 5 deletions pkg/local_object_storage/blobstor/peapod/peapod.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type Peapod struct {
path string
perm fs.FileMode

flushInterval time.Duration

compress *compression.Config

readOnly bool
Expand All @@ -63,18 +65,24 @@ var errMissingRootBucket = errors.New("missing root bucket")

// New creates new Peapod instance to be located at the given path with
// specified permissions.
func New(path string, perm fs.FileMode) *Peapod {
//
// Specified flush interval MUST be positive (see Init).
func New(path string, perm fs.FileMode, flushInterval time.Duration) *Peapod {
if flushInterval <= 0 {
panic(fmt.Sprintf("non-positive flush interval %v", flushInterval))
}
return &Peapod{
path: path,
perm: perm,

flushInterval: flushInterval,
}
}

func (x *Peapod) flushLoop() {
defer close(x.chFlushDone)

const flushInterval = 10 * time.Millisecond
t := time.NewTimer(flushInterval)
t := time.NewTimer(x.flushInterval)
defer t.Stop()

for {
Expand All @@ -88,7 +96,7 @@ func (x *Peapod) flushLoop() {

x.flushCurrentBatch(true)

interval := flushInterval - time.Since(st)
interval := x.flushInterval - time.Since(st)
if interval <= 0 {
interval = time.Microsecond
}
Expand Down Expand Up @@ -207,7 +215,9 @@ func (x *Peapod) Open(readOnly bool) error {
return nil
}

// Init initializes internal structure of the underlying database.
// Init initializes internal structure of the underlying database and runs
// flushing routine. The routine writes data batches into disk once per time
// interval configured in New.
func (x *Peapod) Init() error {
if x.readOnly {
// no extra actions needed in read-only mode
Expand Down
9 changes: 5 additions & 4 deletions pkg/local_object_storage/blobstor/peapod/peapod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"path/filepath"
"sync"
"testing"
"time"

"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/internal/blobstortest"
Expand All @@ -23,20 +24,20 @@ func TestGeneric(t *testing.T) {
}

blobstortest.TestAll(t, func(t *testing.T) common.Storage {
return peapod.New(newPath(), 0600)
return peapod.New(newPath(), 0600, 10*time.Millisecond)
}, 2048, 16*1024)

t.Run("info", func(t *testing.T) {
path := newPath()
blobstortest.TestInfo(t, func(t *testing.T) common.Storage {
return peapod.New(path, 0600)
return peapod.New(path, 0600, 10*time.Millisecond)
}, peapod.Type, path)
})
}

func TestControl(t *testing.T) {
blobstortest.TestControl(t, func(t *testing.T) common.Storage {
return peapod.New(filepath.Join(t.TempDir(), "peapod.db"), 0600)
return peapod.New(filepath.Join(t.TempDir(), "peapod.db"), 0600, 10*time.Millisecond)
}, 2048, 2048)
}

Expand Down Expand Up @@ -73,7 +74,7 @@ func newTestPeapodReadOnly(tb testing.TB) (*peapod.Peapod, oid.Address) {
}

func _newTestPeapod(tb testing.TB, path string, readOnly bool) *peapod.Peapod {
ppd := peapod.New(path, 0600)
ppd := peapod.New(path, 0600, 10*time.Millisecond)
require.NoError(tb, ppd.Open(readOnly))
require.NoError(tb, ppd.Init())

Expand Down

0 comments on commit b01baf6

Please sign in to comment.