Skip to content
This repository was archived by the owner on Jan 28, 2021. It is now read-only.

Fix 261/multiindex #547

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 51 additions & 10 deletions sql/index/pilosa/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type (
batch struct {
bitBatches []*bitBatch
fields []*pilosa.Field
exists []bool
timePilosa time.Duration
timeMapping time.Duration
}
Expand Down Expand Up @@ -227,11 +228,14 @@ func (d *Driver) savePartition(
err error
)
for i, e := range idx.Expressions() {
name := fieldName(idx.ID(), e, p)
pilosaIndex.DeleteField(name)
name := fieldName(e, p)
field, err := pilosaIndex.CreateField(name, pilosa.OptFieldTypeDefault())
if err != nil {
return 0, err
if _, ok := err.(pilosa.ConflictError); !ok {
return 0, err
}
field = pilosaIndex.Field(name)
b.exists[i] = true
}
b.fields[i] = field
b.bitBatches[i] = newBitBatch(sql.IndexBatchSize)
Expand Down Expand Up @@ -335,6 +339,7 @@ func (d *Driver) Save(
var b = batch{
fields: make([]*pilosa.Field, len(idx.Expressions())),
bitBatches: make([]*bitBatch, len(idx.Expressions())),
exists: make([]bool, len(idx.Expressions())),
}

ctx.Context, idx.cancel = context.WithCancel(ctx.Context)
Expand Down Expand Up @@ -396,6 +401,41 @@ func (d *Driver) Delete(i sql.Index, partitions sql.PartitionIter) error {
return err
}

var (
root = filepath.Join(d.root, i.Database(), i.Table())
)

dirs, err := ioutil.ReadDir(root)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
for _, dirinfo := range dirs {
if dirinfo.IsDir() && !strings.HasPrefix(dirinfo.Name(), ".") {
config := d.configFilePath(i.Database(), i.Table(), dirinfo.Name())
if _, err := os.Stat(config); err != nil {
if os.IsNotExist(err) {
continue
}
return err
}
cfg, err := index.ReadConfigFile(config)
if err != nil {
return err
}

for _, ex1 := range cfg.Expressions {
for _, ex2 := range idx.Expressions() {
if ex1 == ex2 {
return nil
}
}
}
}
}

for {
p, err := partitions.Next()
if err != nil {
Expand All @@ -406,7 +446,7 @@ func (d *Driver) Delete(i sql.Index, partitions sql.PartitionIter) error {
}

for _, ex := range idx.Expressions() {
name := fieldName(idx.ID(), ex, p)
name := fieldName(ex, p)
field := idx.index.Field(name)
if field == nil {
continue
Expand Down Expand Up @@ -440,10 +480,12 @@ func (d *Driver) savePilosa(ctx *sql.Context, colID uint64, b *batch) error {
start := time.Now()

for i, fld := range b.fields {
err := fld.Import(b.bitBatches[i].rows, b.bitBatches[i].cols, nil)
if err != nil {
span.LogKV("error", err)
return err
if !b.exists[i] {
err := fld.Import(b.bitBatches[i].rows, b.bitBatches[i].cols, nil)
if err != nil {
span.LogKV("error", err)
return err
}
}

b.bitBatches[i].Clean()
Expand Down Expand Up @@ -506,9 +548,8 @@ func indexName(db, table string) string {
return fmt.Sprintf("%s-%x", IndexNamePrefix, h.Sum(nil))
}

func fieldName(id, ex string, p sql.Partition) string {
func fieldName(ex string, p sql.Partition) string {
h := sha1.New()
io.WriteString(h, id)
io.WriteString(h, ex)
h.Write(p.Key())
return fmt.Sprintf("%s-%x", FieldNamePrefix, h.Sum(nil))
Expand Down
47 changes: 19 additions & 28 deletions sql/index/pilosa/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -974,13 +974,11 @@ func TestNegateIndex(t *testing.T) {
{"3", []interface{}{int64(1)}},
{"4", []interface{}{int64(1)}},
{"5", []interface{}{int64(7)}},
},
},
{
testPartition(1),
[]kvfixture{
{"1", []interface{}{int64(2)}},
{"2", []interface{}{int64(7)}},
{"6", []interface{}{int64(10)}},
{"7", []interface{}{int64(5)}},
{"8", []interface{}{int64(6)}},
{"9", []interface{}{int64(4)}},
{"10", []interface{}{int64(3)}},
},
},
},
Expand All @@ -989,23 +987,19 @@ func TestNegateIndex(t *testing.T) {
err = d.Save(sql.NewEmptyContext(), idx, it)
require.NoError(err)

fixtures := []kvfixture{
{"1", []interface{}{int64(2), int64(6)}},
{"2", []interface{}{int64(7), int64(5)}},
{"3", []interface{}{int64(1), int64(2)}},
{"4", []interface{}{int64(1), int64(3)}},
{"5", []interface{}{int64(7), int64(6)}},
{"6", []interface{}{int64(10), int64(6)}},
{"7", []interface{}{int64(5), int64(1)}},
{"8", []interface{}{int64(6), int64(2)}},
{"9", []interface{}{int64(4), int64(0)}},
{"10", []interface{}{int64(3), int64(5)}},
}

multiIt := &fixturePartitionKeyValueIter{
fixtures: []partitionKeyValueFixture{
{testPartition(0), fixtures},
{testPartition(1), fixtures[4:]},
{testPartition(0), []kvfixture{
{"1", []interface{}{int64(2), int64(6)}},
{"2", []interface{}{int64(7), int64(5)}},
{"3", []interface{}{int64(1), int64(2)}},
{"4", []interface{}{int64(1), int64(3)}},
{"6", []interface{}{int64(10), int64(6)}},
{"7", []interface{}{int64(5), int64(1)}},
{"8", []interface{}{int64(6), int64(2)}},
{"9", []interface{}{int64(4), int64(0)}},
{"10", []interface{}{int64(3), int64(5)}},
}},
},
}

Expand All @@ -1018,7 +1012,7 @@ func TestNegateIndex(t *testing.T) {
values, err := lookupValues(lookup)
require.NoError(err)

expected := []string{"1", "2", "5"}
expected := []string{"1", "2", "5", "6", "7", "8", "9", "10"}
require.Equal(expected, values)

// test non existing values
Expand All @@ -1028,7 +1022,7 @@ func TestNegateIndex(t *testing.T) {
values, err = lookupValues(lookup)
require.NoError(err)

expected = []string{"1", "2", "3", "4", "5"}
expected = []string{"1", "2", "3", "4", "5", "6", "7", "8", "9", "10"}
require.Equal(expected, values)

lookup, err = multiIdx.(sql.NegateIndex).Not(int64(1), int64(6))
Expand Down Expand Up @@ -1098,10 +1092,6 @@ func TestEqualAndLessIndex(t *testing.T) {
{"4", []interface{}{int64(4)}},
{"5", []interface{}{int64(5)}},
{"6", []interface{}{int64(6)}},
{"7", []interface{}{int64(7)}},
{"8", []interface{}{int64(8)}},
{"9", []interface{}{int64(9)}},
{"10", []interface{}{int64(10)}},
},
},
},
Expand All @@ -1123,6 +1113,7 @@ func TestEqualAndLessIndex(t *testing.T) {
expected = []string{"3", "4"}
require.Equal(expected, values)
}

func TestPilosaHolder(t *testing.T) {
require := require.New(t)
setup(t)
Expand Down
2 changes: 1 addition & 1 deletion sql/index/pilosa/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (idx *pilosaIndex) Has(p sql.Partition, key ...interface{}) (bool, error) {
}

for i, expr := range idx.expressions {
name := fieldName(idx.ID(), expr, p)
name := fieldName(expr, p)

val, err := idx.mapping.get(name, key[i])
if err != nil || val == nil {
Expand Down
6 changes: 3 additions & 3 deletions sql/index/pilosa/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (l *indexLookup) indexName() string {
func (l *indexLookup) intersectExpressions(p sql.Partition) (*pilosa.Row, error) {
var row *pilosa.Row
for i, expr := range l.expressions {
field := l.index.Field(fieldName(l.id, expr, p))
field := l.index.Field(fieldName(expr, p))
rowID, err := l.mapping.rowID(field.Name(), l.keys[i])
if err == io.EOF {
continue
Expand Down Expand Up @@ -228,7 +228,7 @@ func (l *filteredLookup) intersectExpressions(p sql.Partition) (*pilosa.Row, err
var row *pilosa.Row

for i, expr := range l.expressions {
field := l.index.Field(fieldName(l.id, expr, p))
field := l.index.Field(fieldName(expr, p))
rows, err := l.mapping.filter(field.Name(), func(b []byte) (bool, error) {
return l.filter(i, b)
})
Expand Down Expand Up @@ -387,7 +387,7 @@ func (l *negateLookup) indexName() string { return l.index.Name() }
func (l *negateLookup) intersectExpressions(p sql.Partition) (*pilosa.Row, error) {
var row *pilosa.Row
for i, expr := range l.expressions {
field := l.index.Field(fieldName(l.id, expr, p))
field := l.index.Field(fieldName(expr, p))

maxRowID, err := l.mapping.getMaxRowID(field.Name())
if err != nil {
Expand Down