Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(proto): low cardinality improvements #203

Merged
merged 2 commits into from
Oct 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions proto/col_low_cardinality.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ func (c *ColLowCardinality[T]) Prepare() error {
c.keys = append(c.keys[:0], make([]int, len(c.Values))...)
if c.kv == nil {
c.kv = map[T]int{}
c.index.Reset()
}

// Fill keys with value indexes.
Expand Down
157 changes: 157 additions & 0 deletions proto/col_low_cardinality_raw.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package proto

import "github.com/go-faster/errors"

// ColLowCardinalityRaw is non-generic version of ColLowCardinality.
type ColLowCardinalityRaw struct {
Index Column // dictionary
Key CardinalityKey

// Keeping all key column variants as fields to reuse
// memory more efficiently.

Keys8 ColUInt8
Keys16 ColUInt16
Keys32 ColUInt32
Keys64 ColUInt64
}

func (c *ColLowCardinalityRaw) DecodeState(r *Reader) error {
keySerialization, err := r.Int64()
if err != nil {
return errors.Wrap(err, "version")
}
if keySerialization != int64(sharedDictionariesWithAdditionalKeys) {
return errors.Errorf("got version %d, expected %d",
keySerialization, sharedDictionariesWithAdditionalKeys,
)
}
if s, ok := c.Index.(StateDecoder); ok {
if err := s.DecodeState(r); err != nil {
return errors.Wrap(err, "state")
}
}
return nil
}

func (c ColLowCardinalityRaw) EncodeState(b *Buffer) {
// Writing key serialization version.
b.PutInt64(int64(sharedDictionariesWithAdditionalKeys))
if s, ok := c.Index.(StateEncoder); ok {
s.EncodeState(b)
}
}

func (c *ColLowCardinalityRaw) AppendKey(i int) {
switch c.Key {
case KeyUInt8:
c.Keys8 = append(c.Keys8, uint8(i))
case KeyUInt16:
c.Keys16 = append(c.Keys16, uint16(i))
case KeyUInt32:
c.Keys32 = append(c.Keys32, uint32(i))
case KeyUInt64:
c.Keys64 = append(c.Keys64, uint64(i))
default:
panic("invalid key type")
}
}

func (c *ColLowCardinalityRaw) Keys() Column {
switch c.Key {
case KeyUInt8:
return &c.Keys8
case KeyUInt16:
return &c.Keys16
case KeyUInt32:
return &c.Keys32
case KeyUInt64:
return &c.Keys64
default:
panic("invalid key type")
}
}

func (c ColLowCardinalityRaw) Type() ColumnType {
return ColumnTypeLowCardinality.Sub(c.Index.Type())
}

func (c ColLowCardinalityRaw) Rows() int {
return c.Keys().Rows()
}

func (c *ColLowCardinalityRaw) DecodeColumn(r *Reader, rows int) error {
if rows == 0 {
// Skipping entirely of no rows.
return nil
}
meta, err := r.Int64()
if err != nil {
return errors.Wrap(err, "meta")
}
if (meta & cardinalityNeedGlobalDictionaryBit) == 1 {
return errors.New("global dictionary is not supported")
}
if (meta & cardinalityHasAdditionalKeysBit) == 0 {
return errors.New("additional keys bit is missing")
}

key := CardinalityKey(meta & cardinalityKeyMask)
if !key.IsACardinalityKey() {
return errors.Errorf("invalid low cardinality keys type %d", key)
}
c.Key = key

indexRows, err := r.Int64()
if err != nil {
return errors.Wrap(err, "index size")
}
if err := checkRows(int(indexRows)); err != nil {
return errors.Wrap(err, "index size")
}
if err := c.Index.DecodeColumn(r, int(indexRows)); err != nil {
return errors.Wrap(err, "index column")
}

keyRows, err := r.Int64()
if err != nil {
return errors.Wrap(err, "keys size")
}
if err := checkRows(int(keyRows)); err != nil {
return errors.Wrap(err, "index size")
}
if err := c.Keys().DecodeColumn(r, int(keyRows)); err != nil {
return errors.Wrap(err, "keys column")
}

return nil
}

func (c *ColLowCardinalityRaw) Reset() {
c.Index.Reset()
c.Keys8.Reset()
c.Keys16.Reset()
c.Keys32.Reset()
c.Keys64.Reset()
}

func (c ColLowCardinalityRaw) EncodeColumn(b *Buffer) {
if c.Rows() == 0 {
// Skipping encoding entirely.
return
}

// Meta encodes whether reader should update
// low cardinality metadata and keys column type.
meta := cardinalityUpdateAll | int64(c.Key)
b.PutInt64(meta)

// Writing index (dictionary).
b.PutInt64(int64(c.Index.Rows()))
c.Index.EncodeColumn(b)

// Sequence of values as indexes in dictionary.
k := c.Keys()
b.PutInt64(int64(k.Rows()))
k.EncodeColumn(b)
}
100 changes: 100 additions & 0 deletions proto/col_low_cardinality_raw_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package proto

import (
"bytes"
"io"
"testing"

"github.com/stretchr/testify/require"

"github.com/ClickHouse/ch-go/internal/gold"
)

func TestColLowCardinalityRaw_DecodeColumn(t *testing.T) {
t.Run("Str", func(t *testing.T) {
const rows = 25
var data ColStr
for _, v := range []string{
"neo",
"trinity",
"morpheus",
} {
data.Append(v)
}
col := &ColLowCardinalityRaw{
Index: &data,
Key: KeyUInt8,
}
for i := 0; i < rows; i++ {
col.AppendKey(i % data.Rows())
}

var buf Buffer
col.EncodeColumn(&buf)
t.Run("Golden", func(t *testing.T) {
gold.Bytes(t, buf.Buf, "col_low_cardinality_i_str_k_8")
})
t.Run("Ok", func(t *testing.T) {
br := bytes.NewReader(buf.Buf)
r := NewReader(br)
dec := &ColLowCardinalityRaw{
Index: &data,
}
require.NoError(t, dec.DecodeColumn(r, rows))
require.Equal(t, col, dec)
require.Equal(t, rows, dec.Rows())
dec.Reset()
require.Equal(t, 0, dec.Rows())
require.Equal(t, ColumnTypeLowCardinality.Sub(ColumnTypeString), dec.Type())
})
t.Run("EOF", func(t *testing.T) {
r := NewReader(bytes.NewReader(nil))
dec := &ColLowCardinalityRaw{
Index: &data,
}
require.ErrorIs(t, dec.DecodeColumn(r, rows), io.EOF)
})
t.Run("NoShortRead", func(t *testing.T) {
dec := &ColLowCardinalityRaw{
Index: &data,
}
requireNoShortRead(t, buf.Buf, colAware(dec, rows))
})
})
t.Run("Blank", func(t *testing.T) {
// Blank columns (i.e. row count is zero) are not encoded.
var data ColStr
col := &ColLowCardinalityRaw{
Index: &data,
Key: KeyUInt8,
}
var buf Buffer
col.EncodeColumn(&buf)

var dec ColLowCardinalityRaw
require.NoError(t, dec.DecodeColumn(buf.Reader(), col.Rows()))
})
t.Run("InvalidVersion", func(t *testing.T) {
var buf Buffer
buf.PutInt64(2)
var dec ColLowCardinalityRaw
require.NoError(t, dec.DecodeColumn(buf.Reader(), 0))
require.Error(t, dec.DecodeColumn(buf.Reader(), 1))
})
t.Run("InvalidMeta", func(t *testing.T) {
var buf Buffer
buf.PutInt64(1)
buf.PutInt64(0)
var dec ColLowCardinalityRaw
require.NoError(t, dec.DecodeColumn(buf.Reader(), 0))
require.Error(t, dec.DecodeColumn(buf.Reader(), 1))
})
t.Run("InvalidKeyType", func(t *testing.T) {
var buf Buffer
buf.PutInt64(1)
buf.PutInt64(cardinalityUpdateAll | int64(KeyUInt64+1))
var dec ColLowCardinalityRaw
require.NoError(t, dec.DecodeColumn(buf.Reader(), 0))
require.Error(t, dec.DecodeColumn(buf.Reader(), 1))
})
}