Skip to content

Commit

Permalink
Reproduce #15271
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
  • Loading branch information
serathius committed Feb 14, 2023
1 parent 6c439aa commit daff6c1
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 21 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/linearizability-template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
esac
- name: test-linearizability
run: |
EXPECT_DEBUG=true GO_TEST_FLAGS='-v --count ${{ inputs.count }} --timeout ${{ inputs.testTimeout }} --failfast --run TestLinearizability' RESULTS_DIR=/tmp/linearizability make test-linearizability
EXPECT_DEBUG=true GO_TEST_FLAGS='-v --count ${{ inputs.count }} --timeout ${{ inputs.testTimeout }} --failfast --run TestLinearizability/Snapshot' RESULTS_DIR=/tmp/linearizability make test-linearizability
- uses: actions/upload-artifact@v2
if: always()
with:
Expand Down
2 changes: 1 addition & 1 deletion tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
if cfg.WatchProcessNotifyInterval != 0 {
args = append(args, "--experimental-watch-progress-notify-interval", cfg.WatchProcessNotifyInterval.String())
}
if cfg.SnapshotCatchUpEntries > 0 {
if cfg.SnapshotCatchUpEntries > 0 && cfg.SnapshotCatchUpEntries != 10000 {
args = append(args, "--experimental-snapshot-catchup-entries", fmt.Sprintf("%d", cfg.SnapshotCatchUpEntries))
}
envVars := map[string]string{}
Expand Down
14 changes: 9 additions & 5 deletions tests/linearizability/failpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

const (
triggerTimeout = 5 * time.Second
triggerTimeout = 20 * time.Second
)

var (
Expand All @@ -53,7 +53,8 @@ var (
CompactBeforeCommitBatchPanic Failpoint = goPanicFailpoint{"compactBeforeCommitBatch", triggerCompact, AnyMember}
CompactAfterCommitBatchPanic Failpoint = goPanicFailpoint{"compactAfterCommitBatch", triggerCompact, AnyMember}
RaftBeforeLeaderSendPanic Failpoint = goPanicFailpoint{"raftBeforeLeaderSend", nil, Leader}
BlackholePeerNetwork Failpoint = blackholePeerNetworkFailpoint{}
BlackholePeerNetwork Failpoint = blackholePeerNetworkFailpoint{waitTillSnapshot: false}
BlackholeUntilSnapshot Failpoint = blackholePeerNetworkFailpoint{waitTillSnapshot: true}
DelayPeerNetwork Failpoint = delayPeerNetworkFailpoint{duration: time.Second, baseLatency: 75 * time.Millisecond, randomizedLatency: 50 * time.Millisecond}
oneNodeClusterFailpoints = []Failpoint{
KillFailpoint, BeforeCommitPanic, AfterCommitPanic, RaftBeforeSavePanic,
Expand All @@ -77,7 +78,8 @@ var (
RaftBeforeSaveSnapPanic Failpoint = goPanicFailpoint{"raftBeforeSaveSnap", triggerBlackholeUntilSnapshot, Follower}
RaftAfterSaveSnapPanic Failpoint = goPanicFailpoint{"raftAfterSaveSnap", triggerBlackholeUntilSnapshot, Follower}
RandomSnapshotFailpoint Failpoint = randomFailpoint{[]Failpoint{
RaftBeforeApplySnapPanic, RaftAfterApplySnapPanic, RaftAfterWALReleasePanic, RaftBeforeSaveSnapPanic, RaftAfterSaveSnapPanic,
//RaftBeforeApplySnapPanic, RaftAfterApplySnapPanic, RaftAfterWALReleasePanic, RaftBeforeSaveSnapPanic, RaftAfterSaveSnapPanic,
BlackholeUntilSnapshot,
}}
)

Expand Down Expand Up @@ -275,11 +277,13 @@ func (f randomFailpoint) Available(e2e.EtcdProcess) bool {
return true
}

type blackholePeerNetworkFailpoint struct{}
type blackholePeerNetworkFailpoint struct {
waitTillSnapshot bool
}

func (f blackholePeerNetworkFailpoint) Trigger(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error {
member := clus.Procs[rand.Int()%len(clus.Procs)]
return triggerBlackhole(t, ctx, member, clus, false)
return triggerBlackhole(t, ctx, member, clus, f.waitTillSnapshot)
}

func triggerBlackhole(t *testing.T, ctx context.Context, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster, shouldWaitTillSnapshot bool) error {
Expand Down
29 changes: 15 additions & 14 deletions tests/linearizability/linearizability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ var (
largePutSize: 32769,
leaseTTL: DefaultLeaseTTL,
writes: []requestChance{
{operation: Put, chance: 90},
{operation: LargePut, chance: 5},
{operation: Put, chance: 100},
},
},
}
Expand Down Expand Up @@ -181,8 +180,15 @@ func TestLinearizability(t *testing.T) {
validateWatchResponses(t, watchResponses, watchProgressNotifyEnabled)
longestHistory, remainingEvents := watchEventHistory(watchResponses)
validateEventsMatch(t, longestHistory, remainingEvents)

operations = patchOperationBasedOnWatchEvents(operations, longestHistory)
checkOperationsAndPersistResults(t, lg, operations, clus)
path, err := testResultsDirectory(t)
if err != nil {
t.Error(err)
}
persistWatchResponses(t, lg, path, watchResponses)
persistWatchEvents(t, lg, path, append(remainingEvents, longestHistory))
checkOperationsAndPersistResults(t, lg, operations, clus, path)
})
}
}
Expand Down Expand Up @@ -400,17 +406,12 @@ func validateEventsMatch(t *testing.T, longestHistory []watchEvent, other [][]wa
length := len(other[i])
// We compare prefix of watch events, as we are not guaranteed to collect all events from each node.
if diff := cmp.Diff(longestHistory[:length], other[i][:length], cmpopts.IgnoreFields(watchEvent{}, "Time")); diff != "" {
t.Errorf("Events in watches do not match, %s", diff)
t.Errorf("Events in watches do not match")
}
}
}

func checkOperationsAndPersistResults(t *testing.T, lg *zap.Logger, operations []porcupine.Operation, clus *e2e.EtcdProcessCluster) {
path, err := testResultsDirectory(t)
if err != nil {
t.Error(err)
}

func checkOperationsAndPersistResults(t *testing.T, lg *zap.Logger, operations []porcupine.Operation, clus *e2e.EtcdProcessCluster, testResultsPath string) {
linearizable, info := porcupine.CheckOperationsVerbose(model.Etcd, operations, 5*time.Minute)
if linearizable == porcupine.Illegal {
t.Error("Model is not linearizable")
Expand All @@ -419,13 +420,13 @@ func checkOperationsAndPersistResults(t *testing.T, lg *zap.Logger, operations [
t.Error("Linearization timed out")
}
if linearizable != porcupine.Ok {
persistOperationHistory(t, lg, path, operations)
persistMemberDataDir(t, lg, clus, path)
persistOperationHistory(t, lg, testResultsPath, operations)
persistMemberDataDir(t, lg, clus, testResultsPath)
}

visualizationPath := filepath.Join(path, "history.html")
visualizationPath := filepath.Join(testResultsPath, "history.html")
lg.Info("Saving visualization", zap.String("path", visualizationPath))
err = porcupine.VisualizePath(model.Etcd, info, visualizationPath)
err := porcupine.VisualizePath(model.Etcd, info, visualizationPath)
if err != nil {
t.Errorf("Failed to visualize, err: %v", err)
}
Expand Down
45 changes: 45 additions & 0 deletions tests/linearizability/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ package linearizability

import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -153,3 +157,44 @@ type watchEvent struct {
Revision int64
Time time.Time
}

func persistWatchResponses(t *testing.T, lg *zap.Logger, path string, responses [][]watchResponse) {
for i, resps := range responses {
watchFilePath := filepath.Join(path, fmt.Sprintf("watch-responses-%d.json", i))
lg.Info("Saving watch responses", zap.String("path", watchFilePath))
file, err := os.OpenFile(watchFilePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755)
if err != nil {
t.Errorf("Failed to save watch history: %v", err)
return
}
defer file.Close()
encoder := json.NewEncoder(file)
for _, resp := range resps {
err := encoder.Encode(resp)
if err != nil {
t.Errorf("Failed to encode response: %v", err)
}
}
}
}

func persistWatchEvents(t *testing.T, lg *zap.Logger, path string, events [][]watchEvent) {
for i, evs := range events {
eventsFilePath := filepath.Join(path, fmt.Sprintf("watch-events-%d.json", i))
lg.Info("Saving watch events", zap.String("path", eventsFilePath))
file, err := os.OpenFile(eventsFilePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755)
if err != nil {
t.Errorf("Failed to save watch history: %v", err)
return
}
defer file.Close()
encoder := json.NewEncoder(file)
for _, event := range evs {
event.Time = time.Time{}
err := encoder.Encode(event)
if err != nil {
t.Errorf("Failed to encode response: %v", err)
}
}
}
}

0 comments on commit daff6c1

Please sign in to comment.