Skip to content
This repository has been archived by the owner on Jan 28, 2021. It is now read-only.

Commit

Permalink
No holder.
Browse files Browse the repository at this point in the history
Signed-off-by: kuba-- <[email protected]>
(cherry picked from commit 3d2c375)
  • Loading branch information
kuba-- authored and ajnavarro committed Oct 19, 2018
1 parent 2f288e1 commit b05455e
Show file tree
Hide file tree
Showing 3 changed files with 258 additions and 19 deletions.
46 changes: 31 additions & 15 deletions sql/index/pilosa/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,15 @@ type (

// Driver implements sql.IndexDriver interface.
Driver struct {
root string
holder *pilosa.Holder
root string
}
)

// NewDriver returns a new instance of pilosa.Driver
// which satisfies sql.IndexDriver interface
func NewDriver(root string) *Driver {
h := pilosa.NewHolder()
h.Path = filepath.Join(root, "."+DriverID)
return &Driver{
root: root,
holder: h,
root: root,
}
}

Expand Down Expand Up @@ -116,16 +112,12 @@ func (d *Driver) Create(
return nil, err
}

idx, err := d.holder.CreateIndexIfNotExists(
indexName(db, table),
pilosa.IndexOptions{},
)
idx, err := d.newPilosaIndex(db, table)
if err != nil {
return nil, err
}

mapping := newMapping(d.mappingFilePath(db, table, id))

