Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Littdb #1280

Draft
wants to merge 14 commits into
base: master
Choose a base branch
from
40 changes: 40 additions & 0 deletions common/testutils/random/test_random.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,43 @@ func (r *TestRandom) BLS() *core.KeyPair {
sk := new(core.PrivateKey).SetBigInt(n)
return core.MakeKeyPair(sk)
}

// Bool generates a random boolean.
func (r *TestRandom) Bool() bool {
return r.BoolWithProbability(0.5)
}

// BoolWithProbability generates a random boolean with a given probability of being true.
func (r *TestRandom) BoolWithProbability(probability float64) bool {
return r.Float64() < probability
}

// Uint32Range generates a random uint32 between min (inclusive) and max (exclusive).
func (r *TestRandom) Uint32Range(min uint32, max uint32) uint32 {
return r.Uint32()%(max-min) + min
}

// Uint64Range generates a random uint64 between min (inclusive) and max (exclusive).
func (r *TestRandom) Uint64Range(min uint64, max uint64) uint64 {
return r.Uint64()%(max-min) + min
}

// Int32Range generates a random int32 between min (inclusive) and max (exclusive).
func (r *TestRandom) Int32Range(min, max int32) int32 {
return r.Int31n(max-min) + min
}

// Int64Range generates a random int64 between min (inclusive) and max (exclusive).
func (r *TestRandom) Int64Range(min, max int64) int64 {
return r.Int63n(max-min) + min
}

// Float32Range generates a random float32 between min (inclusive) and max (exclusive).
func (r *TestRandom) Float32Range(min, max float32) float32 {
return r.Float32()*(max-min) + min
}

// Float64Range generates a random float64 between min (inclusive) and max (exclusive).
func (r *TestRandom) Float64Range(min, max float64) float64 {
return r.Float64()*(max-min) + min
}
41 changes: 41 additions & 0 deletions litt/db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package litt

// DB is a highly specialized key-value store. It is intentionally very feature poor, sacrificing
// unnecessary features for simplicity, high performance, and low memory usage.
//
// Litt: slang, a synonym for "cool" or "awesome". e.g. "Man, that database is litt, bro!".
//
// Supported features:
// - writing values
// - reading values
// - TTLs and automatic (lazy) deletion of expired values
// - tables with non-overlapping namespaces
// - thread safety: all methods are safe to call concurrently, and all modifications are atomic
//
// Unsupported features:
// - mutating existing values (once a value is written, it cannot be changed)
// - deleting values (values only leave the DB when they expire via a TTL)
// - transactions (individual operations are atomic, but there is no way to group operations atomically)
// - fine granularity for TTL (all data in the same table must have the same TTL)
type DB interface {
// GetTable gets a table by name, creating one if it does not exist.
//
// The first time a table is fetched (either a new table or an existing one loaded from disk), its TTL is always
// set to 0 (i.e. it has no TTL). If you want to set a TTL, you must call Table.SetTTL() to do so. This is
// necessary after each time the database is started/restarted.
GetTable(name string) (Table, error)

// DropTable deletes a table and all of its data.
//
// Note that it is NOT thread safe to drop a table concurrently with any operation that accesses the table.
// The table returned by GetTable() before DropTable() is called must not be used once DropTable() is called.
DropTable(name string) error

// Start starts the database. This method must be called before any other method is called.
Start()

// Stop stops the database. This method must be called when the database is no longer needed.
// Stop ensures that all non-flushed data is crash durable on disk before returning. Calls to
// Put() concurrent with Stop() may not be crash durable after Stop() returns.¬
Stop()
}
46 changes: 46 additions & 0 deletions litt/disktable/disk_table.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package disktable

import (
"github.com/Layr-Labs/eigenda/litt"
"time"
)

var _ litt.ManagedTable = &diskTable{}

type diskTable struct {
}

func (d *diskTable) Name() string {
//TODO implement me
panic("implement me")
}

func (d *diskTable) Put(key []byte, value []byte) error {
//TODO implement me
panic("implement me")
}

func (d *diskTable) Get(key []byte) ([]byte, error) {
//TODO implement me
panic("implement me")
}

func (d *diskTable) Flush() error {
//TODO implement me
panic("implement me")
}

func (d *diskTable) SetTTL(ttl time.Duration) {
//TODO implement me
panic("implement me")
}

func (d *diskTable) DoGarbageCollection() error {
//TODO implement me
panic("implement me")
}

func (d *diskTable) Destroy() error {
//TODO implement me
panic("implement me")
}
27 changes: 27 additions & 0 deletions litt/disktable/segment/address.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package segment

import "fmt"

// Address describes the location of data on disk.
// The first 4 bytes are the file ID, and the second 4 bytes are the offset of the data within the file.
type Address uint64

// NewAddress creates a new address
func NewAddress(index uint32, offset uint32) Address {
return Address(uint64(index)<<32 | uint64(offset))
}

// Index returns the file index of the value address.
func (a Address) Index() uint32 {
return uint32(a >> 32)
}

