From daff6c1990c9569b84ae9852e0128d52baa1ebe3 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 10 Feb 2023 00:00:15 +0100 Subject: [PATCH] Reproduce #15271 Signed-off-by: Marek Siarkowicz --- .../workflows/linearizability-template.yaml | 2 +- tests/framework/e2e/cluster.go | 2 +- tests/linearizability/failpoints.go | 14 +++--- tests/linearizability/linearizability_test.go | 29 ++++++------ tests/linearizability/watch.go | 45 +++++++++++++++++++ 5 files changed, 71 insertions(+), 21 deletions(-) diff --git a/.github/workflows/linearizability-template.yaml b/.github/workflows/linearizability-template.yaml index 5f97a0010c85..5540fba3a5f5 100644 --- a/.github/workflows/linearizability-template.yaml +++ b/.github/workflows/linearizability-template.yaml @@ -45,7 +45,7 @@ jobs: esac - name: test-linearizability run: | - EXPECT_DEBUG=true GO_TEST_FLAGS='-v --count ${{ inputs.count }} --timeout ${{ inputs.testTimeout }} --failfast --run TestLinearizability' RESULTS_DIR=/tmp/linearizability make test-linearizability + EXPECT_DEBUG=true GO_TEST_FLAGS='-v --count ${{ inputs.count }} --timeout ${{ inputs.testTimeout }} --failfast --run TestLinearizability/Snapshot' RESULTS_DIR=/tmp/linearizability make test-linearizability - uses: actions/upload-artifact@v2 if: always() with: diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index 442921e60295..83503999c8df 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -581,7 +581,7 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in if cfg.WatchProcessNotifyInterval != 0 { args = append(args, "--experimental-watch-progress-notify-interval", cfg.WatchProcessNotifyInterval.String()) } - if cfg.SnapshotCatchUpEntries > 0 { + if cfg.SnapshotCatchUpEntries > 0 && cfg.SnapshotCatchUpEntries != 10000 { args = append(args, "--experimental-snapshot-catchup-entries", fmt.Sprintf("%d", cfg.SnapshotCatchUpEntries)) } envVars := map[string]string{} diff --git a/tests/linearizability/failpoints.go b/tests/linearizability/failpoints.go index 0175dcb5015d..6e357d06ad72 100644 --- a/tests/linearizability/failpoints.go +++ b/tests/linearizability/failpoints.go @@ -29,7 +29,7 @@ import ( ) const ( - triggerTimeout = 5 * time.Second + triggerTimeout = 20 * time.Second ) var ( @@ -53,7 +53,8 @@ var ( CompactBeforeCommitBatchPanic Failpoint = goPanicFailpoint{"compactBeforeCommitBatch", triggerCompact, AnyMember} CompactAfterCommitBatchPanic Failpoint = goPanicFailpoint{"compactAfterCommitBatch", triggerCompact, AnyMember} RaftBeforeLeaderSendPanic Failpoint = goPanicFailpoint{"raftBeforeLeaderSend", nil, Leader} - BlackholePeerNetwork Failpoint = blackholePeerNetworkFailpoint{} + BlackholePeerNetwork Failpoint = blackholePeerNetworkFailpoint{waitTillSnapshot: false} + BlackholeUntilSnapshot Failpoint = blackholePeerNetworkFailpoint{waitTillSnapshot: true} DelayPeerNetwork Failpoint = delayPeerNetworkFailpoint{duration: time.Second, baseLatency: 75 * time.Millisecond, randomizedLatency: 50 * time.Millisecond} oneNodeClusterFailpoints = []Failpoint{ KillFailpoint, BeforeCommitPanic, AfterCommitPanic, RaftBeforeSavePanic, @@ -77,7 +78,8 @@ var ( RaftBeforeSaveSnapPanic Failpoint = goPanicFailpoint{"raftBeforeSaveSnap", triggerBlackholeUntilSnapshot, Follower} RaftAfterSaveSnapPanic Failpoint = goPanicFailpoint{"raftAfterSaveSnap", triggerBlackholeUntilSnapshot, Follower} RandomSnapshotFailpoint Failpoint = randomFailpoint{[]Failpoint{ - RaftBeforeApplySnapPanic, RaftAfterApplySnapPanic, RaftAfterWALReleasePanic, RaftBeforeSaveSnapPanic, RaftAfterSaveSnapPanic, + //RaftBeforeApplySnapPanic, RaftAfterApplySnapPanic, RaftAfterWALReleasePanic, RaftBeforeSaveSnapPanic, RaftAfterSaveSnapPanic, + BlackholeUntilSnapshot, }} ) @@ -275,11 +277,13 @@ func (f randomFailpoint) Available(e2e.EtcdProcess) bool { return true } -type blackholePeerNetworkFailpoint struct{} +type blackholePeerNetworkFailpoint struct { + waitTillSnapshot bool +} func (f blackholePeerNetworkFailpoint) Trigger(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error { member := clus.Procs[rand.Int()%len(clus.Procs)] - return triggerBlackhole(t, ctx, member, clus, false) + return triggerBlackhole(t, ctx, member, clus, f.waitTillSnapshot) } func triggerBlackhole(t *testing.T, ctx context.Context, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster, shouldWaitTillSnapshot bool) error { diff --git a/tests/linearizability/linearizability_test.go b/tests/linearizability/linearizability_test.go index aa2791542a19..5e1e87ce79c9 100644 --- a/tests/linearizability/linearizability_test.go +++ b/tests/linearizability/linearizability_test.go @@ -73,8 +73,7 @@ var ( largePutSize: 32769, leaseTTL: DefaultLeaseTTL, writes: []requestChance{ - {operation: Put, chance: 90}, - {operation: LargePut, chance: 5}, + {operation: Put, chance: 100}, }, }, } @@ -181,8 +180,15 @@ func TestLinearizability(t *testing.T) { validateWatchResponses(t, watchResponses, watchProgressNotifyEnabled) longestHistory, remainingEvents := watchEventHistory(watchResponses) validateEventsMatch(t, longestHistory, remainingEvents) + operations = patchOperationBasedOnWatchEvents(operations, longestHistory) - checkOperationsAndPersistResults(t, lg, operations, clus) + path, err := testResultsDirectory(t) + if err != nil { + t.Error(err) + } + persistWatchResponses(t, lg, path, watchResponses) + persistWatchEvents(t, lg, path, append(remainingEvents, longestHistory)) + checkOperationsAndPersistResults(t, lg, operations, clus, path) }) } } @@ -400,17 +406,12 @@ func validateEventsMatch(t *testing.T, longestHistory []watchEvent, other [][]wa length := len(other[i]) // We compare prefix of watch events, as we are not guaranteed to collect all events from each node. if diff := cmp.Diff(longestHistory[:length], other[i][:length], cmpopts.IgnoreFields(watchEvent{}, "Time")); diff != "" { - t.Errorf("Events in watches do not match, %s", diff) + t.Errorf("Events in watches do not match") } } } -func checkOperationsAndPersistResults(t *testing.T, lg *zap.Logger, operations []porcupine.Operation, clus *e2e.EtcdProcessCluster) { - path, err := testResultsDirectory(t) - if err != nil { - t.Error(err) - } - +func checkOperationsAndPersistResults(t *testing.T, lg *zap.Logger, operations []porcupine.Operation, clus *e2e.EtcdProcessCluster, testResultsPath string) { linearizable, info := porcupine.CheckOperationsVerbose(model.Etcd, operations, 5*time.Minute) if linearizable == porcupine.Illegal { t.Error("Model is not linearizable") @@ -419,13 +420,13 @@ func checkOperationsAndPersistResults(t *testing.T, lg *zap.Logger, operations [ t.Error("Linearization timed out") } if linearizable != porcupine.Ok { - persistOperationHistory(t, lg, path, operations) - persistMemberDataDir(t, lg, clus, path) + persistOperationHistory(t, lg, testResultsPath, operations) + persistMemberDataDir(t, lg, clus, testResultsPath) } - visualizationPath := filepath.Join(path, "history.html") + visualizationPath := filepath.Join(testResultsPath, "history.html") lg.Info("Saving visualization", zap.String("path", visualizationPath)) - err = porcupine.VisualizePath(model.Etcd, info, visualizationPath) + err := porcupine.VisualizePath(model.Etcd, info, visualizationPath) if err != nil { t.Errorf("Failed to visualize, err: %v", err) } diff --git a/tests/linearizability/watch.go b/tests/linearizability/watch.go index 4466a4c3ab98..2675b087a67c 100644 --- a/tests/linearizability/watch.go +++ b/tests/linearizability/watch.go @@ -16,6 +16,10 @@ package linearizability import ( "context" + "encoding/json" + "fmt" + "os" + "path/filepath" "sync" "testing" "time" @@ -153,3 +157,44 @@ type watchEvent struct { Revision int64 Time time.Time } + +func persistWatchResponses(t *testing.T, lg *zap.Logger, path string, responses [][]watchResponse) { + for i, resps := range responses { + watchFilePath := filepath.Join(path, fmt.Sprintf("watch-responses-%d.json", i)) + lg.Info("Saving watch responses", zap.String("path", watchFilePath)) + file, err := os.OpenFile(watchFilePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755) + if err != nil { + t.Errorf("Failed to save watch history: %v", err) + return + } + defer file.Close() + encoder := json.NewEncoder(file) + for _, resp := range resps { + err := encoder.Encode(resp) + if err != nil { + t.Errorf("Failed to encode response: %v", err) + } + } + } +} + +func persistWatchEvents(t *testing.T, lg *zap.Logger, path string, events [][]watchEvent) { + for i, evs := range events { + eventsFilePath := filepath.Join(path, fmt.Sprintf("watch-events-%d.json", i)) + lg.Info("Saving watch events", zap.String("path", eventsFilePath)) + file, err := os.OpenFile(eventsFilePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755) + if err != nil { + t.Errorf("Failed to save watch history: %v", err) + return + } + defer file.Close() + encoder := json.NewEncoder(file) + for _, event := range evs { + event.Time = time.Time{} + err := encoder.Encode(event) + if err != nil { + t.Errorf("Failed to encode response: %v", err) + } + } + } +}