Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(WIP) add cluster meta executor #5746

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions cluster/internal/data.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions cluster/internal/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,13 @@ message MapShardResponse {
repeated string TagSets = 4;
repeated string Fields = 5;
}

message ExecuteStatementRequest {
required string Statement = 1;
required string Database = 2;
}

message ExecuteStatementResponse {
required int32 Code = 1;
optional string Message = 2;
}
154 changes: 154 additions & 0 deletions cluster/meta_executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package cluster

import (
"fmt"
"log"
"net"
"os"
"sync"
"time"

"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/services/meta"
)

// MetaExecutor executes meta queries on all data nodes.
type MetaExecutor struct {
mu sync.RWMutex
timeout time.Duration
pool *clientPool
maxConnections int
Logger *log.Logger
Node *influxdb.Node

MetaClient interface {
DataNode(id uint64) (ni *meta.NodeInfo, err error)
DataNodes() ([]meta.NodeInfo, error)
}
}

// NewMetaExecutor returns a new initialized *MetaExecutor.
func NewMetaExecutor() *MetaExecutor {
return &MetaExecutor{
timeout: DefaultWriteTimeout,
pool: newClientPool(),
maxConnections: 1000,
Logger: log.New(os.Stderr, "[meta-executor] ", log.LstdFlags),
}
}

// ExecuteStatement executes a single InfluxQL statement on all nodes in the cluster concurrently.
func (m *MetaExecutor) ExecuteStatement(stmt influxql.Statement, database string) error {
println("MetaExecutor.ExecuteStatement start")
defer println("MetaExecutor.ExecuteStatement end")
// Get a list of all nodes the query needs to be executed on.
nodes, err := m.MetaClient.DataNodes()
if err != nil {
return err
}

// Start a goroutine to execute the statement on each of the remote nodes.
var wg sync.WaitGroup
wg.Add(len(nodes))
errs := make(chan error)
defer close(errs)

for _, node := range nodes {
go func(node meta.NodeInfo) {
defer wg.Done()
if err := m.executeOnNode(stmt, database, &node); err != nil {
errs <- err
}
}(node)
}

// Wait on all nodes to execute the statement and respond.
wg.Wait()

select {
case err = <-errs:
return err
default:
return nil
}
}

// executeOnNode executes a single InfluxQL statement on a single node.
func (m *MetaExecutor) executeOnNode(stmt influxql.Statement, database string, node *meta.NodeInfo) error {
println("MetaExecutor.executeOnNode start")
defer println("MetaExecutor.executeOnNode end")
// Executing statement on the local node?
//if node.ID == m.Node.ID {
// panic("fix me !!!!!!!!!!!!!!!!!!!")
//}

// We're executing on a remote node so establish a connection.
fmt.Printf("dialing: %v\n", node)
c, err := m.dial(node.ID)
if err != nil {
return err
}

conn, ok := c.(*pooledConn)
if !ok {
panic("wrong connection type in MetaExecutor")
}
// Return connection to pool by "closing" it.
defer func(conn net.Conn) { conn.Close() }(conn)

// Build RPC request.
var request ExecuteStatementRequest
request.SetStatement(stmt.String())
request.SetDatabase(database)

// Marshal into protocol buffer.
buf, err := request.MarshalBinary()
if err != nil {
return err
}

// Send request.
conn.SetWriteDeadline(time.Now().Add(m.timeout))
if err := WriteTLV(conn, executeStatementRequestMessage, buf); err != nil {
conn.MarkUnusable()
return err
}

// Read the response.
conn.SetReadDeadline(time.Now().Add(m.timeout))
_, buf, err = ReadTLV(conn)
if err != nil {
conn.MarkUnusable()
return err
}

// Unmarshal response.
var response ExecuteStatementResponse
if err := response.UnmarshalBinary(buf); err != nil {
return err
}

if response.Code() != 0 {
return fmt.Errorf("error code %d: %s", response.Code(), response.Message())
}

return nil
}

// dial returns a connection to a single node in the cluster.
func (m *MetaExecutor) dial(nodeID uint64) (net.Conn, error) {
// If we don't have a connection pool for that addr yet, create one
_, ok := m.pool.getPool(nodeID)
if !ok {
factory := &connFactory{nodeID: nodeID, clientPool: m.pool, timeout: m.timeout}
factory.metaClient = m.MetaClient

p, err := NewBoundedPool(1, m.maxConnections, m.timeout, factory.dial)
if err != nil {
return nil, err
}
m.pool.setPool(nodeID, p)
}
return m.pool.conn(nodeID)
}
33 changes: 33 additions & 0 deletions cluster/meta_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package cluster

