Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remote shard mapping over TCP #3376

Merged
merged 2 commits into from
Jul 20, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions cluster/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,24 @@ const (

// DefaultShardWriterTimeout is the default timeout set on shard writers.
DefaultShardWriterTimeout = 5 * time.Second

// DefaultShardMapperTimeout is the default timeout set on shard mappers.
DefaultShardMapperTimeout = 5 * time.Second
)

// Config represents the configuration for the the clustering service.
// Config represents the configuration for the clustering service.
type Config struct {
WriteTimeout toml.Duration `toml:"write-timeout"`
ShardWriterTimeout toml.Duration `toml:"shard-writer-timeout"`
ForceRemoteShardMapping bool `toml:"force-remote-mapping"`
WriteTimeout toml.Duration `toml:"write-timeout"`
ShardWriterTimeout toml.Duration `toml:"shard-writer-timeout"`
ShardMapperTimeout toml.Duration `toml:"shard-mapper-timeout"`
}

// NewConfig returns an instance of Config with defaults.
func NewConfig() Config {
return Config{
WriteTimeout: toml.Duration(DefaultWriteTimeout),
ShardWriterTimeout: toml.Duration(DefaultShardWriterTimeout),
ShardMapperTimeout: toml.Duration(DefaultShardMapperTimeout),
}
}
74 changes: 74 additions & 0 deletions cluster/internal/data.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions cluster/internal/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,15 @@ message WriteShardResponse {
optional string Message = 2;
}

message MapShardRequest {
required uint64 ShardID = 1;
required string Query = 2;
required int32 ChunkSize = 3;
}

