Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement compaction support in robustness test #17833

Merged
merged 1 commit into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions tests/robustness/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,13 @@ func (c *RecordingClient) Defragment(ctx context.Context) (*clientv3.DefragmentR
func (c *RecordingClient) Compact(ctx context.Context, rev int64) (*clientv3.CompactResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
callTime := time.Since(c.baseTime)
resp, err := c.client.Compact(ctx, rev)
returnTime := time.Since(c.baseTime)
c.kvOperations.AppendCompact(rev, callTime, returnTime, resp, err)
return resp, err
}

func (c *RecordingClient) MemberList(ctx context.Context, opts ...clientv3.OpOption) (*clientv3.MemberListResponse, error) {
c.kvMux.Lock()
defer c.kvMux.Unlock()
Expand Down
14 changes: 12 additions & 2 deletions tests/robustness/failpoint/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,22 @@ func (t triggerCompact) Trigger(ctx context.Context, _ *testing.T, member e2e.Et
}
_, err = cc.Compact(ctx, rev)
if err != nil && !connectionError(err) {
return nil, err
return nil, fmt.Errorf("failed to compact: %w", err)
}
return []report.ClientReport{cc.Report()}, nil
}

func (t triggerCompact) Available(e2e.EtcdProcessClusterConfig, e2e.EtcdProcess) bool {
func (t triggerCompact) Available(config e2e.EtcdProcessClusterConfig, _ e2e.EtcdProcess) bool {
// Since introduction of compaction into traffic, injecting compaction failpoints started interfeering with peer proxy.
// TODO: Re-enable the peer proxy for compact failpoints when we confirm the root cause.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an issue tracking this?

Copy link
Member Author

@serathius serathius Jun 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet, I think #16788 might be related.

I was also hoping that maybe this will be auto resolved with #17938 as major rewrite might change this.

if config.PeerProxy {
return false
}
// For multiBatchCompaction we need to guarantee that there are enough revisions between two compaction requests.
// With addition of compaction requests to traffic this might be hard if experimental-compaction-batch-limit is too high.
if t.multiBatchCompaction {
return config.ServerConfig.ExperimentalCompactionBatchLimit <= 10
}
return true
}

Expand Down
7 changes: 7 additions & 0 deletions tests/robustness/model/describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ func describeEtcdResponse(request EtcdRequest, response MaybeEtcdResponse) strin
if response.Error != "" {
return fmt.Sprintf("err: %q", response.Error)
}
if response.ClientError != "" {
return fmt.Sprintf("err: %q", response.ClientError)
}
if response.PartialResponse {
return fmt.Sprintf("unknown, rev: %d", response.Revision)
}
Expand All @@ -38,6 +41,8 @@ func describeEtcdResponse(request EtcdRequest, response MaybeEtcdResponse) strin
return "ok"
}
return fmt.Sprintf("ok, rev: %d", response.Revision)
case Compact:
return "ok"
default:
return fmt.Sprintf("<! unknown request type: %q !>", request.Type)
}
Expand Down Expand Up @@ -67,6 +72,8 @@ func describeEtcdRequest(request EtcdRequest) string {
return fmt.Sprintf("leaseRevoke(%d)", request.LeaseRevoke.LeaseID)
case Defragment:
return fmt.Sprintf("defragment()")
case Compact:
return fmt.Sprintf("compact(%d)", request.Compact.Revision)
default:
return fmt.Sprintf("<! unknown request type: %q !>", request.Type)
}
Expand Down
48 changes: 39 additions & 9 deletions tests/robustness/model/deterministic.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"sort"

"github.com/anishathalye/porcupine"

"go.etcd.io/etcd/server/v3/storage/mvcc"
)

// DeterministicModel assumes a deterministic execution of etcd requests. All
Expand Down Expand Up @@ -65,10 +67,11 @@ var DeterministicModel = porcupine.Model{
}

type EtcdState struct {
Revision int64
KeyValues map[string]ValueRevision
KeyLeases map[string]int64
Leases map[int64]EtcdLease
Revision int64
CompactRevision int64
KeyValues map[string]ValueRevision
KeyLeases map[string]int64
Leases map[int64]EtcdLease
}

