Skip to content

Commit

Permalink
tests: Support multiple keys in linearizability tests
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
  • Loading branch information
serathius committed Dec 10, 2022
1 parent 142fa76 commit b4e9dc1
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 66 deletions.
109 changes: 58 additions & 51 deletions tests/linearizability/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ type EtcdResponse struct {
}

type EtcdState struct {
Key string
PossibleValues []ValueRevision
Revision int64
KeyValues map[string]string
}

type ValueRevision struct {
Expand All @@ -57,15 +57,15 @@ type ValueRevision struct {
}

var etcdModel = porcupine.Model{
Init: func() interface{} { return "{}" },
Init: func() interface{} { return "[]" },
Step: func(st interface{}, in interface{}, out interface{}) (bool, interface{}) {
var state EtcdState
err := json.Unmarshal([]byte(st.(string)), &state)
var states []EtcdState
err := json.Unmarshal([]byte(st.(string)), &states)
if err != nil {
panic(err)
}
ok, state := step(state, in.(EtcdRequest), out.(EtcdResponse))
data, err := json.Marshal(state)
ok, states := step(states, in.(EtcdRequest), out.(EtcdResponse))
data, err := json.Marshal(states)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -105,92 +105,99 @@ var etcdModel = porcupine.Model{
},
}

func step(state EtcdState, request EtcdRequest, response EtcdResponse) (bool, EtcdState) {
if len(state.PossibleValues) == 0 {
state.Key = request.Key
if ok, v := initValueRevision(request, response); ok {
state.PossibleValues = append(state.PossibleValues, v)
func step(states []EtcdState, request EtcdRequest, response EtcdResponse) (bool, []EtcdState) {
if len(states) == 0 {
if ok, v := initState(request, response); ok {
states = append(states, v)
}
return true, state
}
if state.Key != request.Key {
panic("multiple keys not supported")
return true, states
}
if response.Err != nil {
for _, v := range state.PossibleValues {
newV, _ := stepValue(v, request)
state.PossibleValues = append(state.PossibleValues, newV)
for _, s := range states {
newState, _ := stepState(s, request)
states = append(states, newState)
}
} else {
var i = 0
for _, v := range state.PossibleValues {
newV, expectedResponse := stepValue(v, request)
for _, s := range states {
newState, expectedResponse := stepState(s, request)
if expectedResponse == response {
state.PossibleValues[i] = newV
states[i] = newState
i++
}
}
state.PossibleValues = state.PossibleValues[:i]
states = states[:i]
}
return len(state.PossibleValues) > 0, state
return len(states) > 0, states
}

func initValueRevision(request EtcdRequest, response EtcdResponse) (ok bool, v ValueRevision) {
func initState(request EtcdRequest, response EtcdResponse) (ok bool, s EtcdState) {
if response.Err != nil {
return false, ValueRevision{}
return false, EtcdState{}
}
kvs := map[string]string{}
switch request.Op {
case Get:
return true, ValueRevision{
Value: response.GetData,
Revision: response.Revision,
if response.GetData != "" {
kvs[request.Key] = response.GetData
}
return true, EtcdState{
Revision: response.Revision,
KeyValues: kvs,
}
case Put:
return true, ValueRevision{
Value: request.PutData,
Revision: response.Revision,
kvs[request.Key] = request.PutData
return true, EtcdState{
Revision: response.Revision,
KeyValues: kvs,
}
case Delete:
return true, ValueRevision{
Value: "",
Revision: response.Revision,
return true, EtcdState{
Revision: response.Revision,
KeyValues: kvs,
}
case Txn:
if response.TxnSucceeded {
return true, ValueRevision{
Value: request.TxnNewData,
Revision: response.Revision,
kvs[request.Key] = request.TxnNewData
return true, EtcdState{
Revision: response.Revision,
KeyValues: kvs,
}
}
return false, ValueRevision{}
return false, EtcdState{}
default:
panic("Unknown operation")
}
}

func stepValue(v ValueRevision, request EtcdRequest) (ValueRevision, EtcdResponse) {
func stepState(s EtcdState, request EtcdRequest) (EtcdState, EtcdResponse) {
newKVs := map[string]string{}
for k, v := range s.KeyValues {
newKVs[k] = v
}
s.KeyValues = newKVs
resp := EtcdResponse{}
switch request.Op {
case Get:
resp.GetData = v.Value
resp.GetData = s.KeyValues[request.Key]
case Put:
v.Value = request.PutData
v.Revision += 1
s.KeyValues[request.Key] = request.PutData
s.Revision += 1
case Delete:
if v.Value != "" {
v.Value = ""
v.Revision += 1
if _, ok := s.KeyValues[request.Key]; ok {
delete(s.KeyValues, request.Key)
s.Revision += 1
resp.Deleted = 1
}
case Txn:
if v.Value == request.TxnExpectData {
v.Value = request.TxnNewData
v.Revision += 1
if val := s.KeyValues[request.Key]; val == request.TxnExpectData {
s.KeyValues[request.Key] = request.TxnNewData
s.Revision += 1
resp.TxnSucceeded = true
}
default:
panic("unsupported operation")
}
resp.Revision = v.Revision
return v, resp
resp.Revision = s.Revision
return s, resp
}
4 changes: 2 additions & 2 deletions tests/linearizability/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func TestModel(t *testing.T) {
{
name: "Put can fail and be lost before delete",
operations: []testOperation{
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 1}},
{req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Revision: 1}},
{req: EtcdRequest{Op: Put, Key: "key", PutData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}},
{req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Revision: 1}},
},
Expand Down Expand Up @@ -138,7 +138,7 @@ func TestModel(t *testing.T) {
name: "Put can fail but be persisted and increase revision before delete",
operations: []testOperation{
// One failed request, one persisted.
{req: EtcdRequest{Op: Get, Key: "key"}, resp: EtcdResponse{Revision: 1}},
{req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Revision: 1}},
{req: EtcdRequest{Op: Put, Key: "key", PutData: "2"}, resp: EtcdResponse{Err: errors.New("failed")}},
{req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Deleted: 1, Revision: 1}, failure: true},
{req: EtcdRequest{Op: Delete, Key: "key"}, resp: EtcdResponse{Deleted: 1, Revision: 2}, failure: true},
Expand Down
26 changes: 13 additions & 13 deletions tests/linearizability/traffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,14 @@ import (
)

var (
DefaultTraffic Traffic = readWriteSingleKey{key: "key", writes: []opChance{{operation: Put, chance: 90}, {operation: Delete, chance: 5}, {operation: Txn, chance: 5}}}
DefaultTraffic Traffic = readWriteSingleKey{writes: []opChance{{operation: Put, chance: 40}, {operation: Delete, chance: 20}, {operation: Txn, chance: 40}}}
)

type Traffic interface {
Run(ctx context.Context, c *recordingClient, limiter *rate.Limiter, ids idProvider)
}

type readWriteSingleKey struct {
key string
writes []opChance
}

Expand All @@ -50,41 +49,42 @@ func (t readWriteSingleKey) Run(ctx context.Context, c *recordingClient, limiter
return
default:
}
key := fmt.Sprintf("%d", rand.Int()%10)
// Execute one read per one write to avoid operation history include too many failed writes when etcd is down.
resp, err := t.Read(ctx, c, limiter)
resp, err := t.Read(ctx, c, limiter, key)
if err != nil {
continue
}
// Provide each write with unique id to make it easier to validate operation history.
t.Write(ctx, c, limiter, ids.RequestId(), resp)
t.Write(ctx, c, limiter, key, fmt.Sprintf("%d", ids.RequestId()), resp)
}
}

func (t readWriteSingleKey) Read(ctx context.Context, c *recordingClient, limiter *rate.Limiter) ([]*mvccpb.KeyValue, error) {
func (t readWriteSingleKey) Read(ctx context.Context, c *recordingClient, limiter *rate.Limiter, key string) ([]*mvccpb.KeyValue, error) {
getCtx, cancel := context.WithTimeout(ctx, 20*time.Millisecond)
resp, err := c.Get(getCtx, t.key)
resp, err := c.Get(getCtx, key)
cancel()
if err == nil {
limiter.Wait(ctx)
}
return resp, err
}

func (t readWriteSingleKey) Write(ctx context.Context, c *recordingClient, limiter *rate.Limiter, id int, kvs []*mvccpb.KeyValue) error {
func (t readWriteSingleKey) Write(ctx context.Context, c *recordingClient, limiter *rate.Limiter, key string, newValue string, lastValues []*mvccpb.KeyValue) error {
putCtx, cancel := context.WithTimeout(ctx, 20*time.Millisecond)

var err error
switch t.pickWriteOperation() {
case Put:
err = c.Put(putCtx, t.key, fmt.Sprintf("%d", id))
err = c.Put(putCtx, key, newValue)
case Delete:
err = c.Delete(putCtx, t.key)
err = c.Delete(putCtx, key)
case Txn:
var value string
if len(kvs) != 0 {
value = string(kvs[0].Value)
var expectValue string
if len(lastValues) != 0 {
expectValue = string(lastValues[0].Value)
}
err = c.Txn(putCtx, t.key, value, fmt.Sprintf("%d", id))
err = c.Txn(putCtx, key, expectValue, newValue)
default:
panic("invalid operation")
}
Expand Down

0 comments on commit b4e9dc1

Please sign in to comment.