Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds support for fine-grained watches in blocking queries. #2671

Merged
merged 19 commits into from
Jan 26, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
10f3bdf
Updates go-immutable-radix and go-memdb to get fine-grained watches.
slackpad Jan 24, 2017
b21625a
Adds new variant of blocking query wrapper with WatchSet support.
slackpad Jan 13, 2017
05735e3
Removes some incorrect comments.
slackpad Jan 16, 2017
fefed7c
Don't do any watch tracking for non-blocking queries.
slackpad Jan 19, 2017
f2d9da2
Adds diff check for node and service parts of register requests.
slackpad Jan 20, 2017
b7b42d7
Adds fine-grained watches to catalog endpoints.
slackpad Jan 20, 2017
e4b8832
Fixes a race condition when updating the state store during a snapsho…
slackpad Jan 20, 2017
dcb55c7
Adds fine-grained watches to health endpoints.
slackpad Jan 24, 2017
eaa8fde
Updates a comment to point to new blockingQuery function.
slackpad Jan 24, 2017
ec90404
Adds fine-grained watch support to ACL endpoints.
slackpad Jan 24, 2017
3675e5c
Adds fine-grained watches to coordinate endpoints.
slackpad Jan 24, 2017
dfcffe0
Adds fine-grained watches to internal endpoints.
slackpad Jan 24, 2017
8b7977c
Adds fine-grained watches to prepared query endpoints.
slackpad Jan 24, 2017
1d39ddb
Adds fine-grained watches to session endpoints.
slackpad Jan 24, 2017
68e90d0
Adds a facility to notify when restores occur.
slackpad Jan 24, 2017
7da2f51
Cuts KVS endpoints over to new fine-grained watch plumbing.
slackpad Jan 24, 2017
d97c3c6
Guts all the old blocking query code.
slackpad Jan 24, 2017
75f2aa8
Pass state store pointer into the blocking query work function.
slackpad Jan 25, 2017
b787aa1
Tweaks a few comments.
slackpad Jan 25, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion command/agent/event_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (s *HTTPServer) EventList(resp http.ResponseWriter, req *http.Request) (int
nameFilter = filt
}