func (s EtcdState) apply(request EtcdRequest, response EtcdResponse) (bool, EtcdState) {
Expand All @@ -77,7 +80,10 @@ func (s EtcdState) apply(request EtcdRequest, response EtcdResponse) (bool, Etcd
}

func (s EtcdState) DeepCopy() EtcdState {
newState := EtcdState{Revision: s.Revision}
newState := EtcdState{
Revision: s.Revision,
CompactRevision: s.CompactRevision,
}

newState.KeyValues = maps.Clone(s.KeyValues)
newState.KeyLeases = maps.Clone(s.KeyLeases)
Expand All @@ -92,10 +98,12 @@ func (s EtcdState) DeepCopy() EtcdState {

func freshEtcdState() EtcdState {
return EtcdState{
Revision: 1,
KeyValues: map[string]ValueRevision{},
KeyLeases: map[string]int64{},
Leases: map[int64]EtcdLease{},
Revision: 1,
// Start from CompactRevision equal -1 as etcd allows client to compact revision 0 for some reason.
CompactRevision: -1,
KeyValues: map[string]ValueRevision{},
KeyLeases: map[string]int64{},
Leases: map[int64]EtcdLease{},
}
}

Expand All @@ -112,6 +120,9 @@ func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) {
if request.Range.Revision > newState.Revision {
return newState, MaybeEtcdResponse{Error: ErrEtcdFutureRev.Error()}
}
if request.Range.Revision < newState.CompactRevision {
return newState, MaybeEtcdResponse{EtcdResponse: EtcdResponse{ClientError: mvcc.ErrCompacted.Error()}}
}
return newState, MaybeEtcdResponse{PartialResponse: true, EtcdResponse: EtcdResponse{Revision: newState.Revision}}
case Txn:
failure := false
Expand Down Expand Up @@ -190,6 +201,14 @@ func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) {
return newState, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Revision: newState.Revision, LeaseRevoke: &LeaseRevokeResponse{}}}
case Defragment:
return newState, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Defragment: &DefragmentResponse{}, Revision: newState.Revision}}
case Compact:
if request.Compact.Revision <= newState.CompactRevision {
return newState, MaybeEtcdResponse{EtcdResponse: EtcdResponse{ClientError: mvcc.ErrCompacted.Error()}}
}
newState.CompactRevision = request.Compact.Revision
// Set fake revision as compaction returns non-linearizable revision.
// TODO: Model non-linearizable response revision in model.
return newState, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Compact: &CompactResponse{}, Revision: -1}}
default:
panic(fmt.Sprintf("Unknown request type: %v", request.Type))
}
Expand Down Expand Up @@ -249,6 +268,7 @@ const (
LeaseGrant RequestType = "leaseGrant"
LeaseRevoke RequestType = "leaseRevoke"
Defragment RequestType = "defragment"
Compact RequestType = "compact"
)

