diff --git a/CHANGELOG.md b/CHANGELOG.md index fd1d8777e0b..c342d98f759 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -51,6 +51,7 @@ - [#5841](https://github.com/influxdata/influxdb/pull/5841): Reduce tsm allocations by converting time.Time to int64 - [#5842](https://github.com/influxdata/influxdb/issues/5842): Add SeriesList binary marshaling - [#5854](https://github.com/influxdata/influxdb/issues/5854): failures of tests in tsdb/engine/tsm1 when compiled with go master +- [#5610](https://github.com/influxdata/influxdb/issues/5610): Write into fully-replicated cluster is not replicated across all shards ## v0.10.1 [2016-02-18] diff --git a/cluster/internal/data.pb.go b/cluster/internal/data.pb.go index f000aad978b..667cea9fb15 100644 --- a/cluster/internal/data.pb.go +++ b/cluster/internal/data.pb.go @@ -23,15 +23,19 @@ It has these top-level messages: package internal import proto "github.com/gogo/protobuf/proto" +import fmt "fmt" import math "math" // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal +var _ = fmt.Errorf var _ = math.Inf type WriteShardRequest struct { - ShardID *uint64 `protobuf:"varint,1,req" json:"ShardID,omitempty"` - Points [][]byte `protobuf:"bytes,2,rep" json:"Points,omitempty"` + ShardID *uint64 `protobuf:"varint,1,req,name=ShardID" json:"ShardID,omitempty"` + Points [][]byte `protobuf:"bytes,2,rep,name=Points" json:"Points,omitempty"` + Database *string `protobuf:"bytes,3,opt,name=Database" json:"Database,omitempty"` + RetentionPolicy *string `protobuf:"bytes,4,opt,name=RetentionPolicy" json:"RetentionPolicy,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -53,9 +57,23 @@ func (m *WriteShardRequest) GetPoints() [][]byte { return nil } +func (m *WriteShardRequest) GetDatabase() string { + if m != nil && m.Database != nil { + return *m.Database + } + return "" +} + +func (m *WriteShardRequest) GetRetentionPolicy() string { + if m != nil && m.RetentionPolicy != nil { + return *m.RetentionPolicy + } + return "" +} + type WriteShardResponse struct { - Code *int32 `protobuf:"varint,1,req" json:"Code,omitempty"` - Message *string `protobuf:"bytes,2,opt" json:"Message,omitempty"` + Code *int32 `protobuf:"varint,1,req,name=Code" json:"Code,omitempty"` + Message *string `protobuf:"bytes,2,opt,name=Message" json:"Message,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -78,8 +96,8 @@ func (m *WriteShardResponse) GetMessage() string { } type ExecuteStatementRequest struct { - Statement *string `protobuf:"bytes,1,req" json:"Statement,omitempty"` - Database *string `protobuf:"bytes,2,req" json:"Database,omitempty"` + Statement *string `protobuf:"bytes,1,req,name=Statement" json:"Statement,omitempty"` + Database *string `protobuf:"bytes,2,req,name=Database" json:"Database,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -102,8 +120,8 @@ func (m *ExecuteStatementRequest) GetDatabase() string { } type ExecuteStatementResponse struct { - Code *int32 `protobuf:"varint,1,req" json:"Code,omitempty"` - Message *string `protobuf:"bytes,2,opt" json:"Message,omitempty"` + Code *int32 `protobuf:"varint,1,req,name=Code" json:"Code,omitempty"` + Message *string `protobuf:"bytes,2,opt,name=Message" json:"Message,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -126,8 +144,8 @@ func (m *ExecuteStatementResponse) GetMessage() string { } type CreateIteratorRequest struct { - ShardIDs []uint64 `protobuf:"varint,1,rep" json:"ShardIDs,omitempty"` - Opt []byte `protobuf:"bytes,2,req" json:"Opt,omitempty"` + ShardIDs []uint64 `protobuf:"varint,1,rep,name=ShardIDs" json:"ShardIDs,omitempty"` + Opt []byte `protobuf:"bytes,2,req,name=Opt" json:"Opt,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -150,7 +168,7 @@ func (m *CreateIteratorRequest) GetOpt() []byte { } type CreateIteratorResponse struct { - Err *string `protobuf:"bytes,1,opt" json:"Err,omitempty"` + Err *string `protobuf:"bytes,1,opt,name=Err" json:"Err,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -166,8 +184,8 @@ func (m *CreateIteratorResponse) GetErr() string { } type FieldDimensionsRequest struct { - ShardIDs []uint64 `protobuf:"varint,1,rep" json:"ShardIDs,omitempty"` - Sources []byte `protobuf:"bytes,2,req" json:"Sources,omitempty"` + ShardIDs []uint64 `protobuf:"varint,1,rep,name=ShardIDs" json:"ShardIDs,omitempty"` + Sources []byte `protobuf:"bytes,2,req,name=Sources" json:"Sources,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -190,9 +208,9 @@ func (m *FieldDimensionsRequest) GetSources() []byte { } type FieldDimensionsResponse struct { - Fields []string `protobuf:"bytes,1,rep" json:"Fields,omitempty"` - Dimensions []string `protobuf:"bytes,2,rep" json:"Dimensions,omitempty"` - Err *string `protobuf:"bytes,3,opt" json:"Err,omitempty"` + Fields []string `protobuf:"bytes,1,rep,name=Fields" json:"Fields,omitempty"` + Dimensions []string `protobuf:"bytes,2,rep,name=Dimensions" json:"Dimensions,omitempty"` + Err *string `protobuf:"bytes,3,opt,name=Err" json:"Err,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -222,8 +240,8 @@ func (m *FieldDimensionsResponse) GetErr() string { } type SeriesKeysRequest struct { - ShardIDs []uint64 `protobuf:"varint,1,rep" json:"ShardIDs,omitempty"` - Opt []byte `protobuf:"bytes,2,req" json:"Opt,omitempty"` + ShardIDs []uint64 `protobuf:"varint,1,rep,name=ShardIDs" json:"ShardIDs,omitempty"` + Opt []byte `protobuf:"bytes,2,req,name=Opt" json:"Opt,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -246,8 +264,8 @@ func (m *SeriesKeysRequest) GetOpt() []byte { } type SeriesKeysResponse struct { - SeriesList []byte `protobuf:"bytes,1,opt" json:"SeriesList,omitempty"` - Err *string `protobuf:"bytes,2,opt" json:"Err,omitempty"` + SeriesList []byte `protobuf:"bytes,1,opt,name=SeriesList" json:"SeriesList,omitempty"` + Err *string `protobuf:"bytes,2,opt,name=Err" json:"Err,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -270,4 +288,14 @@ func (m *SeriesKeysResponse) GetErr() string { } func init() { + proto.RegisterType((*WriteShardRequest)(nil), "internal.WriteShardRequest") + proto.RegisterType((*WriteShardResponse)(nil), "internal.WriteShardResponse") + proto.RegisterType((*ExecuteStatementRequest)(nil), "internal.ExecuteStatementRequest") + proto.RegisterType((*ExecuteStatementResponse)(nil), "internal.ExecuteStatementResponse") + proto.RegisterType((*CreateIteratorRequest)(nil), "internal.CreateIteratorRequest") + proto.RegisterType((*CreateIteratorResponse)(nil), "internal.CreateIteratorResponse") + proto.RegisterType((*FieldDimensionsRequest)(nil), "internal.FieldDimensionsRequest") + proto.RegisterType((*FieldDimensionsResponse)(nil), "internal.FieldDimensionsResponse") + proto.RegisterType((*SeriesKeysRequest)(nil), "internal.SeriesKeysRequest") + proto.RegisterType((*SeriesKeysResponse)(nil), "internal.SeriesKeysResponse") } diff --git a/cluster/internal/data.proto b/cluster/internal/data.proto index bae9d4855ad..97e686e306f 100644 --- a/cluster/internal/data.proto +++ b/cluster/internal/data.proto @@ -3,6 +3,8 @@ package internal; message WriteShardRequest { required uint64 ShardID = 1; repeated bytes Points = 2; + optional string Database = 3; + optional string RetentionPolicy = 4; } message WriteShardResponse { diff --git a/cluster/query_executor_test.go b/cluster/query_executor_test.go index c7bab87a4b1..b7347696dd7 100644 --- a/cluster/query_executor_test.go +++ b/cluster/query_executor_test.go @@ -202,6 +202,9 @@ type TSDBStore struct { } func (s *TSDBStore) CreateShard(database, policy string, shardID uint64) error { + if s.CreateShardFn == nil { + return nil + } return s.CreateShardFn(database, policy, shardID) } diff --git a/cluster/rpc.go b/cluster/rpc.go index 28177441865..8710e899801 100644 --- a/cluster/rpc.go +++ b/cluster/rpc.go @@ -48,6 +48,14 @@ func (w *WriteShardRequest) SetShardID(id uint64) { w.pb.ShardID = &id } // ShardID gets the ShardID func (w *WriteShardRequest) ShardID() uint64 { return w.pb.GetShardID() } +func (w *WriteShardRequest) SetDatabase(db string) { w.pb.Database = &db } + +func (w *WriteShardRequest) SetRetentionPolicy(rp string) { w.pb.RetentionPolicy = &rp } + +func (w *WriteShardRequest) Database() string { return w.pb.GetDatabase() } + +func (w *WriteShardRequest) RetentionPolicy() string { return w.pb.GetRetentionPolicy() } + // Points returns the time series Points func (w *WriteShardRequest) Points() []models.Point { return w.unmarshalPoints() } diff --git a/cluster/service.go b/cluster/service.go index 6fe2de8f7f9..a24d569a1f0 100644 --- a/cluster/service.go +++ b/cluster/service.go @@ -250,22 +250,23 @@ func (s *Service) processWriteShardRequest(buf []byte) error { // to check the metastore to determine what database and retention policy this // shard should reside within. if err == tsdb.ErrShardNotFound { - - // Query the metastore for the owner of this shard - database, retentionPolicy, sgi := s.MetaClient.ShardOwner(req.ShardID()) - if sgi == nil { - // If we can't find it, then we need to drop this request - // as it is no longer valid. This could happen if writes were queued via - // hinted handoff and delivered after a shard group was deleted. - s.Logger.Printf("drop write request: shard=%d. shard group does not exist or was deleted", req.ShardID()) + db, rp := req.Database(), req.RetentionPolicy() + if db == "" || rp == "" { + s.Logger.Printf("drop write request: shard=%d. no database or rentention policy received", req.ShardID()) return nil } - err = s.TSDBStore.CreateShard(database, retentionPolicy, req.ShardID()) + err = s.TSDBStore.CreateShard(req.Database(), req.RetentionPolicy(), req.ShardID()) if err != nil { - return err + s.statMap.Add(writeShardFail, 1) + return fmt.Errorf("create shard %d: %s", req.ShardID(), err) + } + + err = s.TSDBStore.WriteToShard(req.ShardID(), points) + if err != nil { + s.statMap.Add(writeShardFail, 1) + return fmt.Errorf("write shard %d: %s", req.ShardID(), err) } - return s.TSDBStore.WriteToShard(req.ShardID(), points) } if err != nil { diff --git a/cluster/service_test.go b/cluster/service_test.go index 68ecf43fc93..743632ba17f 100644 --- a/cluster/service_test.go +++ b/cluster/service_test.go @@ -23,10 +23,15 @@ func (m *metaClient) DataNode(nodeID uint64) (*meta.NodeInfo, error) { }, nil } +func (m *metaClient) ShardOwner(shardID uint64) (db, rp string, sgi *meta.ShardGroupInfo) { + return "db", "rp", &meta.ShardGroupInfo{} +} + type testService struct { - nodeID uint64 - ln net.Listener - muxln net.Listener + nodeID uint64 + ln net.Listener + muxln net.Listener + responses chan *serviceResponse TSDBStore TSDBStore } @@ -46,6 +51,7 @@ func newTestWriteService(f func(shardID uint64, points []models.Point) error) te muxln: muxln, } s.TSDBStore.WriteToShardFn = f + s.responses = make(chan *serviceResponse, 1024) return s } @@ -62,8 +68,8 @@ type serviceResponse struct { points []models.Point } -func writeShardSuccess(shardID uint64, points []models.Point) error { - responses <- &serviceResponse{ +func (ts *testService) writeShardSuccess(shardID uint64, points []models.Point) error { + ts.responses <- &serviceResponse{ shardID: shardID, points: points, } @@ -79,13 +85,11 @@ func writeShardSlow(shardID uint64, points []models.Point) error { return nil } -var responses = make(chan *serviceResponse, 1024) - -func (testService) ResponseN(n int) ([]*serviceResponse, error) { +func (ts *testService) ResponseN(n int) ([]*serviceResponse, error) { var a []*serviceResponse for { select { - case r := <-responses: + case r := <-ts.responses: a = append(a, r) if len(a) == n { return a, nil diff --git a/cluster/shard_writer.go b/cluster/shard_writer.go index 59e001a48f8..9a44137a8ee 100644 --- a/cluster/shard_writer.go +++ b/cluster/shard_writer.go @@ -34,6 +34,7 @@ type ShardWriter struct { MetaClient interface { DataNode(id uint64) (ni *meta.NodeInfo, err error) + ShardOwner(shardID uint64) (database, policy string, sgi *meta.ShardGroupInfo) } } @@ -61,9 +62,20 @@ func (w *ShardWriter) WriteShard(shardID, ownerID uint64, points []models.Point) conn.Close() // return to pool }(conn) + // Determine the location of this shard and whether it still exists + db, rp, sgi := w.MetaClient.ShardOwner(shardID) + if sgi == nil { + // If we can't get the shard group for this shard, then we need to drop this request + // as it is no longer valid. This could happen if writes were queued via + // hinted handoff and we're processing the queue after a shard group was deleted. + return nil + } + // Build write request. var request WriteShardRequest request.SetShardID(shardID) + request.SetDatabase(db) + request.SetRetentionPolicy(rp) request.AddPoints(points) // Marshal into protocol buffers. diff --git a/cluster/shard_writer_test.go b/cluster/shard_writer_test.go index b7fbfa266f0..ffbcbebaa1f 100644 --- a/cluster/shard_writer_test.go +++ b/cluster/shard_writer_test.go @@ -13,7 +13,8 @@ import ( // Ensure the shard writer can successfully write a single request. func TestShardWriter_WriteShard_Success(t *testing.T) { - ts := newTestWriteService(writeShardSuccess) + ts := newTestWriteService(nil) + ts.TSDBStore.WriteToShardFn = ts.writeShardSuccess s := cluster.NewService(cluster.Config{}) s.Listener = ts.muxln s.TSDBStore = &ts.TSDBStore @@ -60,7 +61,8 @@ func TestShardWriter_WriteShard_Success(t *testing.T) { // Ensure the shard writer can successful write a multiple requests. func TestShardWriter_WriteShard_Multiple(t *testing.T) { - ts := newTestWriteService(writeShardSuccess) + ts := newTestWriteService(nil) + ts.TSDBStore.WriteToShardFn = ts.writeShardSuccess s := cluster.NewService(cluster.Config{}) s.Listener = ts.muxln s.TSDBStore = &ts.TSDBStore @@ -137,7 +139,8 @@ func TestShardWriter_WriteShard_Error(t *testing.T) { // Ensure the shard writer returns an error when dialing times out. func TestShardWriter_Write_ErrDialTimeout(t *testing.T) { - ts := newTestWriteService(writeShardSuccess) + ts := newTestWriteService(nil) + ts.TSDBStore.WriteToShardFn = ts.writeShardSuccess s := cluster.NewService(cluster.Config{}) s.Listener = ts.muxln s.TSDBStore = &ts.TSDBStore diff --git a/services/meta/client.go b/services/meta/client.go index c93c880e15f..32ab7900df4 100644 --- a/services/meta/client.go +++ b/services/meta/client.go @@ -256,7 +256,7 @@ func (c *Client) CreateDataNode(httpAddr, tcpAddr string) (*NodeInfo, error) { return nil, err } - n, err := c.DataNodeByHTTPHost(httpAddr) + n, err := c.DataNodeByTCPHost(tcpAddr) if err != nil { return nil, err } @@ -278,6 +278,18 @@ func (c *Client) DataNodeByHTTPHost(httpAddr string) (*NodeInfo, error) { return nil, ErrNodeNotFound } +// DataNodeByTCPHost returns the data node with the give http bind address +func (c *Client) DataNodeByTCPHost(tcpAddr string) (*NodeInfo, error) { + nodes, _ := c.DataNodes() + for _, n := range nodes { + if n.TCPHost == tcpAddr { + return &n, nil + } + } + + return nil, ErrNodeNotFound +} + // DeleteDataNode deletes a data node from the cluster. func (c *Client) DeleteDataNode(id uint64) error { cmd := &internal.DeleteDataNodeCommand{ diff --git a/services/meta/data.go b/services/meta/data.go index 1db34609086..92458316bd1 100644 --- a/services/meta/data.go +++ b/services/meta/data.go @@ -53,7 +53,7 @@ func (data *Data) DataNode(id uint64) *NodeInfo { func (data *Data) CreateDataNode(host, tcpHost string) error { // Ensure a node with the same host doesn't already exist. for _, n := range data.DataNodes { - if n.Host == host { + if n.TCPHost == tcpHost { return ErrNodeExists } }