Skip to content

Commit

Permalink
Merge pull request #212 from dkropachev/dk/have-a-test-for-paging-and…
Browse files Browse the repository at this point in the history
…-allow-filtering

Have a test to make sure paging feature works properly with allow filtering queries
  • Loading branch information
sylwiaszunejko authored Jul 11, 2024
2 parents cab9e85 + c212c76 commit 6af144e
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 3 deletions.
140 changes: 139 additions & 1 deletion cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"math/big"
"net"
"reflect"
"sort"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -312,6 +313,143 @@ func TestPaging(t *testing.T) {
}
}

func TestPagingWithAllowFiltering(t *testing.T) {
session := createSession(t)

t.Cleanup(func() {
if err := session.Query("DROP TABLE gocql_test.pagging_with_allow_filtering").Exec(); err != nil {
t.Fatal("drop table:", err)
}
session.Close()
})

if session.cfg.ProtoVersion == 1 {
t.Skip("Paging not supported. Please use Cassandra >= 2.0")
}

const (
targetP1 = 50
targetP2 = 50
totalExpectedResults = 30
pageSize = 5
deletedRageStart = 10
deletedRageEnd = 20
// Some record range is being deleted, to test tombstones appearance
expectedCount = totalExpectedResults - (deletedRageEnd - deletedRageStart)
)

paginatedSelect := fmt.Sprintf("SELECT c1, f1 FROM gocql_test.pagging_with_allow_filtering WHERE p1 = %d AND p2 = %d AND f1 < %d ALLOW FILTERING;", targetP1, targetP2, totalExpectedResults)
validateResult := func(t *testing.T, results []int) {
if len(results) != expectedCount {
t.Fatalf("expected %d got %d: %d", expectedCount, len(results), results)
}

sort.Ints(results)

expect := make([]int, 0, expectedCount)
for i := 0; i < totalExpectedResults; i++ {
if i >= deletedRageStart && i < deletedRageEnd {
continue
}
expect = append(expect, i)
}

if !reflect.DeepEqual(results, expect) {
t.Fatalf("expected %v\ngot %v", expect, results)
}
}

t.Run("Prepare", func(t *testing.T) {
if err := createTable(session,
"CREATE TABLE gocql_test.pagging_with_allow_filtering (p1 int, p2 int, c1 int, f1 int, "+
"PRIMARY KEY ((p1, p2), c1)) WITH CLUSTERING ORDER BY (c1 DESC)"); err != nil {
t.Fatal("create table:", err)
}

// Insert extra records
for i := 0; i < 100; i++ {
if err := session.Query("INSERT INTO gocql_test.pagging_with_allow_filtering (p1,p2,c1,f1) VALUES (?,?,?,?)", i, i, i, i).Exec(); err != nil {
t.Fatal("insert:", err)
}
}

// Insert records to a target partition
for i := 0; i < 100; i++ {
if err := session.Query("INSERT INTO gocql_test.pagging_with_allow_filtering (p1,p2,c1,f1) VALUES (?,?,?,?)", targetP1, targetP2, i, i).Exec(); err != nil {
t.Fatal("insert:", err)
}
}

if err := session.Query("DELETE FROM gocql_test.pagging_with_allow_filtering WHERE p1 = ? AND p2 = ? AND c1 >= ? AND c1 < ?", targetP1, targetP2, deletedRageStart, deletedRageEnd).Exec(); err != nil {
t.Fatal("insert:", err)
}
})

t.Run("AutoPagination", func(t *testing.T) {
for _, c := range []Consistency{One, Quorum} {
t.Run(c.String(), func(t *testing.T) {
iter := session.Query(paginatedSelect).Consistency(c).PageSize(pageSize).Iter()

var c1, f1 int
var results []int

for iter.Scan(&c1, &f1) {
if c1 != f1 {
t.Fatalf("expected c1 and f1 values to be the same, but got c1=%d f1=%d", c1, f1)
}
results = append(results, f1)
}
if err := iter.Close(); err != nil {
t.Fatal("select:", err.Error())
}
validateResult(t, results)
})
}
})

t.Run("ManualPagination", func(t *testing.T) {
for _, c := range []Consistency{One, Quorum} {
t.Run(c.String(), func(t *testing.T) {

var c1, f1 int
var results []int
var currentPageState []byte

qry := session.Query(paginatedSelect).Consistency(c).PageSize(pageSize)

for {
iter := qry.PageState(currentPageState).Iter()

// Here we make sure that all iterator, but last one have some data in it
if !iter.LastPage() && iter.NumRows() == 0 {
t.Errorf("expected at least one row, but got 0")
}
for iter.Scan(&c1, &f1) {
if c1 != f1 {
t.Fatalf("expected c1 and f1 values to be the same, but got c1=%d f1=%d", c1, f1)
}
results = append(results, f1)
}
if err := iter.Close(); err != nil {
t.Fatal("select:", err.Error())
}
if iter.LastPage() {
break
}
newPageState := iter.PageState()
if len(currentPageState) == len(newPageState) && bytes.Compare(newPageState, currentPageState) == 0 {
t.Fatalf("page state did not change")
}
currentPageState = newPageState
}

validateResult(t, results)
})
}
})

}

func TestPagingWithBind(t *testing.T) {
session := createSession(t)
defer session.Close()
Expand Down Expand Up @@ -2225,7 +2363,7 @@ func TestManualQueryPaging(t *testing.T) {
fetched++
}

if len(iter.PageState()) > 0 {
if !iter.LastPage() {
// more pages
iter = query.PageState(iter.PageState()).Iter()
} else {
Expand Down
24 changes: 22 additions & 2 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1393,9 +1393,24 @@ func (q *Query) Iter() *Iter {
if isUseStatement(q.stmt) {
return &Iter{err: ErrUseStmt}
}
// if the query was specifically run on a connection then re-use that
// connection when fetching the next results

if !q.disableAutoPage {
return q.executeQuery()
}

// Retry on empty page if pagination is manual
iter := q.executeQuery()
for iter.err == nil && iter.numRows == 0 && !iter.LastPage() {
q.PageState(iter.PageState())
iter = q.executeQuery()
}
return iter
}

func (q *Query) executeQuery() *Iter {
if q.conn != nil {
// if the query was specifically run on a connection then re-use that
// connection when fetching the next results
return q.conn.executeQuery(q.Context(), q)
}
return q.session.executeQuery(q)
Expand Down Expand Up @@ -1771,6 +1786,11 @@ func (iter *Iter) PageState() []byte {
return iter.meta.pagingState
}

// LastPage returns true if there are no more pages to fetch.
func (iter *Iter) LastPage() bool {
return len(iter.meta.pagingState) == 0
}

// NumRows returns the number of rows in this pagination, it will update when new
// pages are fetched, it is not the value of the total number of rows this iter
// will return unless there is only a single page returned.
Expand Down

0 comments on commit 6af144e

Please sign in to comment.