Skip to content

Commit

Permalink
Merge pull request #2028 from hashicorp/f-atomic-kv
Browse files Browse the repository at this point in the history
Adds support for atomic transactions spanning multiple KV entries.
  • Loading branch information
slackpad committed May 15, 2016
2 parents 0e34cc3 + 6533876 commit 0f5aabc
Show file tree
Hide file tree
Showing 26 changed files with 5,273 additions and 2,114 deletions.
156 changes: 156 additions & 0 deletions api/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,43 @@ type KVPair struct {
// KVPairs is a list of KVPair objects
type KVPairs []*KVPair

// KVOp constants give possible operations available in a KVTxn.
type KVOp string

const (
KVSet KVOp = "set"
KVDelete = "delete"
KVDeleteCAS = "delete-cas"
KVDeleteTree = "delete-tree"
KVCAS = "cas"
KVLock = "lock"
KVUnlock = "unlock"
KVGet = "get"
KVGetTree = "get-tree"
KVCheckSession = "check-session"
KVCheckIndex = "check-index"
)

// KVTxnOp defines a single operation inside a transaction.
type KVTxnOp struct {
Verb string
Key string
Value []byte
Flags uint64
Index uint64
Session string
}

// KVTxnOps defines a set of operations to be performed inside a single
// transaction.
type KVTxnOps []*KVTxnOp

// KVTxnResponse has the outcome of a transaction.
type KVTxnResponse struct {
Results []*KVPair
Errors TxnErrors
}

// KV is used to manipulate the K/V API
type KV struct {
c *Client
Expand Down Expand Up @@ -238,3 +275,122 @@ func (k *KV) deleteInternal(key string, params map[string]string, q *WriteOption
res := strings.Contains(string(buf.Bytes()), "true")
return res, qm, nil
}

// TxnOp is the internal format we send to Consul. It's not specific to KV,
// though currently only KV operations are supported.
type TxnOp struct {
KV *KVTxnOp
}

// TxnOps is a list of transaction operations.
type TxnOps []*TxnOp

// TxnResult is the internal format we receive from Consul.
type TxnResult struct {
KV *KVPair
}

// TxnResults is a list of TxnResult objects.
type TxnResults []*TxnResult

// TxnError is used to return information about an operation in a transaction.
type TxnError struct {
OpIndex int
What string
}

// TxnErrors is a list of TxnError objects.
type TxnErrors []*TxnError

// TxnResponse is the internal format we receive from Consul.
type TxnResponse struct {
Results TxnResults
Errors TxnErrors
}

// Txn is used to apply multiple KV operations in a single, atomic transaction.
//
// Note that Go will perform the required base64 encoding on the values
// automatically because the type is a byte slice. Transactions are defined as a
// list of operations to perform, using the KVOp constants and KVTxnOp structure
// to define operations. If any operation fails, none of the changes are applied
// to the state store. Note that this hides the internal raw transaction interface
// and munges the input and output types into KV-specific ones for ease of use.
// If there are more non-KV operations in the future we may break out a new
// transaction API client, but it will be easy to keep this KV-specific variant
// supported.
//
// Even though this is generally a write operation, we take a QueryOptions input
// and return a QueryMeta output. If the transaction contains only read ops, then
// Consul will fast-path it to a different endpoint internally which supports
// consistency controls, but not blocking. If there are write operations then
// the request will always be routed through raft and any consistency settings
// will be ignored.
//
// Here's an example:
//
// ops := KVTxnOps{
// &KVTxnOp{
// Verb: KVLock,
// Key: "test/lock",
// Session: "adf4238a-882b-9ddc-4a9d-5b6758e4159e",
// Value: []byte("hello"),
// },
// &KVTxnOp{
// Verb: KVGet,
// Key: "another/key",
// },
// }
// ok, response, _, err := kv.Txn(&ops, nil)
//
// If there is a problem making the transaction request then an error will be
// returned. Otherwise, the ok value will be true if the transaction succeeded
// or false if it was rolled back. The response is a structured return value which
// will have the outcome of the transaction. Its Results member will have entries
// for each operation. Deleted keys will have a nil entry in the, and to save
// space, the Value of each key in the Results will be nil unless the operation
// is a KVGet. If the transaction was rolled back, the Errors member will have
// entries referencing the index of the operation that failed along with an error
// message.
func (k *KV) Txn(txn KVTxnOps, q *QueryOptions) (bool, *KVTxnResponse, *QueryMeta, error) {
r := k.c.newRequest("PUT", "/v1/txn")
r.setQueryOptions(q)

// Convert into the internal format since this is an all-KV txn.
ops := make(TxnOps, 0, len(txn))
for _, kvOp := range txn {
ops = append(ops, &TxnOp{KV: kvOp})
}
r.obj = ops
rtt, resp, err := k.c.doRequest(r)
if err != nil {
return false, nil, nil, err
}
defer resp.Body.Close()

qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt

if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusConflict {
var txnResp TxnResponse
if err := decodeBody(resp, &txnResp); err != nil {
return false, nil, nil, err
}

// Convert from the internal format.
kvResp := KVTxnResponse{
Errors: txnResp.Errors,
}
for _, result := range txnResp.Results {
kvResp.Results = append(kvResp.Results, result.KV)
}
return resp.StatusCode == http.StatusOK, &kvResp, qm, nil
}

var buf bytes.Buffer
if _, err := io.Copy(&buf, resp.Body); err != nil {
return false, nil, nil, fmt.Errorf("Failed to read response: %v", err)
}
return false, nil, nil, fmt.Errorf("Failed request: %s", buf.String())
}
118 changes: 118 additions & 0 deletions api/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package api
import (
"bytes"
"path"
"strings"
"testing"
"time"
)
Expand Down Expand Up @@ -445,3 +446,120 @@ func TestClient_AcquireRelease(t *testing.T) {
t.Fatalf("unexpected value: %#v", meta)
}
}

func TestClient_Txn(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()

session := c.Session()
kv := c.KV()

// Make a session.
id, _, err := session.CreateNoChecks(nil, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
defer session.Destroy(id, nil)

// Acquire and get the key via a transaction, but don't supply a valid
// session.
key := testKey()
value := []byte("test")
txn := KVTxnOps{
&KVTxnOp{
Verb: KVLock,
Key: key,
Value: value,
},
&KVTxnOp{
Verb: KVGet,
Key: key,
},
}
ok, ret, _, err := kv.Txn(txn, nil)
if err != nil {
t.Fatalf("err: %v", err)
} else if ok {
t.Fatalf("transaction should have failed")
}

if ret == nil || len(ret.Errors) != 2 || len(ret.Results) != 0 {
t.Fatalf("bad: %v", ret)
}
if ret.Errors[0].OpIndex != 0 ||
!strings.Contains(ret.Errors[0].What, "missing session") ||
!strings.Contains(ret.Errors[1].What, "doesn't exist") {
t.Fatalf("bad: %v", ret.Errors[0])
}

// Now poke in a real session and try again.
txn[0].Session = id
ok, ret, _, err = kv.Txn(txn, nil)
if err != nil {
t.Fatalf("err: %v", err)
} else if !ok {
t.Fatalf("transaction failure")
}

if ret == nil || len(ret.Errors) != 0 || len(ret.Results) != 2 {
t.Fatalf("bad: %v", ret)
}
for i, result := range ret.Results {
var expected []byte
if i == 1 {
expected = value
}

if result.Key != key ||
!bytes.Equal(result.Value, expected) ||
result.Session != id ||
result.LockIndex != 1 {
t.Fatalf("bad: %v", result)
}
}

// Run a read-only transaction.
txn = KVTxnOps{
&KVTxnOp{
Verb: KVGet,
Key: key,
},
}
ok, ret, _, err = kv.Txn(txn, nil)
if err != nil {
t.Fatalf("err: %v", err)
} else if !ok {
t.Fatalf("transaction failure")
}

if ret == nil || len(ret.Errors) != 0 || len(ret.Results) != 1 {
t.Fatalf("bad: %v", ret)
}
for _, result := range ret.Results {
if result.Key != key ||
!bytes.Equal(result.Value, value) ||
result.Session != id ||
result.LockIndex != 1 {
t.Fatalf("bad: %v", result)
}
}

// Sanity check using the regular GET API.
pair, meta, err := kv.Get(key, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if pair == nil {
t.Fatalf("expected value: %#v", pair)
}
if pair.LockIndex != 1 {
t.Fatalf("Expected lock: %v", pair)
}
if pair.Session != id {
t.Fatalf("Expected lock: %v", pair)
}
if meta.LastIndex == 0 {
t.Fatalf("unexpected value: %#v", meta)
}
}
33 changes: 23 additions & 10 deletions command/agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,8 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/query", s.wrap(s.PreparedQueryGeneral))
s.mux.HandleFunc("/v1/query/", s.wrap(s.PreparedQuerySpecific))

s.mux.HandleFunc("/v1/txn", s.wrap(s.Txn))

if enableDebug {
s.mux.HandleFunc("/debug/pprof/", pprof.Index)
s.mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
Expand Down Expand Up @@ -342,28 +344,39 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque
return
}

prettyPrint := false
if _, ok := req.URL.Query()["pretty"]; ok {
prettyPrint = true
}
// Write out the JSON object
if obj != nil {
var buf []byte
if prettyPrint {
buf, err = json.MarshalIndent(obj, "", " ")
} else {
buf, err = json.Marshal(obj)
}
buf, err = s.marshalJSON(req, obj)
if err != nil {
goto HAS_ERR
}

resp.Header().Set("Content-Type", "application/json")
resp.Write(buf)
}
}
return f
}

// marshalJSON marshals the object into JSON, respecting the user's pretty-ness
// configuration.
func (s *HTTPServer) marshalJSON(req *http.Request, obj interface{}) ([]byte, error) {
if _, ok := req.URL.Query()["pretty"]; ok {
buf, err := json.MarshalIndent(obj, "", " ")
if err != nil {
return nil, err
}
buf = append(buf, "\n"...)
return buf, nil
}

buf, err := json.Marshal(obj)
if err != nil {
return nil, err
}
return buf, err
}

// Returns true if the UI is enabled.
func (s *HTTPServer) IsUIEnabled() bool {
return s.uiDir != "" || s.agent.config.EnableUi
Expand Down
1 change: 1 addition & 0 deletions command/agent/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ func testPrettyPrint(pretty string, t *testing.T) {
srv.wrap(handler)(resp, req)

expected, _ := json.MarshalIndent(r, "", " ")
expected = append(expected, "\n"...)
actual, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatalf("err: %s", err)
Expand Down
Loading

0 comments on commit 0f5aabc

Please sign in to comment.