type EtcdRequest struct {
Expand All @@ -258,6 +278,7 @@ type EtcdRequest struct {
Range *RangeRequest
Txn *TxnRequest
Defragment *DefragmentRequest
Compact *CompactRequest
}

func (r *EtcdRequest) IsRead() bool {
Expand Down Expand Up @@ -349,6 +370,8 @@ type EtcdResponse struct {
LeaseGrant *LeaseGrantReponse
LeaseRevoke *LeaseRevokeResponse
Defragment *DefragmentResponse
Compact *CompactResponse
ClientError string
Revision int64
}

Expand Down Expand Up @@ -417,3 +440,10 @@ func ToValueOrHash(value string) ValueOrHash {
}
return v
}

type CompactResponse struct {
}

type CompactRequest struct {
Revision int64
}
27 changes: 27 additions & 0 deletions tests/robustness/model/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ package model

import (
"fmt"
"strings"
"time"

"github.com/anishathalye/porcupine"

"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/storage/mvcc"
"go.etcd.io/etcd/tests/v3/robustness/identity"
)

Expand Down Expand Up @@ -259,6 +261,23 @@ func (h *AppendableHistory) AppendDefragment(start, end time.Duration, resp *cli
h.appendSuccessful(request, start, end, defragmentResponse(revision))
}

func (h *AppendableHistory) AppendCompact(rev int64, start, end time.Duration, resp *clientv3.CompactResponse, err error) {
request := compactRequest(rev)
if err != nil {
if strings.Contains(err.Error(), mvcc.ErrCompacted.Error()) {
h.appendSuccessful(request, start, end, MaybeEtcdResponse{
EtcdResponse: EtcdResponse{ClientError: mvcc.ErrCompacted.Error()},
})
return
}
h.appendFailed(request, start, end, err)
return
}
// Set fake revision as compaction returns non-linearizable revision.
// TODO: Model non-linearizable response revision in model.
h.appendSuccessful(request, start, end, compactResponse(-1))
}

func (h *AppendableHistory) appendFailed(request EtcdRequest, start, end time.Duration, err error) {
op := porcupine.Operation{
ClientId: h.streamID,
Expand Down Expand Up @@ -444,6 +463,14 @@ func defragmentResponse(revision int64) MaybeEtcdResponse {
return MaybeEtcdResponse{EtcdResponse: EtcdResponse{Defragment: &DefragmentResponse{}, Revision: revision}}
}

func compactRequest(rev int64) EtcdRequest {
return EtcdRequest{Type: Compact, Compact: &CompactRequest{Revision: rev}}
}

func compactResponse(revision int64) MaybeEtcdResponse {
return MaybeEtcdResponse{EtcdResponse: EtcdResponse{Compact: &CompactResponse{}, Revision: revision}}
}

type History struct {
operations []porcupine.Operation
}
Expand Down
6 changes: 6 additions & 0 deletions tests/robustness/options/server_config_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ func WithSnapshotCount(input ...uint64) e2e.EPClusterOption {
}
}

func WithCompactionBatchLimit(input ...int) e2e.EPClusterOption {
return func(c *e2e.EtcdProcessClusterConfig) {
c.ServerConfig.ExperimentalCompactionBatchLimit = input[internalRand.Intn(len(input))]
}
}

func WithSnapshotCatchUpEntries(input ...uint64) e2e.EPClusterOption {
return func(c *e2e.EtcdProcessClusterConfig) {
c.ServerConfig.SnapshotCatchUpEntries = input[internalRand.Intn(len(input))]
Expand Down
6 changes: 5 additions & 1 deletion tests/robustness/report/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,11 @@ func parseEntryNormal(ent raftpb.Entry) (*model.EtcdRequest, error) {
case raftReq.ClusterVersionSet != nil:
return nil, nil
case raftReq.Compaction != nil:
return nil, nil
request := model.EtcdRequest{
Type: model.Compact,
Compact: &model.CompactRequest{Revision: raftReq.Compaction.Revision},
}
return &request, nil
case raftReq.Txn != nil:
txn := model.TxnRequest{
Conditions: []model.EtcdCondition{},
Expand Down
3 changes: 2 additions & 1 deletion tests/robustness/scenarios.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ func exploratoryScenarios(_ *testing.T) []testScenario {
options.WithSnapshotCount(50, 100, 1000),
options.WithSubsetOptions(randomizableOptions...),
e2e.WithGoFailEnabled(true),
e2e.WithCompactionBatchLimit(100),
// Set low minimal compaction batch limit to allow for triggering multi batch compaction failpoints.
options.WithCompactionBatchLimit(10, 100, 1000),
e2e.WithWatchProcessNotifyInterval(100 * time.Millisecond),
}

Expand Down
15 changes: 12 additions & 3 deletions tests/robustness/traffic/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,14 @@ var (
{choice: List, weight: 15},
{choice: StaleGet, weight: 10},
{choice: StaleList, weight: 10},
{choice: Put, weight: 23},
{choice: LargePut, weight: 2},
{choice: Delete, weight: 5},
{choice: MultiOpTxn, weight: 5},
{choice: PutWithLease, weight: 5},
{choice: LeaseRevoke, weight: 5},
{choice: CompareAndSet, weight: 5},
{choice: Put, weight: 15},
{choice: LargePut, weight: 5},
{choice: Compact, weight: 5},
},
}
EtcdPut = etcdTraffic{
Expand All @@ -56,9 +57,10 @@ var (
{choice: List, weight: 15},
{choice: StaleGet, weight: 10},
{choice: StaleList, weight: 10},
{choice: Put, weight: 40},
{choice: MultiOpTxn, weight: 5},
{choice: LargePut, weight: 5},
{choice: Put, weight: 35},
{choice: Compact, weight: 5},
},
}
)
Expand Down Expand Up @@ -89,6 +91,7 @@ const (
LeaseRevoke etcdRequestType = "leaseRevoke"
CompareAndSet etcdRequestType = "compareAndSet"
Defragment etcdRequestType = "defragment"
Compact etcdRequestType = "compact"
)

func (t etcdTraffic) Name() string {
Expand Down Expand Up @@ -266,6 +269,12 @@ func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType,
if resp != nil {
rev = resp.Header.Revision
}
case Compact:
var resp *clientv3.CompactResponse
resp, err = c.client.Compact(opCtx, lastRev)
if resp != nil {
rev = resp.Header.Revision
}
default:
panic("invalid choice")
}
Expand Down
17 changes: 13 additions & 4 deletions tests/robustness/traffic/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ var (
resource: "pods",
namespace: "default",
writeChoices: []choiceWeight[KubernetesRequestType]{
{choice: KubernetesUpdate, weight: 90},
{choice: KubernetesUpdate, weight: 85},
{choice: KubernetesDelete, weight: 5},
{choice: KubernetesCreate, weight: 5},
{choice: KubernetesCompact, weight: 5},
},
}
)
Expand Down Expand Up @@ -168,6 +169,8 @@ func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids
_, err = kc.OptimisticUpdate(writeCtx, key, fmt.Sprintf("%d", ids.NewRequestID()), rev)
case KubernetesCreate:
err = kc.OptimisticCreate(writeCtx, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestID()))
case KubernetesCompact:
err = kc.Compact(writeCtx, rev)
default:
panic(fmt.Sprintf("invalid choice: %q", op))
}
Expand Down Expand Up @@ -213,9 +216,10 @@ func (t kubernetesTraffic) generateKey() string {
type KubernetesRequestType string

const (
KubernetesDelete KubernetesRequestType = "delete"
KubernetesUpdate KubernetesRequestType = "update"
KubernetesCreate KubernetesRequestType = "create"
KubernetesDelete KubernetesRequestType = "delete"
KubernetesUpdate KubernetesRequestType = "update"
KubernetesCreate KubernetesRequestType = "create"
KubernetesCompact KubernetesRequestType = "compact"
)

type kubernetesClient struct {
Expand Down Expand Up @@ -254,6 +258,11 @@ func (k kubernetesClient) RequestProgress(ctx context.Context) error {
return k.client.RequestProgress(clientv3.WithRequireLeader(ctx))
}

func (k kubernetesClient) Compact(ctx context.Context, rev int64) error {
_, err := k.client.Compact(ctx, rev)
return err
}

// Kubernetes optimistically assumes that key didn't change since it was last observed, so it executes operations within a transaction conditioned on key not changing.
// However, if the keys value changed it wants imminently to read it, thus the Get operation on failure.
func (k kubernetesClient) optimisticOperationOrGet(ctx context.Context, key string, operation clientv3.Op, expectedRevision int64) (*mvccpb.KeyValue, error) {
Expand Down
2 changes: 2 additions & 0 deletions tests/robustness/validate/patch_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ func persistedOperationsReturnTime(allOperations []porcupine.Operation, persiste
}
case model.LeaseGrant:
case model.LeaseRevoke:
case model.Compact:
default:
panic(fmt.Sprintf("Unknown request type: %q", request.Type))
}
Expand All @@ -218,6 +219,7 @@ func operationReturnTime(operations []porcupine.Operation) map[model.EtcdOperati
case model.Range:
case model.LeaseGrant:
case model.LeaseRevoke:
case model.Compact:
default:
panic(fmt.Sprintf("Unknown request type: %q", request.Type))
}
Expand Down