Skip to content

Commit

Permalink
Uninline slab when it is overwritten or removed
Browse files Browse the repository at this point in the history
Currently, Set() and Remove() return overwritten or removed storable.
If storable is inlined slab, it is not stored in storage (because it
is removed from its parent slab which is in storage), so any future
changes to it would be lost.

On the other hand, if overwritten or removed storable is SlabIDStorable,
any future changes to it can still be persisted because it is in
its own slab in storage.

This inconsistency (not merged or deployed yet) can cause potential
data loss or unexpected behavior if deployed.

This commit uninlines inlined slabs that is overwritten or removed,
stores them in storage, and returns their SlabID as storable to caller.
  • Loading branch information
fxamacker committed Oct 1, 2023
1 parent a75e388 commit 042eb68
Show file tree
Hide file tree
Showing 4 changed files with 1,229 additions and 55 deletions.
146 changes: 116 additions & 30 deletions array.go
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,56 @@ func (a *ArrayDataSlab) Inlinable(maxInlineSize uint64) bool {
return uint64(inlinedSize) <= maxInlineSize
}

// inline converts not-inlined ArrayDataSlab to inlined ArrayDataSlab and removes it from storage.
func (a *ArrayDataSlab) inline(storage SlabStorage) error {
if a.inlined {
return NewFatalError(fmt.Errorf("failed to inline ArrayDataSlab %s: it is inlined already", a.header.slabID))
}

id := a.header.slabID

// Remove slab from storage because it is going to be inlined.
err := storage.Remove(id)
if err != nil {
// Wrap err as external error (if needed) because err is returned by SlabStorage interface.
return wrapErrorfAsExternalErrorIfNeeded(err, fmt.Sprintf("failed to remove slab %s", id))
}

// Update data slab size as inlined slab.
a.header.size = a.header.size -
arrayRootDataSlabPrefixSize +
inlinedArrayDataSlabPrefixSize

// Update data slab inlined status.
a.inlined = true

return nil
}

// uninline converts an inlined ArrayDataSlab to uninlined ArrayDataSlab and stores it in storage.
func (a *ArrayDataSlab) uninline(storage SlabStorage) error {
if !a.inlined {
return NewFatalError(fmt.Errorf("failed to un-inline ArrayDataSlab %s: it is not inlined", a.header.slabID))
}

// Update data slab size
a.header.size = a.header.size -
inlinedArrayDataSlabPrefixSize +
arrayRootDataSlabPrefixSize

// Update data slab inlined status
a.inlined = false

// Store slab in storage
err := storage.Store(a.header.slabID, a)
if err != nil {
// Wrap err as external error (if needed) because err is returned by SlabStorage interface.
return wrapErrorfAsExternalErrorIfNeeded(err, fmt.Sprintf("failed to store slab %s", a.header.slabID))
}

return nil
}

