Skip to content

Commit

Permalink
Merge pull request #2671 from hashicorp/f-fine-watch
Browse files Browse the repository at this point in the history
Adds support for fine-grained watches in blocking queries.
  • Loading branch information
slackpad authored Jan 26, 2017
2 parents 01ed547 + b787aa1 commit 4bd0da7
Show file tree
Hide file tree
Showing 54 changed files with 2,253 additions and 2,155 deletions.
2 changes: 1 addition & 1 deletion command/agent/event_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (s *HTTPServer) EventList(resp http.ResponseWriter, req *http.Request) (int
nameFilter = filt
}

// Lots of this logic is borrowed from consul/rpc.go:blockingRPC
// Lots of this logic is borrowed from consul/rpc.go:blockingQuery
// However we cannot use that directly since this code has some
// slight semantics differences...
var timeout <-chan time.Time
Expand Down
2 changes: 1 addition & 1 deletion consul/acl.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (s *Server) aclLocalFault(id string) (string, string, error) {

// Query the state store.
state := s.fsm.State()
_, acl, err := state.ACLGet(id)
_, acl, err := state.ACLGet(nil, id)
if err != nil {
return "", "", err
}
Expand Down
22 changes: 9 additions & 13 deletions consul/acl_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (

"github.com/armon/go-metrics"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-uuid"
)

Expand Down Expand Up @@ -108,7 +110,7 @@ func (a *ACL) Apply(args *structs.ACLRequest, reply *string) error {
return err
}

_, acl, err := state.ACLGet(args.ACL.ID)
_, acl, err := state.ACLGet(nil, args.ACL.ID)
if err != nil {
a.srv.logger.Printf("[ERR] consul.acl: ACL lookup failed: %v", err)
return err
Expand Down Expand Up @@ -144,13 +146,10 @@ func (a *ACL) Get(args *structs.ACLSpecificRequest,
return fmt.Errorf(aclDisabled)
}

// Get the local state
state := a.srv.fsm.State()
return a.srv.blockingRPC(&args.QueryOptions,
return a.srv.blockingQuery(&args.QueryOptions,
&reply.QueryMeta,
state.GetQueryWatch("ACLGet"),
func() error {
index, acl, err := state.ACLGet(args.ACL)
func(ws memdb.WatchSet, state *state.StateStore) error {
index, acl, err := state.ACLGet(ws, args.ACL)
if err != nil {
return err
}
Expand Down Expand Up @@ -224,13 +223,10 @@ func (a *ACL) List(args *structs.DCSpecificRequest,
return permissionDeniedErr
}

// Get the local state
state := a.srv.fsm.State()
return a.srv.blockingRPC(&args.QueryOptions,
return a.srv.blockingQuery(&args.QueryOptions,
&reply.QueryMeta,
state.GetQueryWatch("ACLList"),
func() error {
index, acls, err := state.ACLList()
func(ws memdb.WatchSet, state *state.StateStore) error {
index, acls, err := state.ACLList(ws)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions consul/acl_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestACLEndpoint_Apply(t *testing.T) {

// Verify
state := s1.fsm.State()
_, s, err := state.ACLGet(out)
_, s, err := state.ACLGet(nil, out)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand All @@ -63,7 +63,7 @@ func TestACLEndpoint_Apply(t *testing.T) {
}

// Verify
_, s, err = state.ACLGet(id)
_, s, err = state.ACLGet(nil, id)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -182,7 +182,7 @@ func TestACLEndpoint_Apply_CustomID(t *testing.T) {

// Verify
state := s1.fsm.State()
_, s, err := state.ACLGet(out)
_, s, err := state.ACLGet(nil, out)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion consul/acl_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func reconcileACLs(local, remote structs.ACLs, lastRemoteIndex uint64) structs.A

// FetchLocalACLs returns the ACLs in the local state store.
func (s *Server) fetchLocalACLs() (structs.ACLs, error) {
_, local, err := s.fsm.State().ACLList()
_, local, err := s.fsm.State().ACLList(nil)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions consul/acl_replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,11 +364,11 @@ func TestACLReplication(t *testing.T) {
}

checkSame := func() (bool, error) {
index, remote, err := s1.fsm.State().ACLList()
index, remote, err := s1.fsm.State().ACLList(nil)
if err != nil {
return false, err
}
_, local, err := s2.fsm.State().ACLList()
_, local, err := s2.fsm.State().ACLList(nil)
if err != nil {
return false, err
}
Expand Down
4 changes: 2 additions & 2 deletions consul/acl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,14 +688,14 @@ func TestACL_Replication(t *testing.T) {

// Wait for replication to occur.
testutil.WaitForResult(func() (bool, error) {
_, acl, err := s2.fsm.State().ACLGet(id)
_, acl, err := s2.fsm.State().ACLGet(nil, id)
if err != nil {
return false, err
}
if acl == nil {
return false, nil
}
_, acl, err = s3.fsm.State().ACLGet(id)
_, acl, err = s3.fsm.State().ACLGet(nil, id)
if err != nil {
return false, err
}
Expand Down
46 changes: 18 additions & 28 deletions consul/catalog_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"time"

"github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-uuid"
)

Expand Down Expand Up @@ -79,7 +81,7 @@ func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error
// Check the complete register request against the given ACL policy.
if acl != nil && c.srv.config.ACLEnforceVersion8 {
state := c.srv.fsm.State()
_, ns, err := state.NodeServices(args.Node)
_, ns, err := state.NodeServices(nil, args.Node)
if err != nil {
return fmt.Errorf("Node lookup failed: %v", err)
}
Expand Down Expand Up @@ -162,20 +164,17 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde
return err
}

// Get the list of nodes.
state := c.srv.fsm.State()
return c.srv.blockingRPC(
return c.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
state.GetQueryWatch("Nodes"),
func() error {
func(ws memdb.WatchSet, state *state.StateStore) error {
var index uint64
var nodes structs.Nodes
var err error
if len(args.NodeMetaFilters) > 0 {
index, nodes, err = state.NodesByMeta(args.NodeMetaFilters)
index, nodes, err = state.NodesByMeta(ws, args.NodeMetaFilters)
} else {
index, nodes, err = state.Nodes()
index, nodes, err = state.Nodes(ws)
}
if err != nil {
return err
Expand All @@ -195,20 +194,17 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I
return err
}

// Get the list of services and their tags.
state := c.srv.fsm.State()
return c.srv.blockingRPC(
return c.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
state.GetQueryWatch("Services"),
func() error {
func(ws memdb.WatchSet, state *state.StateStore) error {
var index uint64
var services structs.Services
var err error
if len(args.NodeMetaFilters) > 0 {
index, services, err = state.ServicesByNodeMeta(args.NodeMetaFilters)
index, services, err = state.ServicesByNodeMeta(ws, args.NodeMetaFilters)
} else {
index, services, err = state.Services()
index, services, err = state.Services(ws)
}
if err != nil {
return err
Expand All @@ -230,20 +226,17 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru
return fmt.Errorf("Must provide service name")
}

// Get the nodes
state := c.srv.fsm.State()
err := c.srv.blockingRPC(
err := c.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
state.GetQueryWatch("ServiceNodes"),
func() error {
func(ws memdb.WatchSet, state *state.StateStore) error {
var index uint64
var services structs.ServiceNodes
var err error
if args.TagFilter {
index, services, err = state.ServiceTagNodes(args.ServiceName, args.ServiceTag)
index, services, err = state.ServiceTagNodes(ws, args.ServiceName, args.ServiceTag)
} else {
index, services, err = state.ServiceNodes(args.ServiceName)
index, services, err = state.ServiceNodes(ws, args.ServiceName)
}
if err != nil {
return err
Expand Down Expand Up @@ -288,14 +281,11 @@ func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs
return fmt.Errorf("Must provide node")
}

// Get the node services
state := c.srv.fsm.State()
return c.srv.blockingRPC(
return c.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
state.GetQueryWatch("NodeServices"),
func() error {
index, services, err := state.NodeServices(args.Node)
func(ws memdb.WatchSet, state *state.StateStore) error {
index, services, err := state.NodeServices(ws, args.Node)
if err != nil {
return err
}
Expand Down
10 changes: 5 additions & 5 deletions consul/coordinate_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"sync"
"time"

"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/serf/coordinate"
)

Expand Down Expand Up @@ -173,12 +175,10 @@ func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.I
return err
}

state := c.srv.fsm.State()
return c.srv.blockingRPC(&args.QueryOptions,
return c.srv.blockingQuery(&args.QueryOptions,
&reply.QueryMeta,
state.GetQueryWatch("Coordinates"),
func() error {
index, coords, err := state.Coordinates()
func(ws memdb.WatchSet, state *state.StateStore) error {
index, coords, err := state.Coordinates(ws)
if err != nil {
return err
}
Expand Down
23 changes: 22 additions & 1 deletion consul/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"log"
"sync"
"time"

"github.com/armon/go-metrics"
Expand All @@ -24,8 +25,15 @@ type consulFSM struct {
logOutput io.Writer
logger *log.Logger
path string

// stateLock is only used to protect outside callers to State() from
// racing with Restore(), which is called by Raft (it puts in a totally
// new state store). Everything internal here is synchronized by the
// Raft side, so doesn't need to lock this.
stateLock sync.RWMutex
state *state.StateStore
gc *state.TombstoneGC

gc *state.TombstoneGC
}

// consulSnapshot is used to provide a snapshot of the current
Expand Down Expand Up @@ -60,6 +68,8 @@ func NewFSM(gc *state.TombstoneGC, logOutput io.Writer) (*consulFSM, error) {

// State is used to return a handle to the current state
func (c *consulFSM) State() *state.StateStore {
c.stateLock.RLock()
defer c.stateLock.RUnlock()
return c.state
}

Expand Down Expand Up @@ -316,7 +326,18 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
if err != nil {
return err
}

// External code might be calling State(), so we need to synchronize
// here to make sure we swap in the new state store atomically.
c.stateLock.Lock()
stateOld := c.state
c.state = stateNew
c.stateLock.Unlock()

// The old state store has been abandoned already since we've replaced
// it with an empty one, but we defer telling watchers about it until
// the restore is done, so they wake up once we have the latest data.
defer stateOld.Abandon()

// Set up a new restore transaction
restore := c.state.Restore()
Expand Down
Loading

0 comments on commit 4bd0da7

Please sign in to comment.