diff --git a/cassandra_test.go b/cassandra_test.go index 67e5d0161..f7539ded4 100644 --- a/cassandra_test.go +++ b/cassandra_test.go @@ -12,6 +12,7 @@ import ( "math/big" "net" "reflect" + "sort" "strconv" "strings" "testing" @@ -312,6 +313,143 @@ func TestPaging(t *testing.T) { } } +func TestPagingWithAllowFiltering(t *testing.T) { + session := createSession(t) + + t.Cleanup(func() { + if err := session.Query("DROP TABLE gocql_test.pagging_with_allow_filtering").Exec(); err != nil { + t.Fatal("drop table:", err) + } + session.Close() + }) + + if session.cfg.ProtoVersion == 1 { + t.Skip("Paging not supported. Please use Cassandra >= 2.0") + } + + const ( + targetP1 = 50 + targetP2 = 50 + totalExpectedResults = 30 + pageSize = 5 + deletedRageStart = 10 + deletedRageEnd = 20 + // Some record range is being deleted, to test tombstones appearance + expectedCount = totalExpectedResults - (deletedRageEnd - deletedRageStart) + ) + + paginatedSelect := fmt.Sprintf("SELECT c1, f1 FROM gocql_test.pagging_with_allow_filtering WHERE p1 = %d AND p2 = %d AND f1 < %d ALLOW FILTERING;", targetP1, targetP2, totalExpectedResults) + validateResult := func(t *testing.T, results []int) { + if len(results) != expectedCount { + t.Fatalf("expected %d got %d: %d", expectedCount, len(results), results) + } + + sort.Ints(results) + + expect := make([]int, 0, expectedCount) + for i := 0; i < totalExpectedResults; i++ { + if i >= deletedRageStart && i < deletedRageEnd { + continue + } + expect = append(expect, i) + } + + if !reflect.DeepEqual(results, expect) { + t.Fatalf("expected %v\ngot %v", expect, results) + } + } + + t.Run("Prepare", func(t *testing.T) { + if err := createTable(session, + "CREATE TABLE gocql_test.pagging_with_allow_filtering (p1 int, p2 int, c1 int, f1 int, "+ + "PRIMARY KEY ((p1, p2), c1)) WITH CLUSTERING ORDER BY (c1 DESC)"); err != nil { + t.Fatal("create table:", err) + } + + // Insert extra records + for i := 0; i < 100; i++ { + if err := session.Query("INSERT INTO gocql_test.pagging_with_allow_filtering (p1,p2,c1,f1) VALUES (?,?,?,?)", i, i, i, i).Exec(); err != nil { + t.Fatal("insert:", err) + } + } + + // Insert records to a target partition + for i := 0; i < 100; i++ { + if err := session.Query("INSERT INTO gocql_test.pagging_with_allow_filtering (p1,p2,c1,f1) VALUES (?,?,?,?)", targetP1, targetP2, i, i).Exec(); err != nil { + t.Fatal("insert:", err) + } + } + + if err := session.Query("DELETE FROM gocql_test.pagging_with_allow_filtering WHERE p1 = ? AND p2 = ? AND c1 >= ? AND c1 < ?", targetP1, targetP2, deletedRageStart, deletedRageEnd).Exec(); err != nil { + t.Fatal("insert:", err) + } + }) + + t.Run("AutoPagination", func(t *testing.T) { + for _, c := range []Consistency{One, Quorum} { + t.Run(c.String(), func(t *testing.T) { + iter := session.Query(paginatedSelect).Consistency(c).PageSize(pageSize).Iter() + + var c1, f1 int + var results []int + + for iter.Scan(&c1, &f1) { + if c1 != f1 { + t.Fatalf("expected c1 and f1 values to be the same, but got c1=%d f1=%d", c1, f1) + } + results = append(results, f1) + } + if err := iter.Close(); err != nil { + t.Fatal("select:", err.Error()) + } + validateResult(t, results) + }) + } + }) + + t.Run("ManualPagination", func(t *testing.T) { + for _, c := range []Consistency{One, Quorum} { + t.Run(c.String(), func(t *testing.T) { + + var c1, f1 int + var results []int + var currentPageState []byte + + qry := session.Query(paginatedSelect).Consistency(c).PageSize(pageSize) + + for { + iter := qry.PageState(currentPageState).Iter() + + // Here we make sure that all iterator, but last one have some data in it + if !iter.LastPage() && iter.NumRows() == 0 { + t.Errorf("expected at least one row, but got 0") + } + for iter.Scan(&c1, &f1) { + if c1 != f1 { + t.Fatalf("expected c1 and f1 values to be the same, but got c1=%d f1=%d", c1, f1) + } + results = append(results, f1) + } + if err := iter.Close(); err != nil { + t.Fatal("select:", err.Error()) + } + if iter.LastPage() { + break + } + newPageState := iter.PageState() + if len(currentPageState) == len(newPageState) && bytes.Compare(newPageState, currentPageState) == 0 { + t.Fatalf("page state did not change") + } + currentPageState = newPageState + } + + validateResult(t, results) + }) + } + }) + +} + func TestPagingWithBind(t *testing.T) { session := createSession(t) defer session.Close() @@ -2225,7 +2363,7 @@ func TestManualQueryPaging(t *testing.T) { fetched++ } - if len(iter.PageState()) > 0 { + if !iter.LastPage() { // more pages iter = query.PageState(iter.PageState()).Iter() } else { diff --git a/session.go b/session.go index b70eefa43..bbfc8d862 100644 --- a/session.go +++ b/session.go @@ -1393,9 +1393,24 @@ func (q *Query) Iter() *Iter { if isUseStatement(q.stmt) { return &Iter{err: ErrUseStmt} } - // if the query was specifically run on a connection then re-use that - // connection when fetching the next results + + if !q.disableAutoPage { + return q.executeQuery() + } + + // Retry on empty page if pagination is manual + iter := q.executeQuery() + for iter.err == nil && iter.numRows == 0 && !iter.LastPage() { + q.PageState(iter.PageState()) + iter = q.executeQuery() + } + return iter +} + +func (q *Query) executeQuery() *Iter { if q.conn != nil { + // if the query was specifically run on a connection then re-use that + // connection when fetching the next results return q.conn.executeQuery(q.Context(), q) } return q.session.executeQuery(q) @@ -1771,6 +1786,11 @@ func (iter *Iter) PageState() []byte { return iter.meta.pagingState } +// LastPage returns true if there are no more pages to fetch. +func (iter *Iter) LastPage() bool { + return len(iter.meta.pagingState) == 0 +} + // NumRows returns the number of rows in this pagination, it will update when new // pages are fetched, it is not the value of the total number of rows this iter // will return unless there is only a single page returned.