Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(query): Read just the latest value for scalar types #8966

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
48 changes: 48 additions & 0 deletions posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,54 @@ func TestAddMutation_mrjn1(t *testing.T) {
require.Equal(t, 0, ol.Length(txn.StartTs, 0))
}

func TestReadSingleValue(t *testing.T) {
defer setMaxListSize(maxListSize)
maxListSize = math.MaxInt32

// We call pl.Iterate and then stop iterating in the first loop when we are reading
// single values. This test confirms that the two functions, getFirst from this file
// and GetSingeValueForKey works without an issue.

key := x.DataKey(x.GalaxyAttr("value"), 1240)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
N := int(10000)
for i := 2; i <= N; i += 2 {
edge := &pb.DirectedEdge{
Value: []byte("ho hey there" + strconv.Itoa(i)),
}
txn := Txn{StartTs: uint64(i)}
addMutationHelper(t, ol, edge, Set, &txn)
require.NoError(t, ol.commitMutation(uint64(i), uint64(i)+1))
kData := ol.getMutation(uint64(i))
writer := NewTxnWriter(pstore)
if err := writer.SetAt(key, kData, BitDeltaPosting, uint64(i)); err != nil {
require.NoError(t, err)
}
writer.Flush()

if i%10 == 0 {
// Do frequent rollups, and store data in old timestamp
kvs, err := ol.Rollup(nil, txn.StartTs-3)
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
}

j := 2
if j < int(ol.minTs) {
j = int(ol.minTs)
}
for ; j < i+6; j++ {
tx := NewTxn(uint64(j))
k, err := tx.cache.GetSinglePosting(key)
require.NoError(t, err)
checkValue(t, ol, string(k.Postings[0].Value), uint64(j))
}
}
}

func TestRollupMaxTsIsSet(t *testing.T) {
defer setMaxListSize(maxListSize)
maxListSize = math.MaxInt32
Expand Down
64 changes: 64 additions & 0 deletions posting/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,70 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error)
return lc.SetIfAbsent(skey, pl), nil
}

// GetSinglePosting retrieves the cached version of the first item in the list associated with the
// given key. This is used for retrieving the value of a scalar predicats.
func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since its a public function could you add a description of what this function's doing/why its necessary? basically what you explained yesterday in text format would be helpful for future reference

getDeltas := func() *pb.PostingList {
lc.RLock()
defer lc.RUnlock()

pl := &pb.PostingList{}
if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 {
err := pl.Unmarshal(delta)
if err != nil {
return pl
}
}

return nil
}

getPostings := func() (*pb.PostingList, error) {
pl := getDeltas()
if pl != nil {
return pl, nil
}

pl = &pb.PostingList{}
txn := pstore.NewTransactionAt(lc.startTs, false)
item, err := txn.Get(key)
if err != nil {
return nil, err
}

err = item.Value(func(val []byte) error {
if err := pl.Unmarshal(val); err != nil {
return err
}
return nil
})

return pl, err
}

pl, err := getPostings()
if err == badger.ErrKeyNotFound {
return nil, nil
}
if err != nil {
return nil, err
}

// Filter and remove STAR_ALL and OP_DELETE Postings
idx := 0
for _, postings := range pl.Postings {
if hasDeleteAll(postings) {
return nil, nil
}
if postings.Op != Del {
pl.Postings[idx] = postings
idx++
}
}
pl.Postings = pl.Postings[:idx]
return pl, nil
}

