Skip to content

Commit

Permalink
Merge pull request #3376 from influxdb/mapping_over_tcp
Browse files Browse the repository at this point in the history
Remote shard mapping over TCP
  • Loading branch information
otoolep committed Jul 20, 2015
2 parents 5206c0e + 10eecb4 commit 30345f5
Show file tree
Hide file tree
Showing 15 changed files with 469 additions and 236 deletions.
12 changes: 9 additions & 3 deletions cluster/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,24 @@ const (

// DefaultShardWriterTimeout is the default timeout set on shard writers.
DefaultShardWriterTimeout = 5 * time.Second

// DefaultShardMapperTimeout is the default timeout set on shard mappers.
DefaultShardMapperTimeout = 5 * time.Second
)

// Config represents the configuration for the the clustering service.
// Config represents the configuration for the clustering service.
type Config struct {
WriteTimeout toml.Duration `toml:"write-timeout"`
ShardWriterTimeout toml.Duration `toml:"shard-writer-timeout"`
ForceRemoteShardMapping bool `toml:"force-remote-mapping"`
WriteTimeout toml.Duration `toml:"write-timeout"`
ShardWriterTimeout toml.Duration `toml:"shard-writer-timeout"`
ShardMapperTimeout toml.Duration `toml:"shard-mapper-timeout"`
}

// NewConfig returns an instance of Config with defaults.
func NewConfig() Config {
return Config{
WriteTimeout: toml.Duration(DefaultWriteTimeout),
ShardWriterTimeout: toml.Duration(DefaultShardWriterTimeout),
ShardMapperTimeout: toml.Duration(DefaultShardMapperTimeout),
}
}
74 changes: 74 additions & 0 deletions cluster/internal/data.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions cluster/internal/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,15 @@ message WriteShardResponse {
optional string Message = 2;
}

message MapShardRequest {
required uint64 ShardID = 1;
required string Query = 2;
required int32 ChunkSize = 3;
}

