Skip to content

Commit

Permalink
feat: data partition by domain ID
Browse files Browse the repository at this point in the history
This commit adds domain id as part of data key. Now the key looks like

  [ prefix ][ resolution ][ view ][ domain ][ field ][ container ]

WHich allows future deletion of old data by domain. Also speeds up dashboard
queries, since we  only read existence bitmap resulting in a a single
container load per view( when no filter is provided).

We avoid,

- Computing ids for matching domain per view.
- Keeping any book keeping per shard/view relating to domains. We skip
entrire shard if we can't seek to valid domain existence containers.

All of this is iterator based, majority of operations are seek followed by reads.
Ensuring only relevant blocks will be read.
  • Loading branch information
gernest committed Nov 26, 2024
1 parent 85a26cf commit 585b2be
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 157 deletions.
9 changes: 6 additions & 3 deletions internal/encoding/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ const (
bmPrefix = 0
bmResolution = bmPrefix + 1
bmView = bmResolution + 1
bmField = bmView + 8
bmDomain = bmView + 8
bmField = bmDomain + 8
bmContainer = bmField + 1
BitmapKeySize = bmContainer + 8
)
Expand All @@ -43,18 +44,20 @@ func From(a []byte) *Key {
return (*Key)(unsafe.Pointer(&a[0]))
}