processingFile := d.processingFilePath(db, table, id)
if err := index.WriteProcessingFile(
processingFile,
Expand Down Expand Up @@ -173,11 +165,14 @@ func (d *Driver) LoadAll(db, table string) ([]sql.Index, error) {
}

func (d *Driver) loadIndex(db, table, id string) (*pilosaIndex, error) {
name := indexName(db, table)
idx := d.holder.Index(name)
if idx == nil {
return nil, errLoadingIndex.New(name)
idx, err := d.newPilosaIndex(db, table)
if err != nil {
return nil, err
}
if err := idx.Open(); err != nil {
return nil, err
}
defer idx.Close()

dir := filepath.Join(d.root, db, table, id)
config := d.configFilePath(db, table, id)
Expand Down Expand Up @@ -328,6 +323,12 @@ func (d *Driver) Save(
if !ok {
return errInvalidIndexType.New(i)
}

if err := idx.index.Open(); err != nil {
return err
}
defer idx.index.Close()

idx.wg.Add(1)
defer idx.wg.Done()

Expand Down Expand Up @@ -386,6 +387,11 @@ func (d *Driver) Delete(i sql.Index, partitions sql.PartitionIter) error {
idx.wg.Wait()
}

if err := idx.index.Open(); err != nil {
return err
}
defer idx.index.Close()

if err := os.RemoveAll(filepath.Join(d.root, i.Database(), i.Table(), i.ID())); err != nil {
return err
}
Expand Down Expand Up @@ -534,3 +540,13 @@ func (d *Driver) processingFilePath(db, table, id string) string {
func (d *Driver) mappingFilePath(db, table, id string) string {
return filepath.Join(d.root, db, table, id, MappingFileName)
}

func (d *Driver) newPilosaIndex(db, table string) (*pilosa.Index, error) {
name := indexName(db, table)
path := filepath.Join(d.root, "."+DriverID, name)
idx, err := pilosa.NewIndex(path, name)
if err != nil {
return nil, err
}
return idx, nil
}
221 changes: 221 additions & 0 deletions sql/index/pilosa/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,66 @@ func TestLoadAll(t *testing.T) {
require.NoError(err)
}

func TestLoadAllWithMultipleDrivers(t *testing.T) {
require := require.New(t)
setup(t)
defer cleanup(t)

d1 := NewDriver(tmpDir)
idx1, err := d1.Create("db", "table", "id1", makeExpressions("table", "hash1"), nil)
require.NoError(err)
it1 := &partitionKeyValueIter{
partitions: 2,
offset: 0,
total: 64,
expressions: idx1.Expressions(),
location: randLocation,
}
require.NoError(d1.Save(sql.NewEmptyContext(), idx1, it1))

d2 := NewDriver(tmpDir)
idx2, err := d2.Create("db", "table", "id2", makeExpressions("table", "hash1"), nil)
require.NoError(err)
it2 := &partitionKeyValueIter{
partitions: 2,
offset: 0,
total: 64,
expressions: idx2.Expressions(),
location: randLocation,
}
require.NoError(d2.Save(sql.NewEmptyContext(), idx2, it2))

d := NewDriver(tmpDir)
indexes, err := d.LoadAll("db", "table")
require.NoError(err)

require.Equal(2, len(indexes))
i1, ok := idx1.(*pilosaIndex)
require.True(ok)
i2, ok := idx2.(*pilosaIndex)
require.True(ok)

require.Equal(i1.index.Name(), i2.index.Name())

// Load index from another table. Previously this panicked as the same
// pilosa.Holder was used for all indexes.

d3 := NewDriver(tmpDir)
idx3, err := d3.Create("db", "table2", "id1", makeExpressions("table2", "hash1"), nil)
require.NoError(err)
it3 := &partitionKeyValueIter{
partitions: 2,
offset: 0,
total: 64,
expressions: idx3.Expressions(),
location: randLocation,
}
require.NoError(d3.Save(sql.NewEmptyContext(), idx3, it3))

indexes, err = d.LoadAll("db", "table2")
require.NoError(err)
}

type logLoc struct {
loc []byte
err error
Expand Down Expand Up @@ -214,6 +274,39 @@ func TestSaveAndGetAll(t *testing.T) {
require.True(errInvalidKeys.Is(err))
}

func TestSaveAndGetAllWithMultipleDrivers(t *testing.T) {
require := require.New(t)
setup(t)
defer cleanup(t)

db, table, id := "db_name", "table_name", "index_id"
expressions := makeExpressions(table, "lang", "hash")

d1 := NewDriver(tmpDir)
sqlIdx, err := d1.Create(db, table, id, expressions, nil)
require.NoError(err)

it := &partitionKeyValueIter{
partitions: 2,
offset: 0,
total: 64,
expressions: sqlIdx.Expressions(),
location: randLocation,
}

err = d1.Save(sql.NewEmptyContext(), sqlIdx, it)
require.NoError(err)

d2 := NewDriver(tmpDir)
indexes, err := d2.LoadAll(db, table)
require.NoError(err)
require.Equal(1, len(indexes))

_, err = sqlIdx.Get()
require.Error(err)
require.True(errInvalidKeys.Is(err))
}

func TestLoadCorruptedIndex(t *testing.T) {
require := require.New(t)
setup(t)
Expand Down Expand Up @@ -254,6 +347,58 @@ func TestDelete(t *testing.T) {
require.NoError(err)
}

func TestDeleteWithMultipleDrivers(t *testing.T) {
require := require.New(t)
setup(t)
defer cleanup(t)

db, table, id := "db_name", "table_name", "index_id"

expressions := []sql.Expression{
expression.NewGetFieldWithTable(0, sql.Int64, table, "lang", true),
expression.NewGetFieldWithTable(1, sql.Int64, table, "field", true),
}

d := NewDriver(tmpDir)
sqlIdx, err := d.Create(db, table, id, expressions, nil)
require.NoError(err)

d = NewDriver(tmpDir)
err = d.Delete(sqlIdx, new(partitionIter))
require.NoError(err)
}

func TestDeleteAndLoadAll(t *testing.T) {
require := require.New(t)
setup(t)
defer cleanup(t)

db, table, id := "db_name", "table_name", "index_id"
expressions := makeExpressions(table, "lang", "hash")

d := NewDriver(tmpDir)
sqlIdx, err := d.Create(db, table, id, expressions, nil)
require.NoError(err)

it := &partitionKeyValueIter{
partitions: 2,
offset: 0,
total: 64,
expressions: sqlIdx.Expressions(),
location: randLocation,
}

err = d.Save(sql.NewEmptyContext(), sqlIdx, it)
require.NoError(err)

err = d.Delete(sqlIdx, new(partitionIter))
require.NoError(err)

indexes, err := d.LoadAll(db, table)
require.NoError(err)
require.Equal(0, len(indexes))
}

func TestDeleteInProgress(t *testing.T) {
require := require.New(t)
setup(t)
Expand Down Expand Up @@ -449,6 +594,82 @@ func TestIntersection(t *testing.T) {
require.NoError(interIt.Close())
}

func TestIntersectionWithMultipleDrivers(t *testing.T) {
ctx := sql.NewContext(context.Background())
require := require.New(t)
setup(t)
defer cleanup(t)

db, table := "db_name", "table_name"
idxLang, expLang := "idx_lang", makeExpressions(table, "lang")
idxPath, expPath := "idx_path", makeExpressions(table, "path")

d1 := NewDriver(tmpDir)
sqlIdxLang, err := d1.Create(db, table, idxLang, expLang, nil)
require.NoError(err)

d2 := NewDriver(tmpDir)
sqlIdxPath, err := d2.Create(db, table, idxPath, expPath, nil)
require.NoError(err)

itLang := &partitionKeyValueIter{
partitions: 2,
offset: 0,
total: 10,
expressions: sqlIdxLang.Expressions(),
location: offsetLocation,
}

itPath := &partitionKeyValueIter{
partitions: 2,
offset: 0,
total: 10,
expressions: sqlIdxPath.Expressions(),
location: offsetLocation,
}

err = d1.Save(ctx, sqlIdxLang, itLang)
require.NoError(err)

err = d2.Save(ctx, sqlIdxPath, itPath)
require.NoError(err)

lookupLang, err := sqlIdxLang.Get(itLang.records[0][0].values...)
require.NoError(err)
lookupPath, err := sqlIdxPath.Get(itPath.records[0][itPath.total-1].values...)
require.NoError(err)

m, ok := lookupLang.(sql.Mergeable)
require.True(ok)
require.True(m.IsMergeable(lookupPath))

interLookup, ok := lookupLang.(sql.SetOperations)
require.True(ok)
interIt, err := interLookup.Intersection(lookupPath).Values(testPartition(0))
require.NoError(err)
loc, err := interIt.Next()

require.True(err == io.EOF)
require.NoError(interIt.Close())

lookupLang, err = sqlIdxLang.Get(itLang.records[0][0].values...)
require.NoError(err)
lookupPath, err = sqlIdxPath.Get(itPath.records[0][0].values...)
require.NoError(err)

interLookup, ok = lookupPath.(sql.SetOperations)
require.True(ok)
interIt, err = interLookup.Intersection(lookupLang).Values(testPartition(0))
require.NoError(err)
loc, err = interIt.Next()
require.NoError(err)
require.Equal(loc, itPath.records[0][0].location)
_, err = interIt.Next()
require.True(err == io.EOF)

require.NoError(interIt.Close())
}

func TestUnion(t *testing.T) {
require := require.New(t)
setup(t)
Expand Down
10 changes: 6 additions & 4 deletions sql/index/pilosa/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,11 @@ func (l *indexLookup) values(p sql.Partition) (*pilosa.Row, error) {
if err := l.mapping.open(); err != nil {
return nil, err
}

defer l.mapping.close()

if err := l.index.Open(); err != nil {
return nil, err
}
var row *pilosa.Row
for i, expr := range l.expressions {
field := l.index.Field(fieldName(l.id, expr, p))
Expand All @@ -105,6 +107,9 @@ func (l *indexLookup) values(p sql.Partition) (*pilosa.Row, error) {

row = intersect(row, r)
}
if err := l.index.Close(); err != nil {
return nil, err
}

// evaluate composition of operations
for _, op := range l.operations {
Expand All @@ -131,9 +136,6 @@ func (l *indexLookup) values(p sql.Partition) (*pilosa.Row, error) {

// Values implements sql.IndexLookup.Values
func (l *indexLookup) Values(p sql.Partition) (sql.IndexValueIter, error) {
l.index.Open()
defer l.index.Close()

row, err := l.values(p)
if err != nil {
return nil, err
Expand Down

0 comments on commit b05455e

Please sign in to comment.