Skip to content
This repository has been archived by the owner on Jan 28, 2021. It is now read-only.

Commit

Permalink
Merge pull request #550 from kuba--/fix-filteredindex
Browse files Browse the repository at this point in the history
Fix filtered index
  • Loading branch information
ajnavarro authored Nov 15, 2018
2 parents cd34c1a + 04ae740 commit 2da211e
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 38 deletions.
9 changes: 0 additions & 9 deletions sql/index/pilosa/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,15 +498,6 @@ func (b *bitBatch) Add(row, col uint64) {
b.cols = append(b.cols, col)
}

func (b *bitBatch) NextRecord() (uint64, uint64, error) {
if b.pos >= uint64(len(b.rows)) {
return 0, 0, io.EOF
}

b.pos++
return b.rows[b.pos-1], b.cols[b.pos-1], nil
}

func indexName(db, table string) string {
h := sha1.New()
io.WriteString(h, db)
Expand Down
82 changes: 82 additions & 0 deletions sql/index/pilosa/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1041,6 +1041,88 @@ func TestNegateIndex(t *testing.T) {
require.Equal(expected, values)
}

func TestEqualAndLessIndex(t *testing.T) {
require := require.New(t)
setup(t)
defer cleanup(t)

ctx := sql.NewContext(context.Background())
db, table := "db_name", "table_name"
d := NewDriver(tmpDir)

idxEqA, err := d.Create(db, table, "idx_eq_a", makeExpressions(table, "a"), nil)
require.NoError(err)
pilosaIdxEqA, ok := idxEqA.(*pilosaIndex)
require.True(ok)
itA := &fixturePartitionKeyValueIter{
fixtures: []partitionKeyValueFixture{
{
testPartition(0),
[]kvfixture{
{"1", []interface{}{int64(2)}},
{"2", []interface{}{int64(7)}},
{"3", []interface{}{int64(1)}},
{"4", []interface{}{int64(1)}},
{"5", []interface{}{int64(1)}},
{"6", []interface{}{int64(10)}},
{"7", []interface{}{int64(5)}},
{"8", []interface{}{int64(6)}},
{"9", []interface{}{int64(4)}},
{"10", []interface{}{int64(1)}},
},
},
},
}
err = d.Save(ctx, pilosaIdxEqA, itA)
require.NoError(err)
eqALookup, err := pilosaIdxEqA.Get(int64(1))
require.NoError(err)

values, err := lookupValues(eqALookup)
require.NoError(err)
expected := []string{"3", "4", "5", "10"}
require.Equal(expected, values)

idxLessB, err := d.Create(db, table, "idx_less_b", makeExpressions(table, "b"), nil)
require.NoError(err)
pilosaIdxLessB, ok := idxLessB.(*pilosaIndex)
require.True(ok)
itB := &fixturePartitionKeyValueIter{
fixtures: []partitionKeyValueFixture{
{
testPartition(0),
[]kvfixture{
{"1", []interface{}{int64(1)}},
{"2", []interface{}{int64(2)}},
{"3", []interface{}{int64(3)}},
{"4", []interface{}{int64(4)}},
{"5", []interface{}{int64(5)}},
{"6", []interface{}{int64(6)}},
{"7", []interface{}{int64(7)}},
{"8", []interface{}{int64(8)}},
{"9", []interface{}{int64(9)}},
{"10", []interface{}{int64(10)}},
},
},
},
}
err = d.Save(ctx, pilosaIdxLessB, itB)
require.NoError(err)
lessB, err := pilosaIdxLessB.AscendLessThan(int64(5))
require.NoError(err)
lessBLookup := lessB.(*ascendLookup)

values, err = lookupValues(lessBLookup)
require.NoError(err)
expected = []string{"1", "2", "3", "4"}
require.Equal(expected, values)

interLookup := eqALookup.(sql.SetOperations).Intersection(lessBLookup)
values, err = lookupValues(interLookup)
require.NoError(err)
expected = []string{"3", "4"}
require.Equal(expected, values)
}
func TestPilosaHolder(t *testing.T) {
require := require.New(t)
setup(t)
Expand Down
96 changes: 67 additions & 29 deletions sql/index/pilosa/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,7 @@ func (l *indexLookup) indexName() string {
return l.index.Name()
}

func (l *indexLookup) values(p sql.Partition) (*pilosa.Row, error) {
if err := l.mapping.open(); err != nil {
return nil, err
}
defer l.mapping.close()

if err := l.index.Open(); err != nil {
return nil, err
}
func (l *indexLookup) intersectExpressions(p sql.Partition) (*pilosa.Row, error) {
var row *pilosa.Row
for i, expr := range l.expressions {
field := l.index.Field(fieldName(l.id, expr, p))
Expand All @@ -107,7 +99,25 @@ func (l *indexLookup) values(p sql.Partition) (*pilosa.Row, error) {

row = intersect(row, r)
}
if err := l.index.Close(); err != nil {
return row, nil
}

func (l *indexLookup) values(p sql.Partition) (*pilosa.Row, error) {
if err := l.mapping.open(); err != nil {
return nil, err
}
defer l.mapping.close()

if err := l.index.Open(); err != nil {
return nil, err
}
row, err := l.intersectExpressions(p)
if e := l.index.Close(); e != nil {
if err == nil {
err = e
}
}
if err != nil {
return nil, err
}

Expand Down Expand Up @@ -213,14 +223,10 @@ func (l *filteredLookup) indexName() string {
return l.index.Name()
}

func (l *filteredLookup) values(p sql.Partition) (*pilosa.Row, error) {
if err := l.mapping.open(); err != nil {
return nil, err
}
defer l.mapping.close()

// evaluate Intersection of bitmaps
// evaluate Intersection of bitmaps
func (l *filteredLookup) intersectExpressions(p sql.Partition) (*pilosa.Row, error) {
var row *pilosa.Row

for i, expr := range l.expressions {
field := l.index.Field(fieldName(l.id, expr, p))
rows, err := l.mapping.filter(field.Name(), func(b []byte) (bool, error) {
Expand All @@ -242,6 +248,28 @@ func (l *filteredLookup) values(p sql.Partition) (*pilosa.Row, error) {
row = intersect(row, r)
}

return row, nil
}

func (l *filteredLookup) values(p sql.Partition) (*pilosa.Row, error) {
if err := l.mapping.open(); err != nil {
return nil, err
}
defer l.mapping.close()

if err := l.index.Open(); err != nil {
return nil, err
}
row, err := l.intersectExpressions(p)
if e := l.index.Close(); e != nil {
if err == nil {
err = e
}
}
if err != nil {
return nil, err
}

// evaluate composition of operations
for _, op := range l.operations {
var (
Expand Down Expand Up @@ -269,9 +297,6 @@ func (l *filteredLookup) values(p sql.Partition) (*pilosa.Row, error) {
}

func (l *filteredLookup) Values(p sql.Partition) (sql.IndexValueIter, error) {
l.index.Open()
defer l.index.Close()

row, err := l.values(p)
if err != nil {
return nil, err
Expand Down Expand Up @@ -359,12 +384,7 @@ type negateLookup struct {

func (l *negateLookup) indexName() string { return l.index.Name() }

func (l *negateLookup) values(p sql.Partition) (*pilosa.Row, error) {
if err := l.mapping.open(); err != nil {
return nil, err
}
defer l.mapping.close()

func (l *negateLookup) intersectExpressions(p sql.Partition) (*pilosa.Row, error) {
var row *pilosa.Row
for i, expr := range l.expressions {
field := l.index.Field(fieldName(l.id, expr, p))
Expand Down Expand Up @@ -401,6 +421,27 @@ func (l *negateLookup) values(p sql.Partition) (*pilosa.Row, error) {

row = intersect(row, r)
}
return row, nil
}

func (l *negateLookup) values(p sql.Partition) (*pilosa.Row, error) {
if err := l.mapping.open(); err != nil {
return nil, err
}
defer l.mapping.close()

if err := l.index.Open(); err != nil {
return nil, err
}
row, err := l.intersectExpressions(p)
if e := l.index.Close(); e != nil {
if err == nil {
err = e
}
}
if err != nil {
return nil, err
}

// evaluate composition of operations
for _, op := range l.operations {
Expand Down Expand Up @@ -431,9 +472,6 @@ func (l *negateLookup) values(p sql.Partition) (*pilosa.Row, error) {

// Values implements sql.IndexLookup.Values
func (l *negateLookup) Values(p sql.Partition) (sql.IndexValueIter, error) {
l.index.Open()
defer l.index.Close()

row, err := l.values(p)
if err != nil {
return nil, err
Expand Down

0 comments on commit 2da211e

Please sign in to comment.