message MapShardResponse {
required int32 Code = 1;
optional string Message = 2;
optional bytes Data = 3;
repeated string TagSets = 4;
}
61 changes: 61 additions & 0 deletions cluster/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,67 @@ import (

//go:generate protoc --gogo_out=. internal/data.proto

// MapShardRequest represents the request to map a remote shard for a query.
type MapShardRequest struct {
pb internal.MapShardRequest
}

func (m *MapShardRequest) ShardID() uint64 { return m.pb.GetShardID() }
func (m *MapShardRequest) Query() string { return m.pb.GetQuery() }
func (m *MapShardRequest) ChunkSize() int32 { return m.pb.GetChunkSize() }

func (m *MapShardRequest) SetShardID(id uint64) { m.pb.ShardID = &id }
func (m *MapShardRequest) SetQuery(query string) { m.pb.Query = &query }
func (m *MapShardRequest) SetChunkSize(chunkSize int32) { m.pb.ChunkSize = &chunkSize }

// MarshalBinary encodes the object to a binary format.
func (m *MapShardRequest) MarshalBinary() ([]byte, error) {
return proto.Marshal(&m.pb)
}

// UnmarshalBinary populates MapShardRequest from a binary format.
func (m *MapShardRequest) UnmarshalBinary(buf []byte) error {
if err := proto.Unmarshal(buf, &m.pb); err != nil {
return err
}
return nil
}

// MapShardResponse represents the response returned from a remote MapShardRequest call
type MapShardResponse struct {
pb internal.MapShardResponse
}

func NewMapShardResponse(code int, message string) *MapShardResponse {
m := &MapShardResponse{}
m.SetCode(code)
m.SetMessage(message)
return m
}

func (r *MapShardResponse) Code() int { return int(r.pb.GetCode()) }
func (r *MapShardResponse) Message() string { return r.pb.GetMessage() }
func (r *MapShardResponse) TagSets() []string { return r.pb.GetTagSets() }
func (r *MapShardResponse) Data() []byte { return r.pb.GetData() }

func (r *MapShardResponse) SetCode(code int) { r.pb.Code = proto.Int32(int32(code)) }
func (r *MapShardResponse) SetMessage(message string) { r.pb.Message = &message }
func (r *MapShardResponse) SetTagSets(tagsets []string) { r.pb.TagSets = tagsets }
func (r *MapShardResponse) SetData(data []byte) { r.pb.Data = data }

// MarshalBinary encodes the object to a binary format.
func (r *MapShardResponse) MarshalBinary() ([]byte, error) {
return proto.Marshal(&r.pb)
}

// UnmarshalBinary populates WritePointRequest from a binary format.
func (r *MapShardResponse) UnmarshalBinary(buf []byte) error {
if err := proto.Unmarshal(buf, &r.pb); err != nil {
return err
}
return nil
}

// WritePointsRequest represents a request to write point data to the cluster
type WritePointsRequest struct {
Database string
Expand Down
72 changes: 72 additions & 0 deletions cluster/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cluster

import (
"encoding/binary"
"encoding/json"
"fmt"
"io"
"log"
Expand Down Expand Up @@ -36,6 +37,7 @@ type Service struct {
TSDBStore interface {
CreateShard(database, policy string, shardID uint64) error
WriteToShard(shardID uint64, points []tsdb.Point) error
CreateMapper(shardID uint64, query string, chunkSize int) (tsdb.Mapper, error)
}

Logger *log.Logger
Expand Down Expand Up @@ -144,6 +146,14 @@ func (s *Service) handleConn(conn net.Conn) {
s.Logger.Printf("process write shard error: %s", err)
}
s.writeShardResponse(conn, err)
case mapShardRequestMessage:
err := s.processMapShardRequest(conn, buf)
if err != nil {
s.Logger.Printf("process map shard error: %s", err)
if err := writeMapShardResponseMessage(conn, NewMapShardResponse(1, err.Error())); err != nil {
s.Logger.Printf("process map shard error writing response: %s", err.Error())
}
}
default:
s.Logger.Printf("cluster service message type not found: %d", typ)
}
Expand Down Expand Up @@ -213,6 +223,68 @@ func (s *Service) writeShardResponse(w io.Writer, e error) {
}
}

func (s *Service) processMapShardRequest(w io.Writer, buf []byte) error {
// Decode request
var req MapShardRequest
if err := req.UnmarshalBinary(buf); err != nil {
return err
}

m, err := s.TSDBStore.CreateMapper(req.ShardID(), req.Query(), int(req.ChunkSize()))
if err != nil {
return fmt.Errorf("create mapper: %s", err)
}
if m == nil {
return writeMapShardResponseMessage(w, NewMapShardResponse(0, ""))
}

if err := m.Open(); err != nil {
return fmt.Errorf("mapper open: %s", err)
}
defer m.Close()

var tagSetsSent bool
for {
var resp MapShardResponse

if !tagSetsSent {
resp.SetTagSets(m.TagSets())
tagSetsSent = true
}

chunk, err := m.NextChunk()
if err != nil {
return fmt.Errorf("next chunk: %s", err)
}
if chunk != nil {
b, err := json.Marshal(chunk)
if err != nil {
return fmt.Errorf("encoding: %s", err)
}
resp.SetData(b)
}

// Write to connection.
resp.SetCode(0)
if err := writeMapShardResponseMessage(w, &resp); err != nil {
return err
}

if chunk == nil {
// All mapper data sent.
return nil
}
}
}

func writeMapShardResponseMessage(w io.Writer, msg *MapShardResponse) error {
buf, err := msg.MarshalBinary()
if err != nil {
return err
}
return WriteTLV(w, mapShardResponseMessage, buf)
}

// ReadTLV reads a type-length-value record from r.
func ReadTLV(r io.Reader) (byte, []byte, error) {
var typ [1]byte
Expand Down
15 changes: 10 additions & 5 deletions cluster/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ func (m *metaStore) Node(nodeID uint64) (*meta.NodeInfo, error) {
}

type testService struct {
nodeID uint64
ln net.Listener
muxln net.Listener
writeShardFunc func(shardID uint64, points []tsdb.Point) error
createShardFunc func(database, policy string, shardID uint64) error
nodeID uint64
ln net.Listener
muxln net.Listener
writeShardFunc func(shardID uint64, points []tsdb.Point) error
createShardFunc func(database, policy string, shardID uint64) error
createMapperFunc func(shardID uint64, query string, chunkSize int) (tsdb.Mapper, error)
}

func newTestWriteService(f func(shardID uint64, points []tsdb.Point) error) testService {
Expand Down Expand Up @@ -68,6 +69,10 @@ func (t testService) CreateShard(database, policy string, shardID uint64) error
return t.createShardFunc(database, policy, shardID)
}

func (t testService) CreateMapper(shardID uint64, query string, chunkSize int) (tsdb.Mapper, error) {
return t.createMapperFunc(shardID, query, chunkSize)
}

func writeShardSuccess(shardID uint64, points []tsdb.Point) error {
responses <- &serviceResponse{
shardID: shardID,
Expand Down
Loading