Skip to content

Commit

Permalink
tests: Allow dynamic number of clients
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
  • Loading branch information
serathius committed Dec 5, 2022
1 parent 5baf837 commit 4409a95
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 14 deletions.
14 changes: 10 additions & 4 deletions tests/linearizability/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,26 @@ package linearizability

import (
"context"
"sync/atomic"
"time"

"github.com/anishathalye/porcupine"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

var (
clientId atomic.Uint32
)

type recordingClient struct {
client clientv3.Client
id int
id uint32

operations []porcupine.Operation
}

func NewClient(endpoints []string, id int) (*recordingClient, error) {
func NewClient(endpoints []string) (*recordingClient, error) {
cc, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
Logger: zap.NewNop(),
Expand All @@ -40,6 +45,7 @@ func NewClient(endpoints []string, id int) (*recordingClient, error) {
if err != nil {
return nil, err
}
id := clientId.Add(1) - 1
return &recordingClient{
client: *cc,
id: id,
Expand All @@ -63,7 +69,7 @@ func (c *recordingClient) Get(ctx context.Context, key string) error {
readData = string(resp.Kvs[0].Value)
}
c.operations = append(c.operations, porcupine.Operation{
ClientId: c.id,
ClientId: int(c.id),
Input: etcdRequest{op: Get, key: key},
Call: callTime.UnixNano(),
Output: etcdResponse{getData: readData, revision: resp.Header.Revision},
Expand All @@ -81,7 +87,7 @@ func (c *recordingClient) Put(ctx context.Context, key, value string) error {
revision = resp.Header.Revision
}
c.operations = append(c.operations, porcupine.Operation{
ClientId: c.id,
ClientId: int(c.id),
Input: etcdRequest{op: Put, key: key, putData: value},
Call: callTime.UnixNano(),
Output: etcdResponse{err: err, revision: revision},
Expand Down
4 changes: 3 additions & 1 deletion tests/linearizability/linearizability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ func TestLinearizability(t *testing.T) {
}

func testLinearizability(ctx context.Context, t *testing.T, config e2e.EtcdProcessClusterConfig, failpoint FailpointConfig, traffic trafficConfig) {
clientId.Store(0)
requestId.Store(0)
clus, err := e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&config))
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -151,7 +153,7 @@ func simulateTraffic(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessClu
for i := 0; i < config.clientCount; i++ {
wg.Add(1)
endpoints := []string{endpoints[i%len(endpoints)]}
c, err := NewClient(endpoints, i)
c, err := NewClient(endpoints)
if err != nil {
t.Fatal(err)
}
Expand Down
14 changes: 5 additions & 9 deletions tests/linearizability/traffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ import (
"context"
"fmt"
"math/rand"
"sync/atomic"
"time"

"golang.org/x/time/rate"
)

var (
DefaultTraffic Traffic = readWriteSingleKey{key: "key", writes: []opChance{{operation: Put, chance: 100}}}
requestId atomic.Uint64
)

type Traffic interface {
Expand All @@ -42,11 +44,7 @@ type opChance struct {
}

func (t readWriteSingleKey) Run(ctx context.Context, c *recordingClient, limiter *rate.Limiter) {
maxOperationsPerClient := 1000000
minId := maxOperationsPerClient * c.id
maxId := maxOperationsPerClient * (c.id + 1)

for writeId := minId; writeId < maxId; {
for {
select {
case <-ctx.Done():
return
Expand All @@ -58,10 +56,8 @@ func (t readWriteSingleKey) Run(ctx context.Context, c *recordingClient, limiter
continue
}
// Provide each write with unique id to make it easier to validate operation history.
t.Write(ctx, c, limiter, writeId)
writeId++
t.Write(ctx, c, limiter, requestId.Add(1))
}
return
}

func (t readWriteSingleKey) Read(ctx context.Context, c *recordingClient, limiter *rate.Limiter) error {
Expand All @@ -74,7 +70,7 @@ func (t readWriteSingleKey) Read(ctx context.Context, c *recordingClient, limite
return err
}

func (t readWriteSingleKey) Write(ctx context.Context, c *recordingClient, limiter *rate.Limiter, id int) error {
func (t readWriteSingleKey) Write(ctx context.Context, c *recordingClient, limiter *rate.Limiter, id uint64) error {
putCtx, cancel := context.WithTimeout(ctx, 20*time.Millisecond)

var err error
Expand Down

0 comments on commit 4409a95

Please sign in to comment.