diff --git a/sql/index/pilosa/driver.go b/sql/index/pilosa/driver.go index 6303c073b..9a5d30e87 100644 --- a/sql/index/pilosa/driver.go +++ b/sql/index/pilosa/driver.go @@ -63,6 +63,7 @@ type ( batch struct { bitBatches []*bitBatch fields []*pilosa.Field + exists []bool timePilosa time.Duration timeMapping time.Duration } @@ -227,11 +228,14 @@ func (d *Driver) savePartition( err error ) for i, e := range idx.Expressions() { - name := fieldName(idx.ID(), e, p) - pilosaIndex.DeleteField(name) + name := fieldName(e, p) field, err := pilosaIndex.CreateField(name, pilosa.OptFieldTypeDefault()) if err != nil { - return 0, err + if _, ok := err.(pilosa.ConflictError); !ok { + return 0, err + } + field = pilosaIndex.Field(name) + b.exists[i] = true } b.fields[i] = field b.bitBatches[i] = newBitBatch(sql.IndexBatchSize) @@ -335,6 +339,7 @@ func (d *Driver) Save( var b = batch{ fields: make([]*pilosa.Field, len(idx.Expressions())), bitBatches: make([]*bitBatch, len(idx.Expressions())), + exists: make([]bool, len(idx.Expressions())), } ctx.Context, idx.cancel = context.WithCancel(ctx.Context) @@ -396,6 +401,41 @@ func (d *Driver) Delete(i sql.Index, partitions sql.PartitionIter) error { return err } + var ( + root = filepath.Join(d.root, i.Database(), i.Table()) + ) + + dirs, err := ioutil.ReadDir(root) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + for _, dirinfo := range dirs { + if dirinfo.IsDir() && !strings.HasPrefix(dirinfo.Name(), ".") { + config := d.configFilePath(i.Database(), i.Table(), dirinfo.Name()) + if _, err := os.Stat(config); err != nil { + if os.IsNotExist(err) { + continue + } + return err + } + cfg, err := index.ReadConfigFile(config) + if err != nil { + return err + } + + for _, ex1 := range cfg.Expressions { + for _, ex2 := range idx.Expressions() { + if ex1 == ex2 { + return nil + } + } + } + } + } + for { p, err := partitions.Next() if err != nil { @@ -406,7 +446,7 @@ func (d *Driver) Delete(i sql.Index, partitions sql.PartitionIter) error { } for _, ex := range idx.Expressions() { - name := fieldName(idx.ID(), ex, p) + name := fieldName(ex, p) field := idx.index.Field(name) if field == nil { continue @@ -440,10 +480,12 @@ func (d *Driver) savePilosa(ctx *sql.Context, colID uint64, b *batch) error { start := time.Now() for i, fld := range b.fields { - err := fld.Import(b.bitBatches[i].rows, b.bitBatches[i].cols, nil) - if err != nil { - span.LogKV("error", err) - return err + if !b.exists[i] { + err := fld.Import(b.bitBatches[i].rows, b.bitBatches[i].cols, nil) + if err != nil { + span.LogKV("error", err) + return err + } } b.bitBatches[i].Clean() @@ -506,9 +548,8 @@ func indexName(db, table string) string { return fmt.Sprintf("%s-%x", IndexNamePrefix, h.Sum(nil)) } -func fieldName(id, ex string, p sql.Partition) string { +func fieldName(ex string, p sql.Partition) string { h := sha1.New() - io.WriteString(h, id) io.WriteString(h, ex) h.Write(p.Key()) return fmt.Sprintf("%s-%x", FieldNamePrefix, h.Sum(nil)) diff --git a/sql/index/pilosa/driver_test.go b/sql/index/pilosa/driver_test.go index 86954d01b..800e30bfc 100644 --- a/sql/index/pilosa/driver_test.go +++ b/sql/index/pilosa/driver_test.go @@ -974,13 +974,11 @@ func TestNegateIndex(t *testing.T) { {"3", []interface{}{int64(1)}}, {"4", []interface{}{int64(1)}}, {"5", []interface{}{int64(7)}}, - }, - }, - { - testPartition(1), - []kvfixture{ - {"1", []interface{}{int64(2)}}, - {"2", []interface{}{int64(7)}}, + {"6", []interface{}{int64(10)}}, + {"7", []interface{}{int64(5)}}, + {"8", []interface{}{int64(6)}}, + {"9", []interface{}{int64(4)}}, + {"10", []interface{}{int64(3)}}, }, }, }, @@ -989,23 +987,19 @@ func TestNegateIndex(t *testing.T) { err = d.Save(sql.NewEmptyContext(), idx, it) require.NoError(err) - fixtures := []kvfixture{ - {"1", []interface{}{int64(2), int64(6)}}, - {"2", []interface{}{int64(7), int64(5)}}, - {"3", []interface{}{int64(1), int64(2)}}, - {"4", []interface{}{int64(1), int64(3)}}, - {"5", []interface{}{int64(7), int64(6)}}, - {"6", []interface{}{int64(10), int64(6)}}, - {"7", []interface{}{int64(5), int64(1)}}, - {"8", []interface{}{int64(6), int64(2)}}, - {"9", []interface{}{int64(4), int64(0)}}, - {"10", []interface{}{int64(3), int64(5)}}, - } - multiIt := &fixturePartitionKeyValueIter{ fixtures: []partitionKeyValueFixture{ - {testPartition(0), fixtures}, - {testPartition(1), fixtures[4:]}, + {testPartition(0), []kvfixture{ + {"1", []interface{}{int64(2), int64(6)}}, + {"2", []interface{}{int64(7), int64(5)}}, + {"3", []interface{}{int64(1), int64(2)}}, + {"4", []interface{}{int64(1), int64(3)}}, + {"6", []interface{}{int64(10), int64(6)}}, + {"7", []interface{}{int64(5), int64(1)}}, + {"8", []interface{}{int64(6), int64(2)}}, + {"9", []interface{}{int64(4), int64(0)}}, + {"10", []interface{}{int64(3), int64(5)}}, + }}, }, } @@ -1018,7 +1012,7 @@ func TestNegateIndex(t *testing.T) { values, err := lookupValues(lookup) require.NoError(err) - expected := []string{"1", "2", "5"} + expected := []string{"1", "2", "5", "6", "7", "8", "9", "10"} require.Equal(expected, values) // test non existing values @@ -1028,7 +1022,7 @@ func TestNegateIndex(t *testing.T) { values, err = lookupValues(lookup) require.NoError(err) - expected = []string{"1", "2", "3", "4", "5"} + expected = []string{"1", "2", "3", "4", "5", "6", "7", "8", "9", "10"} require.Equal(expected, values) lookup, err = multiIdx.(sql.NegateIndex).Not(int64(1), int64(6)) @@ -1098,10 +1092,6 @@ func TestEqualAndLessIndex(t *testing.T) { {"4", []interface{}{int64(4)}}, {"5", []interface{}{int64(5)}}, {"6", []interface{}{int64(6)}}, - {"7", []interface{}{int64(7)}}, - {"8", []interface{}{int64(8)}}, - {"9", []interface{}{int64(9)}}, - {"10", []interface{}{int64(10)}}, }, }, }, @@ -1123,6 +1113,7 @@ func TestEqualAndLessIndex(t *testing.T) { expected = []string{"3", "4"} require.Equal(expected, values) } + func TestPilosaHolder(t *testing.T) { require := require.New(t) setup(t) diff --git a/sql/index/pilosa/index.go b/sql/index/pilosa/index.go index 00fd4db53..fb6b4896b 100644 --- a/sql/index/pilosa/index.go +++ b/sql/index/pilosa/index.go @@ -110,7 +110,7 @@ func (idx *pilosaIndex) Has(p sql.Partition, key ...interface{}) (bool, error) { } for i, expr := range idx.expressions { - name := fieldName(idx.ID(), expr, p) + name := fieldName(expr, p) val, err := idx.mapping.get(name, key[i]) if err != nil || val == nil { diff --git a/sql/index/pilosa/lookup.go b/sql/index/pilosa/lookup.go index 50bfa91f7..a9699391b 100644 --- a/sql/index/pilosa/lookup.go +++ b/sql/index/pilosa/lookup.go @@ -83,7 +83,7 @@ func (l *indexLookup) indexName() string { func (l *indexLookup) intersectExpressions(p sql.Partition) (*pilosa.Row, error) { var row *pilosa.Row for i, expr := range l.expressions { - field := l.index.Field(fieldName(l.id, expr, p)) + field := l.index.Field(fieldName(expr, p)) rowID, err := l.mapping.rowID(field.Name(), l.keys[i]) if err == io.EOF { continue @@ -228,7 +228,7 @@ func (l *filteredLookup) intersectExpressions(p sql.Partition) (*pilosa.Row, err var row *pilosa.Row for i, expr := range l.expressions { - field := l.index.Field(fieldName(l.id, expr, p)) + field := l.index.Field(fieldName(expr, p)) rows, err := l.mapping.filter(field.Name(), func(b []byte) (bool, error) { return l.filter(i, b) }) @@ -387,7 +387,7 @@ func (l *negateLookup) indexName() string { return l.index.Name() } func (l *negateLookup) intersectExpressions(p sql.Partition) (*pilosa.Row, error) { var row *pilosa.Row for i, expr := range l.expressions { - field := l.index.Field(fieldName(l.id, expr, p)) + field := l.index.Field(fieldName(expr, p)) maxRowID, err := l.mapping.getMaxRowID(field.Name()) if err != nil {