Skip to content

Commit

Permalink
(tae): update mock index file and some index codes (#15)
Browse files Browse the repository at this point in the history
Signed-off-by: asuka <312856403@qq.com>
  • Loading branch information
zzl200012 authored Apr 20, 2022
1 parent 4bbbb2f commit 3ca59c7
Show file tree
Hide file tree
Showing 9 changed files with 280 additions and 18 deletions.
36 changes: 36 additions & 0 deletions pkg/vm/engine/tae/dataio/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dataio

import (
"fmt"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"

"github.com/RoaringBitmap/roaring"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/container/batch"
Expand Down Expand Up @@ -87,3 +88,38 @@ func (sf *mockSegmentFile) Destory() error {
}
return nil
}

type mockIndexFile struct {
counter int
data []byte
}

func MockIndexFile() *mockIndexFile {
return &mockIndexFile{
counter: 0,
data: make([]byte, 0),
}
}

func (file *mockIndexFile) Append(data []byte) (startOffset uint32, err error) {
startOffset = uint32(len(file.data))
file.data = append(file.data, data...)
return
}

func (file *mockIndexFile) Read(offset uint32, size uint32) (data []byte) {
return file.data[offset:offset+size]
}

func (file *mockIndexFile) AllocIndexNodeId() common.ID {
file.counter++
return common.ID{
TableID: 0,
SegmentID: 0,
BlockID: 0,
PartID: uint32(file.counter),
Idx: uint16(file.counter),
Iter: 0,
}
}

7 changes: 7 additions & 0 deletions pkg/vm/engine/tae/dataio/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,10 @@ type BlockInfo interface {
Rows() uint32
TSRange() (uint64, uint64)
}

// IndexFile is only used for mocking
type IndexFile interface {
Append(data []byte) (startOffset uint32, err error)
Read(offset uint32, size uint32) (data []byte)
AllocIndexNodeId() common.ID
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package accessif
package acif

import "github.com/matrixorigin/matrixone/pkg/container/vector"

Expand Down
8 changes: 8 additions & 0 deletions pkg/vm/engine/tae/index/basic/zonemap.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ func NewZoneMap(typ types.Type, mutex *sync.RWMutex) *ZoneMap {
return zm
}

func NewZoneMapFromSource(data []byte) (*ZoneMap, error) {
zm := ZoneMap{}
if err := zm.Unmarshal(data); err != nil {
return nil, err
}
return &zm, nil
}

func (zm *ZoneMap) GetType() types.Type {
return zm.typ
}
Expand Down
19 changes: 10 additions & 9 deletions pkg/vm/engine/tae/index/common/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,16 @@ func Compress(raw []byte, ctyp CompressType) []byte {
return raw
}

func Decompress(compressed []byte, ctyp CompressType) []byte {
return compressed
func Decompress(src []byte, dst []byte, ctyp CompressType) error {
copy(dst, src)
return nil
}

type IndexMeta struct {
IdxType IndexType
CompType CompressType
ColIdx uint16
PartOffset uint32
//PartOffset uint32
StartOffset uint32
Size uint32
RawSize uint32
Expand All @@ -59,9 +60,9 @@ func (meta *IndexMeta) SetIndexedColumn(colIdx uint16) {
meta.ColIdx = colIdx
}

func (meta *IndexMeta) SetPartOffset(offset uint32) {
meta.PartOffset = offset
}
//func (meta *IndexMeta) SetPartOffset(offset uint32) {
// meta.PartOffset = offset
//}

func (meta *IndexMeta) SetStartOffset(offset uint32) {
meta.StartOffset = offset
Expand All @@ -77,7 +78,7 @@ func (meta *IndexMeta) Marshal() ([]byte, error) {
buf.Write(encoding.EncodeUint8(uint8(meta.IdxType)))
buf.Write(encoding.EncodeUint8(uint8(meta.CompType)))
buf.Write(encoding.EncodeUint16(meta.ColIdx))
buf.Write(encoding.EncodeUint32(meta.PartOffset))
//buf.Write(encoding.EncodeUint32(meta.PartOffset))
buf.Write(encoding.EncodeUint32(meta.StartOffset))
buf.Write(encoding.EncodeUint32(meta.Size))
buf.Write(encoding.EncodeUint32(meta.RawSize))
Expand All @@ -91,8 +92,8 @@ func (meta *IndexMeta) Unmarshal(buf []byte) error {
buf = buf[1:]
meta.ColIdx = encoding.DecodeUint16(buf[:2])
buf = buf[2:]
meta.PartOffset = encoding.DecodeUint32(buf[:4])
buf = buf[4:]
//meta.PartOffset = encoding.DecodeUint32(buf[:4])
//buf = buf[4:]
meta.StartOffset = encoding.DecodeUint32(buf[:4])
buf = buf[4:]
meta.Size = encoding.DecodeUint32(buf[:4])
Expand Down
154 changes: 151 additions & 3 deletions pkg/vm/engine/tae/index/io/zonemap_block.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,171 @@
package io

import (
"github.com/RoaringBitmap/roaring"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/buffer"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/buffer/base"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/dataio"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index/basic"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index/common"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index/common/errors"
)

type blockZoneMapIndexNode struct {
*buffer.Node
mgr base.INodeManager
host dataio.BlockFile
host dataio.IndexFile
meta *common.IndexMeta
inner *basic.ZoneMap
}

func newBlockZoneMapIndexNode(mgr base.INodeManager, host dataio.BlockFile, meta *common.IndexMeta) *blockZoneMapIndexNode {
func newBlockZoneMapIndexNode(mgr base.INodeManager, host dataio.IndexFile, meta *common.IndexMeta) *blockZoneMapIndexNode {
impl := new(blockZoneMapIndexNode)
impl.Node = buffer.NewNode(impl, mgr, host.AllocIndexNodeId(), uint64(meta.Size))
impl.LoadFunc = impl.OnLoad
impl.UnloadFunc = impl.OnUnload
impl.DestroyFunc = impl.OnDestroy
impl.host = host
impl.meta = meta
impl.mgr = mgr
mgr.RegisterNode(impl)
return impl
}

func (n *blockZoneMapIndexNode) OnLoad() {
if n.inner != nil {
// no-op
return
}
var err error
startOffset := n.meta.StartOffset
size := n.meta.Size
compressTyp := n.meta.CompType
data := n.host.Read(startOffset, size)
rawSize := n.meta.RawSize
buf := make([]byte, rawSize)
if err = common.Decompress(data, buf, compressTyp); err != nil {
panic(err)
}
n.inner, err = basic.NewZoneMapFromSource(buf)
if err != nil {
panic(err)
}
return
}

func (n *blockZoneMapIndexNode) OnUnload() {
if n.inner == nil {
// no-op
return
}
n.inner = nil
}

func (n *blockZoneMapIndexNode) OnDestroy() {
// no-op
}

func (n *blockZoneMapIndexNode) Close() error {
// no-op
return nil
}

type BlockZoneMapIndexReader struct {
inode *blockZoneMapIndexNode
}

func NewBlockZoneMapIndexReader() *BlockZoneMapIndexReader {
return &BlockZoneMapIndexReader{}
}

func (reader *BlockZoneMapIndexReader) Init(mgr base.INodeManager, host dataio.IndexFile, meta *common.IndexMeta) error {
reader.inode = newBlockZoneMapIndexNode(mgr, host, meta)
return nil
}

func (reader *BlockZoneMapIndexReader) MayContainsAnyKeys(keys *vector.Vector) (bool, *roaring.Bitmap, error) {
handle := reader.inode.mgr.Pin(reader.inode)
defer handle.Close()
return handle.GetNode().(*blockZoneMapIndexNode).inner.MayContainsAnyKeys(keys)
}

func (reader *BlockZoneMapIndexReader) MayContainsKey(key interface{}) (bool, error) {
handle := reader.inode.mgr.Pin(reader.inode)
defer handle.Close()
return handle.GetNode().(*blockZoneMapIndexNode).inner.MayContainsKey(key)
}

type BlockZoneMapIndexWriter struct {
cType common.CompressType
host dataio.IndexFile
inner *basic.ZoneMap
colIdx uint16
}

func NewBlockZoneMapIndexWriter() *BlockZoneMapIndexWriter {
return &BlockZoneMapIndexWriter{}
}

func (writer *BlockZoneMapIndexWriter) Init(host dataio.IndexFile, cType common.CompressType, colIdx uint16) error {
writer.host = host
writer.cType = cType
writer.colIdx = colIdx
return nil
}

// TODO: add more types of index node, and integrate with buffer manager
func (writer *BlockZoneMapIndexWriter) Finalize() (*common.IndexMeta, error) {
if writer.inner == nil {
panic("unexpected error")
}
appender := writer.host
meta := common.NewEmptyIndexMeta()
meta.SetIndexType(common.BlockZoneMapIndex)
meta.SetCompressType(writer.cType)
meta.SetIndexedColumn(writer.colIdx)

var startOffset uint32
iBuf, err := writer.inner.Marshal()
if err != nil {
return nil, err
}
rawSize := uint32(len(iBuf))
cBuf := common.Compress(iBuf, writer.cType)
exactSize := uint32(len(cBuf))
meta.SetSize(rawSize, exactSize)
startOffset, err = appender.Append(cBuf)
if err != nil {
return nil, err
}
meta.SetStartOffset(startOffset)
return meta, nil
}

func (writer *BlockZoneMapIndexWriter) AddValues(values *vector.Vector) error {
typ := values.Typ
if writer.inner == nil {
writer.inner = basic.NewZoneMap(typ, nil)
} else {
if writer.inner.GetType() != typ {
return errors.ErrTypeMismatch
}
}
if err := writer.inner.BatchUpdate(values, 0, -1); err != nil {
return err
}
return nil
}

func (writer *BlockZoneMapIndexWriter) SetMinMax(min, max interface{}, typ types.Type) error {
if writer.inner == nil {
writer.inner = basic.NewZoneMap(typ, nil)
} else {
if writer.inner.GetType() != typ {
return errors.ErrTypeMismatch
}
}
writer.inner.SetMin(min)
writer.inner.SetMax(max)
return nil
}
62 changes: 62 additions & 0 deletions pkg/vm/engine/tae/index/io/zonemap_block_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package io

import (
"github.com/RoaringBitmap/roaring"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/buffer"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/dataio"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index/common"
"github.com/stretchr/testify/require"
"testing"
)

func TestBlockZoneMapIndex(t *testing.T) {
bufManager := buffer.NewNodeManager(1024 * 1024, nil)
file := dataio.MockIndexFile()
cType := common.Plain
typ := types.Type{Oid: types.T_int32}
pkColIdx := uint16(0)
var err error
var meta *common.IndexMeta
var res bool
var ans *roaring.Bitmap

writer := NewBlockZoneMapIndexWriter()
err = writer.Init(file, cType, pkColIdx)
require.NoError(t, err)

keys := common.MockVec(typ, 1000, 0)
err = writer.AddValues(keys)
require.NoError(t, err)

meta, err = writer.Finalize()
require.NoError(t, err)

reader := NewBlockZoneMapIndexReader()
err = reader.Init(bufManager, file, meta)
require.NoError(t, err)

//t.Log(bufManager.String())

res, err = reader.MayContainsKey(int32(500))
require.NoError(t, err)
require.True(t, res)

res, err = reader.MayContainsKey(int32(1000))
require.NoError(t, err)
require.False(t, res)

keys = common.MockVec(typ, 100, 1000)
res, ans, err = reader.MayContainsAnyKeys(keys)
require.NoError(t, err)
require.False(t, res)
require.Equal(t, uint64(0), ans.GetCardinality())

keys = common.MockVec(typ, 100, 0)
res, ans, err = reader.MayContainsAnyKeys(keys)
require.NoError(t, err)
require.True(t, res)
require.Equal(t, uint64(100), ans.GetCardinality())

//t.Log(bufManager.String())
}
6 changes: 3 additions & 3 deletions pkg/vm/engine/tae/tables/appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/buffer/base"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/txnif"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index/access/accessif"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index/access/acif"
)

type blockAppender struct {
node *appendableNode
handle base.INodeHandle
indexAppender accessif.IAppendableBlockIndexHolder
indexAppender acif.IAppendableBlockIndexHolder
}

func newAppender(node *appendableNode, idxApd accessif.IAppendableBlockIndexHolder) *blockAppender {
func newAppender(node *appendableNode, idxApd acif.IAppendableBlockIndexHolder) *blockAppender {
appender := new(blockAppender)
appender.node = node
appender.handle = node.mgr.Pin(node)
Expand Down
Loading

0 comments on commit 3ca59c7

Please sign in to comment.