Skip to content

Commit

Permalink
add cluster meta executor
Browse files Browse the repository at this point in the history
  • Loading branch information
dgnorton authored and e-dard committed Feb 19, 2016
1 parent 2da8321 commit e54f91a
Show file tree
Hide file tree
Showing 9 changed files with 382 additions and 6 deletions.
59 changes: 59 additions & 0 deletions cluster/internal/data.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions cluster/internal/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,13 @@ message MapShardResponse {
repeated string TagSets = 4;
repeated string Fields = 5;
}

message ExecuteStatementRequest {
required string Statement = 1;
required string Database = 2;
}

message ExecuteStatementResponse {
required int32 Code = 1;
optional string Message = 2;
}
153 changes: 153 additions & 0 deletions cluster/meta_executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package cluster

import (
"fmt"
"log"
"net"
"os"
"sync"
"time"

"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/services/meta"
)

// MetaExecutor executes meta queries on all data nodes.
type MetaExecutor struct {
mu sync.RWMutex
timeout time.Duration
pool *clientPool
maxConnections int
Logger *log.Logger
Node *influxdb.Node

MetaClient interface {
DataNode(id uint64) (ni *meta.NodeInfo, err error)
DataNodes() ([]meta.NodeInfo, error)
}
}

// NewMetaExecutor returns a new initialized *MetaExecutor.
func NewMetaExecutor() *MetaExecutor {
return &MetaExecutor{
timeout: DefaultWriteTimeout,
pool: newClientPool(),
maxConnections: 1000,
Logger: log.New(os.Stderr, "[meta-executor] ", log.LstdFlags),
}
}

// ExecuteStatement executes a single InfluxQL statement on all nodes in the cluster concurrently.
func (m *MetaExecutor) ExecuteStatement(stmt influxql.Statement, database string) error {
println("MetaExecutor.ExecuteStatement start")
defer println("MetaExecutor.ExecuteStatement end")
// Get a list of all nodes the query needs to be executed on.
nodes, err := m.MetaClient.DataNodes()
if err != nil {
return err
}

// Start a goroutine to execute the statement on each of the remote nodes.
var wg sync.WaitGroup
wg.Add(len(nodes))
errs := make(chan error)
defer close(errs)

for _, node := range nodes {
go func() {
defer wg.Done()
if err := m.executeOnNode(stmt, database, &node); err != nil {
errs <- err
}
}()
}

// Wait on all nodes to execute the statement and respond.
wg.Wait()

select {
case err = <-errs:
return err
default:
return nil
}
}

// executeOnNode executes a single InfluxQL statement on a single node.
func (m *MetaExecutor) executeOnNode(stmt influxql.Statement, database string, node *meta.NodeInfo) error {
println("MetaExecutor.executeOnNode start")
defer println("MetaExecutor.executeOnNode end")
// Executing statement on the local node?
//if node.ID == m.Node.ID {
// panic("fix me !!!!!!!!!!!!!!!!!!!")
//}

// We're executing on a remote node so establish a connection.
c, err := m.dial(node.ID)
if err != nil {
return err
}

conn, ok := c.(*pooledConn)
if !ok {
panic("wrong connection type in MetaExecutor")
}
// Return connection to pool by "closing" it.
defer func(conn net.Conn) { conn.Close() }(conn)

// Build RPC request.
var request ExecuteStatementRequest
request.SetStatement(stmt.String())
request.SetDatabase(database)

// Marshal into protocol buffer.
buf, err := request.MarshalBinary()
if err != nil {
return err
}

// Send request.
conn.SetWriteDeadline(time.Now().Add(m.timeout))
if err := WriteTLV(conn, executeStatementRequestMessage, buf); err != nil {
conn.MarkUnusable()
return err
}

// Read the response.
conn.SetReadDeadline(time.Now().Add(m.timeout))
_, buf, err = ReadTLV(conn)
if err != nil {
conn.MarkUnusable()
return err
}

// Unmarshal response.
var response ExecuteStatementResponse
if err := response.UnmarshalBinary(buf); err != nil {
return err
}

if response.Code() != 0 {
return fmt.Errorf("error code %d: %s", response.Code(), response.Message())
}

return nil
}

// dial returns a connection to a single node in the cluster.
func (m *MetaExecutor) dial(nodeID uint64) (net.Conn, error) {
// If we don't have a connection pool for that addr yet, create one
_, ok := m.pool.getPool(nodeID)
if !ok {
factory := &connFactory{nodeID: nodeID, clientPool: m.pool, timeout: m.timeout}
factory.metaClient = m.MetaClient

p, err := NewBoundedPool(1, m.maxConnections, m.timeout, factory.dial)
if err != nil {
return nil, err
}
m.pool.setPool(nodeID, p)
}
return m.pool.conn(nodeID)
}
41 changes: 41 additions & 0 deletions cluster/meta_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package cluster

