Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(WIP) support dropping server from the cluster #2486

Closed
wants to merge 11 commits into from
94 changes: 60 additions & 34 deletions cmd/influxd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ type Node struct {
OpenTSDBServer *opentsdb.Server // The OpenTSDB Server
}

func (s *Node) ClusterAddr() net.Addr {
return s.clusterListener.Addr()
func (n *Node) ClusterAddr() net.Addr {
return n.clusterListener.Addr()
}

func (s *Node) ClusterURL() *url.URL {
Expand All @@ -71,67 +71,89 @@ func (s *Node) ClusterURL() *url.URL {
}
}

func (s *Node) Close() error {
if err := s.closeClusterListener(); err != nil {
func (n *Node) Close() error {

if err := n.closeClusterListener(); err != nil {
return err
}

if err := s.closeAPIListener(); err != nil {
if err := n.closeAPIListener(); err != nil {
return err
}

if err := s.closeAdminServer(); err != nil {
if err := n.closeAdminServer(); err != nil {
return err
}

for _, g := range s.GraphiteServers {
for _, g := range n.GraphiteServers {
if err := g.Close(); err != nil {
return err
}
}

if s.OpenTSDBServer != nil {
if err := s.OpenTSDBServer.Close(); err != nil {
if n.OpenTSDBServer != nil {
if err := n.OpenTSDBServer.Close(); err != nil {
return err
}
}

if s.DataNode != nil {
if err := s.DataNode.Close(); err != nil {
if n.DataNode != nil {
if err := n.DataNode.Close(); err != nil {
return err
}
}

if s.raftLog != nil {
if err := s.raftLog.Close(); err != nil {
if n.raftLog != nil {
if err := n.raftLog.Close(); err != nil {
return err
}
}

if s.Broker != nil {
if err := s.Broker.Close(); err != nil {
if n.Broker != nil {
if err := n.Broker.Close(); err != nil {
return err
}
}

return nil
}

func (s *Node) openAdminServer(port int) error {
func (n *Node) dropAndExit(config *Config) func() {
return func() {

if err := n.Close(); err != nil {
log.Printf("error closing node: %s", err)
}

if err := os.RemoveAll(config.Data.Dir); err != nil {
log.Printf("error removing %q: %s", config.Data.Dir, err)
}

if err := os.RemoveAll(config.Broker.Dir); err != nil {
log.Printf("error removing %q: %s", config.Broker.Dir, err)
}

log.Printf("successfully dropped node: %q exiting", n.hostname)

os.Exit(0)
}
}

func (n *Node) openAdminServer(port int) error {
// Start the admin interface on the default port
addr := net.JoinHostPort("", strconv.Itoa(port))
s.adminServer = admin.NewServer(addr)
return s.adminServer.ListenAndServe()
n.adminServer = admin.NewServer(addr)
return n.adminServer.ListenAndServe()
}

func (s *Node) closeAdminServer() error {
if s.adminServer != nil {
return s.adminServer.Close()
func (n *Node) closeAdminServer() error {
if n.adminServer != nil {
return n.adminServer.Close()
}
return nil
}

func (s *Node) openListener(desc, addr string, h http.Handler) (net.Listener, error) {
func (n *Node) openListener(desc, addr string, h http.Handler) (net.Listener, error) {
var err error
listener, err := net.Listen("tcp", addr)
if err != nil {
Expand All @@ -153,38 +175,38 @@ func (s *Node) openListener(desc, addr string, h http.Handler) (net.Listener, er

}

func (s *Node) openAPIListener(addr string, h http.Handler) error {
func (n *Node) openAPIListener(addr string, h http.Handler) error {
var err error
s.apiListener, err = s.openListener("API", addr, h)
n.apiListener, err = n.openListener("API", addr, h)
if err != nil {
return err
}
return nil
}

func (s *Node) closeAPIListener() error {
func (n *Node) closeAPIListener() error {
var err error
if s.apiListener != nil {
err = s.apiListener.Close()
s.apiListener = nil
if n.apiListener != nil {
err = n.apiListener.Close()
n.apiListener = nil
}
return err
}

func (s *Node) openClusterListener(addr string, h http.Handler) error {
func (n *Node) openClusterListener(addr string, h http.Handler) error {
var err error
s.clusterListener, err = s.openListener("Cluster", addr, h)
n.clusterListener, err = n.openListener("Cluster", addr, h)
if err != nil {
return err
}
return nil
}

func (s *Node) closeClusterListener() error {
func (n *Node) closeClusterListener() error {
var err error
if s.clusterListener != nil {
err = s.clusterListener.Close()
s.clusterListener = nil
if n.clusterListener != nil {
err = n.clusterListener.Close()
n.clusterListener = nil
}
return err
}
Expand Down Expand Up @@ -334,6 +356,10 @@ func (cmd *RunCommand) Open(config *Config, join string) *Node {

//FIXME: Need to also pass in dataURLs to bootstrap a data node
s = cmd.openServer(joinURLs)

// Give the server reference to close the node
s.DropNode = cmd.node.dropAndExit(config)

cmd.node.DataNode = s
s.SetAuthenticationEnabled(cmd.config.Authentication.Enabled)
log.Printf("authentication enabled: %v\n", cmd.config.Authentication.Enabled)
Expand Down
7 changes: 7 additions & 0 deletions commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,15 @@ const (

// Privilege messages
setPrivilegeMessageType = messaging.MessageType(0x90)

// Server messages
dropServerMessageType = messaging.MessageType(0x100)
)

type dropServerCommand struct {
NodeID uint64 `json:"nodeid"`
}

type createDataNodeCommand struct {
URL string `json:"url"`
}
Expand Down
10 changes: 8 additions & 2 deletions influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,15 @@ var (
// ErrServerOpen is returned when opening an already open server.
ErrServerOpen = errors.New("server already open")

// ErrDropServerConflict is returned when removing the server would result in data loss
ErrDropServerConflict = errors.New("removing this server would result in data loss")

// ErrServerClosed is returned when closing an already closed server.
ErrServerClosed = errors.New("server already closed")

// ErrServernotFound is returned when removing/adding a server and it is not found
ErrServerNotFound = errors.New("server not found")

// ErrPathRequired is returned when opening a server without a path.
ErrPathRequired = errors.New("path required")

Expand All @@ -43,8 +49,8 @@ var (
// attempting to join another data node when no data nodes exist yet
ErrDataNodeNotFound = errors.New("data node not found")

// ErrDataNodeRequired is returned when using a blank data node id.
ErrDataNodeRequired = errors.New("data node required")
// ErrServerNodeIDRequired is returned when using a zero server node id.
ErrServerNodeIDRequired = errors.New("server node id must be greater than 0")

// ErrDatabaseNameRequired is returned when creating a database without a name.
ErrDatabaseNameRequired = errors.New("database name required")
Expand Down
18 changes: 18 additions & 0 deletions influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (*DropDatabaseStatement) node() {}
func (*DropMeasurementStatement) node() {}
func (*DropRetentionPolicyStatement) node() {}
func (*DropSeriesStatement) node() {}
func (*DropServerStatement) node() {}
func (*DropUserStatement) node() {}
func (*GrantStatement) node() {}
func (*ShowContinuousQueriesStatement) node() {}
Expand Down Expand Up @@ -171,6 +172,7 @@ func (*DropDatabaseStatement) stmt() {}
func (*DropMeasurementStatement) stmt() {}
func (*DropRetentionPolicyStatement) stmt() {}
func (*DropSeriesStatement) stmt() {}
func (*DropServerStatement) stmt() {}
func (*DropUserStatement) stmt() {}
func (*GrantStatement) stmt() {}
func (*ShowContinuousQueriesStatement) stmt() {}
Expand Down Expand Up @@ -1234,6 +1236,22 @@ func (s DropSeriesStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Name: "", Privilege: WritePrivilege}}
}

// DropServerStatement represents a command for removing a server from the cluster.
type DropServerStatement struct {
// ID of the node to be dropped.
NodeID uint64
}

// String returns a string representation of the drop series statement.
func (s *DropServerStatement) String() string {
return fmt.Sprintf("DROP SERVER %d", s.NodeID)
}

// RequiredPrivileges returns the privilege required to execute a DropServerStatement.
func (s *DropServerStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges}}
}

// ShowContinuousQueriesStatement represents a command for listing continuous queries.
type ShowContinuousQueriesStatement struct{}

Expand Down
35 changes: 29 additions & 6 deletions influxql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ const (

// DateTimeFormat represents the format for date time literals.
DateTimeFormat = "2006-01-02 15:04:05.999999"

// Number bases
base10 = 10

// Bit sizes
bits32 = 32
bits64 = 64
)

// Parser represents an InfluxQL parser.
Expand Down Expand Up @@ -172,6 +179,8 @@ func (p *Parser) parseDropStatement() (Statement, error) {
return p.parseDropRetentionPolicyStatement()
} else if tok == USER {
return p.parseDropUserStatement()
} else if tok == SERVER {
return p.parseDropServerStatement()
}

return nil, newParseError(tokstr(tok, lit), []string{"SERIES", "CONTINUOUS", "MEASUREMENT"}, pos)
Expand Down Expand Up @@ -372,20 +381,20 @@ func (p *Parser) parseInt(min, max int) (int, error) {
return n, nil
}

// parseUInt32 parses a string and returns a 32-bit unsigned integer literal.
func (p *Parser) parseUInt32() (uint32, error) {
// parseUint parses a string and returns a 64-bit unsigned integer.
func (p *Parser) parseUint(bitSize int) (uint64, error) {
tok, pos, lit := p.scanIgnoreWhitespace()
if tok != NUMBER {
return 0, newParseError(tokstr(tok, lit), []string{"number"}, pos)
}

// Convert string to unsigned 32-bit integer
n, err := strconv.ParseUint(lit, 10, 32)
// Convert string to unsigned 64-bit integer
n, err := strconv.ParseUint(lit, base10, bitSize)
if err != nil {
return 0, &ParseError{Message: err.Error(), Pos: pos}
}

return uint32(n), nil
return n, nil
}

// parseUInt64 parses a string and returns a 64-bit unsigned integer literal.
Expand Down Expand Up @@ -1039,7 +1048,7 @@ func (p *Parser) parseDropSeriesStatement() (*DropSeriesStatement, error) {

// If they didn't provide a FROM or a WHERE, they need to provide the SeriesID
if stmt.Condition == nil && stmt.Source == nil {
id, err := p.parseUInt64()
id, err := p.parseUint(bits64)
if err != nil {
return nil, err
}
Expand All @@ -1048,6 +1057,20 @@ func (p *Parser) parseDropSeriesStatement() (*DropSeriesStatement, error) {
return stmt, nil
}

// parseDropServerStatement parses a string and returns a DropServerStatement.
// This function assumes the "DROP SERVER" tokens have already been consumed.
func (p *Parser) parseDropServerStatement() (*DropServerStatement, error) {
s := &DropServerStatement{}
var err error

// Parse the server's ID.
if s.NodeID, err = p.parseUint(bits64); err != nil {
return nil, err
}

return s, nil
}

// parseShowContinuousQueriesStatement parses a string and returns a ShowContinuousQueriesStatement.
// This function assumes the "SHOW CONTINUOUS" tokens have already been consumed.
func (p *Parser) parseShowContinuousQueriesStatement() (*ShowContinuousQueriesStatement, error) {
Expand Down
6 changes: 6 additions & 0 deletions influxql/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,12 @@ func TestParser_ParseStatement(t *testing.T) {
},
},

// DROP SERVER statement
{
s: `DROP SERVER 123`,
stmt: &influxql.DropServerStatement{NodeID: 123},
},

// SHOW CONTINUOUS QUERIES statement
{
s: `SHOW CONTINUOUS QUERIES`,
Expand Down
2 changes: 2 additions & 0 deletions influxql/scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ func TestScanner_Scan(t *testing.T) {
{s: `REVOKE`, tok: influxql.REVOKE},
{s: `SELECT`, tok: influxql.SELECT},
{s: `SERIES`, tok: influxql.SERIES},
{s: `SERVER`, tok: influxql.SERVER},
{s: `SERVERS`, tok: influxql.SERVERS},
{s: `TAG`, tok: influxql.TAG},
{s: `TO`, tok: influxql.TO},
{s: `USER`, tok: influxql.USER},
Expand Down
2 changes: 2 additions & 0 deletions influxql/token.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ const (
REVOKE
SELECT
SERIES
SERVER
SERVERS
SET
SHOW
Expand Down Expand Up @@ -209,6 +210,7 @@ var tokens = [...]string{
REVOKE: "REVOKE",
SELECT: "SELECT",
SERIES: "SERIES",
SERVER: "SERVER",
SERVERS: "SERVERS",
SET: "SET",
SHOW: "SHOW",
Expand Down
Loading