diff --git a/common/testutils/random/test_random.go b/common/testutils/random/test_random.go index b32103fa7a..88eddc86d8 100644 --- a/common/testutils/random/test_random.go +++ b/common/testutils/random/test_random.go @@ -163,3 +163,43 @@ func (r *TestRandom) BLS() *core.KeyPair { sk := new(core.PrivateKey).SetBigInt(n) return core.MakeKeyPair(sk) } + +// Bool generates a random boolean. +func (r *TestRandom) Bool() bool { + return r.BoolWithProbability(0.5) +} + +// BoolWithProbability generates a random boolean with a given probability of being true. +func (r *TestRandom) BoolWithProbability(probability float64) bool { + return r.Float64() < probability +} + +// Uint32Range generates a random uint32 between min (inclusive) and max (exclusive). +func (r *TestRandom) Uint32Range(min uint32, max uint32) uint32 { + return r.Uint32()%(max-min) + min +} + +// Uint64Range generates a random uint64 between min (inclusive) and max (exclusive). +func (r *TestRandom) Uint64Range(min uint64, max uint64) uint64 { + return r.Uint64()%(max-min) + min +} + +// Int32Range generates a random int32 between min (inclusive) and max (exclusive). +func (r *TestRandom) Int32Range(min, max int32) int32 { + return r.Int31n(max-min) + min +} + +// Int64Range generates a random int64 between min (inclusive) and max (exclusive). +func (r *TestRandom) Int64Range(min, max int64) int64 { + return r.Int63n(max-min) + min +} + +// Float32Range generates a random float32 between min (inclusive) and max (exclusive). +func (r *TestRandom) Float32Range(min, max float32) float32 { + return r.Float32()*(max-min) + min +} + +// Float64Range generates a random float64 between min (inclusive) and max (exclusive). +func (r *TestRandom) Float64Range(min, max float64) float64 { + return r.Float64()*(max-min) + min +} diff --git a/litt/db.go b/litt/db.go new file mode 100644 index 0000000000..c281a0daad --- /dev/null +++ b/litt/db.go @@ -0,0 +1,41 @@ +package litt + +// DB is a highly specialized key-value store. It is intentionally very feature poor, sacrificing +// unnecessary features for simplicity, high performance, and low memory usage. +// +// Litt: slang, a synonym for "cool" or "awesome". e.g. "Man, that database is litt, bro!". +// +// Supported features: +// - writing values +// - reading values +// - TTLs and automatic (lazy) deletion of expired values +// - tables with non-overlapping namespaces +// - thread safety: all methods are safe to call concurrently, and all modifications are atomic +// +// Unsupported features: +// - mutating existing values (once a value is written, it cannot be changed) +// - deleting values (values only leave the DB when they expire via a TTL) +// - transactions (individual operations are atomic, but there is no way to group operations atomically) +// - fine granularity for TTL (all data in the same table must have the same TTL) +type DB interface { + // GetTable gets a table by name, creating one if it does not exist. + // + // The first time a table is fetched (either a new table or an existing one loaded from disk), its TTL is always + // set to 0 (i.e. it has no TTL). If you want to set a TTL, you must call Table.SetTTL() to do so. This is + // necessary after each time the database is started/restarted. + GetTable(name string) (Table, error) + + // DropTable deletes a table and all of its data. + // + // Note that it is NOT thread safe to drop a table concurrently with any operation that accesses the table. + // The table returned by GetTable() before DropTable() is called must not be used once DropTable() is called. + DropTable(name string) error + + // Start starts the database. This method must be called before any other method is called. + Start() + + // Stop stops the database. This method must be called when the database is no longer needed. + // Stop ensures that all non-flushed data is crash durable on disk before returning. Calls to + // Put() concurrent with Stop() may not be crash durable after Stop() returns.¬ + Stop() +} diff --git a/litt/disktable/disk_table.go b/litt/disktable/disk_table.go new file mode 100644 index 0000000000..d2b8f99540 --- /dev/null +++ b/litt/disktable/disk_table.go @@ -0,0 +1,46 @@ +package disktable + +import ( + "github.com/Layr-Labs/eigenda/litt" + "time" +) + +var _ litt.ManagedTable = &diskTable{} + +type diskTable struct { +} + +func (d *diskTable) Name() string { + //TODO implement me + panic("implement me") +} + +func (d *diskTable) Put(key []byte, value []byte) error { + //TODO implement me + panic("implement me") +} + +func (d *diskTable) Get(key []byte) ([]byte, error) { + //TODO implement me + panic("implement me") +} + +func (d *diskTable) Flush() error { + //TODO implement me + panic("implement me") +} + +func (d *diskTable) SetTTL(ttl time.Duration) { + //TODO implement me + panic("implement me") +} + +func (d *diskTable) DoGarbageCollection() error { + //TODO implement me + panic("implement me") +} + +func (d *diskTable) Destroy() error { + //TODO implement me + panic("implement me") +} diff --git a/litt/disktable/segment/address.go b/litt/disktable/segment/address.go new file mode 100644 index 0000000000..fdaa4efb23 --- /dev/null +++ b/litt/disktable/segment/address.go @@ -0,0 +1,27 @@ +package segment + +import "fmt" + +// Address describes the location of data on disk. +// The first 4 bytes are the file ID, and the second 4 bytes are the offset of the data within the file. +type Address uint64 + +// NewAddress creates a new address +func NewAddress(index uint32, offset uint32) Address { + return Address(uint64(index)<<32 | uint64(offset)) +} + +// Index returns the file index of the value address. +func (a Address) Index() uint32 { + return uint32(a >> 32) +} + +// Offset returns the offset of the value address. +func (a Address) Offset() uint32 { + return uint32(a) +} + +// String returns a string representation of the address. +func (a Address) String() string { + return fmt.Sprintf("(%d:%d)", a.Index(), a.Offset()) +} diff --git a/litt/disktable/segment/address_test.go b/litt/disktable/segment/address_test.go new file mode 100644 index 0000000000..5fe3d03fef --- /dev/null +++ b/litt/disktable/segment/address_test.go @@ -0,0 +1,18 @@ +package segment + +import ( + "github.com/Layr-Labs/eigenda/common/testutils/random" + "github.com/stretchr/testify/require" + "testing" +) + +func TestAddress(t *testing.T) { + rand := random.NewTestRandom(t) + + index := rand.Uint32() + offset := rand.Uint32() + address := NewAddress(index, offset) + + require.Equal(t, index, address.Index()) + require.Equal(t, offset, address.Offset()) +} diff --git a/litt/disktable/segment/key_file.go b/litt/disktable/segment/key_file.go new file mode 100644 index 0000000000..57f673e147 --- /dev/null +++ b/litt/disktable/segment/key_file.go @@ -0,0 +1,181 @@ +package segment + +import ( + "bufio" + "encoding/binary" + "fmt" + "github.com/Layr-Labs/eigensdk-go/logging" + "os" + "path" +) + +// KeysFileExtension is the file extension for the keys file. This file contains the keys for the data segment, +// and is used for performing garbage collection on the key index. +const KeysFileExtension = ".keys" + +// keyFile tracks the keys in a segment. It is used to do garbage collection on the key-to-address map. +type keyFile struct { + // The logger for the key file. + logger logging.Logger + + // The segment index. + index uint32 + + // The parent directory containing this file. + parentDirectory string + + // The writer for the file. If the file is sealed, this value is nil. + writer *bufio.Writer +} + +// newKeyFile creates a new key file. +func newKeyFile( + logger logging.Logger, + index uint32, + parentDirectory string, + sealed bool) (*keyFile, error) { + + keys := &keyFile{ + logger: logger, + index: index, + parentDirectory: parentDirectory, + } + + filePath := keys.path() + + exists, _, err := verifyFilePermissions(filePath) + if err != nil { + return nil, fmt.Errorf("file is not writeable: %v", err) + } + + if sealed { + if !exists { + return nil, fmt.Errorf("key file %s does not exist", filePath) + } + } else { + if exists { + return nil, fmt.Errorf("key file %s already exists", filePath) + } + + flags := os.O_RDWR | os.O_CREATE + file, err := os.OpenFile(filePath, flags, 0644) + if err != nil { + return nil, fmt.Errorf("failed to open key file: %v", err) + } + + writer := bufio.NewWriter(file) + keys.writer = writer + } + + return keys, nil +} + +// name returns the name of the key file. +func (k *keyFile) name() string { + return fmt.Sprintf("%d%s", k.index, KeysFileExtension) +} + +// path returns the full path to the key file. +func (k *keyFile) path() string { + return path.Join(k.parentDirectory, k.name()) +} + +// write writes a key to the key file. +func (k *keyFile) write(key []byte) error { + if k.writer == nil { + return fmt.Errorf("key file is sealed") + } + + // First write the length of the key. + err := binary.Write(k.writer, binary.BigEndian, uint32(len(key))) + if err != nil { + return fmt.Errorf("failed to write key length to key file: %v", err) + } + + // Next, write the key itself. + _, err = k.writer.Write(key) + if err != nil { + return fmt.Errorf("failed to write key to key file: %v", err) + } + + return nil +} + +// flush flushes the key file to disk. +func (k *keyFile) flush() error { + if k.writer == nil { + return fmt.Errorf("key file is sealed") + } + + return k.writer.Flush() +} + +// seal seals the key file, preventing further writes. +func (k *keyFile) seal() error { + if k.writer == nil { + return fmt.Errorf("key file is already sealed") + } + + err := k.flush() + if err != nil { + return fmt.Errorf("failed to flush key file: %v", err) + } + k.writer = nil + + return nil +} + +// readKeys reads all keys from the key file. This method returns an error if the key file is not sealed. +func (k *keyFile) readKeys() ([][]byte, error) { + if k.writer != nil { + return nil, fmt.Errorf("key file is not sealed") + } + + file, err := os.Open(k.path()) + if err != nil { + return nil, fmt.Errorf("failed to open key file: %v", err) + } + defer func() { + err = file.Close() + if err != nil { + k.logger.Errorf("failed to close key file: %v", err) + } + }() + + // Key files are small as long as key length is sane. Safe to read the whole file into memory. + keyBytes, err := os.ReadFile(k.path()) + if err != nil { + return nil, fmt.Errorf("failed to read key file: %v", err) + } + keys := make([][]byte, 0) + + index := 0 + for { + if index+4 >= len(keyBytes) { + break + } + keyLength := binary.BigEndian.Uint32(keyBytes[index : index+4]) + index += 4 + + if index+int(keyLength) > len(keyBytes) { + break + } + + key := keyBytes[index : index+int(keyLength)] + keys = append(keys, key) + index += int(keyLength) + } + + if index != len(keyBytes) { + // This can happen if there is a crash while we are writing to the key file. + // Recoverable, but best to note the event in the logs. + k.logger.Warnf("key file %s has %d corrupted bytes", k.path(), len(keyBytes)-index) + } + + return keys, nil +} + +// delete deletes the key file. +func (k *keyFile) delete() error { + return os.Remove(k.path()) +} diff --git a/litt/disktable/segment/key_file_test.go b/litt/disktable/segment/key_file_test.go new file mode 100644 index 0000000000..135cf2283a --- /dev/null +++ b/litt/disktable/segment/key_file_test.go @@ -0,0 +1,134 @@ +package segment + +import ( + "github.com/Layr-Labs/eigenda/common" + "github.com/Layr-Labs/eigenda/common/testutils/random" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "os" + "testing" +) + +func TestReadWriteKeys(t *testing.T) { + rand := random.NewTestRandom(t) + logger, err := common.NewLogger(common.DefaultTextLoggerConfig()) + require.NoError(t, err) + directory := t.TempDir() + + index := rand.Uint32() + + keyCount := rand.Int32Range(100, 200) + keys := make([][]byte, keyCount) + for i := 0; i < int(keyCount); i++ { + keys[i] = rand.VariableBytes(1, 100) + } + + file, err := newKeyFile(logger, index, directory, false) + require.NoError(t, err) + + for _, key := range keys { + err := file.write(key) + require.NoError(t, err) + } + + // Reading the file prior to sealing it is forbidden. + _, err = file.readKeys() + require.Error(t, err) + + err = file.seal() + require.NoError(t, err) + + // Reading the file after sealing it is allowed. + readKeys, err := file.readKeys() + require.NoError(t, err) + + for i, key := range keys { + assert.Equal(t, key, readKeys[i]) + } + + // delete the file + filePath := file.path() + _, err = os.Stat(filePath) + require.NoError(t, err) + + err = file.delete() + require.NoError(t, err) + + _, err = os.Stat(filePath) + require.True(t, os.IsNotExist(err)) +} + +func TestReadingTruncatedKeyFile(t *testing.T) { + rand := random.NewTestRandom(t) + logger, err := common.NewLogger(common.DefaultTextLoggerConfig()) + require.NoError(t, err) + directory := t.TempDir() + + index := rand.Uint32() + + keyCount := rand.Int32Range(100, 200) + keys := make([][]byte, keyCount) + for i := 0; i < int(keyCount); i++ { + keys[i] = rand.VariableBytes(1, 100) + } + + file, err := newKeyFile(logger, index, directory, false) + require.NoError(t, err) + + for _, key := range keys { + err := file.write(key) + require.NoError(t, err) + } + + err = file.seal() + require.NoError(t, err) + + // Truncate the file. Chop off some bytes from the last key, but do not corrupt the length prefix. + lastKeyLength := len(keys[keyCount-1]) + + filePath := file.path() + + originalBytes, err := os.ReadFile(filePath) + require.NoError(t, err) + + bytesToRemove := rand.Int32Range(1, int32(lastKeyLength)+1) + bytes := originalBytes[:len(originalBytes)-int(bytesToRemove)] + + err = os.WriteFile(filePath, bytes, 0644) + require.NoError(t, err) + + // We should be able to read the keys up to the point where the file was truncated. + readKeys, err := file.readKeys() + require.NoError(t, err) + + require.Equal(t, int(keyCount-1), len(readKeys)) + for i, key := range keys[:keyCount-1] { + assert.Equal(t, key, readKeys[i]) + } + + // Truncate the file. This time, chop off some of the length prefix of the last key. + prefixBytesToRemove := rand.Int32Range(1, 4) + bytes = originalBytes[:len(originalBytes)-int(prefixBytesToRemove)] + + err = os.WriteFile(filePath, bytes, 0644) + require.NoError(t, err) + + // We should not be able to read the keys if the length prefix is truncated. + keys, err = file.readKeys() + require.NoError(t, err) + + require.Equal(t, int(keyCount-1), len(keys)) + for i, key := range keys[:keyCount-1] { + assert.Equal(t, key, keys[i]) + } + + // delete the file + _, err = os.Stat(filePath) + require.NoError(t, err) + + err = file.delete() + require.NoError(t, err) + + _, err = os.Stat(filePath) + require.True(t, os.IsNotExist(err)) +} diff --git a/litt/disktable/segment/metadata_file.go b/litt/disktable/segment/metadata_file.go new file mode 100644 index 0000000000..0e5f855404 --- /dev/null +++ b/litt/disktable/segment/metadata_file.go @@ -0,0 +1,195 @@ +package segment + +import ( + "encoding/binary" + "fmt" + "os" + "path" + "time" +) + +const ( + // The current serialization version. If we ever change how we serialize data, bump this version. + currentSerializationVersion = uint32(0) + + // MetadataFileExtension is the file extension for the metadata file. This file contains metadata about the data + // segment, such as serialization version and expiration time. + MetadataFileExtension = ".metadata" + + // MetadataSwapExtension is the file extension for the metadata swap file. This file is used to atomically update + // the metadata file by doing an atomic rename of the swap file to the metadata file. If this file is ever + // present when the database first starts, it is an artifact of a crash during a metadata update, and should be + // deleted. + MetadataSwapExtension = ".metadata.swap" + + // The size of the metadata file in bytes. This is a constant, so it's convenient to have it here. + // 4 bytes for version, 8 bytes for timestamp, 1 byte for sealed + metadataSize = 13 +) + +// metadataFile contains metadata about a segment. +type metadataFile struct { + // The segment index. This value is encoded in the file name. + index uint32 + + // The serialization version for this segment, used to permit smooth data migrations. + // This value is encoded in file. + serializationVersion uint32 + + // If true, the segment is sealed and no more data can be written to it. If false, then data can still be written to + // this segment. This value is encoded in file. + sealed bool + + // The time when the last value was written into the segment, in nanoseconds since the epoch. A segment can + // only be deleted when all values within it are expired, and so we only need to keep track of the timestamp of + // the last value (which always expires last). This value is irrelevant if the segment is not yet sealed. + // This value is encoded in file. + timestamp uint64 + + // The parent directory containing this file. This value is not encoded in file, and is stored here + // for bookkeeping purposes. + parentDirectory string +} + +// newMetadataFile creates a new metadata file. When this method returns, the metadata file will +// be durably written to disk. +func newMetadataFile(index uint32, parentDirectory string) (*metadataFile, error) { + file := &metadataFile{ + index: index, + parentDirectory: parentDirectory, + } + + filePath := file.path() + exists, _, err := verifyFilePermissions(filePath) + if err != nil { + return nil, fmt.Errorf("file %s has incorrect permissions: %v", filePath, err) + } + + if exists { + // File exists. Load it. + data, err := os.ReadFile(filePath) + if err != nil { + return nil, fmt.Errorf("failed to read metadata file %s: %v", filePath, err) + } + err = file.deserialize(data) + if err != nil { + return nil, fmt.Errorf("failed to deserialize metadata file %s: %v", filePath, err) + } + } else { + // File does not exist. Create it. + file.serializationVersion = currentSerializationVersion + err = file.write() + if err != nil { + return nil, fmt.Errorf("failed to write metadata file: %v", err) + } + } + + return file, nil +} + +// Name returns the file name for this metadata file. +func (m *metadataFile) name() string { + return fmt.Sprintf("%d%s", m.index, MetadataFileExtension) +} + +// Path returns the full path to this metadata file. +func (m *metadataFile) path() string { + return path.Join(m.parentDirectory, m.name()) +} + +// SwapName returns the file name for the swap file for this metadata file. +func (m *metadataFile) swapName() string { + return fmt.Sprintf("%d%s", m.index, MetadataSwapExtension) +} + +// SwapPath returns the full path to the swap file for this metadata file. +func (m *metadataFile) swapPath() string { + return path.Join(m.parentDirectory, m.swapName()) +} + +// Seal seals the segment. This action will atomically write the metadata file to disk one final time, +// and should only be performed when all data that will be written to the key/value files has been made durable. +func (m *metadataFile) seal(now time.Time) error { + m.sealed = true + m.timestamp = uint64(now.UnixNano()) + err := m.write() + if err != nil { + return fmt.Errorf("failed to write sealed metadata file: %v", err) + } + return nil +} + +// serialize serializes the metadata file to a byte array. +func (m *metadataFile) serialize() []byte { + // 4 bytes for version, 8 bytes for timestamp, 1 byte for sealed + data := make([]byte, metadataSize) + + // Write the version + binary.BigEndian.PutUint32(data[0:4], m.serializationVersion) + + // Write the timestamp + binary.BigEndian.PutUint64(data[4:12], m.timestamp) + + // Write the sealed flag + if m.sealed { + data[12] = 1 + } else { + data[12] = 0 + } + + return data +} + +// deserialize deserializes the metadata file from a byte array. +func (m *metadataFile) deserialize(data []byte) error { + if len(data) != metadataSize { + return fmt.Errorf("metadata file is not the correct size: %d", len(data)) + } + + m.serializationVersion = binary.BigEndian.Uint32(data[0:4]) + if m.serializationVersion != currentSerializationVersion { + return fmt.Errorf("unsupported serialization version: %d", m.serializationVersion) + } + m.timestamp = binary.BigEndian.Uint64(data[4:12]) + m.sealed = data[12] == 1 + + return nil +} + +// write atomically writes the metadata file to disk. +func (m *metadataFile) write() error { + bytes := m.serialize() + swapPath := m.swapPath() + swapFile, err := os.Create(swapPath) + if err != nil { + return fmt.Errorf("failed to create swap file %s: %v", swapPath, err) + } + + _, err = swapFile.Write(bytes) + if err != nil { + return fmt.Errorf("failed to write to swap file %s: %v", swapPath, err) + } + + err = swapFile.Close() + if err != nil { + return fmt.Errorf("failed to close swap file %s: %v", swapPath, err) + } + + metadataPath := m.path() + err = os.Rename(swapPath, metadataPath) + if err != nil { + return fmt.Errorf("failed to rename swap file %s to metadata file %s: %v", swapPath, metadataPath, err) + } + + return nil +} + +// delete deletes the metadata file from disk. +func (m *metadataFile) delete() error { + err := os.Remove(m.path()) + if err != nil { + return fmt.Errorf("failed to remove metadata file %s: %v", m.path(), err) + } + + return nil +} diff --git a/litt/disktable/segment/metadata_file_test.go b/litt/disktable/segment/metadata_file_test.go new file mode 100644 index 0000000000..70cd5f36e8 --- /dev/null +++ b/litt/disktable/segment/metadata_file_test.go @@ -0,0 +1,138 @@ +package segment + +import ( + "github.com/Layr-Labs/eigenda/common/testutils/random" + "github.com/stretchr/testify/require" + "os" + "testing" +) + +func TestUnsealedSerialization(t *testing.T) { + rand := random.NewTestRandom(t) + directory := t.TempDir() + + index := rand.Uint32() + timestamp := rand.Uint64() + m := &metadataFile{ + index: index, + serializationVersion: currentSerializationVersion, + sealed: false, + timestamp: timestamp, + parentDirectory: directory, + } + err := m.write() + require.NoError(t, err) + + deserialized, err := newMetadataFile(index, m.parentDirectory) + require.NoError(t, err) + require.Equal(t, *m, *deserialized) + + // delete the file + filePath := m.path() + _, err = os.Stat(filePath) + require.NoError(t, err) + + err = m.delete() + require.NoError(t, err) + + _, err = os.Stat(filePath) + require.True(t, os.IsNotExist(err)) +} + +func TestSealedSerialization(t *testing.T) { + rand := random.NewTestRandom(t) + directory := t.TempDir() + + index := rand.Uint32() + timestamp := rand.Uint64() + m := &metadataFile{ + index: index, + serializationVersion: currentSerializationVersion, + sealed: true, + timestamp: timestamp, + parentDirectory: directory, + } + err := m.write() + require.NoError(t, err) + + deserialized, err := newMetadataFile(index, m.parentDirectory) + require.NoError(t, err) + require.Equal(t, *m, *deserialized) + + // delete the file + filePath := m.path() + _, err = os.Stat(filePath) + require.NoError(t, err) + + err = m.delete() + require.NoError(t, err) + + _, err = os.Stat(filePath) + require.True(t, os.IsNotExist(err)) +} + +func TestFreshFileSerialization(t *testing.T) { + rand := random.NewTestRandom(t) + directory := t.TempDir() + + index := rand.Uint32() + m, err := newMetadataFile(index, directory) + require.NoError(t, err) + + require.Equal(t, index, m.index) + require.Equal(t, currentSerializationVersion, m.serializationVersion) + require.False(t, m.sealed) + require.Zero(t, m.timestamp) + require.Equal(t, directory, m.parentDirectory) + + deserialized, err := newMetadataFile(index, m.parentDirectory) + require.NoError(t, err) + require.Equal(t, *m, *deserialized) + + // delete the file + filePath := m.path() + _, err = os.Stat(filePath) + require.NoError(t, err) + + err = m.delete() + require.NoError(t, err) + + _, err = os.Stat(filePath) + require.True(t, os.IsNotExist(err)) +} + +func TestSealing(t *testing.T) { + rand := random.NewTestRandom(t) + directory := t.TempDir() + + index := rand.Uint32() + m, err := newMetadataFile(index, directory) + require.NoError(t, err) + + // seal the file + sealTime := rand.Time() + err = m.seal(sealTime) + require.NoError(t, err) + + require.Equal(t, index, m.index) + require.Equal(t, currentSerializationVersion, m.serializationVersion) + require.True(t, m.sealed) + require.Equal(t, uint64(sealTime.UnixNano()), m.timestamp) + require.Equal(t, directory, m.parentDirectory) + + // load the file + deserialized, err := newMetadataFile(index, m.parentDirectory) + require.NoError(t, err) + require.Equal(t, *m, *deserialized) + + // delete the file + filePath := m.path() + _, err = os.Stat(filePath) + require.NoError(t, err) + + err = m.delete() + require.NoError(t, err) + + _, err = os.Stat(filePath) + require.True(t, os.IsNotExist(err)) +} diff --git a/litt/disktable/segment/segment.go b/litt/disktable/segment/segment.go new file mode 100644 index 0000000000..4b0c1a2b41 --- /dev/null +++ b/litt/disktable/segment/segment.go @@ -0,0 +1,244 @@ +package segment + +import ( + "fmt" + "github.com/Layr-Labs/eigensdk-go/logging" + "sync" + "sync/atomic" + "time" +) + +// Segment is a chunk of data stored on disk. All data in a particular data segment is expired at the same time. +// +// This struct is not thread safe, access control must be handled by the caller. +type Segment struct { // TODO do we need to export this type? + // The logger for the segment. + logger logging.Logger + + // The index of the data segment. The first data segment ever created has index 0, the next has index 1, and so on. + index uint32 + + // This file contains metadata about the segment. + metadata *metadataFile + + // This file contains the keys for the data segment, and is used for performing garbage collection on the key index. + keys *keyFile + + // This file contains the values for the data segment. + values *valueFile + + // The target size for value files. + targetFileSize uint32 + + // lock controls access to the segment. + lock sync.RWMutex + + // reservationCount is the number of reservations on this segment. The segment will not be deleted until this count + // reaches zero. + reservationCount atomic.Int32 +} + +// NewSegment creates a new data segment. +func NewSegment( + logger logging.Logger, + index uint32, + parentDirectory string, + targetFileSize uint32) (*Segment, error) { + + metadata, err := newMetadataFile(index, parentDirectory) + if err != nil { + return nil, fmt.Errorf("failed to open metadata file: %v", err) + } + + keys, err := newKeyFile(logger, index, parentDirectory, metadata.sealed) + if err != nil { + return nil, fmt.Errorf("failed to open key file: %v", err) + } + + values, err := newValueFile(logger, index, parentDirectory, metadata.sealed) + if err != nil { + return nil, fmt.Errorf("failed to open value file: %v", err) + } + + segment := &Segment{ + logger: logger, + index: index, + metadata: metadata, + keys: keys, + values: values, + targetFileSize: targetFileSize, + } + + return segment, nil +} + +// Write records a key-value pair in the data segment, returning the resulting address of the data. +// If this file is full, ok will be false and the data will not have been written. +// +// This method does not ensure that the key-value pair is actually written to disk, only that it is recorded +// in the data segment. Flush must be called to ensure that all data previously passed to Put is written to disk. +func (s *Segment) Write(key []byte, value []byte) (address Address, ok bool, err error) { + s.lock.Lock() + defer s.lock.Unlock() + + resultingSize := s.values.currentSize + uint64(len(value)) + 4 + if resultingSize > uint64(s.targetFileSize) { + // segment is full + return 0, false, nil + } + + err = s.keys.write(key) + if err != nil { + return 0, false, fmt.Errorf("failed to write key: %v", err) + } + + address, err = s.values.write(value) + if err != nil { + return 0, false, fmt.Errorf("failed to write value: %v", err) + } + + return address, true, nil +} + +// CurrentSize returns the current size of the data segment. +func (s *Segment) CurrentSize() uint64 { + s.lock.RLock() + defer s.lock.RUnlock() + + return s.values.currentSize +} + +// Read fetches the data for a key from the data segment. +func (s *Segment) Read(dataAddress Address) ([]byte, error) { + s.lock.RLock() + defer s.lock.RUnlock() + + value, err := s.values.read(dataAddress) + if err != nil { + return nil, fmt.Errorf("failed to read value: %v", err) + } + return value, nil +} + +// GetKeys returns all keys in the data segment. Only permitted to be called after the segment has been sealed. +func (s *Segment) GetKeys() ([][]byte, error) { + s.lock.RLock() + defer s.lock.RUnlock() + + keys, err := s.keys.readKeys() + if err != nil { + return nil, fmt.Errorf("failed to read keys: %v", err) + } + return keys, nil +} + +// Flush writes the data segment to disk. +func (s *Segment) Flush() error { + s.lock.Lock() + defer s.lock.Unlock() + + err := s.keys.flush() + if err != nil { + return fmt.Errorf("failed to flush key file: %v", err) + } + + err = s.values.flush() + if err != nil { + return fmt.Errorf("failed to flush value file: %v", err) + } + + return nil +} + +// Seal flushes all data to disk and finalizes the metadata. After this method is called, no more data can be written +// to the data segment. +func (s *Segment) Seal(now time.Time) error { + s.lock.Lock() + defer s.lock.Unlock() + + err := s.keys.seal() + if err != nil { + return fmt.Errorf("failed to seal key file: %v", err) + } + + err = s.values.seal() + if err != nil { + return fmt.Errorf("failed to seal value file: %v", err) + } + + err = s.metadata.seal(now) + if err != nil { + return fmt.Errorf("failed to seal metadata file: %v", err) + } + + return nil +} + +// IsSealed returns true if the segment is sealed, and false otherwise. +func (s *Segment) IsSealed() bool { + s.lock.RLock() + defer s.lock.RUnlock() + + return s.metadata.sealed +} + +// GetSealTime returns the time at which the segment was sealed. If the file is not sealed, this method will return +// the zero time. +func (s *Segment) GetSealTime() time.Time { + s.lock.RLock() + defer s.lock.RUnlock() + + return time.Unix(0, int64(s.metadata.timestamp)) +} + +// Reserve reserves the segment, preventing it from being deleted. Returns true if the reservation was successful, and +// false otherwise. +func (s *Segment) Reserve() bool { + for { + reservations := s.reservationCount.Load() + if reservations <= 0 { + return false + } + + if s.reservationCount.CompareAndSwap(reservations, reservations+1) { + return true + } + } +} + +// Release releases the segment, allowing it to be deleted. Deletion happens inside this method if the reservation count +// reaches zero as a result of this call. +func (s *Segment) Release() { + reservations := s.reservationCount.Add(-1) + if reservations > 0 { + return + } + + go func() { + err := s.delete() + if err != nil { + s.logger.Errorf("failed to delete segment: %v", err) + } + }() +} + +// delete deletes the segment from disk. +func (s *Segment) delete() error { + s.lock.Lock() + defer s.lock.Unlock() + + err := s.keys.delete() + if err != nil { + return fmt.Errorf("failed to delete key file: %v", err) + } + err = s.values.delete() + if err != nil { + return fmt.Errorf("failed to delete value file: %v", err) + } + err = s.metadata.delete() + if err != nil { + return fmt.Errorf("failed to delete metadata file: %v", err) + } + + return nil +} diff --git a/litt/disktable/segment/segment_manager.go b/litt/disktable/segment/segment_manager.go new file mode 100644 index 0000000000..45a6b642c3 --- /dev/null +++ b/litt/disktable/segment/segment_manager.go @@ -0,0 +1,416 @@ +package segment + +import ( + "fmt" + "github.com/Layr-Labs/eigensdk-go/logging" + "math" + "os" + "path" + "sync" + "time" +) + +// SegmentManager manages a table's Segments. +type SegmentManager struct { + // The logger for the segment manager. + logger logging.Logger + + // The root directory for the segment manager. + root string + + // The index of the lowest numbered segment. After initial creation, only the garbage collection + // thread is permitted to read/write this value for the sake of thread safety. + lowestSegmentIndex uint32 + + // The index of the highest numbered segment. All writes are applied to this segment. + highestSegmentIndex uint32 + + // All segments currently in use. + segments map[uint32]*Segment + + // The target size for value files. + targetFileSize uint32 + + // segmentLock protects access to the segments map and highestSegmentIndex. + // Does not protect the segments themselves. + segmentLock sync.RWMutex +} + +// NewSegmentManager creates a new SegmentManager. +func NewSegmentManager( + logger logging.Logger, + root string, + targetFileSize uint32) (*SegmentManager, error) { + + manager := &SegmentManager{ + logger: logger, + root: root, + targetFileSize: targetFileSize, + } + + err := manager.gatherSegmentFiles() + if err != nil { + return nil, fmt.Errorf("failed to gather segment files: %v", err) + } + + return manager, nil +} + +// getFileIndex returns the index of the segment file. Segment files are named as .. +func (s *SegmentManager) getFileIndex(fileName string) (uint32, error) { + indexString := path.Base(fileName) + index, err := fmt.Sscanf(indexString, "%d") + if err != nil { + return 0, fmt.Errorf("failed to parse index from file name %s: %v", fileName, err) + } + + return uint32(index), nil +} + +// TODO: break up this method, possibly into another file + +// gatherSegmentFiles reads the segment files on disk and populates the segments map. +func (s *SegmentManager) gatherSegmentFiles() error { + + // metadata files we've found on disk, key is the file's segment index, value is the file's path + metadataFiles := make(map[uint32]string) + // key files we've found on disk, key is the file's segment index, value is the file's path + keyFiles := make(map[uint32]string) + // value files we've found on disk, key is the file's segment index, value is the file's path + valueFiles := make(map[uint32]string) + + s.highestSegmentIndex = uint32(0) + s.lowestSegmentIndex = uint32(math.MaxUint32) + + files, err := os.ReadDir(s.root) + if err != nil { + return fmt.Errorf("failed to read directory %s: %v", s.root, err) + } + + // Catalogue the segment files. While we are at it, delete rogue swap files. + for _, file := range files { + if file.IsDir() { + continue + } + + fileName := file.Name() + extension := path.Ext(fileName) + filePath := path.Join(s.root, fileName) + + switch extension { + case MetadataFileExtension: + index, err := s.getFileIndex(fileName) + if err != nil { + return fmt.Errorf("failed to get index from metadata file: %v", err) + } + if index > s.highestSegmentIndex { + s.highestSegmentIndex = index + } + if index < s.lowestSegmentIndex { + s.lowestSegmentIndex = index + } + metadataFiles[index] = filePath + case MetadataSwapExtension: + s.logger.Warnf("Removing rogue swap file %s", filePath) + err = os.Remove(filePath) + if err != nil { + return fmt.Errorf("failed to remove swap file %s: %v", filePath, err) + } + case KeysFileExtension: + index, err := s.getFileIndex(fileName) + if err != nil { + return fmt.Errorf("failed to get index from keys file: %v", err) + } + if index > s.highestSegmentIndex { + s.highestSegmentIndex = index + } + if index < s.lowestSegmentIndex { + s.lowestSegmentIndex = index + } + keyFiles[index] = filePath + case ValuesFileExtension: + index, err := s.getFileIndex(fileName) + if err != nil { + return fmt.Errorf("failed to get index from values file: %v", err) + } + if index > s.highestSegmentIndex { + s.highestSegmentIndex = index + } + if index < s.lowestSegmentIndex { + s.lowestSegmentIndex = index + } + valueFiles[index] = filePath + default: + s.logger.Debugf("Ignoring unknown file %s", filePath) + } + } + + // For each segment, ensure that we have all the necessary files. + orphanSet := make(map[uint32]struct{}) + lastSegmentOrphaned := false + firstSegmentOrphaned := false + for i := s.lowestSegmentIndex; i <= s.highestSegmentIndex; i++ { + if _, ok := metadataFiles[i]; !ok { + // We are missing a metadata file. + + if i == s.highestSegmentIndex { + // This can happen if we crash while creating a new segment. Recoverable. + s.logger.Warnf("Missing metadata file for last segment %d", i) + orphanSet[i] = struct{}{} + lastSegmentOrphaned = true + } else if i == s.lowestSegmentIndex { + // This can happen when deleting the oldest segment. Recoverable. + s.logger.Warnf("Missing metadata file for first segment %d", i) + orphanSet[i] = struct{}{} + firstSegmentOrphaned = true + } else { + // Database is missing internal files. Catastrophic failure. + return fmt.Errorf("missing metadata file for segment %d", i) + } + } + + if _, ok := keyFiles[i]; !ok { + // We are missing a key file. + + if i == s.highestSegmentIndex { + // This can happen if we crash while creating a new segment. Recoverable. + s.logger.Warnf("Missing key file for last segment %d", i) + orphanSet[i] = struct{}{} + lastSegmentOrphaned = true + } else if i == s.lowestSegmentIndex { + // This can happen when deleting the oldest segment. Recoverable. + s.logger.Warnf("Missing key file for first segment %d", i) + orphanSet[i] = struct{}{} + firstSegmentOrphaned = true + } else { + // Database is missing internal files. Catastrophic failure. + return fmt.Errorf("missing key file for segment %d", i) + } + } + + if _, ok := valueFiles[i]; !ok { + // We are missing a value file. + + if i == s.highestSegmentIndex { + // This can happen if we crash while creating a new segment. Recoverable. + s.logger.Warnf("Missing value file for last segment %d", i) + orphanSet[i] = struct{}{} + lastSegmentOrphaned = true + } else if i == s.lowestSegmentIndex { + // This can happen when deleting the oldest segment. Recoverable. + s.logger.Warnf("Missing value file for first segment %d", i) + orphanSet[i] = struct{}{} + firstSegmentOrphaned = true + } else { + // Database is missing internal files. Catastrophic failure. + return fmt.Errorf("missing value file for segment %d", i) + } + } + } + + // Clean up any orphaned segment files. + for orphanIndex := range orphanSet { + metadataPath, ok := metadataFiles[orphanIndex] + if ok { + s.logger.Infof("Removing orphaned metadata file %s", metadataPath) + err = os.Remove(metadataPath) + if err != nil { + return fmt.Errorf("failed to remove orphaned metadata file %s: %v", metadataPath, err) + } + } + + keyPath, ok := keyFiles[orphanIndex] + if ok { + s.logger.Infof("Removing orphaned key file %s", keyPath) + err = os.Remove(keyPath) + if err != nil { + return fmt.Errorf("failed to remove orphaned key file %s: %v", keyPath, err) + } + } + + valuePath, ok := valueFiles[orphanIndex] + if ok { + s.logger.Infof("Removing orphaned value file %s", valuePath) + err = os.Remove(valuePath) + if err != nil { + return fmt.Errorf("failed to remove orphaned value file %s: %v", valuePath, err) + } + } + } + + if lastSegmentOrphaned { + s.highestSegmentIndex-- + } + if firstSegmentOrphaned { + s.lowestSegmentIndex++ + } + + // Finally, load all healthy segments. + for i := s.lowestSegmentIndex; i <= s.highestSegmentIndex; i++ { + segment, err := NewSegment(s.logger, i, s.root, s.targetFileSize) + if err != nil { + return fmt.Errorf("failed to create segment %d: %v", i, err) + } + s.segments[i] = segment + } + + return nil +} + +// getSegment returns the segment with the given index. Segment is reserved, and it is the caller's responsibility to +// release the reservation when done. +func (s *SegmentManager) getReservedSegment(index uint32) (*Segment, error) { + s.segmentLock.RLock() + defer s.segmentLock.RUnlock() + + segment, ok := s.segments[index] + if !ok { + return nil, fmt.Errorf("segment %d does not exist", index) + } + + ok = segment.Reserve() + if !ok { + // segmented was deleted out from under us + return nil, fmt.Errorf("segment %d was deleted", index) + } + + return segment, nil +} + +func (s *SegmentManager) getMutableSegment() (*Segment, error) { + s.segmentLock.RLock() + defer s.segmentLock.RUnlock() + + segment := s.segments[s.highestSegmentIndex] + + ok := segment.Reserve() + if !ok { + // segmented was deleted out from under us. This should never happen for the mutable segment. + return nil, fmt.Errorf("mutable segment %d was deleted", s.highestSegmentIndex) + } + + return segment, nil +} + +// createNewSegment attempts to create a new mutable segment. If multiple goroutines call this method at the same time, +// only one will succeed in creating the new segment. This method should only be called if the last segment is full. +func (s *SegmentManager) attemptSegmentCreation(previousHighestSegmentIndex uint32) error { + s.segmentLock.Lock() + defer s.segmentLock.Unlock() + + if s.highestSegmentIndex != previousHighestSegmentIndex { + // another goroutine beat us to it + s.segmentLock.Unlock() + return nil + } + + // Seal the previous segment. + // TODO can we do this without holding the lock? + now := time.Now() // TODO use time source + err := s.segments[s.highestSegmentIndex].Seal(now) + if err != nil { + return fmt.Errorf("failed to seal segment %d: %v", s.highestSegmentIndex, err) + } + + // Create a new segment. + newSegment, err := NewSegment(s.logger, s.highestSegmentIndex+1, s.root, s.targetFileSize) + if err != nil { + s.segmentLock.Unlock() + return fmt.Errorf("failed to create new segment: %v", err) + } + s.highestSegmentIndex++ + s.segments[s.highestSegmentIndex] = newSegment + + return nil +} + +// Write records a key-value pair in the data segment, returning the resulting address of the data. +// This method does not ensure that the key-value pair is actually written to disk, only that it is recorded +// in the data segment. Flush must be called to ensure that all data previously passed to Put is written to disk. +func (s *SegmentManager) Write(key []byte, value []byte) (Address, error) { + for { + segment, err := s.getMutableSegment() + if err != nil { + return 0, fmt.Errorf("failed to get segment: %v", err) + } + + address, ok, err := segment.Write(key, value) + segment.Release() + if err != nil { + return 0, fmt.Errorf("failed to write key-value pair: %v", err) + } + + if ok { + // We've successfully written the key-value pair. + return address, nil + } + + // The segment filled up, write did not happen. Create a new segment and try again. + err = s.attemptSegmentCreation(segment.index) + if err != nil { + return 0, fmt.Errorf("failed to create new segment: %v", err) + } + } +} + +// Read fetches the data for a key from the data segment. +func (s *SegmentManager) Read(dataAddress Address) (data []byte, err error) { + segment, err := s.getReservedSegment(dataAddress.Index()) + if err != nil { + return nil, fmt.Errorf("failed to get segment: %v", err) + } + defer segment.Release() + + data, err = segment.Read(dataAddress) + + if err != nil { + return nil, fmt.Errorf("failed to read data: %v", err) + } + + return data, nil +} + +// Flush flushes all data to disk. +func (s *SegmentManager) Flush() error { + s.segmentLock.RLock() + defer s.segmentLock.RUnlock() + + err := s.segments[s.highestSegmentIndex].Flush() + if err != nil { + return fmt.Errorf("failed to flush mutable segment: %v", err) + } + + return nil +} + +// TODO create background thread that calls this method + +// DoGarbageCollection performs garbage collection on all segments, deleting old ones as necessary. +// +// Although this method is thread safe with respect to other methods in this class, it should not +// be called concurrently with itself. +func (s *SegmentManager) DoGarbageCollection(now time.Time, ttl time.Duration) { + s.segmentLock.RLock() + defer s.segmentLock.RUnlock() + + for index := s.lowestSegmentIndex; index <= s.highestSegmentIndex; index++ { + segment := s.segments[index] + if !segment.IsSealed() { + // We can't delete an unsealed segment. + return + } + + segmentAge := now.Sub(segment.GetSealTime()) + if segmentAge < ttl { + // Segment is not old enough to be deleted. + return + } + + // Segment is old enough to be deleted. + // Actual deletion will happen when the segment is released by all reservation holders. + segment.Release() + delete(s.segments, index) + + s.lowestSegmentIndex++ + } +} diff --git a/litt/disktable/segment/segment_test.go b/litt/disktable/segment/segment_test.go new file mode 100644 index 0000000000..3460aaa6f0 --- /dev/null +++ b/litt/disktable/segment/segment_test.go @@ -0,0 +1,196 @@ +package segment + +import ( + "github.com/Layr-Labs/eigenda/common" + "github.com/Layr-Labs/eigenda/common/testutils/random" + "github.com/stretchr/testify/require" + "math" + "os" + "testing" +) + +// countFilesInDirectory returns the number of files in the given directory. +func countFilesInDirectory(t *testing.T, directory string) int { + files, err := os.ReadDir(directory) + require.NoError(t, err) + return len(files) +} + +func TestWriteAndReadSegment(t *testing.T) { + rand := random.NewTestRandom(t) + logger, err := common.NewLogger(common.DefaultTextLoggerConfig()) + require.NoError(t, err) + directory := t.TempDir() + + index := rand.Uint32() + valueCount := rand.Int32Range(100, 200) + values := make([][]byte, valueCount) + keys := make([][]byte, valueCount) + for i := 0; i < int(valueCount); i++ { + values[i] = rand.VariableBytes(1, 100) + keys[i] = rand.VariableBytes(1, 100) + } + + addressMap := make(map[Address][]byte) + + seg, err := NewSegment(logger, index, directory, math.MaxUint32) + require.NoError(t, err) + + // Write values to the segment. + for i := 0; i < int(valueCount); i++ { + key := keys[i] + value := values[i] + + address, ok, err := seg.Write(key, value) + require.NoError(t, err) + require.True(t, ok) + addressMap[address] = value + + // Occasionally flush the segment to disk. + if rand.BoolWithProbability(0.25) { + err := seg.Flush() + require.NoError(t, err) + } + + // Occasionally scan all addresses and values in the segment. Some will be on disk, some will be in memory. + if rand.BoolWithProbability(0.1) { + for addr, val := range addressMap { + readValue, err := seg.Read(addr) + require.NoError(t, err) + require.Equal(t, val, readValue) + } + } + } + + // Seal the segment and read all keys and values. + require.False(t, seg.IsSealed()) + sealTime := rand.Time() + err = seg.Seal(sealTime) + require.NoError(t, err) + require.True(t, seg.IsSealed()) + + require.Equal(t, sealTime.UnixNano(), seg.GetSealTime().UnixNano()) + + for addr, val := range addressMap { + readValue, err := seg.Read(addr) + require.NoError(t, err) + require.Equal(t, val, readValue) + } + + keysFromSegment, err := seg.GetKeys() + require.NoError(t, err) + require.Equal(t, keys, keysFromSegment) + + expectedSize := uint64(0) + for _, value := range values { + expectedSize += uint64(len(value)) + 4 + } + require.Equal(t, expectedSize, seg.CurrentSize()) + + // Reopen the segment and read all keys and values. + seg2, err := NewSegment(logger, index, directory, math.MaxUint32) + require.NoError(t, err) + require.True(t, seg2.IsSealed()) + + require.Equal(t, sealTime.UnixNano(), seg2.GetSealTime().UnixNano()) + + for addr, val := range addressMap { + readValue, err := seg2.Read(addr) + require.NoError(t, err) + require.Equal(t, val, readValue) + } + + keysFromSegment2, err := seg2.GetKeys() + require.NoError(t, err) + require.Equal(t, keys, keysFromSegment2) + + require.Equal(t, expectedSize, seg2.CurrentSize()) + + // delete the segment + require.Equal(t, 3, countFilesInDirectory(t, directory)) + + err = seg.delete() + require.NoError(t, err) + + require.Equal(t, 0, countFilesInDirectory(t, directory)) +} + +func TestWriteToFullSegment(t *testing.T) { + rand := random.NewTestRandom(t) + logger, err := common.NewLogger(common.DefaultTextLoggerConfig()) + require.NoError(t, err) + directory := t.TempDir() + + sizeOfAllButLastValue := uint64(0) + sizeOfLastValue := uint64(0) + + index := rand.Uint32() + valueCount := rand.Int32Range(100, 200) + values := make([][]byte, valueCount) + keys := make([][]byte, valueCount) + for i := 0; i < int(valueCount); i++ { + values[i] = rand.VariableBytes(1, 100) + keys[i] = rand.VariableBytes(1, 100) + + if i < int(valueCount)-1 { + sizeOfAllButLastValue += uint64(len(values[i])) + 4 + } else { + sizeOfLastValue = uint64(len(values[i])) + 4 + } + } + + addressMap := make(map[Address][]byte) + + capacity := rand.Uint32Range(uint32(sizeOfAllButLastValue), uint32(sizeOfAllButLastValue+sizeOfLastValue)) + seg, err := NewSegment(logger, index, directory, capacity) + require.NoError(t, err) + + // Write the values. All but the last one should fit. + for i := 0; i < int(valueCount)-1; i++ { + key := keys[i] + value := values[i] + + address, ok, err := seg.Write(key, value) + require.NoError(t, err) + require.True(t, ok) + addressMap[address] = value + } + + // The last value should not fit. + key := keys[int(valueCount)-1] + value := values[int(valueCount)-1] + _, ok, err := seg.Write(key, value) + require.NoError(t, err) + require.False(t, ok) + + // Read the values back. All but the last one should be there. + require.False(t, seg.IsSealed()) + sealTime := rand.Time() + err = seg.Seal(sealTime) + require.NoError(t, err) + + for addr, val := range addressMap { + readValue, err := seg.Read(addr) + require.NoError(t, err) + require.Equal(t, val, readValue) + } + + keysFromSegment, err := seg.GetKeys() + require.NoError(t, err) + require.Equal(t, keys[:len(keys)-1], keysFromSegment) + + // Measure the value file size on disk. It should exactly match sizeOfAllButLastValue. + require.Equal(t, sizeOfAllButLastValue, seg.CurrentSize()) + bytesFromFile, err := os.ReadFile(seg.values.path()) + require.NoError(t, err) + require.Equal(t, int(sizeOfAllButLastValue), len(bytesFromFile)) + + // Delete the segment. + require.Equal(t, 3, countFilesInDirectory(t, directory)) + err = seg.delete() + require.NoError(t, err) + require.Equal(t, 0, countFilesInDirectory(t, directory)) +} + +// TODO reservation tests +// TODO future cody: start here, then finish garbage collection on the segment manager, then test segment manager diff --git a/litt/disktable/segment/util.go b/litt/disktable/segment/util.go new file mode 100644 index 0000000000..4e9d37c325 --- /dev/null +++ b/litt/disktable/segment/util.go @@ -0,0 +1,54 @@ +package segment + +import ( + "fmt" + "os" + "path/filepath" +) + +// verifyFilePermissions checks if a file has read/write permissions and is a regular file (if it exists), +// returning an error if it does not if the file permissions or file type is not as expected. +// Also returns a boolean indicating if the file exists and its size (to save on additional os.Stat calls). +// +// A file is considered to have the correct permissions/type if: +// - it exists and is a standard file with read+write permissions +// - if it does not exist but its parent directory has read+write permissions. +// +// The arguments for the function are the result of os.Stat(path). There is no need to do error checking on the +// result of os.Stat in the calling context (this method does it for you). +func verifyFilePermissions(path string) (exists bool, size int64, err error) { + info, err := os.Stat(path) + if err != nil { + if os.IsNotExist(err) { + // The file does not exist. Check the parent. + parentPath := filepath.Dir(path) + parentInfo, err := os.Stat(parentPath) + if err != nil { + if os.IsNotExist(err) { + return false, -1, fmt.Errorf("parent directory %s does not exist", parentPath) + } + return false, -1, fmt.Errorf("failed to stat parent directory %s: %w", parentPath, err) + } + + if !parentInfo.IsDir() { + return false, -1, fmt.Errorf("parent directory %s is not a directory", parentPath) + } + + if parentInfo.Mode()&0700 != 0700 { + return false, -1, fmt.Errorf("parent directory %s has insufficent permissions", parentPath) + } + } + + return false, 0, nil + } + + // File exists. Check if it is a regular file and that it is readable+writeable. + if info.IsDir() { + return false, -1, fmt.Errorf("file %s is a directory", path) + } + if info.Mode()&0600 != 0600 { + return false, -1, fmt.Errorf("file %s has insufficent permissions", path) + } + + return true, info.Size(), nil +} diff --git a/litt/disktable/segment/value_file.go b/litt/disktable/segment/value_file.go new file mode 100644 index 0000000000..671b6c4a40 --- /dev/null +++ b/litt/disktable/segment/value_file.go @@ -0,0 +1,216 @@ +package segment + +import ( + "bufio" + "encoding/binary" + "fmt" + "github.com/Layr-Labs/eigensdk-go/logging" + "math" + "os" + "path" +) + +// ValuesFileExtension is the file extension for the values file. This file contains the values for the data +// segment. +const ValuesFileExtension = ".values" + +// valueFile represents a file that stores values. +type valueFile struct { + // The logger for the value file. + logger logging.Logger + + // The segment index. + index uint32 + + // The parent directory containing this file. + parentDirectory string + + // The writer for the file. If the file is sealed, this value is nil. + writer *bufio.Writer + + // The current size of the file in bytes. Includes both flushed and unflushed data. + currentSize uint64 + + // a cache of unflushed data. It's only safe to look up a value from the file if it's not in this map. + unflushedData map[Address][]byte +} + +// newValueFile creates a new value file. +func newValueFile( + logger logging.Logger, + index uint32, + parentDirectory string, + sealed bool) (*valueFile, error) { + + values := &valueFile{ + logger: logger, + index: index, + parentDirectory: parentDirectory, + unflushedData: make(map[Address][]byte), + } + + filePath := values.path() + exists, size, err := verifyFilePermissions(filePath) + if err != nil { + return nil, fmt.Errorf("file %s has incorrect permissions: %v", filePath, err) + } + + values.currentSize = uint64(size) + + if sealed { + if !exists { + return nil, fmt.Errorf("value file %s does not exist", filePath) + } + + } else { + if exists { + return nil, fmt.Errorf("value file %s already exists", filePath) + } + + // Open the file for writing. + file, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE, 0644) + if err != nil { + return nil, fmt.Errorf("failed to open value file %s: %v", filePath, err) + } + + values.writer = bufio.NewWriter(file) + } + + return values, nil +} + +// name returns the name of the value file. +func (v *valueFile) name() string { + return fmt.Sprintf("%d%s", v.index, ValuesFileExtension) +} + +// path returns the path to the value file. +func (v *valueFile) path() string { + return path.Join(v.parentDirectory, v.name()) +} + +// read reads a value from the value file. +func (v *valueFile) read(address Address) ([]byte, error) { + if v.index != address.Index() { + return nil, fmt.Errorf("address %v does not belong to this value file", address) + } + + if value, ok := v.unflushedData[address]; ok { + // The value is in the unflushed data cache. + return value, nil + } + + if uint64(address.Offset()) >= v.currentSize { + return nil, fmt.Errorf("address %v is out of bounds", address) + } + + file, err := os.OpenFile(v.path(), os.O_RDONLY, 0644) + if err != nil { + return nil, fmt.Errorf("failed to open value file: %v", err) + } + defer func() { + err = file.Close() + if err != nil { + v.logger.Errorf("failed to close value file: %v", err) + } + }() + + _, err = file.Seek(int64(address.Offset()), 0) + reader := bufio.NewReader(file) + + // Read the length of the value. + var length uint32 + err = binary.Read(reader, binary.BigEndian, &length) + if err != nil { + return nil, fmt.Errorf("failed to read value length from value file: %v", err) + } + + // Read the value itself. + value := make([]byte, length) + bytesRead, err := reader.Read(value) + if err != nil { + return nil, fmt.Errorf("failed to read value from value file: %v", err) + } + + if uint32(bytesRead) != length { + return nil, fmt.Errorf("failed to read value from value file: read %d bytes, expected %d", bytesRead, length) + } + + return value, nil +} + +// write writes a value to the value file, returning its Address. +func (v *valueFile) write(value []byte) (Address, error) { + if v.writer == nil { + return 0, fmt.Errorf("value file is sealed") + } + + if v.currentSize+uint64(len(value)) >= math.MaxUint32 { + // No matter what, we can't start a new value if its first byte would be at position 2^32 or beyond. + // This is because we only have 32 bits in an address to store the position of a value's first byte. + return 0, fmt.Errorf("value file already contains %d bytes, cannot add a new value", v.currentSize) + } + + address := NewAddress(v.index, uint32(v.currentSize)) + v.unflushedData[address] = value + + // First, write the length of the value. + err := binary.Write(v.writer, binary.BigEndian, uint32(len(value))) + if err != nil { + return 0, fmt.Errorf("failed to write value length to value file: %v", err) + } + + // Then, write the value itself. + _, err = v.writer.Write(value) + if err != nil { + return 0, fmt.Errorf("failed to write value to value file: %v", err) + } + + v.currentSize += uint64(len(value)) + 4 + + return address, nil +} + +// flush writes all unflushed data to disk. +func (v *valueFile) flush() error { + if v.writer == nil { + return fmt.Errorf("value file is sealed") + } + + err := v.writer.Flush() + if err != nil { + return fmt.Errorf("failed to flush value file: %v", err) + } + + v.unflushedData = make(map[Address][]byte) + return nil +} + +// seal seals the value file. +func (v *valueFile) seal() error { + if v.writer == nil { + return fmt.Errorf("value file is already sealed") + } + + err := v.flush() + if err != nil { + return fmt.Errorf("failed to flush value file: %v", err) + } + + v.writer = nil + return nil +} + +// delete deletes the value file. +func (v *valueFile) delete() error { + if v.writer != nil { + return fmt.Errorf("value file is not sealed") + } + + err := os.Remove(v.path()) + if err != nil { + return fmt.Errorf("failed to delete value file: %v", err) + } + + return nil +} diff --git a/litt/disktable/segment/value_file_test.go b/litt/disktable/segment/value_file_test.go new file mode 100644 index 0000000000..3bbd004b88 --- /dev/null +++ b/litt/disktable/segment/value_file_test.go @@ -0,0 +1,160 @@ +package segment + +import ( + "github.com/Layr-Labs/eigenda/common" + "github.com/Layr-Labs/eigenda/common/testutils/random" + "github.com/stretchr/testify/require" + "os" + "testing" +) + +func TestWriteThenReadValues(t *testing.T) { + rand := random.NewTestRandom(t) + logger, err := common.NewLogger(common.DefaultTextLoggerConfig()) + require.NoError(t, err) + directory := t.TempDir() + + index := rand.Uint32() + valueCount := rand.Int32Range(100, 200) + values := make([][]byte, valueCount) + for i := 0; i < int(valueCount); i++ { + values[i] = rand.VariableBytes(1, 100) + } + + addressMap := make(map[Address][]byte) + + file, err := newValueFile(logger, index, directory, false) + require.NoError(t, err) + + for _, value := range values { + address, err := file.write(value) + require.NoError(t, err) + addressMap[address] = value + + // Occasionally flush the file to disk. + if rand.BoolWithProbability(0.25) { + err := file.flush() + require.NoError(t, err) + } + + // Occasionally scan all addresses and values in the file. Some will be on disk, some will be in memory. + if rand.BoolWithProbability(0.1) { + for key, val := range addressMap { + readValue, err := file.read(key) + require.NoError(t, err) + require.Equal(t, val, readValue) + } + } + } + + // Seal the file and read all values. + err = file.seal() + require.NoError(t, err) + for key, val := range addressMap { + readValue, err := file.read(key) + require.NoError(t, err) + require.Equal(t, val, readValue) + } + + // delete the file + filePath := file.path() + _, err = os.Stat(filePath) + require.NoError(t, err) + + err = file.delete() + require.NoError(t, err) + + _, err = os.Stat(filePath) + require.True(t, os.IsNotExist(err)) +} + +func TestReadingTruncatedValueFile(t *testing.T) { + rand := random.NewTestRandom(t) + logger, err := common.NewLogger(common.DefaultTextLoggerConfig()) + require.NoError(t, err) + directory := t.TempDir() + + index := rand.Uint32() + valueCount := rand.Int32Range(100, 200) + values := make([][]byte, valueCount) + for i := 0; i < int(valueCount); i++ { + values[i] = rand.VariableBytes(1, 100) + } + + addressMap := make(map[Address][]byte) + + file, err := newValueFile(logger, index, directory, false) + require.NoError(t, err) + + var lastAddress Address + for _, value := range values { + address, err := file.write(value) + require.NoError(t, err) + addressMap[address] = value + lastAddress = address + } + + err = file.seal() + require.NoError(t, err) + + // Truncate the file. Chop off some bytes from the last value, but do not corrupt the length prefix. + lastValueLength := len(values[valueCount-1]) + + filePath := file.path() + + originalBytes, err := os.ReadFile(filePath) + require.NoError(t, err) + + bytesToRemove := rand.Int32Range(1, int32(lastValueLength)+1) + bytes := originalBytes[:len(originalBytes)-int(bytesToRemove)] + + err = os.WriteFile(filePath, bytes, 0644) + require.NoError(t, err) + + file, err = newValueFile(logger, index, directory, true) + require.NoError(t, err) + + // We should be able to read all values except for the last one. + for key, val := range addressMap { + if key == lastAddress { + _, err := file.read(key) + require.Error(t, err) + } else { + readValue, err := file.read(key) + require.NoError(t, err) + require.Equal(t, val, readValue) + } + } + + // Truncate the file. Corrupt the length prefix of the last value. + prefixBytesToRemove := rand.Int32Range(1, 4) + bytes = originalBytes[:len(originalBytes)-int(prefixBytesToRemove)] + + err = os.WriteFile(filePath, bytes, 0644) + require.NoError(t, err) + + file, err = newValueFile(logger, index, directory, true) + require.NoError(t, err) + + // We should be able to read all values except for the last one. + for key, val := range addressMap { + if key == lastAddress { + _, err := file.read(key) + require.Error(t, err) + } else { + readValue, err := file.read(key) + require.NoError(t, err) + require.Equal(t, val, readValue) + } + } + + // delete the file + _, err = os.Stat(filePath) + require.NoError(t, err) + + err = file.delete() + require.NoError(t, err) + + _, err = os.Stat(filePath) + require.True(t, os.IsNotExist(err)) +} diff --git a/litt/littbuilder/builder.go b/litt/littbuilder/builder.go new file mode 100644 index 0000000000..4e18002d56 --- /dev/null +++ b/litt/littbuilder/builder.go @@ -0,0 +1,28 @@ +package littbuilder + +import ( + "fmt" + "github.com/Layr-Labs/eigenda/litt" + "github.com/Layr-Labs/eigenda/litt/memtable" + "time" +) + +// NewDB builds a new litt.DB. +func NewDB(config *Config) (litt.DB, error) { + var tb tableBuilder + switch config.Type { + case DiskDB: + tb = func(timeSource func() time.Time, name string, ttl time.Duration) (litt.ManagedTable, error) { + return nil, nil // TODO + } + case MemDB: + tb = func(timeSource func() time.Time, name string, ttl time.Duration) (litt.ManagedTable, error) { + return memtable.NewMemTable(timeSource, name, ttl), nil + } + default: + return nil, fmt.Errorf("unsupported DB type: %v", config.Type) + } + + database := newDB(config.TimeSource, config.TTL, config.GCPeriod, tb) + return database, nil +} diff --git a/litt/littbuilder/config.go b/litt/littbuilder/config.go new file mode 100644 index 0000000000..df2c1d744d --- /dev/null +++ b/litt/littbuilder/config.go @@ -0,0 +1,37 @@ +package littbuilder + +import "time" + +type DBType int + +const DiskDB DBType = 0 +const MemDB DBType = 1 + +// Config is configuration for a litt.DB. +type Config struct { + // The path where the database will store its files. If the path does not exist, it will be created. + // If the path exists, the database will attempt to open the existing database at that path. + Path string + + // The type of the DB. Choices are DiskDB and MemDB. Default is DiskDB. + Type DBType + + // The default TTL for newly created tables (either ones with data on disk or new tables). + // The default is 0 (no TTL). + TTL time.Duration + + // The period between garbage collection runs. The default is 5 minutes. + GCPeriod time.Duration + + // The time source used by the database. This can be substituted for an artificial time source + // for testing purposes. The default is time.Now. + TimeSource func() time.Time +} + +// DefaultConfig returns a Config with default values. +func DefaultConfig() *Config { + return &Config{ + TimeSource: time.Now, + GCPeriod: 5 * time.Minute, + } +} diff --git a/litt/littbuilder/db_impl.go b/litt/littbuilder/db_impl.go new file mode 100644 index 0000000000..5a81438406 --- /dev/null +++ b/litt/littbuilder/db_impl.go @@ -0,0 +1,122 @@ +package littbuilder + +import ( + "fmt" + "github.com/Layr-Labs/eigenda/litt" + "sync" + "sync/atomic" + "time" +) + +var _ litt.DB = &db{} + +// tableBuilder is a function that creates a new table. +type tableBuilder func(timeSource func() time.Time, name string, ttl time.Duration) (litt.ManagedTable, error) + +// db is an implementation of DB. +type db struct { + // A function that returns the current time. + timeSource func() time.Time + + // The time-to-live for newly created tables. + ttl time.Duration + + // The period between garbage collection runs. + gcPeriod time.Duration + + // A function that creates new tables. + tableBuilder tableBuilder + + // A map of all tables in the database. + tables map[string]litt.ManagedTable + + // A flag that indicates whether the database is alive (i.e. Stop() has not been called). + alive atomic.Bool + + // Protects access to tables and ttl. + lock sync.Mutex +} + +func newDB( + timeSource func() time.Time, + ttl time.Duration, + gcPeriod time.Duration, + tableBuilder tableBuilder) litt.DB { + + return &db{ + timeSource: timeSource, + ttl: ttl, + gcPeriod: gcPeriod, + tableBuilder: tableBuilder, + } +} + +func (d *db) GetTable(name string) (litt.Table, error) { + d.lock.Lock() + defer d.lock.Unlock() + + table, ok := d.tables[name] + if !ok { + var err error + table, err = d.tableBuilder(d.timeSource, name, d.ttl) + if err != nil { + return nil, fmt.Errorf("error creating table: %w", err) + } + + d.tables[name] = table + } + + return table, nil +} + +func (d *db) DropTable(name string) error { + d.lock.Lock() + defer d.lock.Unlock() + + table, ok := d.tables[name] + if !ok { + return fmt.Errorf("table %s does not exist", name) + } + + err := table.Destroy() + if err != nil { + return fmt.Errorf("error destroying table: %w", err) + } + delete(d.tables, name) + + return nil +} + +func (d *db) Start() { + d.alive.Store(true) + + ticker := time.NewTicker(d.gcPeriod) + go func() { + for d.alive.Load() { + <-ticker.C + d.doGarbageCollection() + } + }() +} + +func (d *db) Stop() { + d.alive.Store(false) +} + +// doGarbageCollection performs garbage collection on all tables in the database. +func (d *db) doGarbageCollection() { + tables := make([]litt.ManagedTable, 0, len(d.tables)) + d.lock.Lock() + for _, table := range d.tables { + tables = append(tables, table) + } + d.lock.Unlock() + + for _, table := range tables { + err := table.DoGarbageCollection() + if err != nil { + // TODO log! + panic(err) + } + } +} diff --git a/litt/memtable/mem_table.go b/litt/memtable/mem_table.go new file mode 100644 index 0000000000..7df129e355 --- /dev/null +++ b/litt/memtable/mem_table.go @@ -0,0 +1,136 @@ +package memtable + +import ( + "fmt" + "github.com/Layr-Labs/eigenda/litt" + "github.com/emirpasic/gods/queues" + "github.com/emirpasic/gods/queues/linkedlistqueue" + "sync" + "time" +) + +var _ litt.ManagedTable = &memTable{} + +// expirationRecord is a record of when a key was inserted into the table, and for when it should be deleted. +type expirationRecord struct { + // The time at which the key was inserted into the table. + creationTime time.Time + // A stringified version of the key. + key string +} + +// memTable is a simple implementation of a Table that stores its data in memory. +type memTable struct { + // A function that returns the current time. + timeSource func() time.Time + + // The name of the table. + name string + + // The time-to-live for data in this table. + ttl time.Duration + + // The actual data store. + data map[string][]byte + + // Keeps track of when data should be deleted. + expirationQueue queues.Queue + + // Protects access to data and expirationQueue. + // + // This implementation could be made with smaller granularity locks to improve multithreaded performance, + // at the cost of code complexity. But since this implementation is primary intended for use in tests, + // such optimization is not necessary. + lock sync.RWMutex +} + +// NewMemTable creates a new in-memory table. +func NewMemTable(timeSource func() time.Time, name string, ttl time.Duration) litt.ManagedTable { + return &memTable{ + timeSource: timeSource, + name: name, + ttl: ttl, + data: make(map[string][]byte), + expirationQueue: linkedlistqueue.New(), + } +} + +func (m *memTable) Name() string { + return m.name +} + +func (m *memTable) Put(key []byte, value []byte) error { + stringKey := string(key) + expiration := &expirationRecord{ + creationTime: m.timeSource(), + key: stringKey, + } + + m.lock.Lock() + defer m.lock.Unlock() + + _, ok := m.data[stringKey] + if ok { + return fmt.Errorf("key %x already exists", key) + } + m.data[stringKey] = value + m.expirationQueue.Enqueue(expiration) + + return nil +} + +func (m *memTable) Get(key []byte) ([]byte, error) { + m.lock.RLock() + defer m.lock.RUnlock() + + value, ok := m.data[string(key)] + if !ok { + return nil, fmt.Errorf("key %x does not exist", key) + } + + return value, nil +} + +func (m *memTable) Flush() error { + // This is a no-op for a memory table. Memory tables are ephemeral by nature. + return nil +} + +func (m *memTable) SetTTL(ttl time.Duration) { + m.lock.Lock() + defer m.lock.Unlock() + m.ttl = ttl +} + +func (m *memTable) DoGarbageCollection() error { + m.lock.Lock() + defer m.lock.Unlock() + + now := m.timeSource() + earliestPermittedCreationTime := now.Add(-m.ttl) + + for { + item, ok := m.expirationQueue.Peek() + if !ok { + break + } + expiration := item.(*expirationRecord) + if expiration.creationTime.After(earliestPermittedCreationTime) { + break + } + m.expirationQueue.Dequeue() + delete(m.data, expiration.key) + } + + return nil +} + +func (m *memTable) Destroy() error { + m.lock.Lock() + defer m.lock.Unlock() + + m.data = make(map[string][]byte) + m.expirationQueue.Clear() + + return nil +} diff --git a/litt/table.go b/litt/table.go new file mode 100644 index 0000000000..19cbdcba64 --- /dev/null +++ b/litt/table.go @@ -0,0 +1,48 @@ +package litt + +import "time" + +// Table is a key-value store with a namespace that does not overlap with other tables. +// Values may be written to the table, but once written, they may not be changed or deleted (except via TTL). +type Table interface { + // Name returns the name of the table. + Name() string + + // Put stores a value in the database. May not be used to overwrite an existing value. + // Note that when this method returns, data written may not be crash durable on disk + // (although the write does have atomicity). In order to ensure crash durability, call Flush(). + Put(key []byte, value []byte) error + + // Get retrieves a value from the database. Returns an error if the value does not exist. + // + // For the sake of performance, the returned data is NOT safe to mutate. If you need to modify the data, + // make a copy of it first. Better to avoid a copy if it's not necessary, though. + Get(key []byte) ([]byte, error) + + // Flush ensures that all data written to the database is crash durable on disk. When this method returns, + // all data written by Put() operations is guaranteed to be crash durable. Put() operations called synchronously + // with this method may not be crash durable after this method returns. + // + // Note that data flushed at the same time is not atomic. If the process crashes mid-flush, some data + // being flushed may become persistent, while some may not. Each individual key-value pair is atomic + // in the event of a crash, though. + Flush() error + + // SetTTL sets the time to live for data in this table. This TTL is immediately applied to data already in + // the table. Note that deletion is lazy. That is, when the data expires, it may not be deleted immediately. + SetTTL(ttl time.Duration) + + // TODO: methods that return size in keys, size of keys in bytes, and size of data in bytes +} + +// ManagedTable is a Table that can perform garbage collection on its data. This type should not be directly used +// by clients, and is a type that is used internally by the database. +type ManagedTable interface { + Table + + // DoGarbageCollection performs garbage collection on the table, deleting values that have outlived their TTL. + DoGarbageCollection() error + + // Destroy cleans up resources used by the table. Table will not be usable after this method is called. + Destroy() error +}