func (k *Key) WriteData(res Resolution, field models.Field, view, co uint64) {
func (k *Key) WriteData(res Resolution, field models.Field, view, domainId, co uint64) {
k[0] = keys.DataPrefix[0]
k[bmResolution] = byte(res)
binary.BigEndian.PutUint64(k[bmView:], view)
binary.BigEndian.PutUint64(k[bmDomain:], domainId)
k[bmField] = byte(field)
binary.BigEndian.PutUint64(k[bmContainer:], co)
}

func (k *Key) WriteExistence(res Resolution, field models.Field, view, co uint64) {
func (k *Key) WriteExistence(res Resolution, field models.Field, view, domainId, co uint64) {
k[0] = keys.DataExistsPrefix[0]
k[bmResolution] = byte(res)
binary.BigEndian.PutUint64(k[bmView:], view)
binary.BigEndian.PutUint64(k[bmDomain:], domainId)
k[bmField] = byte(field)
binary.BigEndian.PutUint64(k[bmContainer:], co)
}
Expand Down
37 changes: 12 additions & 25 deletions internal/shards/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@ import (
"time"

"github.com/cockroachdb/pebble"
"github.com/gernest/roaring"
"github.com/vinceanalytics/vince/internal/compute"
"github.com/vinceanalytics/vince/internal/encoding"
"github.com/vinceanalytics/vince/internal/models"
"github.com/vinceanalytics/vince/internal/ro2"
"github.com/vinceanalytics/vince/internal/timeseries/cursor"
"github.com/vinceanalytics/vince/internal/util/assert"
Expand Down Expand Up @@ -83,40 +81,40 @@ func (db *DB) Iter(
domainId uint64,
re encoding.Resolution,
start, end time.Time,
filters []models.Field,
f func(cu *cursor.Cursor, shard, view uint64, match *ro2.Bitmap, exists map[models.Field]*ro2.Bitmap) error) error {
f func(cu *cursor.Cursor, shard, view uint64, match *ro2.Bitmap) error) error {
db.shards.RLock()
defer db.shards.RUnlock()

views := slices.Collect(compute.Range(re, start, end))
if len(views) == 0 {
return nil
}
slices.Reverse(views)

cu := new(cursor.Cursor)
defer cu.Release()

for i := uint64(0); i <= db.shards.max; i++ {
sh := db.shards.data[i]
if sh == nil {
continue
}

it, err := sh.DB.NewIter(nil)
if err != nil {
return err
}
cu.SetIter(it)

cu.SetIter(it, domainId)
if !cu.SeekToDomainShard(re, views[0], views[len(views)-1]) {
// No data for the given domainId were observed for this shard.
// It is safe to skip the entire shard.
continue
}
for _, view := range views {
if !cu.ResetData(re, models.Field_domain, view) {
continue
}
m := ro2.Row(cu, sh.ID, domainId)
if !m.Any() {
match := cu.DomainExistence(re, sh.ID, view)
if !match.Any() {
continue
}
exists := readExistence(cu, re, filters, sh.ID, view)
err := f(cu, sh.ID, view, m, exists)
err := f(cu, sh.ID, view, match)
if err != nil {
it.Close()
return err
Expand All @@ -127,17 +125,6 @@ func (db *DB) Iter(
return nil
}

func readExistence(cu *cursor.Cursor, re encoding.Resolution, fields []models.Field, shard, view uint64) (m map[models.Field]*ro2.Bitmap) {
m = make(map[models.Field]*roaring.Bitmap)
for _, f := range fields {
if !cu.ResetExistence(re, f, view) {
continue
}
m[f] = ro2.Existence(cu, shard)
}
return
}

func (db *DB) Shard(shard uint64) *Shard {
db.shards.RLock()
sh := db.shards.data[shard]
Expand Down
34 changes: 27 additions & 7 deletions internal/timeseries/cursor/timeseries_cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,18 @@ import (
)

type Cursor struct {
it *pebble.Iterator
lo, hi encoding.Key
it *pebble.Iterator
lo, hi encoding.Key
domainId uint64
}

func (cu *Cursor) Release() {
*cu = Cursor{}
}

func (cu *Cursor) SetIter(it *pebble.Iterator) {
func (cu *Cursor) SetIter(it *pebble.Iterator, domainId uint64) {
cu.it = it
cu.domainId = domainId
}

func (cu *Cursor) Reset() {
Expand All @@ -34,18 +36,36 @@ func (cu *Cursor) Reset() {

func (cu *Cursor) ResetData(res encoding.Resolution, field models.Field, view uint64) bool {
cu.Reset()
cu.lo.WriteData(res, field, view, 0)
cu.hi.WriteData(res, field, view, math.MaxUint64)
cu.lo.WriteData(res, field, view, cu.domainId, 0)
cu.hi.WriteData(res, field, view, cu.domainId, math.MaxUint64)
return cu.it.SeekGE(cu.lo[:]) && cu.Valid()
}

func (cu *Cursor) ResetExistence(res encoding.Resolution, field models.Field, view uint64) bool {
cu.Reset()
cu.lo.WriteExistence(res, field, view, 0)
cu.hi.WriteExistence(res, field, view, math.MaxUint64)
cu.lo.WriteExistence(res, field, view, cu.domainId, 0)
cu.hi.WriteExistence(res, field, view, cu.domainId, math.MaxUint64)
return cu.it.SeekGE(cu.lo[:]) && cu.Valid()
}

func (cu *Cursor) SeekToDomainShard(res encoding.Resolution, lo, hi uint64) bool {
cu.Reset()
cu.lo.WriteExistence(res, models.Field_domain, lo, cu.domainId, 0)
cu.hi.WriteExistence(res, models.Field_domain, hi, cu.domainId, math.MaxUint64)
return cu.it.SeekGE(cu.lo[:]) && cu.Valid()
}

func (cu *Cursor) DomainExistence(res encoding.Resolution, shard, view uint64) *ro2.Bitmap {
cu.Reset()
cu.lo.WriteExistence(res, models.Field_domain, view, cu.domainId, 0)
cu.hi.WriteExistence(res, models.Field_domain, view, cu.domainId, math.MaxUint64)
ok := cu.it.SeekGE(cu.lo[:]) && cu.Valid()
if !ok {
return ro2.NewBitmap()
}
return ro2.Existence(cu, shard)
}

func (cu *Cursor) Valid() bool {
return cu.it.Valid() &&
bytes.Compare(cu.it.Key(), cu.hi[:]) == -1
Expand Down
31 changes: 30 additions & 1 deletion internal/timeseries/timeseries_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

type Key struct {
View uint64
Domain uint64
Resolution encoding.Resolution
Field models.Field
Existence bool
Expand All @@ -31,6 +32,7 @@ func (k *Key) Encode(co uint64, b []byte) []byte {
}
b = append(b, byte(k.Resolution))
b = binary.BigEndian.AppendUint64(b, k.View)
b = binary.BigEndian.AppendUint64(b, k.Domain)
b = append(b, byte(k.Field))
b = binary.BigEndian.AppendUint64(b, co)
return b
Expand All @@ -45,6 +47,8 @@ type batch struct {
events uint64
id uint64
shard uint64

domainId uint64
}

func newbatch(db *shards.DB, tr *translation) *batch {
Expand All @@ -67,6 +71,10 @@ func (b *batch) setTs(timestamp int64) {
b.views[encoding.Day] = uint64(compute.Date(ts).UnixMilli())
}

func (b *batch) setDomain(m *models.Model) {

}

// saves only current timestamp.
func (b *batch) save() error {
if b.events == 0 {
Expand Down Expand Up @@ -161,7 +169,11 @@ func (b *batch) add(m *models.Model) error {
b.set(models.Field_browser_version, id, m.BrowserVersion)
b.set(models.Field_country, id, m.Country)
b.set(models.Field_device, id, m.Device)
b.set(models.Field_domain, id, m.Domain)

// domain is stored as part of the key, we only save existence bit
b.domainId = b.tr(models.Field_domain, m.Domain)
b.mxExixtenceOnly(models.Field_domain, id)

b.set(models.Field_entry_page, id, m.EntryPage)
b.set(models.Field_event, id, m.Event)
b.set(models.Field_exit_page, id, m.ExitPage)
Expand All @@ -187,6 +199,7 @@ func (b *batch) bs(field models.Field, id uint64, value int64) {
Resolution: encoding.Resolution(i),
Field: field,
View: b.views[i],
Domain: b.domainId,
}), id, value)
}
}
Expand All @@ -197,6 +210,7 @@ func (b *batch) boolean(field models.Field, id uint64, value bool) {
Resolution: encoding.Resolution(i),
Field: field,
View: b.views[i],
Domain: b.domainId,
}), id, value)
}
}
Expand All @@ -208,18 +222,33 @@ func (b *batch) set(field models.Field, id uint64, value []byte) {
b.mx(field, id, b.tr(field, value))

}

func (b *batch) mx(field models.Field, id uint64, value uint64) {

for i := range b.views {
ro2.WriteMutex(b.ra(Key{
Resolution: encoding.Resolution(i),
Field: field,
View: b.views[i],
Domain: b.domainId,
}), id, value)
b.ra(Key{
Resolution: encoding.Resolution(i),
Field: field,
View: b.views[i],
Domain: b.domainId,
Existence: true,
}).DirectAdd(id % shardwidth.ShardWidth)
}
}

func (b *batch) mxExixtenceOnly(field models.Field, id uint64) {
for i := range b.views {
b.ra(Key{
Resolution: encoding.Resolution(i),
Field: field,
View: b.views[i],
Domain: b.domainId,
Existence: true,
}).DirectAdd(id % shardwidth.ShardWidth)
}
Expand Down
Loading

0 comments on commit 585b2be

Please sign in to comment.