func (a *ArrayDataSlab) hasPointer() bool {
for _, e := range a.elements {
if hasPointer(e) {
Expand Down Expand Up @@ -2740,7 +2790,7 @@ func (a *Array) setCallbackWithChild(i uint64, child Value, maxInlineSize uint64

// Set child value with parent array using updated index.
// Set() calls c.Storable() which returns inlined or not-inlined child storable.
existingValueStorable, err := a.Set(index, c)
existingValueStorable, err := a.set(index, c)
if err != nil {
return err
}
Expand Down Expand Up @@ -2813,6 +2863,34 @@ func (a *Array) Get(i uint64) (Value, error) {
}

func (a *Array) Set(index uint64, value Value) (Storable, error) {
existingStorable, err := a.set(index, value)
if err != nil {
return nil, err
}

// If overwritten storable is an inlined slab, uninline the slab and store it in storage.
// This is to prevent potential data loss because the overwritten inlined slab was not in
// storage and any future changes to it would have been lost.
switch s := existingStorable.(type) {
case *ArrayDataSlab:
err = s.uninline(a.Storage)
if err != nil {
return nil, err
}
existingStorable = SlabIDStorable(s.header.slabID)

case *MapDataSlab:
err = s.uninline(a.Storage)
if err != nil {
return nil, err
}
existingStorable = SlabIDStorable(s.header.slabID)
}

return existingStorable, nil
}

func (a *Array) set(index uint64, value Value) (Storable, error) {
existingStorable, err := a.root.Set(a.Storage, a.Address(), index, value)
if err != nil {
// Don't need to wrap error as external error because err is already categorized by ArraySlab.Set().
Expand All @@ -2825,7 +2903,6 @@ func (a *Array) Set(index uint64, value Value) (Storable, error) {
// Don't need to wrap error as external error because err is already categorized by Array.splitRoot().
return nil, err
}
return existingStorable, nil
}

if !a.root.IsData() {
Expand Down Expand Up @@ -2879,8 +2956,11 @@ func (a *Array) Insert(index uint64, value Value) error {
}

if a.root.IsFull() {
// Don't need to wrap error as external error because err is already categorized by Array.splitRoot().
return a.splitRoot()
err = a.splitRoot()
if err != nil {
// Don't need to wrap error as external error because err is already categorized by Array.splitRoot().
return err
}
}

a.incrementIndexFrom(index)
Expand Down Expand Up @@ -2913,6 +2993,34 @@ func (a *Array) Insert(index uint64, value Value) error {
}

func (a *Array) Remove(index uint64) (Storable, error) {
storable, err := a.remove(index)
if err != nil {
return nil, err
}

// If overwritten storable is an inlined slab, uninline the slab and store it in storage.
// This is to prevent potential data loss because the overwritten inlined slab was not in
// storage and any future changes to it would have been lost.
switch s := storable.(type) {
case *ArrayDataSlab:
err = s.uninline(a.Storage)
if err != nil {
return nil, err
}
storable = SlabIDStorable(s.header.slabID)

case *MapDataSlab:
err = s.uninline(a.Storage)
if err != nil {
return nil, err
}
storable = SlabIDStorable(s.header.slabID)
}

return storable, nil
}

func (a *Array) remove(index uint64) (Storable, error) {
storable, err := a.root.Remove(a.Storage, index)
if err != nil {
// Don't need to wrap error as external error because err is already categorized by ArraySlab.Remove().
Expand Down Expand Up @@ -3088,23 +3196,11 @@ func (a *Array) Storable(_ SlabStorage, _ Address, maxInlineSize uint64) (Storab
return nil, NewFatalError(fmt.Errorf("unexpected inlinable array slab type %T", a.root))
}

rootID := rootDataSlab.header.slabID

// Remove root slab from storage because it is going to be inlined.
err := a.Storage.Remove(rootID)
err := rootDataSlab.inline(a.Storage)
if err != nil {
// Wrap err as external error (if needed) because err is returned by SlabStorage interface.
return nil, wrapErrorfAsExternalErrorIfNeeded(err, fmt.Sprintf("failed to remove slab %s", rootID))
return nil, err
}

// Update root data slab size as inlined slab.
rootDataSlab.header.size = rootDataSlab.header.size -
arrayRootDataSlabPrefixSize +
inlinedArrayDataSlabPrefixSize

// Update root data slab inlined status.
rootDataSlab.inlined = true

return rootDataSlab, nil

case !inlinable && inlined:
Expand All @@ -3119,19 +3215,9 @@ func (a *Array) Storable(_ SlabStorage, _ Address, maxInlineSize uint64) (Storab
return nil, NewFatalError(fmt.Errorf("unexpected inlined array slab type %T", a.root))
}

// Update root data slab size
rootDataSlab.header.size = rootDataSlab.header.size -
inlinedArrayDataSlabPrefixSize +
arrayRootDataSlabPrefixSize

// Update root data slab inlined status.
rootDataSlab.inlined = false

// Store root slab in storage
err := a.Storage.Store(rootDataSlab.header.slabID, a.root)
err := rootDataSlab.uninline(a.Storage)
if err != nil {
// Wrap err as external error (if needed) because err is returned by SlabStorage interface.
return nil, wrapErrorfAsExternalErrorIfNeeded(err, fmt.Sprintf("failed to store slab %s", a.SlabID()))
return nil, err
}

return SlabIDStorable(a.SlabID()), nil
Expand Down
Loading

0 comments on commit 042eb68

Please sign in to comment.