Skip to content

Commit

Permalink
Fixed comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Harshil Goel committed Oct 12, 2023
1 parent 3b32245 commit e893711
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 17 deletions.
1 change: 0 additions & 1 deletion posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,6 @@ func TestReadSingleValue(t *testing.T) {
require.NoError(t, err)
checkValue(t, ol, string(k.Postings[0].Value), uint64(j))
}

}
}

Expand Down
27 changes: 19 additions & 8 deletions posting/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,24 +195,35 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error)
return lc.SetIfAbsent(skey, pl), nil
}

// GetSinglePosting retrieves the cached version of the first item in the list associated with the
// given key. This is used for retrieving the value of a scalar predicats.
func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) {
getDeltas := func() *pb.PostingList {
lc.RLock()
defer lc.RUnlock()

getPostings := func() (*pb.PostingList, error) {
pl := &pb.PostingList{}
lc.RLock()
if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 {
err := pl.Unmarshal(delta)
if err != nil {
lc.RUnlock()
return pl, nil
return pl
}
}
lc.RUnlock()

return nil
}

getPostings := func() (*pb.PostingList, error) {
pl := getDeltas()
if pl != nil {
return pl, nil
}

pl = &pb.PostingList{}
txn := pstore.NewTransactionAt(lc.startTs, false)
item, err := txn.Get(key)
if err != nil {
return pl, err
return nil, err
}

err = item.Value(func(val []byte) error {
Expand All @@ -227,10 +238,10 @@ func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) {

pl, err := getPostings()
if err == badger.ErrKeyNotFound {
err = nil
return nil, nil
}
if err != nil {
return pl, err
return nil, err
}

// Filter and remove STAR_ALL and OP_DELETE Postings
Expand Down
19 changes: 11 additions & 8 deletions worker/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,9 +376,14 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er

outputs := make([]*pb.Result, numGo)
listType := schema.State().IsList(q.Attr)

// These are certain special cases where we can get away with reading only the latest value
// Lang doesn't work because we would be storing various different languages at various
// time. So when we go to read the latest value, we might get a different language.
// Similarly with DoCount and ExpandAll and Facets. List types are also not supported
// because list is stored by time, and we combine all the list items at various timestamps.
hasLang := schema.State().HasLang(q.Attr)
getMultiplePosting := q.DoCount || q.ExpandAll || listType || hasLang
//getMultiplePosting := true
getMultiplePosting := q.DoCount || q.ExpandAll || listType || hasLang || q.FacetParam != nil

calculate := func(start, end int) error {
x.AssertTrue(start%width == 0)
Expand All @@ -399,7 +404,10 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er
fcs := &pb.FacetsList{FacetsList: make([]*pb.Facets, 0)} // TODO Figure out how it is stored

if !getMultiplePosting {
pl, _ := qs.cache.GetSinglePosting(key)
pl, err := qs.cache.GetSinglePosting(key)
if err != nil {
return err
}
if pl == nil || len(pl.Postings) == 0 {
out.UidMatrix = append(out.UidMatrix, &pb.List{})
out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{})
Expand All @@ -413,11 +421,6 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er
Tid: types.TypeID(p.ValType),
Value: p.Value,
}

// TODO Apply facet tree before
if q.FacetParam != nil {
fcs.FacetsList = append(fcs.FacetsList, &pb.Facets{Facets: facets.CopyFacets(p.Facets, q.FacetParam)})
}
}
} else {
pl, err := qs.cache.Get(key)
Expand Down

0 comments on commit e893711

Please sign in to comment.