// Offset returns the offset of the value address.
func (a Address) Offset() uint32 {
return uint32(a)
}

// String returns a string representation of the address.
func (a Address) String() string {
return fmt.Sprintf("(%d:%d)", a.Index(), a.Offset())
}
18 changes: 18 additions & 0 deletions litt/disktable/segment/address_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package segment

import (
"github.com/Layr-Labs/eigenda/common/testutils/random"
"github.com/stretchr/testify/require"
"testing"
)

func TestAddress(t *testing.T) {
rand := random.NewTestRandom(t)

index := rand.Uint32()
offset := rand.Uint32()
address := NewAddress(index, offset)

require.Equal(t, index, address.Index())
require.Equal(t, offset, address.Offset())
}
181 changes: 181 additions & 0 deletions litt/disktable/segment/key_file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package segment

import (
"bufio"
"encoding/binary"
"fmt"
"github.com/Layr-Labs/eigensdk-go/logging"
"os"
"path"
)

// KeysFileExtension is the file extension for the keys file. This file contains the keys for the data segment,
// and is used for performing garbage collection on the key index.
const KeysFileExtension = ".keys"

// keyFile tracks the keys in a segment. It is used to do garbage collection on the key-to-address map.
type keyFile struct {
// The logger for the key file.
logger logging.Logger

// The segment index.
index uint32

// The parent directory containing this file.
parentDirectory string

// The writer for the file. If the file is sealed, this value is nil.
writer *bufio.Writer
}

// newKeyFile creates a new key file.
func newKeyFile(
logger logging.Logger,
index uint32,
parentDirectory string,
sealed bool) (*keyFile, error) {

keys := &keyFile{
logger: logger,
index: index,
parentDirectory: parentDirectory,
}

filePath := keys.path()

exists, _, err := verifyFilePermissions(filePath)
if err != nil {
return nil, fmt.Errorf("file is not writeable: %v", err)
}

if sealed {
if !exists {
return nil, fmt.Errorf("key file %s does not exist", filePath)
}
} else {
if exists {
return nil, fmt.Errorf("key file %s already exists", filePath)
}

flags := os.O_RDWR | os.O_CREATE
file, err := os.OpenFile(filePath, flags, 0644)
if err != nil {
return nil, fmt.Errorf("failed to open key file: %v", err)
}

writer := bufio.NewWriter(file)
keys.writer = writer
}

return keys, nil
}

// name returns the name of the key file.
func (k *keyFile) name() string {
return fmt.Sprintf("%d%s", k.index, KeysFileExtension)
}

// path returns the full path to the key file.
func (k *keyFile) path() string {
return path.Join(k.parentDirectory, k.name())
}

// write writes a key to the key file.
func (k *keyFile) write(key []byte) error {
if k.writer == nil {
return fmt.Errorf("key file is sealed")
}

// First write the length of the key.
err := binary.Write(k.writer, binary.BigEndian, uint32(len(key)))
if err != nil {
return fmt.Errorf("failed to write key length to key file: %v", err)
}

// Next, write the key itself.
_, err = k.writer.Write(key)
if err != nil {
return fmt.Errorf("failed to write key to key file: %v", err)
}

return nil
}

// flush flushes the key file to disk.
func (k *keyFile) flush() error {
if k.writer == nil {
return fmt.Errorf("key file is sealed")
}

return k.writer.Flush()
}

// seal seals the key file, preventing further writes.
func (k *keyFile) seal() error {
if k.writer == nil {
return fmt.Errorf("key file is already sealed")
}

err := k.flush()
if err != nil {
return fmt.Errorf("failed to flush key file: %v", err)
}
k.writer = nil

return nil
}

// readKeys reads all keys from the key file. This method returns an error if the key file is not sealed.
func (k *keyFile) readKeys() ([][]byte, error) {
if k.writer != nil {
return nil, fmt.Errorf("key file is not sealed")
}

file, err := os.Open(k.path())
if err != nil {
return nil, fmt.Errorf("failed to open key file: %v", err)
}
defer func() {
err = file.Close()
if err != nil {
k.logger.Errorf("failed to close key file: %v", err)
}
}()

// Key files are small as long as key length is sane. Safe to read the whole file into memory.
keyBytes, err := os.ReadFile(k.path())
if err != nil {
return nil, fmt.Errorf("failed to read key file: %v", err)
}
keys := make([][]byte, 0)

index := 0
for {
if index+4 >= len(keyBytes) {
break
}
keyLength := binary.BigEndian.Uint32(keyBytes[index : index+4])
index += 4

if index+int(keyLength) > len(keyBytes) {
break
}

key := keyBytes[index : index+int(keyLength)]
keys = append(keys, key)
index += int(keyLength)
}

if index != len(keyBytes) {
// This can happen if there is a crash while we are writing to the key file.
// Recoverable, but best to note the event in the logs.
k.logger.Warnf("key file %s has %d corrupted bytes", k.path(), len(keyBytes)-index)
}

return keys, nil
}

// delete deletes the key file.
func (k *keyFile) delete() error {
return os.Remove(k.path())
}
Loading
Loading