Skip to content

Commit

Permalink
Merge pull request #5879 from influxdata/jw-data-node
Browse files Browse the repository at this point in the history
Data node fixes
  • Loading branch information
jwilder committed Mar 2, 2016
2 parents e3fef55 + a54befb commit b39c35c
Show file tree
Hide file tree
Showing 11 changed files with 119 additions and 45 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
- [#5841](https://github.com/influxdata/influxdb/pull/5841): Reduce tsm allocations by converting time.Time to int64
- [#5842](https://github.com/influxdata/influxdb/issues/5842): Add SeriesList binary marshaling
- [#5854](https://github.com/influxdata/influxdb/issues/5854): failures of tests in tsdb/engine/tsm1 when compiled with go master
- [#5610](https://github.com/influxdata/influxdb/issues/5610): Write into fully-replicated cluster is not replicated across all shards

## v0.10.1 [2016-02-18]

Expand Down
68 changes: 48 additions & 20 deletions cluster/internal/data.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions cluster/internal/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package internal;
message WriteShardRequest {
required uint64 ShardID = 1;
repeated bytes Points = 2;
optional string Database = 3;
optional string RetentionPolicy = 4;
}

message WriteShardResponse {
Expand Down
3 changes: 3 additions & 0 deletions cluster/query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ type TSDBStore struct {
}

func (s *TSDBStore) CreateShard(database, policy string, shardID uint64) error {
if s.CreateShardFn == nil {
return nil
}
return s.CreateShardFn(database, policy, shardID)
}

Expand Down
8 changes: 8 additions & 0 deletions cluster/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ func (w *WriteShardRequest) SetShardID(id uint64) { w.pb.ShardID = &id }
// ShardID gets the ShardID
func (w *WriteShardRequest) ShardID() uint64 { return w.pb.GetShardID() }

func (w *WriteShardRequest) SetDatabase(db string) { w.pb.Database = &db }

func (w *WriteShardRequest) SetRetentionPolicy(rp string) { w.pb.RetentionPolicy = &rp }

func (w *WriteShardRequest) Database() string { return w.pb.GetDatabase() }

func (w *WriteShardRequest) RetentionPolicy() string { return w.pb.GetRetentionPolicy() }

// Points returns the time series Points
func (w *WriteShardRequest) Points() []models.Point { return w.unmarshalPoints() }

Expand Down
23 changes: 12 additions & 11 deletions cluster/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,22 +250,23 @@ func (s *Service) processWriteShardRequest(buf []byte) error {
// to check the metastore to determine what database and retention policy this
// shard should reside within.
if err == tsdb.ErrShardNotFound {

// Query the metastore for the owner of this shard
database, retentionPolicy, sgi := s.MetaClient.ShardOwner(req.ShardID())
if sgi == nil {
// If we can't find it, then we need to drop this request
// as it is no longer valid. This could happen if writes were queued via
// hinted handoff and delivered after a shard group was deleted.
s.Logger.Printf("drop write request: shard=%d. shard group does not exist or was deleted", req.ShardID())
db, rp := req.Database(), req.RetentionPolicy()
if db == "" || rp == "" {
s.Logger.Printf("drop write request: shard=%d. no database or rentention policy received", req.ShardID())
return nil
}

err = s.TSDBStore.CreateShard(database, retentionPolicy, req.ShardID())
err = s.TSDBStore.CreateShard(req.Database(), req.RetentionPolicy(), req.ShardID())
if err != nil {
return err
s.statMap.Add(writeShardFail, 1)
return fmt.Errorf("create shard %d: %s", req.ShardID(), err)
}

err = s.TSDBStore.WriteToShard(req.ShardID(), points)
if err != nil {
s.statMap.Add(writeShardFail, 1)
return fmt.Errorf("write shard %d: %s", req.ShardID(), err)
}
return s.TSDBStore.WriteToShard(req.ShardID(), points)
}

if err != nil {
Expand Down
22 changes: 13 additions & 9 deletions cluster/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,15 @@ func (m *metaClient) DataNode(nodeID uint64) (*meta.NodeInfo, error) {
}, nil
}

func (m *metaClient) ShardOwner(shardID uint64) (db, rp string, sgi *meta.ShardGroupInfo) {
return "db", "rp", &meta.ShardGroupInfo{}
}

type testService struct {
nodeID uint64
ln net.Listener
muxln net.Listener
nodeID uint64
ln net.Listener
muxln net.Listener
responses chan *serviceResponse

TSDBStore TSDBStore
}
Expand All @@ -46,6 +51,7 @@ func newTestWriteService(f func(shardID uint64, points []models.Point) error) te
muxln: muxln,
}
s.TSDBStore.WriteToShardFn = f
s.responses = make(chan *serviceResponse, 1024)
return s
}

Expand All @@ -62,8 +68,8 @@ type serviceResponse struct {
points []models.Point
}

func writeShardSuccess(shardID uint64, points []models.Point) error {
responses <- &serviceResponse{
func (ts *testService) writeShardSuccess(shardID uint64, points []models.Point) error {
ts.responses <- &serviceResponse{
shardID: shardID,
points: points,
}
Expand All @@ -79,13 +85,11 @@ func writeShardSlow(shardID uint64, points []models.Point) error {
return nil
}

var responses = make(chan *serviceResponse, 1024)

func (testService) ResponseN(n int) ([]*serviceResponse, error) {
func (ts *testService) ResponseN(n int) ([]*serviceResponse, error) {
var a []*serviceResponse
for {
select {
case r := <-responses:
case r := <-ts.responses:
a = append(a, r)
if len(a) == n {
return a, nil
Expand Down
12 changes: 12 additions & 0 deletions cluster/shard_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type ShardWriter struct {

MetaClient interface {
DataNode(id uint64) (ni *meta.NodeInfo, err error)
ShardOwner(shardID uint64) (database, policy string, sgi *meta.ShardGroupInfo)
}
}

Expand Down Expand Up @@ -61,9 +62,20 @@ func (w *ShardWriter) WriteShard(shardID, ownerID uint64, points []models.Point)
conn.Close() // return to pool
}(conn)

// Determine the location of this shard and whether it still exists
db, rp, sgi := w.MetaClient.ShardOwner(shardID)
if sgi == nil {
// If we can't get the shard group for this shard, then we need to drop this request
// as it is no longer valid. This could happen if writes were queued via
// hinted handoff and we're processing the queue after a shard group was deleted.
return nil
}

// Build write request.
var request WriteShardRequest
request.SetShardID(shardID)
request.SetDatabase(db)
request.SetRetentionPolicy(rp)
request.AddPoints(points)

// Marshal into protocol buffers.
Expand Down
9 changes: 6 additions & 3 deletions cluster/shard_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (

// Ensure the shard writer can successfully write a single request.
func TestShardWriter_WriteShard_Success(t *testing.T) {
ts := newTestWriteService(writeShardSuccess)
ts := newTestWriteService(nil)
ts.TSDBStore.WriteToShardFn = ts.writeShardSuccess
s := cluster.NewService(cluster.Config{})
s.Listener = ts.muxln
s.TSDBStore = &ts.TSDBStore
Expand Down Expand Up @@ -60,7 +61,8 @@ func TestShardWriter_WriteShard_Success(t *testing.T) {

// Ensure the shard writer can successful write a multiple requests.
func TestShardWriter_WriteShard_Multiple(t *testing.T) {
ts := newTestWriteService(writeShardSuccess)
ts := newTestWriteService(nil)
ts.TSDBStore.WriteToShardFn = ts.writeShardSuccess
s := cluster.NewService(cluster.Config{})
s.Listener = ts.muxln
s.TSDBStore = &ts.TSDBStore
Expand Down Expand Up @@ -137,7 +139,8 @@ func TestShardWriter_WriteShard_Error(t *testing.T) {

// Ensure the shard writer returns an error when dialing times out.
func TestShardWriter_Write_ErrDialTimeout(t *testing.T) {
ts := newTestWriteService(writeShardSuccess)
ts := newTestWriteService(nil)
ts.TSDBStore.WriteToShardFn = ts.writeShardSuccess
s := cluster.NewService(cluster.Config{})
s.Listener = ts.muxln
s.TSDBStore = &ts.TSDBStore
Expand Down
14 changes: 13 additions & 1 deletion services/meta/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func (c *Client) CreateDataNode(httpAddr, tcpAddr string) (*NodeInfo, error) {
return nil, err
}

n, err := c.DataNodeByHTTPHost(httpAddr)
n, err := c.DataNodeByTCPHost(tcpAddr)
if err != nil {
return nil, err
}
Expand All @@ -278,6 +278,18 @@ func (c *Client) DataNodeByHTTPHost(httpAddr string) (*NodeInfo, error) {
return nil, ErrNodeNotFound
}

// DataNodeByTCPHost returns the data node with the give http bind address
func (c *Client) DataNodeByTCPHost(tcpAddr string) (*NodeInfo, error) {
nodes, _ := c.DataNodes()
for _, n := range nodes {
if n.TCPHost == tcpAddr {
return &n, nil
}
}

return nil, ErrNodeNotFound
}

// DeleteDataNode deletes a data node from the cluster.
func (c *Client) DeleteDataNode(id uint64) error {
cmd := &internal.DeleteDataNodeCommand{
Expand Down
2 changes: 1 addition & 1 deletion services/meta/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (data *Data) DataNode(id uint64) *NodeInfo {
func (data *Data) CreateDataNode(host, tcpHost string) error {
// Ensure a node with the same host doesn't already exist.
for _, n := range data.DataNodes {
if n.Host == host {
if n.TCPHost == tcpHost {
return ErrNodeExists
}
}
Expand Down

0 comments on commit b39c35c

Please sign in to comment.