import (
"log"
"os"

"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/tsdb"
)

// MetaWriter writes meta query changes to the local tsdb store.
type MetaWriter struct {
TSDBStore *tsdb.Store
Logger *log.Logger

MetaClient interface {
Database(name string) (*meta.DatabaseInfo, error)
}
}

// NewMetaWriter returns a new initialized *MetaWriter.
func NewMetaWriter() *MetaWriter {
return &MetaWriter{
Logger: log.New(os.Stderr, "[meta-writer] ", log.LstdFlags),
}
}

// DropDatabase closes and deletes all local files for the database.
func (m *MetaWriter) DropDatabase(name string) error {
println("MetaWriter.DropDatabase start")
defer println("MetaWriter.DropDatabase end")
return m.TSDBStore.DeleteDatabase(name)
}
14 changes: 8 additions & 6 deletions cluster/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ type QueryExecutor struct {
// Used for rewriting points back into system for SELECT INTO statements.
PointsWriter *PointsWriter

// Used for executing meta statements on all data nodes.
MetaExecutor *MetaExecutor

// Output of all logging.
// Defaults to discarding all log output.
LogOutput io.Writer
Expand Down Expand Up @@ -277,6 +280,8 @@ func (e *QueryExecutor) executeDropContinuousQueryStatement(q *influxql.DropCont
}

func (e *QueryExecutor) executeDropDatabaseStatement(stmt *influxql.DropDatabaseStatement) error {
println("cluster.QueryExecutor.executeDropDatabaseStatement start")
defer println("cluster.QueryExecutor.executeDropDatabaseStatement end")
dbi, err := e.MetaClient.Database(stmt.Name)
if err != nil {
return err
Expand All @@ -293,12 +298,9 @@ func (e *QueryExecutor) executeDropDatabaseStatement(stmt *influxql.DropDatabase
return err
}

// Remove the database from the local store
if err := e.TSDBStore.DeleteDatabase(stmt.Name); err != nil {
return err
}

return nil
// Tell all data nodes in the cluster to
// delete their local files for this database.
return e.MetaExecutor.ExecuteStatement(stmt, "")
}

// executeDropRetentionPolicy closes all local shards for the retention
Expand Down
62 changes: 62 additions & 0 deletions cluster/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,65 @@ func (w *WriteShardResponse) UnmarshalBinary(buf []byte) error {
}
return nil
}

// ExecuteStatementRequest represents the a request to execute a statement on a node.
type ExecuteStatementRequest struct {
pb internal.ExecuteStatementRequest
}

// Statement returns the InfluxQL statement.
func (r *ExecuteStatementRequest) Statement() string { return r.pb.GetStatement() }

// SetStatement sets the InfluxQL statement.
func (r *ExecuteStatementRequest) SetStatement(statement string) {
r.pb.Statement = proto.String(statement)
}

// Database returns the database name.
func (r *ExecuteStatementRequest) Database() string { return r.pb.GetDatabase() }

// SetDatabase sets the database name.
func (r *ExecuteStatementRequest) SetDatabase(database string) { r.pb.Database = proto.String(database) }

// MarshalBinary encodes the object to a binary format.
func (m *ExecuteStatementRequest) MarshalBinary() ([]byte, error) {
return proto.Marshal(&m.pb)
}

// UnmarshalBinary populates ExecuteStatementRequest from a binary format.
func (m *ExecuteStatementRequest) UnmarshalBinary(buf []byte) error {
if err := proto.Unmarshal(buf, &m.pb); err != nil {
return err
}
return nil
}

// ExecuteStatementResponse represents the response returned from a remote ExecuteStatementRequest call.
type ExecuteStatementResponse struct {
pb internal.WriteShardResponse
}

// Code returns the response code.
func (w *ExecuteStatementResponse) Code() int { return int(w.pb.GetCode()) }

// SetCode sets the Code
func (w *ExecuteStatementResponse) SetCode(code int) { w.pb.Code = proto.Int32(int32(code)) }

// Message returns the repsonse message.
func (w *ExecuteStatementResponse) Message() string { return w.pb.GetMessage() }

// SetMessage sets the Message
func (w *ExecuteStatementResponse) SetMessage(message string) { w.pb.Message = &message }

// MarshalBinary encodes the object to a binary format.
func (m *ExecuteStatementResponse) MarshalBinary() ([]byte, error) {
return proto.Marshal(&m.pb)
}

// UnmarshalBinary populates ExecuteStatementResponse from a binary format.
func (m *ExecuteStatementResponse) UnmarshalBinary(buf []byte) error {
if err := proto.Unmarshal(buf, &m.pb); err != nil {
return err
}
return nil
}
Loading