Skip to content

Commit

Permalink
fix: Remove duplication of block heads on delete (#3096)
Browse files Browse the repository at this point in the history
## Relevant issue(s)

Resolves #3085 #3089

Documents #3056 #3086 #3087 (I'm going to close these on merge, no need
to have them littering the backlog)

## Description

Removes the duplication of head links from delete blocks.

PR also includes the following to save the hassle of multiple test-cid
updates:
- Removes `fieldName` from composite block deltas
- Removes the magic `_head` link name, and extracts head links to a new,
optional prop
- Documents the reasons for duplicating various bits of data in the
blockstore blocks as discussed in standup

With the actions defined in
`TestQueryCommitsWithFieldIDFieldWithUpdate`, create block size has been
reduced by 4%, and update block size by 7% - this will vary a lot
depending on what fields are being updated though, the test chosen to
calc was just the first test I found that created one small doc, and
updated a single field.

I recommend reviewing commit by commit. The test-cid changes have been
pulled out to their own commit.
  • Loading branch information
AndrewSisley authored Oct 3, 2024
1 parent e59f6d9 commit 4e5470c
Show file tree
Hide file tree
Showing 42 changed files with 431 additions and 359 deletions.
3 changes: 3 additions & 0 deletions docs/data_format_changes/i3085-block-trim.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Remove duplication of block heads on delete

