Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: Implement LargePut requests #15180

Merged
merged 1 commit into from
Jan 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 24 additions & 12 deletions tests/linearizability/linearizability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,24 +47,36 @@ var (
minimalQPS: 100,
maximalQPS: 200,
clientCount: 8,
traffic: readWriteSingleKey{keyCount: 4, leaseTTL: DefaultLeaseTTL, writes: []requestChance{
{operation: Put, chance: 50},
{operation: Delete, chance: 10},
{operation: PutWithLease, chance: 10},
{operation: LeaseRevoke, chance: 10},
{operation: CompareAndSet, chance: 10},
{operation: Defragment, chance: 10},
}},
traffic: traffic{
keyCount: 4,
leaseTTL: DefaultLeaseTTL,
largePutSize: 32769,
writes: []requestChance{
{operation: Put, chance: 50},
{operation: LargePut, chance: 5},
{operation: Delete, chance: 10},
{operation: PutWithLease, chance: 10},
{operation: LeaseRevoke, chance: 10},
{operation: CompareAndSet, chance: 10},
{operation: Defragment, chance: 5},
},
},
}
HighTraffic = trafficConfig{
name: "HighTraffic",
minimalQPS: 200,
maximalQPS: 1000,
clientCount: 12,
traffic: readWriteSingleKey{keyCount: 4, leaseTTL: DefaultLeaseTTL, writes: []requestChance{
{operation: Put, chance: 90},
{operation: Defragment, chance: 10},
}},
traffic: traffic{
keyCount: 4,
largePutSize: 32769,
leaseTTL: DefaultLeaseTTL,
writes: []requestChance{
{operation: Put, chance: 90},
{operation: LargePut, chance: 5},
{operation: Defragment, chance: 5},
},
},
}
defaultTraffic = LowTraffic
trafficList = []trafficConfig{
Expand Down
8 changes: 4 additions & 4 deletions tests/linearizability/model/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func getRequest(key string) EtcdRequest {
}

func getResponse(value string, revision int64) EtcdResponse {
return EtcdResponse{Txn: &TxnResponse{OpsResult: []EtcdOperationResult{{Value: value}}}, Revision: revision}
return EtcdResponse{Txn: &TxnResponse{OpsResult: []EtcdOperationResult{{Value: ToValueOrHash(value)}}}, Revision: revision}
}

func failedResponse(err error) EtcdResponse {
Expand All @@ -225,7 +225,7 @@ func unknownResponse(revision int64) EtcdResponse {
}

func putRequest(key, value string) EtcdRequest {
return EtcdRequest{Type: Txn, Txn: &TxnRequest{Ops: []EtcdOperation{{Type: Put, Key: key, Value: value}}}}
return EtcdRequest{Type: Txn, Txn: &TxnRequest{Ops: []EtcdOperation{{Type: Put, Key: key, Value: ToValueOrHash(value)}}}}
}

func putResponse(revision int64) EtcdResponse {
Expand All @@ -241,7 +241,7 @@ func deleteResponse(deleted int64, revision int64) EtcdResponse {
}

func txnRequest(key, expectValue, newValue string) EtcdRequest {
return EtcdRequest{Type: Txn, Txn: &TxnRequest{Conds: []EtcdCondition{{Key: key, ExpectedValue: expectValue}}, Ops: []EtcdOperation{{Type: Put, Key: key, Value: newValue}}}}
return EtcdRequest{Type: Txn, Txn: &TxnRequest{Conds: []EtcdCondition{{Key: key, ExpectedValue: ToValueOrHash(expectValue)}}, Ops: []EtcdOperation{{Type: Put, Key: key, Value: ToValueOrHash(newValue)}}}}
}

func txnResponse(succeeded bool, revision int64) EtcdResponse {
Expand All @@ -253,7 +253,7 @@ func txnResponse(succeeded bool, revision int64) EtcdResponse {
}

func putWithLeaseRequest(key, value string, leaseID int64) EtcdRequest {
return EtcdRequest{Type: Txn, Txn: &TxnRequest{Ops: []EtcdOperation{{Type: Put, Key: key, Value: value, LeaseID: leaseID}}}}
return EtcdRequest{Type: Txn, Txn: &TxnRequest{Ops: []EtcdOperation{{Type: Put, Key: key, Value: ToValueOrHash(value), LeaseID: leaseID}}}}
}

func leaseGrantRequest(leaseID int64) EtcdRequest {
Expand Down
53 changes: 39 additions & 14 deletions tests/linearizability/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/json"
"fmt"
"github.com/anishathalye/porcupine"
"hash/fnv"
"reflect"
"strings"
)
Expand Down Expand Up @@ -76,13 +77,13 @@ type TxnRequest struct {

type EtcdCondition struct {
Key string
ExpectedValue string
ExpectedValue ValueOrHash
}

type EtcdOperation struct {
Type OperationType
Key string
Value string
Value ValueOrHash
LeaseID int64
}

Expand Down Expand Up @@ -120,7 +121,7 @@ func Match(r1, r2 EtcdResponse) bool {
}

type EtcdOperationResult struct {
Value string
Value ValueOrHash
Deleted int64
}

Expand All @@ -134,11 +135,28 @@ type PossibleStates []EtcdState

type EtcdState struct {
Revision int64
KeyValues map[string]string
KeyValues map[string]ValueOrHash
KeyLeases map[string]int64
Leases map[int64]EtcdLease
}

type ValueOrHash struct {
Value string
Hash uint32
}

func ToValueOrHash(value string) ValueOrHash {
v := ValueOrHash{}
if len(value) < 20 {
v.Value = value
} else {
h := fnv.New32a()
h.Write([]byte(value))
v.Hash = h.Sum32()
}
return v
}

func describeEtcdRequestResponse(request EtcdRequest, response EtcdResponse) string {
return fmt.Sprintf("%s -> %s", describeEtcdRequest(request), describeEtcdResponse(request, response))
}
Expand Down Expand Up @@ -181,7 +199,7 @@ func describeEtcdRequest(request EtcdRequest) string {
func describeEtcdConditions(conds []EtcdCondition) string {
opsDescription := make([]string, len(conds))
for i := range conds {
opsDescription[i] = fmt.Sprintf("%s==%q", conds[i].Key, conds[i].ExpectedValue)
opsDescription[i] = fmt.Sprintf("%s==%s", conds[i].Key, describeValueOrHash(conds[i].ExpectedValue))
}
return strings.Join(opsDescription, " && ")
}
Expand Down Expand Up @@ -211,9 +229,9 @@ func describeEtcdOperation(op EtcdOperation) string {
return fmt.Sprintf("get(%q)", op.Key)
case Put:
if op.LeaseID != 0 {
return fmt.Sprintf("put(%q, %q, %d)", op.Key, op.Value, op.LeaseID)
return fmt.Sprintf("put(%q, %s, %d)", op.Key, describeValueOrHash(op.Value), op.LeaseID)
}
return fmt.Sprintf("put(%q, %q, nil)", op.Key, op.Value)
return fmt.Sprintf("put(%q, %s, nil)", op.Key, describeValueOrHash(op.Value))
case Delete:
return fmt.Sprintf("delete(%q)", op.Key)
default:
Expand All @@ -224,10 +242,7 @@ func describeEtcdOperation(op EtcdOperation) string {
func describeEtcdOperationResponse(op OperationType, resp EtcdOperationResult) string {
switch op {
case Get:
if resp.Value == "" {
return "nil"
}
return fmt.Sprintf("%q", resp.Value)
return describeValueOrHash(resp.Value)
case Put:
return fmt.Sprintf("ok")
case Delete:
Expand All @@ -237,6 +252,16 @@ func describeEtcdOperationResponse(op OperationType, resp EtcdOperationResult) s
}
}

func describeValueOrHash(value ValueOrHash) string {
if value.Hash != 0 {
return fmt.Sprintf("hash: %d", value.Hash)
}
if value.Value == "" {
return "nil"
}
return fmt.Sprintf("%q", value.Value)
}

func step(states PossibleStates, request EtcdRequest, response EtcdResponse) (bool, PossibleStates) {
if len(states) == 0 {
// states were not initialized
Expand All @@ -257,7 +282,7 @@ func step(states PossibleStates, request EtcdRequest, response EtcdResponse) (bo
func initState(request EtcdRequest, response EtcdResponse) EtcdState {
state := EtcdState{
Revision: response.Revision,
KeyValues: map[string]string{},
KeyValues: map[string]ValueOrHash{},
KeyLeases: map[string]int64{},
Leases: map[int64]EtcdLease{},
}
Expand All @@ -270,7 +295,7 @@ func initState(request EtcdRequest, response EtcdResponse) EtcdState {
opResp := response.Txn.OpsResult[i]
switch op.Type {
case Get:
if opResp.Value != "" {
if opResp.Value.Value != "" && opResp.Value.Hash == 0 {
state.KeyValues[op.Key] = opResp.Value
}
case Put:
Expand Down Expand Up @@ -319,7 +344,7 @@ func applyRequest(states PossibleStates, request EtcdRequest, response EtcdRespo

// applyRequestToSingleState handles a successful request, returning updated state and response it would generate.
func applyRequestToSingleState(s EtcdState, request EtcdRequest) (EtcdState, EtcdResponse) {
newKVs := map[string]string{}
newKVs := map[string]ValueOrHash{}
for k, v := range s.KeyValues {
newKVs[k] = v
}
Expand Down
21 changes: 21 additions & 0 deletions tests/linearizability/model/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,17 @@ func TestModelStep(t *testing.T) {
{req: getRequest("key2"), resp: getResponse("12", 2)},
},
},
{
name: "Get response data should match large put",
operations: []testOperation{
{req: putRequest("key", "012345678901234567890"), resp: putResponse(1)},
{req: getRequest("key"), resp: getResponse("123456789012345678901", 1), failure: true},
{req: getRequest("key"), resp: getResponse("012345678901234567890", 1)},
{req: putRequest("key", "123456789012345678901"), resp: putResponse(2)},
{req: getRequest("key"), resp: getResponse("123456789012345678901", 2)},
{req: getRequest("key"), resp: getResponse("012345678901234567890", 2), failure: true},
},
},
{
name: "Put must increase revision by 1",
operations: []testOperation{
Expand Down Expand Up @@ -612,6 +623,11 @@ func TestModelDescribe(t *testing.T) {
resp: getResponse("2", 2),
expectDescribe: `get("key2") -> "2", rev: 2`,
},
{
req: getRequest("key2b"),
resp: getResponse("01234567890123456789", 2),
expectDescribe: `get("key2b") -> hash: 2945867837, rev: 2`,
},
{
req: putRequest("key3", "3"),
resp: putResponse(3),
Expand All @@ -622,6 +638,11 @@ func TestModelDescribe(t *testing.T) {
resp: putResponse(3),
expectDescribe: `put("key3b", "3b", 3) -> ok, rev: 3`,
},
{
req: putRequest("key3c", "01234567890123456789"),
resp: putResponse(3),
expectDescribe: `put("key3c", hash: 2945867837, nil) -> ok, rev: 3`,
},
{
req: putRequest("key4", "4"),
resp: failedResponse(errors.New("failed")),
Expand Down
30 changes: 22 additions & 8 deletions tests/linearizability/traffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"math/rand"
"strings"
"time"

"golang.org/x/time/rate"
Expand All @@ -36,6 +37,7 @@ type TrafficRequestType string
const (
Get TrafficRequestType = "get"
Put TrafficRequestType = "put"
LargePut TrafficRequestType = "largePut"
Delete TrafficRequestType = "delete"
PutWithLease TrafficRequestType = "putWithLease"
LeaseRevoke TrafficRequestType = "leaseRevoke"
Expand All @@ -47,18 +49,19 @@ type Traffic interface {
Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage)
}

type readWriteSingleKey struct {
keyCount int
writes []requestChance
leaseTTL int64
type traffic struct {
keyCount int
writes []requestChance
leaseTTL int64
largePutSize int
}

type requestChance struct {
operation TrafficRequestType
chance int
}

func (t readWriteSingleKey) Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage) {
func (t traffic) Run(ctx context.Context, clientId int, c *recordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIdStorage) {

for {
select {
Expand All @@ -77,7 +80,7 @@ func (t readWriteSingleKey) Run(ctx context.Context, clientId int, c *recordingC
}
}

func (t readWriteSingleKey) Read(ctx context.Context, c *recordingClient, limiter *rate.Limiter, key string) ([]*mvccpb.KeyValue, error) {
func (t traffic) Read(ctx context.Context, c *recordingClient, limiter *rate.Limiter, key string) ([]*mvccpb.KeyValue, error) {
getCtx, cancel := context.WithTimeout(ctx, RequestTimeout)
resp, err := c.Get(getCtx, key)
cancel()
Expand All @@ -87,13 +90,15 @@ func (t readWriteSingleKey) Read(ctx context.Context, c *recordingClient, limite
return resp, err
}

func (t readWriteSingleKey) Write(ctx context.Context, c *recordingClient, limiter *rate.Limiter, key string, newValue string, lm identity.LeaseIdStorage, cid int, lastValues []*mvccpb.KeyValue) error {
func (t traffic) Write(ctx context.Context, c *recordingClient, limiter *rate.Limiter, key string, newValue string, lm identity.LeaseIdStorage, cid int, lastValues []*mvccpb.KeyValue) error {
writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout)

var err error
switch t.pickWriteRequest() {
case Put:
err = c.Put(writeCtx, key, newValue)
case LargePut:
err = c.Put(writeCtx, key, randString(t.largePutSize))
case Delete:
err = c.Delete(writeCtx, key)
case CompareAndSet:
Expand Down Expand Up @@ -137,7 +142,7 @@ func (t readWriteSingleKey) Write(ctx context.Context, c *recordingClient, limit
return err
}

func (t readWriteSingleKey) pickWriteRequest() TrafficRequestType {
func (t traffic) pickWriteRequest() TrafficRequestType {
sum := 0
for _, op := range t.writes {
sum += op.chance
Expand All @@ -151,3 +156,12 @@ func (t readWriteSingleKey) pickWriteRequest() TrafficRequestType {
}
panic("unexpected")
}

func randString(size int) string {
data := strings.Builder{}
data.Grow(size)
for i := 0; i < size; i++ {
data.WriteByte(byte(int('a') + rand.Intn(26)))
}
return data.String()
}
2 changes: 1 addition & 1 deletion tests/linearizability/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func collectMemberWatchEvents(ctx context.Context, t *testing.T, c *clientv3.Cli
Op: model.EtcdOperation{
Type: op,
Key: string(event.Kv.Key),
Value: string(event.Kv.Value),
Value: model.ToValueOrHash(string(event.Kv.Value)),
},
})
}
Expand Down