message MapShardResponse {
required int32 Code = 1;
optional string Message = 2;
optional bytes Data = 3;
repeated string TagSets = 4;
}
61 changes: 61 additions & 0 deletions cluster/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,67 @@ import (

//go:generate protoc --gogo_out=. internal/data.proto

// MapShardRequest represents the request to map a remote shard for a query.
type MapShardRequest struct {
pb internal.MapShardRequest
}

func (m *MapShardRequest) ShardID() uint64 { return m.pb.GetShardID() }
func (m *MapShardRequest) Query() string { return m.pb.GetQuery() }
func (m *MapShardRequest) ChunkSize() int32 { return m.pb.GetChunkSize() }

func (m *MapShardRequest) SetShardID(id uint64) { m.pb.ShardID = &id }
func (m *MapShardRequest) SetQuery(query string) { m.pb.Query = &query }
func (m *MapShardRequest) SetChunkSize(chunkSize int32) { m.pb.ChunkSize = &chunkSize }

// MarshalBinary encodes the object to a binary format.
func (m *MapShardRequest) MarshalBinary() ([]byte, error) {
return proto.Marshal(&m.pb)
}

// UnmarshalBinary populates MapShardRequest from a binary format.
func (m *MapShardRequest) UnmarshalBinary(buf []byte) error {
if err := proto.Unmarshal(buf, &m.pb); err != nil {
return err
}
return nil
}

// MapShardResponse represents the response returned from a remote MapShardRequest call
type MapShardResponse struct {
pb internal.MapShardResponse
}

func NewMapShardResponse(code int, message string) *MapShardResponse {
m := &MapShardResponse{}
m.SetCode(code)
m.SetMessage(message)
return m
}

func (r *MapShardResponse) Code() int { return int(r.pb.GetCode()) }
func (r *MapShardResponse) Message() string { return r.pb.GetMessage() }
func (r *MapShardResponse) TagSets() []string { return r.pb.GetTagSets() }
func (r *MapShardResponse) Data() []byte { return r.pb.GetData() }

func (r *MapShardResponse) SetCode(code int) { r.pb.Code = proto.Int32(int32(code)) }
func (r *MapShardResponse) SetMessage(message string) { r.pb.Message = &message }
func (r *MapShardResponse) SetTagSets(tagsets []string) { r.pb.TagSets = tagsets }
func (r *MapShardResponse) SetData(data []byte) { r.pb.Data = data }

// MarshalBinary encodes the object to a binary format.
func (r *MapShardResponse) MarshalBinary() ([]byte, error) {
return proto.Marshal(&r.pb)
}

// UnmarshalBinary populates WritePointRequest from a binary format.
func (r *MapShardResponse) UnmarshalBinary(buf []byte) error {
if err := proto.Unmarshal(buf, &r.pb); err != nil {
return err
}
return nil
}

// WritePointsRequest represents a request to write point data to the cluster
type WritePointsRequest struct {
Database string
Expand Down
72 changes: 72 additions & 0 deletions cluster/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cluster

import (
"encoding/binary"
"encoding/json"
"fmt"
"io"
"log"
Expand Down Expand Up @@ -36,6 +37,7 @@ type Service struct {
TSDBStore interface {
CreateShard(database, policy string, shardID uint64) error
WriteToShard(shardID uint64, points []tsdb.Point) error
CreateMapper(shardID uint64, query string, chunkSize int) (tsdb.Mapper, error)
}

Logger *log.Logger
Expand Down Expand Up @@ -144,6 +146,14 @@ func (s *Service) handleConn(conn net.Conn) {
s.Logger.Printf("process write shard error: %s", err)
}
s.writeShardResponse(conn, err)
case mapShardRequestMessage:
err := s.processMapShardRequest(conn, buf)
if err != nil {
s.Logger.Printf("process map shard error: %s", err)
if err := writeMapShardResponseMessage(conn, NewMapShardResponse(1, err.Error())); err != nil {
s.Logger.Printf("process map shard error writing response: %s", err.Error())
}
}
default:
s.Logger.Printf("cluster service message type not found: %d", typ)
}
Expand Down Expand Up @@ -213,6 +223,68 @@ func (s *Service) writeShardResponse(w io.Writer, e error) {
}
}

func (s *Service) processMapShardRequest(w io.Writer, buf []byte) error {
// Decode request
var req MapShardRequest
if err := req.UnmarshalBinary(buf); err != nil {
return err
}

m, err := s.TSDBStore.CreateMapper(req.ShardID(), req.Query(), int(req.ChunkSize()))
if err != nil {
return fmt.Errorf("create mapper: %s", err)
}
if m == nil {
return writeMapShardResponseMessage(w, NewMapShardResponse(0, ""))
}

if err := m.Open(); err != nil {
return fmt.Errorf("mapper open: %s", err)
}
defer m.Close()

var tagSetsSent bool
for {
var resp MapShardResponse

if !tagSetsSent {
resp.SetTagSets(m.TagSets())
tagSetsSent = true
}

chunk, err := m.NextChunk()
if err != nil {
return fmt.Errorf("next chunk: %s", err)
}
if chunk != nil {
b, err := json.Marshal(chunk)
if err != nil {
return fmt.Errorf("encoding: %s", err)
}
resp.SetData(b)
}

// Write to connection.
resp.SetCode(0)
if err := writeMapShardResponseMessage(w, &resp); err != nil {
return err
}

if chunk == nil {
// All mapper data sent.
return nil
}
}
}

func writeMapShardResponseMessage(w io.Writer, msg *MapShardResponse) error {
buf, err := msg.MarshalBinary()
if err != nil {
return err
}
return WriteTLV(w, mapShardResponseMessage, buf)
}

// ReadTLV reads a type-length-value record from r.
func ReadTLV(r io.Reader) (byte, []byte, error) {
var typ [1]byte
Expand Down
15 changes: 10 additions & 5 deletions cluster/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ func (m *metaStore) Node(nodeID uint64) (*meta.NodeInfo, error) {
}

type testService struct {
nodeID uint64
ln net.Listener
muxln net.Listener
writeShardFunc func(shardID uint64, points []tsdb.Point) error
createShardFunc func(database, policy string, shardID uint64) error
nodeID uint64
ln net.Listener
muxln net.Listener
writeShardFunc func(shardID uint64, points []tsdb.Point) error
createShardFunc func(database, policy string, shardID uint64) error
createMapperFunc func(shardID uint64, query string, chunkSize int) (tsdb.Mapper, error)
}

func newTestWriteService(f func(shardID uint64, points []tsdb.Point) error) testService {
Expand Down Expand Up @@ -68,6 +69,10 @@ func (t testService) CreateShard(database, policy string, shardID uint64) error
return t.createShardFunc(database, policy, shardID)
}

func (t testService) CreateMapper(shardID uint64, query string, chunkSize int) (tsdb.Mapper, error) {
return t.createMapperFunc(shardID, query, chunkSize)
}

func writeShardSuccess(shardID uint64, points []tsdb.Point) error {
responses <- &serviceResponse{
shardID: shardID,
Expand Down
Loading

0 comments on commit 30345f5

Please sign in to comment.