The structure of blocks in the blockstore was reworked slightly - head links have been extracted to a separate property, and fieldName has been removed from composite blocks.
71 changes: 55 additions & 16 deletions internal/core/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ type DAGLink struct {
// Name is the name of the link.
//
// This will be either the field name of the CRDT delta or "_head" for the head link.
//
// This field currently serves no purpose and is duplicating data already held on the target
// block. However we want to have this long term to enable some fancy P2P magic to allow users
// to configure the collection to only sync particular fields using
// [GraphSync](https://github.com/ipfs/go-graphsync) which will need to make use of this property.
Name string
// Link is the CID link to the object.
cidlink.Link
Expand Down Expand Up @@ -121,8 +126,21 @@ type Encryption struct {
type Block struct {
// Delta is the CRDT delta that is stored in the block.
Delta crdt.CRDT

// The previous block-CIDs that this block is based on.
//
// For example:
// - This will be empty for all 'create' blocks.
// - Most 'update' blocks will have a single head, however they will have multiple if the history has
// diverged and there were multiple blocks at the previous height.
Heads []cidlink.Link

// Links are the links to other blocks in the DAG.
//
// This does not include `Heads`. This will be empty for field-level blocks. It will never be empty
// for composite blocks (and will contain links to field-level blocks).
Links []DAGLink

// Encryption contains the encryption information for the block's delta.
// It needs to be a pointer so that it can be translated from and to `optional` in the IPLD schema.
Encryption *cidlink.Link
Expand All @@ -137,20 +155,25 @@ func (block *Block) IsEncrypted() bool {
func (block *Block) Clone() *Block {
return &Block{
Delta: block.Delta.Clone(),
Heads: block.Heads,
Links: block.Links,
Encryption: block.Encryption,
}
}

// GetHeadLinks returns the CIDs of the previous blocks. There can be more than 1 with multiple heads.
func (block *Block) GetHeadLinks() []cid.Cid {
var heads []cid.Cid
// AllLinks returns the block `Heads` and `Links` combined into a single set.
//
// All heads will be first in the set, followed by other links.
func (block *Block) AllLinks() []cidlink.Link {
result := make([]cidlink.Link, 0, len(block.Heads)+len(block.Links))

result = append(result, block.Heads...)

for _, link := range block.Links {
if link.Name == core.HEAD {
heads = append(heads, link.Cid)
}
result = append(result, link.Link)
}
return heads

return result
}

// IPLDSchemaBytes returns the IPLD schema representation for the block.
Expand All @@ -160,7 +183,8 @@ func (block *Block) IPLDSchemaBytes() []byte {
return []byte(`
type Block struct {
delta CRDT
links [DAGLink]
heads optional [Link]
links optional [DAGLink]
encryption optional Link
}
`)
Expand All @@ -181,20 +205,17 @@ func (enc *Encryption) IPLDSchemaBytes() []byte {

// New creates a new block with the given delta and links.
func New(delta core.Delta, links []DAGLink, heads ...cid.Cid) *Block {
blockLinks := make([]DAGLink, 0, len(links)+len(heads))

// Sort the heads lexicographically by CID.
// We need to do this to ensure that the block is deterministic.
sort.Slice(heads, func(i, j int) bool {
return strings.Compare(heads[i].String(), heads[j].String()) < 0
})

headLinks := make([]cidlink.Link, 0, len(heads))
for _, head := range heads {
blockLinks = append(
blockLinks,
DAGLink{
Name: core.HEAD,
Link: cidlink.Link{Cid: head},
},
headLinks = append(
headLinks,
cidlink.Link{Cid: head},
)
}

Expand All @@ -204,9 +225,27 @@ func New(delta core.Delta, links []DAGLink, heads ...cid.Cid) *Block {
return strings.Compare(links[i].Cid.String(), links[j].Cid.String()) < 0
})

blockLinks := make([]DAGLink, 0, len(links))
blockLinks = append(blockLinks, links...)

if len(headLinks) == 0 {
// The encoding used for block serialization will consume space if an empty set is
// provided, but it will not consume space if nil is provided, so if empty we set it
// to nil. The would-be space consumed includes the property name, so this is not an
// insignificant amount.
headLinks = nil
}

if len(blockLinks) == 0 {
// The encoding used for block serialization will consume space if an empty set is
// provided, but it will not consume space if nil is provided, so if empty we set it
// to nil. The would-be space consumed includes the property name, so this is not an
// insignificant amount.
blockLinks = nil
}

return &Block{
Heads: headLinks,
Links: blockLinks,
Delta: crdt.NewCRDT(delta),
}
Expand Down
17 changes: 5 additions & 12 deletions internal/core/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/ipld/go-ipld-prime/storage/memstore"
"github.com/stretchr/testify/require"

"github.com/sourcenetwork/defradb/internal/core"
"github.com/sourcenetwork/defradb/internal/core/crdt"
)

Expand All @@ -47,7 +46,6 @@ func generateBlocks(lsys *linking.LinkSystem) (cidlink.Link, error) {
Delta: crdt.CRDT{
CompositeDAGDelta: &crdt.CompositeDAGDelta{
DocID: []byte("docID"),
FieldName: "C",
Priority: 1,
SchemaVersionID: "schemaVersionID",
Status: 1,
Expand Down Expand Up @@ -75,11 +73,8 @@ func generateBlocks(lsys *linking.LinkSystem) (cidlink.Link, error) {
Data: []byte("Johny"),
},
},
Links: []DAGLink{
{
Name: core.HEAD,
Link: fieldBlockLink.(cidlink.Link),
},
Heads: []cidlink.Link{
fieldBlockLink.(cidlink.Link),
},
}
fieldUpdateBlockLink, err := lsys.Store(ipld.LinkContext{}, GetLinkPrototype(), fieldUpdateBlock.GenerateNode())
Expand All @@ -91,17 +86,15 @@ func generateBlocks(lsys *linking.LinkSystem) (cidlink.Link, error) {
Delta: crdt.CRDT{
CompositeDAGDelta: &crdt.CompositeDAGDelta{
DocID: []byte("docID"),
FieldName: "C",
Priority: 2,
SchemaVersionID: "schemaVersionID",
Status: 1,
},
},
Heads: []cidlink.Link{
compositeBlockLink.(cidlink.Link),
},
Links: []DAGLink{
{
Name: core.HEAD,
Link: compositeBlockLink.(cidlink.Link),
},
{
Name: "name",
Link: fieldUpdateBlockLink.(cidlink.Link),
Expand Down
2 changes: 2 additions & 0 deletions internal/core/crdt/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type baseCRDT struct {
// It can be used to identify the collection datastructure state at the time of commit.
schemaVersionKey core.CollectionSchemaVersionKey

// fieldName holds the name of the field hosting this CRDT, if this is a field level
// commit.
fieldName string
}

Expand Down
22 changes: 15 additions & 7 deletions internal/core/crdt/composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,23 @@ import (

// CompositeDAGDelta represents a delta-state update made of sub-MerkleCRDTs.
type CompositeDAGDelta struct {
DocID []byte
FieldName string
Priority uint64
// This property is duplicated from field-level blocks.
//
// We could remove this without much hassle from the composite, however long-term
// the ideal solution would be to remove it from the field-level commits *excluding*
// the initial field level commit where it must exist in order to scope it to a particular
// document. This would require a local index in order to handle field level commit-queries.
DocID []byte
Priority uint64
// SchemaVersionID is the schema version datastore key at the time of commit.
//
// It can be used to identify the collection datastructure state at the time of commit.
//
// This property is deliberately duplicated from field-level blocks as it makes the P2P code
// quite a lot easier - we can remove this from here at some point if we want to.
//
// Conversely we could remove this from the field-level commits and leave it on the composite,
// however that would complicate commit-queries and would require us to maintain an index elsewhere.
SchemaVersionID string
// Status represents the status of the document. By default it is `Active`.
// Alternatively, if can be set to `Deleted`.
Expand All @@ -47,7 +58,6 @@ func (delta *CompositeDAGDelta) IPLDSchemaBytes() []byte {
return []byte(`
type CompositeDAGDelta struct {
docID Bytes
fieldName String
priority Int
schemaVersionID String
status Int
Expand Down Expand Up @@ -75,9 +85,8 @@ func NewCompositeDAG(
store datastore.DSReaderWriter,
schemaVersionKey core.CollectionSchemaVersionKey,
key core.DataStoreKey,
fieldName string,
) CompositeDAG {
return CompositeDAG{newBaseCRDT(store, key, schemaVersionKey, fieldName)}
return CompositeDAG{newBaseCRDT(store, key, schemaVersionKey, "")}
}

// Value is a no-op for a CompositeDAG.
Expand All @@ -89,7 +98,6 @@ func (c CompositeDAG) Value(ctx context.Context) ([]byte, error) {
func (c CompositeDAG) Set(status client.DocumentStatus) *CompositeDAGDelta {
return &CompositeDAGDelta{
DocID: []byte(c.key.DocID),
FieldName: c.fieldName,
SchemaVersionID: c.schemaVersionKey.SchemaVersionID,
Status: status,
}
Expand Down
3 changes: 0 additions & 3 deletions internal/core/crdt/ipld_union.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,6 @@ func (c CRDT) GetFieldName() string {
switch {
case c.LWWRegDelta != nil:
return c.LWWRegDelta.FieldName
case c.CompositeDAGDelta != nil:
return c.CompositeDAGDelta.FieldName
case c.CounterDelta != nil:
return c.CounterDelta.FieldName
}
Expand Down Expand Up @@ -124,7 +122,6 @@ func (c CRDT) Clone() CRDT {
case c.CompositeDAGDelta != nil:
cloned.CompositeDAGDelta = &CompositeDAGDelta{
DocID: c.CompositeDAGDelta.DocID,
FieldName: c.CompositeDAGDelta.FieldName,
Priority: c.CompositeDAGDelta.Priority,
SchemaVersionID: c.CompositeDAGDelta.SchemaVersionID,
Status: c.CompositeDAGDelta.Status,
Expand Down
1 change: 0 additions & 1 deletion internal/core/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,4 @@ package core

const (
COMPOSITE_NAMESPACE = "C"
HEAD = "_head"
)
1 change: 0 additions & 1 deletion internal/db/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,6 @@ func (c *collection) saveCompositeToMerkleCRDT(
txn,
core.NewCollectionSchemaVersionKey(c.Schema().VersionID, c.ID()),
dsKey,
"",
)

if status.IsDeleted() {
Expand Down
18 changes: 1 addition & 17 deletions internal/db/collection_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,11 @@ package db
import (
"context"

cidlink "github.com/ipld/go-ipld-prime/linking/cid"

"github.com/sourcenetwork/defradb/acp"
"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/event"
"github.com/sourcenetwork/defradb/internal/core"
coreblock "github.com/sourcenetwork/defradb/internal/core/block"
"github.com/sourcenetwork/defradb/internal/merkle/clock"
)

// DeleteWithFilter deletes using a filter to target documents for delete.
Expand Down Expand Up @@ -142,24 +139,11 @@ func (c *collection) applyDelete(

txn := mustGetContextTxn(ctx)
dsKey := primaryKey.ToDataStoreKey()
headset := clock.NewHeadSet(
txn.Headstore(),
dsKey.WithFieldID(core.COMPOSITE_NAMESPACE).ToHeadStoreKey(),
)
cids, _, err := headset.List(ctx)
if err != nil {
return err
}

dagLinks := make([]coreblock.DAGLink, len(cids))
for i, cid := range cids {
dagLinks[i] = coreblock.NewDAGLink(core.HEAD, cidlink.Link{Cid: cid})
}

link, b, err := c.saveCompositeToMerkleCRDT(
ctx,
dsKey,
dagLinks,
[]coreblock.DAGLink{},
client.Deleted,
)
if err != nil {
Expand Down
15 changes: 2 additions & 13 deletions internal/db/fetcher/versioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,20 +312,14 @@ func (vf *VersionedFetcher) seekNext(c cid.Cid, topParent bool) error {
}

// only seekNext on parent if we have a HEAD link
l, ok := block.GetLinkByName(core.HEAD)
if ok {
err := vf.seekNext(l.Cid, true)
if len(block.Heads) != 0 {
err := vf.seekNext(block.Heads[0].Cid, true)
if err != nil {
return err
}
}

// loop over links and ignore head links
for _, l := range block.Links {
if l.Name == core.HEAD {
continue
}

err := vf.seekNext(l.Link.Cid, false)
if err != nil {
return err
Expand Down Expand Up @@ -362,12 +356,7 @@ func (vf *VersionedFetcher) merge(c cid.Cid) error {
}

// handle subgraphs
// loop over links and ignore head links
for _, l := range block.Links {
if l.Name == core.HEAD {
continue
}

// get node
subBlock, err := vf.getDAGBlock(l.Link.Cid)
if err != nil {
Expand Down
Loading

0 comments on commit 4e5470c

Please sign in to comment.