Skip to content

Commit

Permalink
Merge pull request #6502 from influxdata/copy-shard
Browse files Browse the repository at this point in the history
Add ability to copy shard via rpc calls.  Remove deprecated copier service.
  • Loading branch information
corylanou committed May 10, 2016
2 parents afde7ce + f9ec3c9 commit 639b0d7
Show file tree
Hide file tree
Showing 17 changed files with 620 additions and 629 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ With this release InfluxDB is moving to Go v1.6.
- [#6534](https://github.com/influxdata/influxdb/pull/6534): Move to Go v1.6.2 (over Go v1.4.3)
- [#6522](https://github.com/influxdata/influxdb/pull/6522): Dump TSM files to line protocol
- [#6585](https://github.com/influxdata/influxdb/pull/6585): Parallelize iterators
- [#6502](https://github.com/influxdata/influxdb/pull/6502): Add ability to copy shard via rpc calls. Remove deprecated copier service..

### Bugfixes

Expand Down
256 changes: 157 additions & 99 deletions cluster/internal/data.pb.go

Large diffs are not rendered by default.

21 changes: 21 additions & 0 deletions cluster/internal/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,24 @@ message RemoteMonitorRequest {
message RemoteMonitorResponse {
optional string Err = 1;
}

message BackupShardRequest {
required uint64 ShardID = 1;
optional int64 Since = 2;
}

message BackupShardResponse {
optional string Err = 2;
}

message CopyShardRequest {
required string Host = 1;
required string Database = 2;
required string Policy = 3;
required uint64 ShardID = 4;
optional int64 Since = 5;
}

message CopyShardResponse {
optional string Err = 2;
}
86 changes: 86 additions & 0 deletions cluster/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,3 +556,89 @@ func (r *RemoteMonitorResponse) UnmarshalBinary(data []byte) error {
}
return nil
}

// BackupShardRequest represents a request to stream a backup of a single shard.
type BackupShardRequest struct {
ShardID uint64
Since time.Time
}

// MarshalBinary encodes r to a binary format.
func (r *BackupShardRequest) MarshalBinary() ([]byte, error) {
return proto.Marshal(&internal.BackupShardRequest{
ShardID: proto.Uint64(r.ShardID),
Since: proto.Int64(r.Since.UnixNano()),
})
}

// UnmarshalBinary decodes data into r.
func (r *BackupShardRequest) UnmarshalBinary(data []byte) error {
var pb internal.BackupShardRequest
if err := proto.Unmarshal(data, &pb); err != nil {
return err
}

r.ShardID = pb.GetShardID()
r.Since = time.Unix(0, pb.GetSince())
return nil
}

// CopyShardRequest represents a request to copy a shard from another host.
type CopyShardRequest struct {
Host string
Database string
Policy string
ShardID uint64
Since time.Time
}

// MarshalBinary encodes r to a binary format.
func (r *CopyShardRequest) MarshalBinary() ([]byte, error) {
return proto.Marshal(&internal.CopyShardRequest{
Host: proto.String(r.Host),
Database: proto.String(r.Database),
Policy: proto.String(r.Policy),
ShardID: proto.Uint64(r.ShardID),
Since: proto.Int64(r.Since.UnixNano()),
})
}

// UnmarshalBinary decodes data into r.
func (r *CopyShardRequest) UnmarshalBinary(data []byte) error {
var pb internal.CopyShardRequest
if err := proto.Unmarshal(data, &pb); err != nil {
return err
}

r.Host = pb.GetHost()
r.Database = pb.GetDatabase()
r.Policy = pb.GetPolicy()
r.ShardID = pb.GetShardID()
r.Since = time.Unix(0, pb.GetSince())
return nil
}

// CopyShardResponse represents a response from a shard Copy.
type CopyShardResponse struct {
Err error
}

func (r *CopyShardResponse) MarshalBinary() ([]byte, error) {
var pb internal.CopyShardResponse
if r.Err != nil {
pb.Err = proto.String(r.Err.Error())
}
return proto.Marshal(&pb)
}

func (r *CopyShardResponse) UnmarshalBinary(data []byte) error {
var pb internal.CopyShardResponse

if err := proto.Unmarshal(data, &pb); err != nil {
return err
}
if pb.Err != nil {
r.Err = errors.New(pb.GetErr())
}
return nil
}
167 changes: 167 additions & 0 deletions cluster/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os"
"strings"
"sync"
"time"

"github.com/influxdata/influxdb/monitor"

Expand Down Expand Up @@ -40,6 +41,15 @@ const (

seriesKeysReq = "seriesKeysReq"
seriesKeysResp = "seriesKeysResp"

expandSourcesReq = "expandSourcesReq"
expandSourcesReqexpandSourcesResp = "expandSourcesResp"

backupShardReq = "backupShardReq"
backupShardReqbackupShardResp = "backupShardResp"

copyShardReq = "copyShardReq"
copyShardResp = "copyShardResp"
)

const (
Expand All @@ -60,8 +70,20 @@ const (

remoteMonitorRequestMessage
remoteMonitorResponseMessage

expandSourcesRequestMessage
expandSourcesResponseMessage

backupShardRequestMessage
backupShardResponseMessage

copyShardRequestMessage
copyShardResponseMessage
)

// BackupTimeout is the time before a connection times out when performing a backup.
const BackupTimeout = 30 * time.Second

// Service processes data received over raw TCP connections.
type Service struct {
mu sync.RWMutex
Expand Down Expand Up @@ -227,6 +249,18 @@ func (s *Service) handleConn(conn net.Conn) {
s.Logger.Printf("process write shard error: %s", err)
}
s.writeRemoteMonitorResponse(conn, err)
case expandSourcesRequestMessage:
s.statMap.Add(expandSourcesReq, 1)
s.processExpandSourcesRequest(conn)
return
case backupShardRequestMessage:
s.statMap.Add(backupShardReq, 1)
s.processBackupShardRequest(conn)
return
case copyShardRequestMessage:
s.statMap.Add(copyShardReq, 1)
s.processCopyShardRequest(conn)
return
default:
s.Logger.Printf("cluster service message type not found: %d", typ)
}
Expand Down Expand Up @@ -549,6 +583,139 @@ func (s *Service) writeRemoteMonitorResponse(w io.Writer, e error) {
}
}

func (s *Service) processExpandSourcesRequest(conn net.Conn) {
var sources influxql.Sources
if err := func() error {
// Parse request.
var req ExpandSourcesRequest
if err := DecodeLV(conn, &req); err != nil {
return err
}

// Collect iterator creators for each shard.
ics := make([]influxql.IteratorCreator, 0, len(req.ShardIDs))
for _, shardID := range req.ShardIDs {
ic := s.TSDBStore.ShardIteratorCreator(shardID)
if ic == nil {
return nil
}
ics = append(ics, ic)
}

// Expand sources from all shards.
a, err := influxql.IteratorCreators(ics).ExpandSources(req.Sources)
if err != nil {
return err
}
sources = a

return nil
}(); err != nil {
s.Logger.Printf("error reading ExpandSources request: %s", err)
EncodeTLV(conn, expandSourcesResponseMessage, &ExpandSourcesResponse{Err: err})
return
}

// Encode success response.
if err := EncodeTLV(conn, expandSourcesResponseMessage, &ExpandSourcesResponse{
Sources: sources,
}); err != nil {
s.Logger.Printf("error writing ExpandSources response: %s", err)
return
}
}

func (s *Service) processBackupShardRequest(conn net.Conn) {
if err := func() error {
// Parse request.
var req BackupShardRequest
if err := DecodeLV(conn, &req); err != nil {
return err
}

// Backup from local shard to the connection.
if err := s.TSDBStore.BackupShard(req.ShardID, req.Since, conn); err != nil {
return err
}

return nil
}(); err != nil {
s.Logger.Printf("error processing BackupShardRequest: %s", err)
return
}
}

func (s *Service) processCopyShardRequest(conn net.Conn) {
if err := func() error {
// Parse request.
var req CopyShardRequest
if err := DecodeLV(conn, &req); err != nil {
return err
}

// Begin streaming backup from remote server.
r, err := s.backupRemoteShard(req.Host, req.ShardID, req.Since)
if err != nil {
return err
}
defer r.Close()

// Create shard if it doesn't exist.
if err := s.TSDBStore.CreateShard(req.Database, req.Policy, req.ShardID); err != nil {
return err
}

// Restore to local shard.
if err := s.TSDBStore.RestoreShard(req.ShardID, r); err != nil {
return err
}

return nil
}(); err != nil {
s.Logger.Printf("error reading CopyShard request: %s", err)
EncodeTLV(conn, copyShardResponseMessage, &CopyShardResponse{Err: err})
return
}

// Encode success response.
if err := EncodeTLV(conn, copyShardResponseMessage, &CopyShardResponse{}); err != nil {
s.Logger.Printf("error writing CopyShard response: %s", err)
return
}
}

// backupRemoteShard connects to a cluster service on a remote host and streams a shard.
func (s *Service) backupRemoteShard(host string, shardID uint64, since time.Time) (io.ReadCloser, error) {
conn, err := net.Dial("tcp", host)
if err != nil {
return nil, err
}
conn.SetDeadline(time.Now().Add(BackupTimeout))

if err := func() error {
// Write the cluster multiplexing header byte
if _, err := conn.Write([]byte{MuxHeader}); err != nil {
return err
}

// Write backup request.
if err := EncodeTLV(conn, backupShardResponseMessage, &BackupShardRequest{
ShardID: shardID,
Since: since,
}); err != nil {
return fmt.Errorf("error writing BackupShardRequest: %s", err)
}

return nil
}(); err != nil {
conn.Close()
return nil, err
}

// Return the connection which will stream the rest of the backup.
return conn, nil
}

// ReadTLV reads a type-length-value record from r.
func ReadTLV(r io.Reader) (byte, []byte, error) {
typ, err := ReadType(r)
Expand Down
5 changes: 5 additions & 0 deletions cluster/statement_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"errors"
"fmt"
"io"
"sort"
"strconv"
"time"
Expand Down Expand Up @@ -872,12 +873,16 @@ type TSDBStore interface {
CreateShard(database, policy string, shardID uint64) error
WriteToShard(shardID uint64, points []models.Point) error

RestoreShard(id uint64, r io.Reader) error
BackupShard(id uint64, since time.Time, w io.Writer) error

DeleteDatabase(name string) error
DeleteMeasurement(database, name string) error
DeleteRetentionPolicy(database, name string) error
DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error
DeleteShard(id uint64) error
IteratorCreator(shards []meta.ShardInfo) (influxql.IteratorCreator, error)
ShardIteratorCreator(id uint64) influxql.IteratorCreator
}

type LocalTSDBStore struct {
Expand Down
11 changes: 11 additions & 0 deletions cluster/statement_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ type TSDBStore struct {
CreateShardFn func(database, policy string, shardID uint64) error
WriteToShardFn func(shardID uint64, points []models.Point) error

RestoreShardFn func(id uint64, r io.Reader) error
BackupShardFn func(id uint64, since time.Time, w io.Writer) error

DeleteDatabaseFn func(name string) error
DeleteMeasurementFn func(database, name string) error
DeleteRetentionPolicyFn func(database, name string) error
Expand All @@ -226,6 +229,14 @@ func (s *TSDBStore) WriteToShard(shardID uint64, points []models.Point) error {
return s.WriteToShardFn(shardID, points)
}

func (s *TSDBStore) RestoreShard(id uint64, r io.Reader) error {
return s.RestoreShardFn(id, r)
}

func (s *TSDBStore) BackupShard(id uint64, since time.Time, w io.Writer) error {
return s.BackupShardFn(id, since, w)
}

func (s *TSDBStore) DeleteDatabase(name string) error {
return s.DeleteDatabaseFn(name)
}
Expand Down
Loading

0 comments on commit 639b0d7

Please sign in to comment.