// Get retrieves the cached version of the list associated with the given key.
func (lc *LocalCache) Get(key []byte) (*List, error) {
return lc.getInternal(key, true)
Expand Down
104 changes: 69 additions & 35 deletions worker/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,14 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er
outputs := make([]*pb.Result, numGo)
listType := schema.State().IsList(q.Attr)

// These are certain special cases where we can get away with reading only the latest value
// Lang doesn't work because we would be storing various different languages at various
// time. So when we go to read the latest value, we might get a different language.
// Similarly with DoCount and ExpandAll and Facets. List types are also not supported
// because list is stored by time, and we combine all the list items at various timestamps.
hasLang := schema.State().HasLang(q.Attr)
getMultiplePosting := q.DoCount || q.ExpandAll || listType || hasLang || q.FacetParam != nil

calculate := func(start, end int) error {
x.AssertTrue(start%width == 0)
out := &pb.Result{}
Expand All @@ -391,49 +399,75 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er
key := x.DataKey(q.Attr, q.UidList.Uids[i])

// Get or create the posting list for an entity, attribute combination.
pl, err := qs.cache.Get(key)
if err != nil {
return err
}

// If count is being requested, there is no need to populate value and facets matrix.
if q.DoCount {
count, err := countForValuePostings(args, pl, facetsTree, listType)
if err != nil && err != posting.ErrNoValue {
var vals []types.Val
fcs := &pb.FacetsList{FacetsList: make([]*pb.Facets, 0)} // TODO Figure out how it is stored

if !getMultiplePosting {
pl, err := qs.cache.GetSinglePosting(key)
if err != nil {
return err
}
if pl == nil || len(pl.Postings) == 0 {
out.UidMatrix = append(out.UidMatrix, &pb.List{})
out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{})
out.ValueMatrix = append(out.ValueMatrix,
&pb.ValueList{Values: []*pb.TaskValue{}})
continue
}
vals = make([]types.Val, len(pl.Postings))
for i, p := range pl.Postings {
vals[i] = types.Val{
Tid: types.TypeID(p.ValType),
Value: p.Value,
}
}
} else {
pl, err := qs.cache.Get(key)
if err != nil {
return err
}
out.Counts = append(out.Counts, uint32(count))
// Add an empty UID list to make later processing consistent.
out.UidMatrix = append(out.UidMatrix, &pb.List{})
continue
}

vals, fcs, err := retrieveValuesAndFacets(args, pl, facetsTree, listType)
switch {
case err == posting.ErrNoValue || (err == nil && len(vals) == 0):
// This branch is taken when the value does not exist in the pl or
// the number of values retrieved is zero (there could still be facets).
// We add empty lists to the UidMatrix, FaceMatrix, ValueMatrix and
// LangMatrix so that all these data structure have predictable layouts.
out.UidMatrix = append(out.UidMatrix, &pb.List{})
out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{})
out.ValueMatrix = append(out.ValueMatrix,
&pb.ValueList{Values: []*pb.TaskValue{}})
if q.ExpandAll {
// To keep the cardinality same as that of ValueMatrix.
out.LangMatrix = append(out.LangMatrix, &pb.LangList{})
// If count is being requested, there is no need to populate value and facets matrix.
if q.DoCount {
count, err := countForValuePostings(args, pl, facetsTree, listType)
if err != nil && err != posting.ErrNoValue {
return err
}
out.Counts = append(out.Counts, uint32(count))
// Add an empty UID list to make later processing consistent.
out.UidMatrix = append(out.UidMatrix, &pb.List{})
continue
}
continue
case err != nil:
return err
}

if q.ExpandAll {
langTags, err := pl.GetLangTags(args.q.ReadTs)
if err != nil {
vals, fcs, err = retrieveValuesAndFacets(args, pl, facetsTree, listType)

switch {
case err == posting.ErrNoValue || (err == nil && len(vals) == 0):
// This branch is taken when the value does not exist in the pl or
// the number of values retrieved is zero (there could still be facets).
// We add empty lists to the UidMatrix, FaceMatrix, ValueMatrix and
// LangMatrix so that all these data structure have predictable layouts.
out.UidMatrix = append(out.UidMatrix, &pb.List{})
out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{})
out.ValueMatrix = append(out.ValueMatrix,
&pb.ValueList{Values: []*pb.TaskValue{}})
if q.ExpandAll {
// To keep the cardinality same as that of ValueMatrix.
out.LangMatrix = append(out.LangMatrix, &pb.LangList{})
}
continue
case err != nil:
return err
}
out.LangMatrix = append(out.LangMatrix, &pb.LangList{Lang: langTags})

if q.ExpandAll {
langTags, err := pl.GetLangTags(args.q.ReadTs)
if err != nil {
return err
}
out.LangMatrix = append(out.LangMatrix, &pb.LangList{Lang: langTags})
}
}

uidList := new(pb.List)
Expand Down