Skip to content

Commit

Permalink
Merge pull request #3913 from benbjohnson/owner
Browse files Browse the repository at this point in the history
Convert meta shard owners to objects
  • Loading branch information
benbjohnson committed Sep 1, 2015
2 parents a8ebac9 + 767307e commit 9067664
Show file tree
Hide file tree
Showing 10 changed files with 221 additions and 67 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ With this release InfluxDB is moving to Go 1.5.
- [#3868](https://github.com/influxdb/influxdb/pull/3868): Add shell option to start the daemon on CentOS. Thanks @SwannCroiset.
- [#3886](https://github.com/influxdb/influxdb/pull/3886): Prevent write timeouts due to lock contention in WAL
- [#3574](https://github.com/influxdb/influxdb/issues/3574): Querying data node causes panic
- [#3913](https://github.com/influxdb/influxdb/issues/3913): Convert meta shard owners to objects

## v0.9.3 [2015-08-26]

Expand Down
20 changes: 10 additions & 10 deletions cluster/points_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (w *PointsWriter) WritePoints(p *WritePointsRequest) error {
func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPolicy string,
consistency ConsistencyLevel, points []tsdb.Point) error {
// The required number of writes to achieve the requested consistency level
required := len(shard.OwnerIDs)
required := len(shard.Owners)
switch consistency {
case ConsistencyLevelAny, ConsistencyLevelOne:
required = 1
Expand All @@ -233,11 +233,11 @@ func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPo
}

// response channel for each shard writer go routine
ch := make(chan error, len(shard.OwnerIDs))
ch := make(chan error, len(shard.Owners))

for _, nodeID := range shard.OwnerIDs {
go func(shardID, nodeID uint64, points []tsdb.Point) {
if w.MetaStore.NodeID() == nodeID {
for _, owner := range shard.Owners {
go func(shardID uint64, owner meta.ShardOwner, points []tsdb.Point) {
if w.MetaStore.NodeID() == owner.NodeID {
err := w.TSDBStore.WriteToShard(shardID, points)
// If we've written to shard that should exist on the current node, but the store has
// not actually created this shard, tell it to create it and retry the write
Expand All @@ -253,10 +253,10 @@ func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPo
return
}

err := w.ShardWriter.WriteShard(shardID, nodeID, points)
err := w.ShardWriter.WriteShard(shardID, owner.NodeID, points)
if err != nil && tsdb.IsRetryable(err) {
// The remote write failed so queue it via hinted handoff
hherr := w.HintedHandoff.WriteShard(shardID, nodeID, points)
hherr := w.HintedHandoff.WriteShard(shardID, owner.NodeID, points)

// If the write consistency level is ANY, then a successful hinted handoff can
// be considered a successful write so send nil to the response channel
Expand All @@ -268,13 +268,13 @@ func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPo
}
ch <- err

}(shard.ID, nodeID, points)
}(shard.ID, owner, points)
}

var wrote int
timeout := time.After(w.WriteTimeout)
var writeError error
for _, nodeID := range shard.OwnerIDs {
for _, owner := range shard.Owners {
select {
case <-w.closing:
return ErrWriteFailed
Expand All @@ -284,7 +284,7 @@ func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPo
case err := <-ch:
// If the write returned an error, continue to the next response
if err != nil {
w.Logger.Printf("write failed for shard %d on node %d: %v", shard.ID, nodeID, err)
w.Logger.Printf("write failed for shard %d on node %d: %v", shard.ID, owner.NodeID, err)

// Keep track of the first error we see to return back to the client
if writeError == nil {
Expand Down
56 changes: 42 additions & 14 deletions cluster/points_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,16 @@ func TestPointsWriter_MapShards_One(t *testing.T) {
func TestPointsWriter_MapShards_Multiple(t *testing.T) {
ms := MetaStore{}
rp := NewRetentionPolicy("myp", time.Hour, 3)
AttachShardGroupInfo(rp, []uint64{1, 2, 3})
AttachShardGroupInfo(rp, []uint64{1, 2, 3})
AttachShardGroupInfo(rp, []meta.ShardOwner{
{NodeID: 1},
{NodeID: 2},
{NodeID: 3},
})
AttachShardGroupInfo(rp, []meta.ShardOwner{
{NodeID: 1},
{NodeID: 2},
{NodeID: 3},
})

ms.NodeIDFn = func() uint64 { return 1 }
ms.RetentionPolicyFn = func(db, retentionPolicy string) (*meta.RetentionPolicyInfo, error) {
Expand Down Expand Up @@ -249,13 +257,25 @@ func TestPointsWriter_WritePoints(t *testing.T) {
theTest := test
sm := cluster.NewShardMapping()
sm.MapPoint(
&meta.ShardInfo{ID: uint64(1), OwnerIDs: []uint64{uint64(1), uint64(2), uint64(3)}},
&meta.ShardInfo{ID: uint64(1), Owners: []meta.ShardOwner{
{NodeID: 1},
{NodeID: 2},
{NodeID: 3},
}},
pr.Points[0])
sm.MapPoint(
&meta.ShardInfo{ID: uint64(2), OwnerIDs: []uint64{uint64(1), uint64(2), uint64(3)}},
&meta.ShardInfo{ID: uint64(2), Owners: []meta.ShardOwner{
{NodeID: 1},
{NodeID: 2},
{NodeID: 3},
}},
pr.Points[1])
sm.MapPoint(
&meta.ShardInfo{ID: uint64(2), OwnerIDs: []uint64{uint64(1), uint64(2), uint64(3)}},
&meta.ShardInfo{ID: uint64(2), Owners: []meta.ShardOwner{
{NodeID: 1},
{NodeID: 2},
{NodeID: 3},
}},
pr.Points[2])

// Local cluster.Node ShardWriter
Expand Down Expand Up @@ -334,8 +354,16 @@ func (f *fakeStore) CreateShard(database, retentionPolicy string, shardID uint64
func NewMetaStore() *MetaStore {
ms := &MetaStore{}
rp := NewRetentionPolicy("myp", time.Hour, 3)
AttachShardGroupInfo(rp, []uint64{1, 2, 3})
AttachShardGroupInfo(rp, []uint64{1, 2, 3})
AttachShardGroupInfo(rp, []meta.ShardOwner{
{NodeID: 1},
{NodeID: 2},
{NodeID: 3},
})
AttachShardGroupInfo(rp, []meta.ShardOwner{
{NodeID: 1},
{NodeID: 2},
{NodeID: 3},
})

ms.RetentionPolicyFn = func(db, retentionPolicy string) (*meta.RetentionPolicyInfo, error) {
return rp, nil
Expand Down Expand Up @@ -380,15 +408,15 @@ func (m MetaStore) ShardOwner(shardID uint64) (string, string, *meta.ShardGroupI

func NewRetentionPolicy(name string, duration time.Duration, nodeCount int) *meta.RetentionPolicyInfo {
shards := []meta.ShardInfo{}
ownerIDs := []uint64{}
owners := []meta.ShardOwner{}
for i := 1; i <= nodeCount; i++ {
ownerIDs = append(ownerIDs, uint64(i))
owners = append(owners, meta.ShardOwner{NodeID: uint64(i)})
}

// each node is fully replicated with each other
shards = append(shards, meta.ShardInfo{
ID: nextShardID(),
OwnerIDs: ownerIDs,
ID: nextShardID(),
Owners: owners,
})

rp := &meta.RetentionPolicyInfo{
Expand All @@ -408,7 +436,7 @@ func NewRetentionPolicy(name string, duration time.Duration, nodeCount int) *met
return rp
}

func AttachShardGroupInfo(rp *meta.RetentionPolicyInfo, ownerIDs []uint64) {
func AttachShardGroupInfo(rp *meta.RetentionPolicyInfo, owners []meta.ShardOwner) {
var startTime, endTime time.Time
if len(rp.ShardGroups) == 0 {
startTime = time.Unix(0, 0)
Expand All @@ -423,8 +451,8 @@ func AttachShardGroupInfo(rp *meta.RetentionPolicyInfo, ownerIDs []uint64) {
EndTime: endTime,
Shards: []meta.ShardInfo{
meta.ShardInfo{
ID: nextShardID(),
OwnerIDs: ownerIDs,
ID: nextShardID(),
Owners: owners,
},
},
}
Expand Down
2 changes: 1 addition & 1 deletion cluster/shard_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (s *ShardMapper) CreateMapper(sh meta.ShardInfo, stmt influxql.Statement, c

if !sh.OwnedBy(s.MetaStore.NodeID()) || s.ForceRemoteMapping {
// Pick a node in a pseudo-random manner.
conn, err := s.dial(sh.OwnerIDs[rand.Intn(len(sh.OwnerIDs))])
conn, err := s.dial(sh.Owners[rand.Intn(len(sh.Owners))].NodeID)
if err != nil {
return nil, err
}
Expand Down
73 changes: 61 additions & 12 deletions meta/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func (data *Data) CreateShardGroup(database, policy string, timestamp time.Time)
si := &sgi.Shards[i]
for j := 0; j < replicaN; j++ {
nodeID := data.Nodes[nodeIndex%len(data.Nodes)].ID
si.OwnerIDs = append(si.OwnerIDs, nodeID)
si.Owners = append(si.Owners, ShardOwner{NodeID: nodeID})
nodeIndex++
}
}
Expand Down Expand Up @@ -942,14 +942,14 @@ func (sgi *ShardGroupInfo) unmarshal(pb *internal.ShardGroupInfo) {

// ShardInfo represents metadata about a shard.
type ShardInfo struct {
ID uint64
OwnerIDs []uint64
ID uint64
Owners []ShardOwner
}

// OwnedBy returns whether the shard's owner IDs includes nodeID.
func (si ShardInfo) OwnedBy(nodeID uint64) bool {
for _, id := range si.OwnerIDs {
if id == nodeID {
for _, so := range si.Owners {
if so.NodeID == nodeID {
return true
}
}
Expand All @@ -960,9 +960,11 @@ func (si ShardInfo) OwnedBy(nodeID uint64) bool {
func (si ShardInfo) clone() ShardInfo {
other := si

if si.OwnerIDs != nil {
other.OwnerIDs = make([]uint64, len(si.OwnerIDs))
copy(other.OwnerIDs, si.OwnerIDs)
if si.Owners != nil {
other.Owners = make([]ShardOwner, len(si.Owners))
for i := range si.Owners {
other.Owners[i] = si.Owners[i].clone()
}
}

return other
Expand All @@ -974,17 +976,64 @@ func (si ShardInfo) marshal() *internal.ShardInfo {
ID: proto.Uint64(si.ID),
}

pb.OwnerIDs = make([]uint64, len(si.OwnerIDs))
copy(pb.OwnerIDs, si.OwnerIDs)
pb.Owners = make([]*internal.ShardOwner, len(si.Owners))
for i := range si.Owners {
pb.Owners[i] = si.Owners[i].marshal()
}

return pb
}

// UnmarshalBinary decodes the object from a binary format.
func (si *ShardInfo) UnmarshalBinary(buf []byte) error {
var pb internal.ShardInfo
if err := proto.Unmarshal(buf, &pb); err != nil {
return err
}
si.unmarshal(&pb)
return nil
}

// unmarshal deserializes from a protobuf representation.
func (si *ShardInfo) unmarshal(pb *internal.ShardInfo) {
si.ID = pb.GetID()
si.OwnerIDs = make([]uint64, len(pb.GetOwnerIDs()))
copy(si.OwnerIDs, pb.GetOwnerIDs())

// If deprecated "OwnerIDs" exists then convert it to "Owners" format.
if len(pb.GetOwnerIDs()) > 0 {
si.Owners = make([]ShardOwner, len(pb.GetOwnerIDs()))
for i, x := range pb.GetOwnerIDs() {
si.Owners[i].unmarshal(&internal.ShardOwner{
NodeID: proto.Uint64(x),
})
}
} else if len(pb.GetOwners()) > 0 {
si.Owners = make([]ShardOwner, len(pb.GetOwners()))
for i, x := range pb.GetOwners() {
si.Owners[i].unmarshal(x)
}
}
}

// ShardOwner represents a node that owns a shard.
type ShardOwner struct {
NodeID uint64
}

// clone returns a deep copy of so.
func (so ShardOwner) clone() ShardOwner {
return so
}

// marshal serializes to a protobuf representation.
func (so ShardOwner) marshal() *internal.ShardOwner {
return &internal.ShardOwner{
NodeID: proto.Uint64(so.NodeID),
}
}

// unmarshal deserializes from a protobuf representation.
func (so *ShardOwner) unmarshal(pb *internal.ShardOwner) {
so.NodeID = pb.GetNodeID()
}

// ContinuousQueryInfo represents metadata about a continuous query.
Expand Down
60 changes: 53 additions & 7 deletions meta/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (
"time"

"github.com/davecgh/go-spew/spew"
"github.com/gogo/protobuf/proto"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/meta"
"github.com/influxdb/influxdb/meta/internal"
)

// Ensure a node can be created.
Expand Down Expand Up @@ -299,7 +301,13 @@ func TestData_CreateShardGroup(t *testing.T) {
StartTime: time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC),
EndTime: time.Date(2000, time.January, 1, 1, 0, 0, 0, time.UTC),
Shards: []meta.ShardInfo{
{ID: 1, OwnerIDs: []uint64{1, 2}},
{
ID: 1,
Owners: []meta.ShardOwner{
{NodeID: 1},
{NodeID: 2},
},
},
},
}) {
t.Fatalf("unexpected shard group: %#v", sgi)
Expand Down Expand Up @@ -570,8 +578,12 @@ func TestData_Clone(t *testing.T) {
EndTime: time.Date(2000, time.February, 1, 0, 0, 0, 0, time.UTC),
Shards: []meta.ShardInfo{
{
ID: 200,
OwnerIDs: []uint64{1, 3, 4},
ID: 200,
Owners: []meta.ShardOwner{
{NodeID: 1},
{NodeID: 3},
{NodeID: 4},
},
},
},
},
Expand Down Expand Up @@ -605,8 +617,8 @@ func TestData_Clone(t *testing.T) {
}

// Ensure that changing data in the clone does not affect the original.
other.Databases[0].RetentionPolicies[0].ShardGroups[0].Shards[0].OwnerIDs[1] = 9
if v := data.Databases[0].RetentionPolicies[0].ShardGroups[0].Shards[0].OwnerIDs[1]; v != 3 {
other.Databases[0].RetentionPolicies[0].ShardGroups[0].Shards[0].Owners[1].NodeID = 9
if v := data.Databases[0].RetentionPolicies[0].ShardGroups[0].Shards[0].Owners[1].NodeID; v != 3 {
t.Fatalf("editing clone changed original: %v", v)
}
}
Expand Down Expand Up @@ -637,8 +649,12 @@ func TestData_MarshalBinary(t *testing.T) {
EndTime: time.Date(2000, time.February, 1, 0, 0, 0, 0, time.UTC),
Shards: []meta.ShardInfo{
{
ID: 200,
OwnerIDs: []uint64{1, 3, 4},
ID: 200,
Owners: []meta.ShardOwner{
{NodeID: 1},
{NodeID: 3},
{NodeID: 4},
},
},
},
},
Expand Down Expand Up @@ -682,3 +698,33 @@ func TestData_MarshalBinary(t *testing.T) {
t.Fatalf("unexpected users: %#v", other.Users)
}
}

// Ensure shards with deprecated "OwnerIDs" can be decoded.
func TestShardInfo_UnmarshalBinary_OwnerIDs(t *testing.T) {
// Encode deprecated form to bytes.
buf, err := proto.Marshal(&internal.ShardInfo{
ID: proto.Uint64(1),
OwnerIDs: []uint64{10, 20, 30},
})
if err != nil {
t.Fatal(err)
}

// Decode deprecated form.
var si meta.ShardInfo
if err := si.UnmarshalBinary(buf); err != nil {
t.Fatal(err)
}

// Verify data is migrated correctly.
if !reflect.DeepEqual(si, meta.ShardInfo{
ID: 1,
Owners: []meta.ShardOwner{
{NodeID: 10},
{NodeID: 20},
{NodeID: 30},
},
}) {
t.Fatalf("unexpected shard info: %s", spew.Sdump(si))
}
}
Loading

0 comments on commit 9067664

Please sign in to comment.