diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index d95fc36f406..7e38d9a1168 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -53,8 +53,8 @@ type Node struct { OpenTSDBServer *opentsdb.Server // The OpenTSDB Server } -func (s *Node) ClusterAddr() net.Addr { - return s.clusterListener.Addr() +func (n *Node) ClusterAddr() net.Addr { + return n.clusterListener.Addr() } func (s *Node) ClusterURL() *url.URL { @@ -71,45 +71,46 @@ func (s *Node) ClusterURL() *url.URL { } } -func (s *Node) Close() error { - if err := s.closeClusterListener(); err != nil { +func (n *Node) Close() error { + + if err := n.closeClusterListener(); err != nil { return err } - if err := s.closeAPIListener(); err != nil { + if err := n.closeAPIListener(); err != nil { return err } - if err := s.closeAdminServer(); err != nil { + if err := n.closeAdminServer(); err != nil { return err } - for _, g := range s.GraphiteServers { + for _, g := range n.GraphiteServers { if err := g.Close(); err != nil { return err } } - if s.OpenTSDBServer != nil { - if err := s.OpenTSDBServer.Close(); err != nil { + if n.OpenTSDBServer != nil { + if err := n.OpenTSDBServer.Close(); err != nil { return err } } - if s.DataNode != nil { - if err := s.DataNode.Close(); err != nil { + if n.DataNode != nil { + if err := n.DataNode.Close(); err != nil { return err } } - if s.raftLog != nil { - if err := s.raftLog.Close(); err != nil { + if n.raftLog != nil { + if err := n.raftLog.Close(); err != nil { return err } } - if s.Broker != nil { - if err := s.Broker.Close(); err != nil { + if n.Broker != nil { + if err := n.Broker.Close(); err != nil { return err } } @@ -117,21 +118,42 @@ func (s *Node) Close() error { return nil } -func (s *Node) openAdminServer(port int) error { +func (n *Node) dropAndExit(config *Config) func() { + return func() { + + if err := n.Close(); err != nil { + log.Printf("error closing node: %s", err) + } + + if err := os.RemoveAll(config.Data.Dir); err != nil { + log.Printf("error removing %q: %s", config.Data.Dir, err) + } + + if err := os.RemoveAll(config.Broker.Dir); err != nil { + log.Printf("error removing %q: %s", config.Broker.Dir, err) + } + + log.Printf("successfully dropped node: %q exiting", n.hostname) + + os.Exit(0) + } +} + +func (n *Node) openAdminServer(port int) error { // Start the admin interface on the default port addr := net.JoinHostPort("", strconv.Itoa(port)) - s.adminServer = admin.NewServer(addr) - return s.adminServer.ListenAndServe() + n.adminServer = admin.NewServer(addr) + return n.adminServer.ListenAndServe() } -func (s *Node) closeAdminServer() error { - if s.adminServer != nil { - return s.adminServer.Close() +func (n *Node) closeAdminServer() error { + if n.adminServer != nil { + return n.adminServer.Close() } return nil } -func (s *Node) openListener(desc, addr string, h http.Handler) (net.Listener, error) { +func (n *Node) openListener(desc, addr string, h http.Handler) (net.Listener, error) { var err error listener, err := net.Listen("tcp", addr) if err != nil { @@ -153,38 +175,38 @@ func (s *Node) openListener(desc, addr string, h http.Handler) (net.Listener, er } -func (s *Node) openAPIListener(addr string, h http.Handler) error { +func (n *Node) openAPIListener(addr string, h http.Handler) error { var err error - s.apiListener, err = s.openListener("API", addr, h) + n.apiListener, err = n.openListener("API", addr, h) if err != nil { return err } return nil } -func (s *Node) closeAPIListener() error { +func (n *Node) closeAPIListener() error { var err error - if s.apiListener != nil { - err = s.apiListener.Close() - s.apiListener = nil + if n.apiListener != nil { + err = n.apiListener.Close() + n.apiListener = nil } return err } -func (s *Node) openClusterListener(addr string, h http.Handler) error { +func (n *Node) openClusterListener(addr string, h http.Handler) error { var err error - s.clusterListener, err = s.openListener("Cluster", addr, h) + n.clusterListener, err = n.openListener("Cluster", addr, h) if err != nil { return err } return nil } -func (s *Node) closeClusterListener() error { +func (n *Node) closeClusterListener() error { var err error - if s.clusterListener != nil { - err = s.clusterListener.Close() - s.clusterListener = nil + if n.clusterListener != nil { + err = n.clusterListener.Close() + n.clusterListener = nil } return err } @@ -334,6 +356,10 @@ func (cmd *RunCommand) Open(config *Config, join string) *Node { //FIXME: Need to also pass in dataURLs to bootstrap a data node s = cmd.openServer(joinURLs) + + // Give the server reference to close the node + s.DropNode = cmd.node.dropAndExit(config) + cmd.node.DataNode = s s.SetAuthenticationEnabled(cmd.config.Authentication.Enabled) log.Printf("authentication enabled: %v\n", cmd.config.Authentication.Enabled) diff --git a/commands.go b/commands.go index ede928ce088..b4ddb15b7bb 100644 --- a/commands.go +++ b/commands.go @@ -47,8 +47,15 @@ const ( // Privilege messages setPrivilegeMessageType = messaging.MessageType(0x90) + + // Server messages + dropServerMessageType = messaging.MessageType(0x100) ) +type dropServerCommand struct { + NodeID uint64 `json:"nodeid"` +} + type createDataNodeCommand struct { URL string `json:"url"` } diff --git a/influxdb.go b/influxdb.go index e076254636b..07257828812 100644 --- a/influxdb.go +++ b/influxdb.go @@ -21,9 +21,15 @@ var ( // ErrServerOpen is returned when opening an already open server. ErrServerOpen = errors.New("server already open") + // ErrDropServerConflict is returned when removing the server would result in data loss + ErrDropServerConflict = errors.New("removing this server would result in data loss") + // ErrServerClosed is returned when closing an already closed server. ErrServerClosed = errors.New("server already closed") + // ErrServernotFound is returned when removing/adding a server and it is not found + ErrServerNotFound = errors.New("server not found") + // ErrPathRequired is returned when opening a server without a path. ErrPathRequired = errors.New("path required") @@ -43,8 +49,8 @@ var ( // attempting to join another data node when no data nodes exist yet ErrDataNodeNotFound = errors.New("data node not found") - // ErrDataNodeRequired is returned when using a blank data node id. - ErrDataNodeRequired = errors.New("data node required") + // ErrServerNodeIDRequired is returned when using a zero server node id. + ErrServerNodeIDRequired = errors.New("server node id must be greater than 0") // ErrDatabaseNameRequired is returned when creating a database without a name. ErrDatabaseNameRequired = errors.New("database name required") diff --git a/influxql/ast.go b/influxql/ast.go index 0a140263494..a96eb70d3d9 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -71,6 +71,7 @@ func (*DropDatabaseStatement) node() {} func (*DropMeasurementStatement) node() {} func (*DropRetentionPolicyStatement) node() {} func (*DropSeriesStatement) node() {} +func (*DropServerStatement) node() {} func (*DropUserStatement) node() {} func (*GrantStatement) node() {} func (*ShowContinuousQueriesStatement) node() {} @@ -171,6 +172,7 @@ func (*DropDatabaseStatement) stmt() {} func (*DropMeasurementStatement) stmt() {} func (*DropRetentionPolicyStatement) stmt() {} func (*DropSeriesStatement) stmt() {} +func (*DropServerStatement) stmt() {} func (*DropUserStatement) stmt() {} func (*GrantStatement) stmt() {} func (*ShowContinuousQueriesStatement) stmt() {} @@ -1234,6 +1236,22 @@ func (s DropSeriesStatement) RequiredPrivileges() ExecutionPrivileges { return ExecutionPrivileges{{Name: "", Privilege: WritePrivilege}} } +// DropServerStatement represents a command for removing a server from the cluster. +type DropServerStatement struct { + // ID of the node to be dropped. + NodeID uint64 +} + +// String returns a string representation of the drop series statement. +func (s *DropServerStatement) String() string { + return fmt.Sprintf("DROP SERVER %d", s.NodeID) +} + +// RequiredPrivileges returns the privilege required to execute a DropServerStatement. +func (s *DropServerStatement) RequiredPrivileges() ExecutionPrivileges { + return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges}} +} + // ShowContinuousQueriesStatement represents a command for listing continuous queries. type ShowContinuousQueriesStatement struct{} diff --git a/influxql/parser.go b/influxql/parser.go index b50414f4e53..0c3318f8d6f 100644 --- a/influxql/parser.go +++ b/influxql/parser.go @@ -18,6 +18,13 @@ const ( // DateTimeFormat represents the format for date time literals. DateTimeFormat = "2006-01-02 15:04:05.999999" + + // Number bases + base10 = 10 + + // Bit sizes + bits32 = 32 + bits64 = 64 ) // Parser represents an InfluxQL parser. @@ -172,6 +179,8 @@ func (p *Parser) parseDropStatement() (Statement, error) { return p.parseDropRetentionPolicyStatement() } else if tok == USER { return p.parseDropUserStatement() + } else if tok == SERVER { + return p.parseDropServerStatement() } return nil, newParseError(tokstr(tok, lit), []string{"SERIES", "CONTINUOUS", "MEASUREMENT"}, pos) @@ -372,20 +381,20 @@ func (p *Parser) parseInt(min, max int) (int, error) { return n, nil } -// parseUInt32 parses a string and returns a 32-bit unsigned integer literal. -func (p *Parser) parseUInt32() (uint32, error) { +// parseUint parses a string and returns a 64-bit unsigned integer. +func (p *Parser) parseUint(bitSize int) (uint64, error) { tok, pos, lit := p.scanIgnoreWhitespace() if tok != NUMBER { return 0, newParseError(tokstr(tok, lit), []string{"number"}, pos) } - // Convert string to unsigned 32-bit integer - n, err := strconv.ParseUint(lit, 10, 32) + // Convert string to unsigned 64-bit integer + n, err := strconv.ParseUint(lit, base10, bitSize) if err != nil { return 0, &ParseError{Message: err.Error(), Pos: pos} } - return uint32(n), nil + return n, nil } // parseUInt64 parses a string and returns a 64-bit unsigned integer literal. @@ -1039,7 +1048,7 @@ func (p *Parser) parseDropSeriesStatement() (*DropSeriesStatement, error) { // If they didn't provide a FROM or a WHERE, they need to provide the SeriesID if stmt.Condition == nil && stmt.Source == nil { - id, err := p.parseUInt64() + id, err := p.parseUint(bits64) if err != nil { return nil, err } @@ -1048,6 +1057,20 @@ func (p *Parser) parseDropSeriesStatement() (*DropSeriesStatement, error) { return stmt, nil } +// parseDropServerStatement parses a string and returns a DropServerStatement. +// This function assumes the "DROP SERVER" tokens have already been consumed. +func (p *Parser) parseDropServerStatement() (*DropServerStatement, error) { + s := &DropServerStatement{} + var err error + + // Parse the server's ID. + if s.NodeID, err = p.parseUint(bits64); err != nil { + return nil, err + } + + return s, nil +} + // parseShowContinuousQueriesStatement parses a string and returns a ShowContinuousQueriesStatement. // This function assumes the "SHOW CONTINUOUS" tokens have already been consumed. func (p *Parser) parseShowContinuousQueriesStatement() (*ShowContinuousQueriesStatement, error) { diff --git a/influxql/parser_test.go b/influxql/parser_test.go index 1e6c7a7401e..794ea85ac20 100644 --- a/influxql/parser_test.go +++ b/influxql/parser_test.go @@ -584,6 +584,12 @@ func TestParser_ParseStatement(t *testing.T) { }, }, + // DROP SERVER statement + { + s: `DROP SERVER 123`, + stmt: &influxql.DropServerStatement{NodeID: 123}, + }, + // SHOW CONTINUOUS QUERIES statement { s: `SHOW CONTINUOUS QUERIES`, diff --git a/influxql/scanner_test.go b/influxql/scanner_test.go index e3a03a3532a..f025bcafb09 100644 --- a/influxql/scanner_test.go +++ b/influxql/scanner_test.go @@ -152,6 +152,8 @@ func TestScanner_Scan(t *testing.T) { {s: `REVOKE`, tok: influxql.REVOKE}, {s: `SELECT`, tok: influxql.SELECT}, {s: `SERIES`, tok: influxql.SERIES}, + {s: `SERVER`, tok: influxql.SERVER}, + {s: `SERVERS`, tok: influxql.SERVERS}, {s: `TAG`, tok: influxql.TAG}, {s: `TO`, tok: influxql.TO}, {s: `USER`, tok: influxql.USER}, diff --git a/influxql/token.go b/influxql/token.go index 729c214f85b..fd0f49f2801 100644 --- a/influxql/token.go +++ b/influxql/token.go @@ -104,6 +104,7 @@ const ( REVOKE SELECT SERIES + SERVER SERVERS SET SHOW @@ -209,6 +210,7 @@ var tokens = [...]string{ REVOKE: "REVOKE", SELECT: "SELECT", SERIES: "SERIES", + SERVER: "SERVER", SERVERS: "SERVERS", SET: "SET", SHOW: "SHOW", diff --git a/server.go b/server.go index 697be7e4f80..1a364bc11fc 100644 --- a/server.go +++ b/server.go @@ -68,6 +68,8 @@ type Server struct { index uint64 // highest broadcast index seen errors map[uint64]error // message errors + DropNode func() // give reference to shut down the node + meta *metastore // metadata store dataNodes map[uint64]*DataNode // data nodes by id @@ -264,7 +266,7 @@ func (s *Server) close() error { } if s.client != nil { - s.client.Close() + _ = s.client.Close() s.client = nil } @@ -324,7 +326,7 @@ func (s *Server) load() error { s.shards[sh.ID] = sh // Only open shards owned by the server. - if !sh.HasDataNodeID(s.id) { + if !sh.hasDataNodeID(s.id) { continue } @@ -391,7 +393,7 @@ func (s *Server) StartSelfMonitoring(database, retention string, interval time.D // Shard-level stats. tags["shardID"] = strconv.FormatUint(s.id, 10) for _, sh := range s.shards { - if !sh.HasDataNodeID(s.id) { + if !sh.hasDataNodeID(s.id) { // No stats for non-local shards. continue } @@ -892,6 +894,17 @@ func (s *Server) DataNodes() (a []*DataNode) { return } +// DropServer removes a server from the cluster +// Curently it will drop both a broker and/or a data node +func (s *Server) DropServer(nodeID uint64) error { + if nodeID == 0 { + return ErrServerNodeIDRequired + } + c := &dropServerCommand{NodeID: nodeID} + _, err := s.broadcast(dropServerMessageType, c) + return err +} + // CreateDataNode creates a new data node with a given URL. func (s *Server) CreateDataNode(u *url.URL) error { c := &createDataNodeCommand{URL: u.String()} @@ -995,6 +1008,51 @@ func (s *Server) CreateDatabaseIfNotExists(name string) error { if err := s.CreateDatabase(name); err != nil && err != ErrDatabaseExists { return err } + + return nil +} + +func (s *Server) applyDropServer(m *messaging.Message) error { + var c dropServerCommand + mustUnmarshalJSON(m.Data, &c) + + // Remove data node reference from every shard in meta store. + for _, database := range s.databases { + for _, policy := range database.policies { + for _, shardGroup := range policy.shardGroups { + for _, shard := range shardGroup.Shards { + shard.dropDataNodeID(c.NodeID) + // If there are no data nodes left, close the shard + if len(shard.DataNodeIDs) == 0 { + // TODO if this happens, we should replicate the data out or show a warning that we just lost data + // as this is a sign that no other data nodes have this shard + shard.close() + // Remove it from the top level map + delete(s.shards, shard.ID) + } else { + // Update the server reference + s.shards[shard.ID] = shard + } + } + } + } + } + + // Remove data node from the current server + delete(s.dataNodes, c.NodeID) + + // TODO: Update the meta store for each shard that a datanode has been removed + + // TODO: Persist these changes to the meta store + + // TODO: Brokes need to expire data node + + // am I the server being dropped? + if c.NodeID == s.id { + go s.DropNode() + return nil + } + return nil } @@ -1185,7 +1243,7 @@ func (s *Server) applyCreateShardGroupIfNotExists(m *messaging.Message) error { // Open shards assigned to this server. for _, sh := range g.Shards { // Ignore if this server is not assigned. - if !sh.HasDataNodeID(s.id) { + if !sh.hasDataNodeID(s.id) { continue } @@ -2219,6 +2277,8 @@ func (s *Server) ExecuteQuery(q *influxql.Query, database string, user *User, ch res = s.executeDropDatabaseStatement(stmt, user) case *influxql.ShowDatabasesStatement: res = s.executeShowDatabasesStatement(stmt, user) + case *influxql.DropServerStatement: + res = s.executeDropServerStatement(stmt, user) case *influxql.ShowServersStatement: res = s.executeShowServersStatement(stmt, user) case *influxql.CreateUserStatement: @@ -2497,6 +2557,10 @@ func (s *Server) executeShowDatabasesStatement(q *influxql.ShowDatabasesStatemen return &Result{Series: []*influxql.Row{row}} } +func (s *Server) executeDropServerStatement(q *influxql.DropServerStatement, user *User) *Result { + return &Result{Err: s.DropServer(q.NodeID)} +} + func (s *Server) executeShowServersStatement(q *influxql.ShowServersStatement, user *User) *Result { row := &influxql.Row{Columns: []string{"id", "url"}} for _, node := range s.DataNodes() { @@ -3380,7 +3444,7 @@ func (s *Server) DiagnosticsAsRows() []*influxql.Row { nodes = append(nodes, strconv.FormatUint(n, 10)) } var path string - if sh.HasDataNodeID(s.id) { + if sh.hasDataNodeID(s.id) { path = sh.store.Path() } shardsRow.Values = append(shardsRow.Values, []interface{}{now, strconv.FormatUint(sh.ID, 10), @@ -3429,6 +3493,8 @@ func (s *Server) processor(conn MessagingConn, done chan struct{}) { // Process message. var err error switch m.Type { + case dropServerMessageType: + err = s.applyDropServer(m) case createDataNodeMessageType: err = s.applyCreateDataNode(m) case deleteDataNodeMessageType: diff --git a/shard.go b/shard.go index e672137f82c..e9026a90161 100644 --- a/shard.go +++ b/shard.go @@ -63,7 +63,7 @@ func (sg *ShardGroup) initialize(index uint64, shardN, replicaN int, db *databas func (sg *ShardGroup) close(id uint64) error { for _, shard := range sg.Shards { // Ignore shards not on this server. - if !shard.HasDataNodeID(id) { + if !shard.hasDataNodeID(id) { continue } @@ -256,9 +256,23 @@ func (s *Shard) sync(index uint64) error { } } -// HasDataNodeID return true if the data node owns the shard. -func (s *Shard) HasDataNodeID(id uint64) bool { - for _, dataNodeID := range s.DataNodeIDs { +func (s *Shard) dropDataNodeID(id uint64) { + s.mu.Lock() + defer s.mu.Unlock() + + var dataNodeIds []uint64 + + for _, dataNodeId := range s.DataNodeIDs { + if id != dataNodeId { + dataNodeIds = append(dataNodeIds, dataNodeId) + } + } + + s.DataNodeIDs = dataNodeIds +} + +func (s *Shard) hasDataNodeID(id uint64) bool { + for _, dataNodeID := range s.readDataNodeIDs() { if dataNodeID == id { return true } @@ -266,6 +280,13 @@ func (s *Shard) HasDataNodeID(id uint64) bool { return false } +func (s *Shard) readDataNodeIDs() []uint64 { + s.mu.RLock() + defer s.mu.RUnlock() + + return s.DataNodeIDs +} + // readSeries reads encoded series data from a shard. func (s *Shard) readSeries(seriesID uint64, timestamp int64) (values []byte, err error) { err = s.store.View(func(tx *bolt.Tx) error {