import (
"log"
"os"

"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/tsdb"
)

// MetaWriter writes meta query changes to the local tsdb store.
type MetaWriter struct {
TSDBStore *tsdb.Store
Logger *log.Logger

MetaClient interface {
Database(name string) (*meta.DatabaseInfo, error)
}
}

// NewMetaWriter returns a new initialized *MetaWriter.
func NewMetaWriter() *MetaWriter {
return &MetaWriter{
Logger: log.New(os.Stderr, "[meta-writer] ", log.LstdFlags),
}
}

// DropDatabase closes and deletes all local files for the database.
func (m *MetaWriter) DropDatabase(name string) error {
println("MetaWriter.DropDatabase start")
defer println("MetaWriter.DropDatabase end")
dbi, err := m.MetaClient.Database(name)
if err != nil {
return err
} else if dbi == nil {
return nil
}

// Remove the database from the local store
return m.TSDBStore.DeleteDatabase(name)
}
14 changes: 8 additions & 6 deletions cluster/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ type QueryExecutor struct {
// Used for rewriting points back into system for SELECT INTO statements.
PointsWriter *PointsWriter

// Used for executing meta statements on all data nodes.
MetaExecutor *MetaExecutor

// Output of all logging.
// Defaults to discarding all log output.
LogOutput io.Writer
Expand Down Expand Up @@ -277,6 +280,8 @@ func (e *QueryExecutor) executeDropContinuousQueryStatement(q *influxql.DropCont
}

func (e *QueryExecutor) executeDropDatabaseStatement(stmt *influxql.DropDatabaseStatement) error {
println("cluster.QueryExecutor.executeDropDatabaseStatement start")
defer println("cluster.QueryExecutor.executeDropDatabaseStatement end")
dbi, err := e.MetaClient.Database(stmt.Name)
if err != nil {
return err
Expand All @@ -293,12 +298,9 @@ func (e *QueryExecutor) executeDropDatabaseStatement(stmt *influxql.DropDatabase
return err
}

// Remove the database from the local store
if err := e.TSDBStore.DeleteDatabase(stmt.Name); err != nil {
return err
}

return nil
// Tell all data nodes in the cluster to
// delete their local files for this database.
return e.MetaExecutor.ExecuteStatement(stmt, "")
}

// executeDropRetentionPolicy closes all local shards for the retention
Expand Down
62 changes: 62 additions & 0 deletions cluster/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,65 @@ func (w *WriteShardResponse) UnmarshalBinary(buf []byte) error {
}
return nil
}

// ExecuteStatementRequest represents the a request to execute a statement on a node.
type ExecuteStatementRequest struct {
pb internal.ExecuteStatementRequest
}

// Statement returns the InfluxQL statement.
func (r *ExecuteStatementRequest) Statement() string { return r.pb.GetStatement() }

// SetStatement sets the InfluxQL statement.
func (r *ExecuteStatementRequest) SetStatement(statement string) {
r.pb.Statement = proto.String(statement)
}

// Database returns the database name.
func (r *ExecuteStatementRequest) Database() string { return r.pb.GetDatabase() }

// SetDatabase sets the database name.
func (r *ExecuteStatementRequest) SetDatabase(database string) { r.pb.Database = proto.String(database) }

// MarshalBinary encodes the object to a binary format.
func (m *ExecuteStatementRequest) MarshalBinary() ([]byte, error) {
return proto.Marshal(&m.pb)
}

// UnmarshalBinary populates ExecuteStatementRequest from a binary format.
func (m *ExecuteStatementRequest) UnmarshalBinary(buf []byte) error {
if err := proto.Unmarshal(buf, &m.pb); err != nil {
return err
}
return nil
}

// ExecuteStatementResponse represents the response returned from a remote ExecuteStatementRequest call.
type ExecuteStatementResponse struct {
pb internal.WriteShardResponse
}

// Code returns the response code.
func (w *ExecuteStatementResponse) Code() int { return int(w.pb.GetCode()) }

// SetCode sets the Code
func (w *ExecuteStatementResponse) SetCode(code int) { w.pb.Code = proto.Int32(int32(code)) }

// Message returns the repsonse message.
func (w *ExecuteStatementResponse) Message() string { return w.pb.GetMessage() }

// SetMessage sets the Message
func (w *ExecuteStatementResponse) SetMessage(message string) { w.pb.Message = &message }

// MarshalBinary encodes the object to a binary format.
func (m *ExecuteStatementResponse) MarshalBinary() ([]byte, error) {
return proto.Marshal(&m.pb)
}

// UnmarshalBinary populates ExecuteStatementResponse from a binary format.
func (m *ExecuteStatementResponse) UnmarshalBinary(buf []byte) error {
if err := proto.Unmarshal(buf, &m.pb); err != nil {
return err
}
return nil
}
Loading

0 comments on commit e54f91a

Please sign in to comment.