// Lots of this logic is borrowed from consul/rpc.go:blockingRPC
// Lots of this logic is borrowed from consul/rpc.go:blockingQuery
// However we cannot use that directly since this code has some
// slight semantics differences...
var timeout <-chan time.Time
Expand Down
2 changes: 1 addition & 1 deletion consul/acl.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (s *Server) aclLocalFault(id string) (string, string, error) {

// Query the state store.
state := s.fsm.State()
_, acl, err := state.ACLGet(id)
_, acl, err := state.ACLGet(nil, id)
if err != nil {
return "", "", err
}
Expand Down
22 changes: 9 additions & 13 deletions consul/acl_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (

"github.com/armon/go-metrics"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-uuid"
)

Expand Down Expand Up @@ -108,7 +110,7 @@ func (a *ACL) Apply(args *structs.ACLRequest, reply *string) error {
return err
}

_, acl, err := state.ACLGet(args.ACL.ID)
_, acl, err := state.ACLGet(nil, args.ACL.ID)
if err != nil {
a.srv.logger.Printf("[ERR] consul.acl: ACL lookup failed: %v", err)
return err
Expand Down Expand Up @@ -144,13 +146,10 @@ func (a *ACL) Get(args *structs.ACLSpecificRequest,
return fmt.Errorf(aclDisabled)
}

// Get the local state
state := a.srv.fsm.State()
return a.srv.blockingRPC(&args.QueryOptions,
return a.srv.blockingQuery(&args.QueryOptions,
&reply.QueryMeta,
state.GetQueryWatch("ACLGet"),
func() error {
index, acl, err := state.ACLGet(args.ACL)
func(ws memdb.WatchSet, state *state.StateStore) error {
index, acl, err := state.ACLGet(ws, args.ACL)
if err != nil {
return err
}
Expand Down Expand Up @@ -224,13 +223,10 @@ func (a *ACL) List(args *structs.DCSpecificRequest,
return permissionDeniedErr
}

// Get the local state
state := a.srv.fsm.State()
return a.srv.blockingRPC(&args.QueryOptions,
return a.srv.blockingQuery(&args.QueryOptions,
&reply.QueryMeta,
state.GetQueryWatch("ACLList"),
func() error {
index, acls, err := state.ACLList()
func(ws memdb.WatchSet, state *state.StateStore) error {
index, acls, err := state.ACLList(ws)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions consul/acl_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestACLEndpoint_Apply(t *testing.T) {

// Verify
state := s1.fsm.State()
_, s, err := state.ACLGet(out)
_, s, err := state.ACLGet(nil, out)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand All @@ -63,7 +63,7 @@ func TestACLEndpoint_Apply(t *testing.T) {
}

// Verify
_, s, err = state.ACLGet(id)
_, s, err = state.ACLGet(nil, id)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -182,7 +182,7 @@ func TestACLEndpoint_Apply_CustomID(t *testing.T) {

// Verify
state := s1.fsm.State()
_, s, err := state.ACLGet(out)
_, s, err := state.ACLGet(nil, out)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion consul/acl_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func reconcileACLs(local, remote structs.ACLs, lastRemoteIndex uint64) structs.A

// FetchLocalACLs returns the ACLs in the local state store.
func (s *Server) fetchLocalACLs() (structs.ACLs, error) {
_, local, err := s.fsm.State().ACLList()
_, local, err := s.fsm.State().ACLList(nil)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions consul/acl_replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,11 +364,11 @@ func TestACLReplication(t *testing.T) {
}

checkSame := func() (bool, error) {
index, remote, err := s1.fsm.State().ACLList()
index, remote, err := s1.fsm.State().ACLList(nil)
if err != nil {
return false, err
}
_, local, err := s2.fsm.State().ACLList()
_, local, err := s2.fsm.State().ACLList(nil)
if err != nil {
return false, err
}
Expand Down
4 changes: 2 additions & 2 deletions consul/acl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,14 +688,14 @@ func TestACL_Replication(t *testing.T) {

// Wait for replication to occur.
testutil.WaitForResult(func() (bool, error) {
_, acl, err := s2.fsm.State().ACLGet(id)
_, acl, err := s2.fsm.State().ACLGet(nil, id)
if err != nil {
return false, err
}
if acl == nil {
return false, nil
}
_, acl, err = s3.fsm.State().ACLGet(id)
_, acl, err = s3.fsm.State().ACLGet(nil, id)
if err != nil {
return false, err
}
Expand Down
46 changes: 18 additions & 28 deletions consul/catalog_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"time"

"github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-uuid"
)

Expand Down Expand Up @@ -79,7 +81,7 @@ func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error
// Check the complete register request against the given ACL policy.
if acl != nil && c.srv.config.ACLEnforceVersion8 {
state := c.srv.fsm.State()
_, ns, err := state.NodeServices(args.Node)
_, ns, err := state.NodeServices(nil, args.Node)
if err != nil {
return fmt.Errorf("Node lookup failed: %v", err)
}
Expand Down Expand Up @@ -162,20 +164,17 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde
return err
}

// Get the list of nodes.
state := c.srv.fsm.State()
return c.srv.blockingRPC(
return c.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
state.GetQueryWatch("Nodes"),
func() error {
func(ws memdb.WatchSet, state *state.StateStore) error {
var index uint64
var nodes structs.Nodes
var err error
if len(args.NodeMetaFilters) > 0 {
index, nodes, err = state.NodesByMeta(args.NodeMetaFilters)
index, nodes, err = state.NodesByMeta(ws, args.NodeMetaFilters)
} else {
index, nodes, err = state.Nodes()
index, nodes, err = state.Nodes(ws)
}
if err != nil {
return err
Expand All @@ -195,20 +194,17 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I
return err
}

// Get the list of services and their tags.
state := c.srv.fsm.State()
return c.srv.blockingRPC(
return c.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
state.GetQueryWatch("Services"),
func() error {
func(ws memdb.WatchSet, state *state.StateStore) error {
var index uint64
var services structs.Services
var err error
if len(args.NodeMetaFilters) > 0 {
index, services, err = state.ServicesByNodeMeta(args.NodeMetaFilters)
index, services, err = state.ServicesByNodeMeta(ws, args.NodeMetaFilters)
} else {
index, services, err = state.Services()
index, services, err = state.Services(ws)
}
if err != nil {
return err
Expand All @@ -230,20 +226,17 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru
return fmt.Errorf("Must provide service name")
}

// Get the nodes
state := c.srv.fsm.State()
err := c.srv.blockingRPC(
err := c.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
state.GetQueryWatch("ServiceNodes"),
func() error {
func(ws memdb.WatchSet, state *state.StateStore) error {
var index uint64
var services structs.ServiceNodes
var err error
if args.TagFilter {
index, services, err = state.ServiceTagNodes(args.ServiceName, args.ServiceTag)
index, services, err = state.ServiceTagNodes(ws, args.ServiceName, args.ServiceTag)
} else {
index, services, err = state.ServiceNodes(args.ServiceName)
index, services, err = state.ServiceNodes(ws, args.ServiceName)
}
if err != nil {
return err
Expand Down Expand Up @@ -288,14 +281,11 @@ func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs
return fmt.Errorf("Must provide node")
}

// Get the node services
state := c.srv.fsm.State()
return c.srv.blockingRPC(
return c.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
state.GetQueryWatch("NodeServices"),
func() error {
index, services, err := state.NodeServices(args.Node)
func(ws memdb.WatchSet, state *state.StateStore) error {
index, services, err := state.NodeServices(ws, args.Node)
if err != nil {
return err
}
Expand Down
10 changes: 5 additions & 5 deletions consul/coordinate_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"sync"
"time"

"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/serf/coordinate"
)

Expand Down Expand Up @@ -173,12 +175,10 @@ func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.I
return err
}

state := c.srv.fsm.State()
return c.srv.blockingRPC(&args.QueryOptions,
return c.srv.blockingQuery(&args.QueryOptions,
&reply.QueryMeta,
state.GetQueryWatch("Coordinates"),
func() error {
index, coords, err := state.Coordinates()
func(ws memdb.WatchSet, state *state.StateStore) error {
index, coords, err := state.Coordinates(ws)
if err != nil {
return err
}
Expand Down
23 changes: 22 additions & 1 deletion consul/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"log"
"sync"
"time"

"github.com/armon/go-metrics"
Expand All @@ -24,8 +25,15 @@ type consulFSM struct {
logOutput io.Writer
logger *log.Logger
path string

// stateLock is only used to protect outside callers to State() from
// racing with Restore(), which is called by Raft (it puts in a totally
// new state store). Everything internal here is synchronized by the
// Raft side, so doesn't need to lock this.
stateLock sync.RWMutex
state *state.StateStore
gc *state.TombstoneGC

gc *state.TombstoneGC
}

// consulSnapshot is used to provide a snapshot of the current
Expand Down Expand Up @@ -60,6 +68,8 @@ func NewFSM(gc *state.TombstoneGC, logOutput io.Writer) (*consulFSM, error) {

// State is used to return a handle to the current state
func (c *consulFSM) State() *state.StateStore {
c.stateLock.RLock()
defer c.stateLock.RUnlock()
return c.state
}

Expand Down Expand Up @@ -316,7 +326,18 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
if err != nil {
return err
}

// External code might be calling State(), so we need to synchronize
// here to make sure we swap in the new state store atomically.
c.stateLock.Lock()
stateOld := c.state
c.state = stateNew
c.stateLock.Unlock()

// The old state store has been abandoned already since we've replaced
// it with an empty one, but we defer telling watchers about it until
// the restore is done, so they wake up once we have the latest data.
defer stateOld.Abandon()

// Set up a new restore transaction
restore := c.state.Restore()
Expand Down
Loading