Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add pipeline, server, socket and strip key prefix options #3

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions cmd/basic/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package main
import (
"flag"
"fmt"
pcgr "github.com/dgryski/go-pcgr"
mct "github.com/dormando/mctester"
"math/rand"
"os"
"runtime/pprof"
"time"

pcgr "github.com/dgryski/go-pcgr"
mct "github.com/feihu-stripe/mctester"
)

var cpuprofile = flag.String("cpuprofile", "", "dump cpu profile to file")
Expand All @@ -31,6 +32,10 @@ func main() {
zipfV := flag.Float64("zipfV", float64(*keySpace/2), "zipf V value (pull below this number")
valueSize := flag.Uint("valuesize", 1000, "size of value (in bytes) to store on miss")
clientFlags := flag.Uint("clientflags", 0, "(32bit unsigned) client flag bits to set on miss")
pipelines := flag.Uint("pipelines", 1, "(32bit unsigned) stack this many GET requests into the same syscall.")
server := flag.String("server", "127.0.0.1:11211", "ip and port to connect to")
socket := flag.String("socket", "", "domain socket to connect to")
stripKeyPrefix := flag.Bool("stripkeyprefix", false, "strip key prefix before comparing with response.")

flag.Parse()

Expand All @@ -53,7 +58,10 @@ func main() {
*/

bl := &BasicLoader{
servers: []string{"127.0.0.1:11211"},
servers: []string{*server},
socket: *socket,
pipelines: *pipelines,
stripKeyPrefix: *stripKeyPrefix,
desiredConnCount: *connCount,
requestsPerSleep: *reqPerSleep,
requestBundlesPerConn: *reqBundlePerConn,
Expand Down Expand Up @@ -100,6 +108,9 @@ func main() {
// - variances: how often to change item sizes
type BasicLoader struct {
servers []string
socket string
pipelines uint
stripKeyPrefix bool
stopAfter time.Time
desiredConnCount int
requestsPerSleep int
Expand Down Expand Up @@ -145,7 +156,7 @@ func (l *BasicLoader) Run() {
func (l *BasicLoader) Worker(doneChan chan<- int) {
// FIXME: selector.
host := l.servers[0]
mc := mct.NewClient(host)
mc := mct.NewClient(host, l.socket, l.pipelines, l.keyPrefix, l.stripKeyPrefix)
bundles := l.requestBundlesPerConn

rs := pcgr.New(time.Now().UnixNano(), 0)
Expand Down
117 changes: 76 additions & 41 deletions protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"io"
"net"
"strconv"
"strings"
"time"
)

Expand All @@ -36,7 +37,14 @@ type mcConn struct {
}

func (c *Client) connectToMc() (*mcConn, error) {
conn, err := net.DialTimeout("tcp", c.Host, c.ConnectTimeout)
var conn net.Conn
var err error
if c.socket != "" {
conn, err = net.DialTimeout("unix", c.socket, c.ConnectTimeout)
} else {
conn, err = net.DialTimeout("tcp", c.Host, c.ConnectTimeout)
}

if err != nil {
return nil, err
}
Expand All @@ -50,19 +58,28 @@ type Client struct {
// read or write timeout
NetTimeout time.Duration
Host string
socket string
cn *mcConn
WBufSize int
RBufSize int
// any necessary locks? channels?
// binprot structure cache.
binpkt *packet
opaque uint32 // just for binprot?

pipelines int
keyPrefix string
stripKeyPrefix bool
}

func NewClient(host string) (client *Client) {
func NewClient(host string, socket string, pipelines uint, keyPrefix string, stripKeyPrefix bool) (client *Client) {
client = &Client{
Host: host,
binpkt: &packet{},
Host: host,
socket: socket,
pipelines: int(pipelines),
keyPrefix: keyPrefix,
stripKeyPrefix: stripKeyPrefix,
binpkt: &packet{},
}
//client.rs = rand.NewSource(time.Now().UnixNano())
return client
Expand Down Expand Up @@ -319,57 +336,75 @@ func (c *Client) MetaDebug(key string) (err error) {
//////////////////////////////////////////////

func (c *Client) Get(key string) (flags uint64, value []byte, code McCode, err error) {
pipelines := c.pipelines
// Expected key from response
respKey := key
if c.stripKeyPrefix {
respKey = strings.TrimPrefix(key, c.keyPrefix)
}

err = c.runNow(key, len(key)+6, func() error {
b := c.cn.b
b.WriteString("get ")
b.WriteString(key)
b.WriteString("\r\n")
err = b.Flush()

if err != nil {
return err
for i := 0; i < pipelines; i++ {
b.WriteString("get ")
b.WriteString(key)
b.WriteString("\r\n")
}
err = b.Flush()

line, err := b.ReadBytes('\n')
if err != nil {
return err
}

if bytes.Equal(line, []byte("END\r\n")) {
code = McMISS
} else {
parts := bytes.Split(line[:len(line)-2], []byte(" "))
if !bytes.Equal(parts[0], []byte("VALUE")) {
// TODO: This should look for ERROR/SERVER_ERROR/etc
return ErrUnexpectedResponse
}
if len(parts) != 4 {
return ErrUnexpectedResponse
}
if !bytes.Equal(parts[1], []byte(key)) {
// FIXME: how do we embed the received vs expected in here?
// use the brand-new golang error wrapping thing?
return ErrKeyDoesNotMatch
}
flags, _ = ParseUint(parts[2])
size, _ := ParseUint(parts[3])

value = make([]byte, size+2)
_, err := io.ReadFull(b, value)
for i := 0; i < pipelines; i++ {
line, err := b.ReadBytes('\n')
if err != nil {
return err
}

if !bytes.Equal(value[len(value)-2:], []byte("\r\n")) {
return ErrCorruptValue
if bytes.Equal(line, []byte("END\r\n")) {
code = McMISS
} else {
parts := bytes.Split(line[:len(line)-2], []byte(" "))
if !bytes.Equal(parts[0], []byte("VALUE")) {
// TODO: This should look for ERROR/SERVER_ERROR/etc
fmt.Print("Unexpected Reponse: ", string(line), "\n")
continue
}
if len(parts) != 4 {
fmt.Print("Unexpected Reponse: ", "parts not 4", "\n")
continue
}
if !bytes.Equal(parts[1], []byte(respKey)) {
fmt.Print("Unmatched Key: ", string(parts[1]), " and ", respKey, "\n")
// FIXME: how do we embed the received vs expected in here?
// use the brand-new golang error wrapping thing?
continue
}
flags, _ = ParseUint(parts[2])
size, _ := ParseUint(parts[3])

value = make([]byte, size+2)
_, err := io.ReadFull(b, value)
if err != nil {
fmt.Print("io ReadFull error, return", "\n")
return err
}

if !bytes.Equal(value[len(value)-2:], []byte("\r\n")) {
fmt.Print("Unmatched Value", "\n")
continue
}
code = McHIT
value = value[:size]

line, err = b.ReadBytes('\n')
if !bytes.Equal(line, []byte("END\r\n")) {
fmt.Print("Unmatched Reponse: ", string(line), " is not END\r\n")
continue
}
}
code = McHIT
value = value[:size]

line, err = b.ReadBytes('\n')
if !bytes.Equal(line, []byte("END\r\n")) {
return ErrUnexpectedResponse
}
}

return nil
Expand Down