Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New memdb-based state store #1291

Merged
merged 110 commits into from
Oct 21, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
110 commits
Select commit Hold shift + click to select a range
633a270
consul/state: starting on new state store
ryanuber Aug 22, 2015
89f87b7
consul/state: round out schema
ryanuber Aug 22, 2015
8181be1
consul/state: start tests for schema
ryanuber Aug 22, 2015
8203919
consul/state: node registration and retrieval works
ryanuber Aug 22, 2015
b2d9c10
consul/state: working on service registration storage
ryanuber Aug 22, 2015
0085512
consul/state: more tests for EnsureNode/GetNode
ryanuber Aug 22, 2015
f9823a2
consul/state: read transactions don't block writes
ryanuber Aug 22, 2015
a52ed3c
consul/state: querying node services works
ryanuber Aug 23, 2015
8671c5f
consul/state: track highest index when querying services
ryanuber Aug 24, 2015
66b3250
consul/state: add function for returning all nodes
ryanuber Aug 25, 2015
8c38210
consul/state: working on node deletion
ryanuber Aug 25, 2015
8a8aef0
consul/state: add service delete functions
ryanuber Aug 25, 2015
1d4a6ea
consul/state: test for index modification during deletes
ryanuber Aug 25, 2015
cf569f4
consul/state: persisting health checks works
ryanuber Aug 25, 2015
77f14af
consul/state: basic health check retrieval works
ryanuber Aug 25, 2015
8c6f40f
consul/state: negative tests
ryanuber Aug 25, 2015
6ebed23
consul/state: set index if we have an existing health check
ryanuber Aug 25, 2015
b0132b1
consul/state: add check deletion method
ryanuber Aug 28, 2015
0809a7e
consul/state: remove checks during service deregistration
ryanuber Aug 28, 2015
aa14ca3
consul/state: services are removed with their associated nodes
ryanuber Aug 29, 2015
04d7e58
consul/state: node checks are removed with their associated nodes
ryanuber Aug 29, 2015
fa8e015
consul/state: test helpers
ryanuber Aug 29, 2015
e148bb5
consul/state: better tests for index table updates
ryanuber Aug 29, 2015
9f281cb
consul/state: cleanup
ryanuber Aug 29, 2015
a4a73c3
consul/state: return highest index for queries with compound results
ryanuber Aug 29, 2015
2cde299
consul/state: support check lookups by service name
ryanuber Aug 29, 2015
f54a96f
consul/state: filter checks by state
ryanuber Sep 1, 2015
26f717f
consul/state: fetch node/check sets by service ID
ryanuber Sep 1, 2015
3b36744
consul/state: testing service registration update
ryanuber Sep 1, 2015
f05a322
consul/state: adding node dump methods
ryanuber Sep 1, 2015
08d4122
consul/state: basic k/v operations
ryanuber Sep 1, 2015
a0fd9fe
consul/state: adding shallow delete for kvs store
ryanuber Sep 1, 2015
4ba89ad
consul/state: adding KVSList for listing a given prefix
ryanuber Sep 2, 2015
8a70ba2
consul/state: initial pass at CAS delete operation for kvs
ryanuber Sep 2, 2015
a0dc2de
consul/state: add CAS method for kv set
ryanuber Sep 2, 2015
8b29bfa
consul/state: fix for maxIndex and better tests
ryanuber Sep 2, 2015
291fbe0
consul/state: list keys from the kv with a prefix/separator
ryanuber Sep 3, 2015
b0856c2
consul/state: adding tree delete for kvs store
ryanuber Sep 3, 2015
a4c202a
consul/state: adding session registration
ryanuber Sep 4, 2015
bd0de2c
consul/state: add session list method
ryanuber Sep 4, 2015
a93e341
consul/state: session lookup by node id works
ryanuber Sep 4, 2015
876aa66
consul/state: refactor some tests
ryanuber Sep 4, 2015
8fa8261
consul/state: more tests
ryanuber Sep 4, 2015
391d4ee
consul/state: basic session destroy works
ryanuber Sep 6, 2015
16188e7
consul/state: basic acl set/get/delete
ryanuber Sep 7, 2015
9219129
consul/state: implement acl delete
ryanuber Sep 7, 2015
7bf7ba6
consul/state: list acls
ryanuber Sep 7, 2015
d57431e
Gets new structs changes to compile, adds some corner case handling a…
Sep 10, 2015
009fd7d
Integrates new state store for ACLs.
Sep 20, 2015
30736ba
Completes state store for KV, sessions, tombstones, and nodes/service…
Sep 25, 2015
21bc8e0
Adds a watch tester helper that helps cut the cruft.
Oct 7, 2015
bc34ae2
Adds ACL snapshot/restore test.
Oct 7, 2015
e339ebf
Adds reap tombstone test (and fixes bugs).
Oct 7, 2015
f1ee05e
Adds ensure registration unit test (and fixes bugs).
Oct 7, 2015
537fd67
Cleans up unit tests for consistency.
Oct 7, 2015
c8d0d09
De-generalizes graveyard since that ended up as a YAGNI (only useful …
Oct 7, 2015
005a7e0
Adds node snap and watch tests as well as a general watch test.
Oct 7, 2015
72f15ce
Adds snap and watch tests for nodes, services, and checks.
Oct 8, 2015
75f9cd5
Adds kvs lock/unlock tests.
Oct 8, 2015
18d60f9
Allows lock holder to re-lock and set a KV, adds tests for corner cas…
Oct 8, 2015
930780e
Adds snapshot/restore and watch tests for KVS.
Oct 9, 2015
ad246e0
Makes sure we don't create a full table watch for tombstones.
Oct 9, 2015
4afd9a8
Adds tombstone tests and gets rid of unused logger.
Oct 9, 2015
bde2495
Adds session snapshot/restore and basic watch tests (and fixes some b…
Oct 9, 2015
87bb81b
Beefs up node and service watch tests for multi-table triggers.
Oct 9, 2015
c3513b1
Adds a note about updating sessions after they are created.
Oct 9, 2015
91ff525
Ports over session invalidation tests (and fixes some bugs).
Oct 9, 2015
dc05fb7
Updates docs about new "acquire when you already have it" behavior.
Oct 9, 2015
c791f2a
Adds twiddling of the real state store in snapshot tests.
Oct 9, 2015
de00a2f
Cuts FSM unit tests over to new state store.
Oct 9, 2015
41338c9
Integrates KVS endopint with new state store (changes KVSList to matc…
Oct 9, 2015
5b6502c
Integrates new state store into leader and catalog/health endpoints.
Oct 12, 2015
b82d492
Integrates new state store into session endpoint; returns table index…
Oct 12, 2015
0959b87
Abstracts the table names away from the RPC call sites.
Oct 13, 2015
c83a9e0
Integrates new state store into internal endpoint.
Oct 13, 2015
7729b66
Integrates session TTL tests with new state store.
Oct 13, 2015
6ba70be
Nukes old state store's connection to FSM and RPC.
Oct 13, 2015
0c90bdc
Knocks out the Raft indexes before doing compare.
Oct 13, 2015
4ee43e9
Deletes the old state store and all its accoutrements.
Oct 13, 2015
51600fa
Gets rid of the transitional "New" suffix on state store and RPC.
Oct 13, 2015
cc6a7a2
Gets rid of todo that we discussed is ok.
Oct 13, 2015
682b011
Cleans up some go vet warnings.
Oct 13, 2015
834c6c1
Makes all delete loops use a separate slice to protect the iterator.
Oct 13, 2015
ca3a84e
Makes session invalidate loops use a separate slice to protect the it…
Oct 13, 2015
b728c6f
Fixes up verify_no_uuid checks for new path.
Oct 13, 2015
181c216
Fixes index management for KVS.
Oct 13, 2015
b6f9aee
Fixes remaining non-KV index calclulations and adds a general getWatc…
Oct 14, 2015
d2f4a5f
Adds clone for service nodes so we don't twiddle the database's object.
Oct 14, 2015
d29673e
Adds tests for GC.
Oct 14, 2015
6f7f163
Adds watch unit tests and does some related watch cleanup.
Oct 14, 2015
57c102a
Adds a delay test.
Oct 14, 2015
fab8672
Does some go fmt after latest round of changes.
Oct 14, 2015
be9f071
Adds unit tests for the graveyard.
Oct 14, 2015
fc541fa
Removes a todo that's no longer needed.
Oct 14, 2015
a1cb9b8
Adds benchmarks back in to the state store.
Oct 14, 2015
ef52331
Adds unit tests for new structs clone functions.
Oct 14, 2015
655967d
Runs go fmt after latest changes.
Oct 14, 2015
2c36c4f
Fixes a typo.
Oct 15, 2015
8d444f2
Ports a couple of new RPC calls to the updated codec mechanism.
Oct 15, 2015
4fb8d10
Gets rid of non-idomatic "state_store" alias in FSM.
Oct 16, 2015
503c552
Converts nodes, services, checks to iterators duing dumps; fixes tag …
Oct 19, 2015
8371c87
Converts KVS snapshot over to iterator.
Oct 19, 2015
ffe531c
Converts sessions and ACLs over to iterators.
Oct 19, 2015
3782fc5
Makes the iterator naming more consistent.
Oct 19, 2015
7d1179c
Adds a special case for fill KVS listings to avoid a tombstone scan.
Oct 19, 2015
ee52f5e
Switches sessions over to UUIDFieldIndex.
Oct 20, 2015
52c373b
Gets rid of unique constraint on sessions index in session_checks.
Oct 20, 2015
f3a95bf
Puts all restore operations into a single transaction and optimizes w…
Oct 20, 2015
785bf66
Gets rid of error prefixing in leader.go.
Oct 20, 2015
ff1eec0
Gets rid of LMDB reference in the FAQ.
Oct 20, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 33 additions & 25 deletions api/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestLock_LockUnlock(t *testing.T) {
t.Fatalf("err: %v", err)
}

// Should loose leadership
// Should lose leadership
select {
case <-leaderCh:
case <-time.After(time.Second):
Expand Down Expand Up @@ -105,32 +105,40 @@ func TestLock_DeleteKey(t *testing.T) {
c, s := makeClient(t)
defer s.Stop()

lock, err := c.LockKey("test/lock")
if err != nil {
t.Fatalf("err: %v", err)
}

// Should work
leaderCh, err := lock.Lock(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if leaderCh == nil {
t.Fatalf("not leader")
}
defer lock.Unlock()
// This uncovered some issues around special-case handling of low index
// numbers where it would work with a low number but fail for higher
// ones, so we loop this a bit to sweep the index up out of that
// territory.
for i := 0; i < 10; i++ {
func() {
lock, err := c.LockKey("test/lock")
if err != nil {
t.Fatalf("err: %v", err)
}

go func() {
// Nuke the key, simulate an operator intervention
kv := c.KV()
kv.Delete("test/lock", nil)
}()
// Should work
leaderCh, err := lock.Lock(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if leaderCh == nil {
t.Fatalf("not leader")
}
defer lock.Unlock()

// Should loose leadership
select {
case <-leaderCh:
case <-time.After(time.Second):
t.Fatalf("should not be leader")
go func() {
// Nuke the key, simulate an operator intervention
kv := c.KV()
kv.Delete("test/lock", nil)
}()

// Should loose leadership
select {
case <-leaderCh:
case <-time.After(time.Second):
t.Fatalf("should not be leader")
}
}()
}
}

Expand Down
3 changes: 2 additions & 1 deletion command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/hashicorp/consul/consul"
"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/serf/serf"
)
Expand Down Expand Up @@ -94,7 +95,7 @@ type Agent struct {
eventBuf []*UserEvent
eventIndex int
eventLock sync.RWMutex
eventNotify consul.NotifyGroup
eventNotify state.NotifyGroup

shutdown bool
shutdownCh chan struct{}
Expand Down
6 changes: 3 additions & 3 deletions command/agent/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ RPC:
}

// Add the node record
records := d.formatNodeRecord(&out.NodeServices.Node, out.NodeServices.Node.Address,
records := d.formatNodeRecord(out.NodeServices.Node, out.NodeServices.Node.Address,
req.Question[0].Name, qType, d.config.NodeTTL)
if records != nil {
resp.Answer = append(resp.Answer, records...)
Expand Down Expand Up @@ -585,7 +585,7 @@ func (d *DNSServer) serviceNodeRecords(nodes structs.CheckServiceNodes, req, res
handled[addr] = struct{}{}

// Add the node record
records := d.formatNodeRecord(&node.Node, addr, qName, qType, ttl)
records := d.formatNodeRecord(node.Node, addr, qName, qType, ttl)
if records != nil {
resp.Answer = append(resp.Answer, records...)
}
Expand Down Expand Up @@ -626,7 +626,7 @@ func (d *DNSServer) serviceSRVRecords(dc string, nodes structs.CheckServiceNodes
}

// Add the extra record
records := d.formatNodeRecord(&node.Node, addr, srvRec.Target, dns.TypeANY, ttl)
records := d.formatNodeRecord(node.Node, addr, srvRec.Target, dns.TypeANY, ttl)
if records != nil {
resp.Extra = append(resp.Extra, records...)
}
Expand Down
4 changes: 4 additions & 0 deletions command/agent/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) {

// All the services should match
for id, serv := range services.NodeServices.Services {
serv.CreateIndex, serv.ModifyIndex = 0, 0
switch id {
case "mysql":
if !reflect.DeepEqual(serv, srv1) {
Expand Down Expand Up @@ -236,6 +237,7 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) {

// All the services should match
for id, serv := range services.NodeServices.Services {
serv.CreateIndex, serv.ModifyIndex = 0, 0
switch id {
case "svc_id1":
if serv.ID != "svc_id1" ||
Expand Down Expand Up @@ -455,6 +457,7 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {

// All the services should match
for id, serv := range services.NodeServices.Services {
serv.CreateIndex, serv.ModifyIndex = 0, 0
switch id {
case "mysql":
t.Fatalf("should not be permitted")
Expand Down Expand Up @@ -581,6 +584,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {

// All the checks should match
for _, chk := range checks.HealthChecks {
chk.CreateIndex, chk.ModifyIndex = 0, 0
switch chk.CheckID {
case "mysql":
if !reflect.DeepEqual(chk, chk1) {
Expand Down
20 changes: 14 additions & 6 deletions consul/acl_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,16 +123,20 @@ func (a *ACL) Get(args *structs.ACLSpecificRequest,
state := a.srv.fsm.State()
return a.srv.blockingRPC(&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("ACLGet"),
state.GetQueryWatch("ACLGet"),
func() error {
index, acl, err := state.ACLGet(args.ACL)
if err != nil {
return err
}

reply.Index = index
if acl != nil {
reply.ACLs = structs.ACLs{acl}
} else {
reply.ACLs = nil
}
return err
return nil
})
}

Expand Down Expand Up @@ -194,10 +198,14 @@ func (a *ACL) List(args *structs.DCSpecificRequest,
state := a.srv.fsm.State()
return a.srv.blockingRPC(&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("ACLList"),
state.GetQueryWatch("ACLList"),
func() error {
var err error
reply.Index, reply.ACLs, err = state.ACLList()
return err
index, acls, err := state.ACLList()
if err != nil {
return err
}

reply.Index, reply.ACLs = index, acls
return nil
})
}
8 changes: 4 additions & 4 deletions consul/acl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ func TestACL_filterServices(t *testing.T) {
func TestACL_filterServiceNodes(t *testing.T) {
// Create some service nodes
nodes := structs.ServiceNodes{
structs.ServiceNode{
&structs.ServiceNode{
Node: "node1",
ServiceName: "foo",
},
Expand All @@ -748,7 +748,7 @@ func TestACL_filterServiceNodes(t *testing.T) {
func TestACL_filterNodeServices(t *testing.T) {
// Create some node services
services := structs.NodeServices{
Node: structs.Node{
Node: &structs.Node{
Node: "node1",
},
Services: map[string]*structs.NodeService{
Expand Down Expand Up @@ -778,10 +778,10 @@ func TestACL_filterCheckServiceNodes(t *testing.T) {
// Create some nodes
nodes := structs.CheckServiceNodes{
structs.CheckServiceNode{
Node: structs.Node{
Node: &structs.Node{
Node: "node1",
},
Service: structs.NodeService{
Service: &structs.NodeService{
ID: "foo",
Service: "foo",
},
Expand Down
55 changes: 40 additions & 15 deletions consul/catalog_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,19 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde
return err
}

// Get the local state
// Get the list of nodes.
state := c.srv.fsm.State()
return c.srv.blockingRPC(&args.QueryOptions,
return c.srv.blockingRPC(
&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("Nodes"),
state.GetQueryWatch("Nodes"),
func() error {
reply.Index, reply.Nodes = state.Nodes()
index, nodes, err := state.Nodes()
if err != nil {
return err
}

reply.Index, reply.Nodes = index, nodes
return nil
})
}
Expand All @@ -136,13 +142,19 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I
return err
}

// Get the current nodes
// Get the list of services and their tags.
state := c.srv.fsm.State()
return c.srv.blockingRPC(&args.QueryOptions,
return c.srv.blockingRPC(
&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("Services"),
state.GetQueryWatch("Services"),
func() error {
reply.Index, reply.Services = state.Services()
index, services, err := state.Services()
if err != nil {
return err
}

reply.Index, reply.Services = index, services
return c.srv.filterACL(args.Token, reply)
})
}
Expand All @@ -160,15 +172,23 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru

// Get the nodes
state := c.srv.fsm.State()
err := c.srv.blockingRPC(&args.QueryOptions,
err := c.srv.blockingRPC(
&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("ServiceNodes"),
state.GetQueryWatch("ServiceNodes"),
func() error {
var index uint64
var services structs.ServiceNodes
var err error
if args.TagFilter {
reply.Index, reply.ServiceNodes = state.ServiceTagNodes(args.ServiceName, args.ServiceTag)
index, services, err = state.ServiceTagNodes(args.ServiceName, args.ServiceTag)
} else {
reply.Index, reply.ServiceNodes = state.ServiceNodes(args.ServiceName)
index, services, err = state.ServiceNodes(args.ServiceName)
}
if err != nil {
return err
}
reply.Index, reply.ServiceNodes = index, services
return c.srv.filterACL(args.Token, reply)
})

Expand Down Expand Up @@ -198,11 +218,16 @@ func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs

// Get the node services
state := c.srv.fsm.State()
return c.srv.blockingRPC(&args.QueryOptions,
return c.srv.blockingRPC(
&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("NodeServices"),
state.GetQueryWatch("NodeServices"),
func() error {
reply.Index, reply.NodeServices = state.NodeServices(args.Node)
index, services, err := state.NodeServices(args.Node)
if err != nil {
return err
}
reply.Index, reply.NodeServices = index, services
return c.srv.filterACL(args.Token, reply)
})
}
Loading