Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to copy shard via rpc calls. Remove deprecated copier service. #6502

Merged
merged 4 commits into from
May 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ With this release InfluxDB is moving to Go v1.6.
- [#6534](https://github.com/influxdata/influxdb/pull/6534): Move to Go v1.6.2 (over Go v1.4.3)
- [#6522](https://github.com/influxdata/influxdb/pull/6522): Dump TSM files to line protocol
- [#6585](https://github.com/influxdata/influxdb/pull/6585): Parallelize iterators
- [#6502](https://github.com/influxdata/influxdb/pull/6502): Add ability to copy shard via rpc calls. Remove deprecated copier service..

### Bugfixes

Expand Down
256 changes: 157 additions & 99 deletions cluster/internal/data.pb.go

Large diffs are not rendered by default.

21 changes: 21 additions & 0 deletions cluster/internal/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,24 @@ message RemoteMonitorRequest {
message RemoteMonitorResponse {
optional string Err = 1;
}

message BackupShardRequest {
required uint64 ShardID = 1;
optional int64 Since = 2;
}

message BackupShardResponse {
optional string Err = 2;
}

message CopyShardRequest {
required string Host = 1;
required string Database = 2;
required string Policy = 3;
required uint64 ShardID = 4;
optional int64 Since = 5;
}

message CopyShardResponse {
optional string Err = 2;
}
86 changes: 86 additions & 0 deletions cluster/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,3 +556,89 @@ func (r *RemoteMonitorResponse) UnmarshalBinary(data []byte) error {
}
return nil
}

// BackupShardRequest represents a request to stream a backup of a single shard.
type BackupShardRequest struct {
ShardID uint64
Since time.Time
}

// MarshalBinary encodes r to a binary format.
func (r *BackupShardRequest) MarshalBinary() ([]byte, error) {
return proto.Marshal(&internal.BackupShardRequest{
ShardID: proto.Uint64(r.ShardID),
Since: proto.Int64(r.Since.UnixNano()),
})
}

// UnmarshalBinary decodes data into r.
func (r *BackupShardRequest) UnmarshalBinary(data []byte) error {
var pb internal.BackupShardRequest
if err := proto.Unmarshal(data, &pb); err != nil {
return err
}

r.ShardID = pb.GetShardID()
r.Since = time.Unix(0, pb.GetSince())
return nil
}

// CopyShardRequest represents a request to copy a shard from another host.
type CopyShardRequest struct {
Host string
Database string
Policy string
ShardID uint64
Since time.Time
}

// MarshalBinary encodes r to a binary format.
func (r *CopyShardRequest) MarshalBinary() ([]byte, error) {
return proto.Marshal(&internal.CopyShardRequest{
Host: proto.String(r.Host),
Database: proto.String(r.Database),
Policy: proto.String(r.Policy),
ShardID: proto.Uint64(r.ShardID),
Since: proto.Int64(r.Since.UnixNano()),
})
}

// UnmarshalBinary decodes data into r.
func (r *CopyShardRequest) UnmarshalBinary(data []byte) error {
var pb internal.CopyShardRequest
if err := proto.Unmarshal(data, &pb); err != nil {
return err
}

r.Host = pb.GetHost()
r.Database = pb.GetDatabase()
r.Policy = pb.GetPolicy()
r.ShardID = pb.GetShardID()
r.Since = time.Unix(0, pb.GetSince())
return nil
}

// CopyShardResponse represents a response from a shard Copy.
type CopyShardResponse struct {
Err error
}

func (r *CopyShardResponse) MarshalBinary() ([]byte, error) {
var pb internal.CopyShardResponse
if r.Err != nil {
pb.Err = proto.String(r.Err.Error())
}
return proto.Marshal(&pb)
}

func (r *CopyShardResponse) UnmarshalBinary(data []byte) error {
var pb internal.CopyShardResponse

if err := proto.Unmarshal(data, &pb); err != nil {
return err
}
if pb.Err != nil {
r.Err = errors.New(pb.GetErr())
}
return nil
}
167 changes: 167 additions & 0 deletions cluster/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os"
"strings"
"sync"
"time"

"github.com/influxdata/influxdb/monitor"

Expand Down Expand Up @@ -40,6 +41,15 @@ const (

seriesKeysReq = "seriesKeysReq"
seriesKeysResp = "seriesKeysResp"

expandSourcesReq = "expandSourcesReq"
expandSourcesReqexpandSourcesResp = "expandSourcesResp"

backupShardReq = "backupShardReq"
backupShardReqbackupShardResp = "backupShardResp"

copyShardReq = "copyShardReq"
copyShardResp = "copyShardResp"
)

const (
Expand All @@ -60,8 +70,20 @@ const (

remoteMonitorRequestMessage
remoteMonitorResponseMessage

expandSourcesRequestMessage
expandSourcesResponseMessage

backupShardRequestMessage
backupShardResponseMessage

copyShardRequestMessage
copyShardResponseMessage
)

// BackupTimeout is the time before a connection times out when performing a backup.
const BackupTimeout = 30 * time.Second

// Service processes data received over raw TCP connections.
type Service struct {
mu sync.RWMutex
Expand Down Expand Up @@ -227,6 +249,18 @@ func (s *Service) handleConn(conn net.Conn) {
s.Logger.Printf("process write shard error: %s", err)
}
s.writeRemoteMonitorResponse(conn, err)
case expandSourcesRequestMessage:
s.statMap.Add(expandSourcesReq, 1)
s.processExpandSourcesRequest(conn)
return
case backupShardRequestMessage:
s.statMap.Add(backupShardReq, 1)
s.processBackupShardRequest(conn)
return
case copyShardRequestMessage:
s.statMap.Add(copyShardReq, 1)
s.processCopyShardRequest(conn)
return
default:
s.Logger.Printf("cluster service message type not found: %d", typ)
}
Expand Down Expand Up @@ -549,6 +583,139 @@ func (s *Service) writeRemoteMonitorResponse(w io.Writer, e error) {
}
}

func (s *Service) processExpandSourcesRequest(conn net.Conn) {
var sources influxql.Sources
if err := func() error {
// Parse request.
var req ExpandSourcesRequest
if err := DecodeLV(conn, &req); err != nil {
return err
}

// Collect iterator creators for each shard.
ics := make([]influxql.IteratorCreator, 0, len(req.ShardIDs))
for _, shardID := range req.ShardIDs {
ic := s.TSDBStore.ShardIteratorCreator(shardID)
if ic == nil {
return nil
}
ics = append(ics, ic)
}

// Expand sources from all shards.
a, err := influxql.IteratorCreators(ics).ExpandSources(req.Sources)
if err != nil {
return err
}
sources = a

return nil
}(); err != nil {
s.Logger.Printf("error reading ExpandSources request: %s", err)
EncodeTLV(conn, expandSourcesResponseMessage, &ExpandSourcesResponse{Err: err})
return
}

// Encode success response.
if err := EncodeTLV(conn, expandSourcesResponseMessage, &ExpandSourcesResponse{
Sources: sources,
}); err != nil {
s.Logger.Printf("error writing ExpandSources response: %s", err)
return
}
}

func (s *Service) processBackupShardRequest(conn net.Conn) {
if err := func() error {
// Parse request.
var req BackupShardRequest
if err := DecodeLV(conn, &req); err != nil {
return err
}

// Backup from local shard to the connection.
if err := s.TSDBStore.BackupShard(req.ShardID, req.Since, conn); err != nil {
return err
}

return nil
}(); err != nil {
s.Logger.Printf("error processing BackupShardRequest: %s", err)
return
}
}

func (s *Service) processCopyShardRequest(conn net.Conn) {
if err := func() error {
// Parse request.
var req CopyShardRequest
if err := DecodeLV(conn, &req); err != nil {
return err
}

// Begin streaming backup from remote server.
r, err := s.backupRemoteShard(req.Host, req.ShardID, req.Since)
if err != nil {
return err
}
defer r.Close()

// Create shard if it doesn't exist.
if err := s.TSDBStore.CreateShard(req.Database, req.Policy, req.ShardID); err != nil {
return err
}

// Restore to local shard.
if err := s.TSDBStore.RestoreShard(req.ShardID, r); err != nil {
return err
}

return nil
}(); err != nil {
s.Logger.Printf("error reading CopyShard request: %s", err)
EncodeTLV(conn, copyShardResponseMessage, &CopyShardResponse{Err: err})
return
}

// Encode success response.
if err := EncodeTLV(conn, copyShardResponseMessage, &CopyShardResponse{}); err != nil {
s.Logger.Printf("error writing CopyShard response: %s", err)
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return is redundant.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I defensively added that in case code was added in the future after the if block. That's just my personal preference though.

}
}

// backupRemoteShard connects to a cluster service on a remote host and streams a shard.
func (s *Service) backupRemoteShard(host string, shardID uint64, since time.Time) (io.ReadCloser, error) {
conn, err := net.Dial("tcp", host)
if err != nil {
return nil, err
}
conn.SetDeadline(time.Now().Add(BackupTimeout))

if err := func() error {
// Write the cluster multiplexing header byte
if _, err := conn.Write([]byte{MuxHeader}); err != nil {
return err
}

// Write backup request.
if err := EncodeTLV(conn, backupShardResponseMessage, &BackupShardRequest{
ShardID: shardID,
Since: since,
}); err != nil {
return fmt.Errorf("error writing BackupShardRequest: %s", err)
}

return nil
}(); err != nil {
conn.Close()
return nil, err
}

// Return the connection which will stream the rest of the backup.
return conn, nil
}

// ReadTLV reads a type-length-value record from r.
func ReadTLV(r io.Reader) (byte, []byte, error) {
typ, err := ReadType(r)
Expand Down
5 changes: 5 additions & 0 deletions cluster/statement_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"errors"
"fmt"
"io"
"sort"
"strconv"
"time"
Expand Down Expand Up @@ -872,12 +873,16 @@ type TSDBStore interface {
CreateShard(database, policy string, shardID uint64) error
WriteToShard(shardID uint64, points []models.Point) error

RestoreShard(id uint64, r io.Reader) error
BackupShard(id uint64, since time.Time, w io.Writer) error

DeleteDatabase(name string) error
DeleteMeasurement(database, name string) error
DeleteRetentionPolicy(database, name string) error
DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error
DeleteShard(id uint64) error
IteratorCreator(shards []meta.ShardInfo) (influxql.IteratorCreator, error)
ShardIteratorCreator(id uint64) influxql.IteratorCreator
}

type LocalTSDBStore struct {
Expand Down
11 changes: 11 additions & 0 deletions cluster/statement_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ type TSDBStore struct {
CreateShardFn func(database, policy string, shardID uint64) error
WriteToShardFn func(shardID uint64, points []models.Point) error

RestoreShardFn func(id uint64, r io.Reader) error
BackupShardFn func(id uint64, since time.Time, w io.Writer) error

DeleteDatabaseFn func(name string) error
DeleteMeasurementFn func(database, name string) error
DeleteRetentionPolicyFn func(database, name string) error
Expand All @@ -226,6 +229,14 @@ func (s *TSDBStore) WriteToShard(shardID uint64, points []models.Point) error {
return s.WriteToShardFn(shardID, points)
}

func (s *TSDBStore) RestoreShard(id uint64, r io.Reader) error {
return s.RestoreShardFn(id, r)
}

func (s *TSDBStore) BackupShard(id uint64, since time.Time, w io.Writer) error {
return s.BackupShardFn(id, since, w)
}

func (s *TSDBStore) DeleteDatabase(name string) error {
return s.DeleteDatabaseFn(name)
}
Expand Down
Loading