Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Require replication factor to equal cluster size #2906

Merged
merged 1 commit into from
Jun 11, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
- [#2900](https://github.com/influxdb/influxdb/pull/2900): Use openTSDB input defaults where necessary
- [#2886](https://github.com/influxdb/influxdb/issues/2886): Refactor backup & restore
- [#2804](https://github.com/influxdb/influxdb/pull/2804): BREAKING: change time literals to be single quoted in InfluxQL. Thanks @nvcook42!
- [#2906](https://github.com/influxdb/influxdb/pull/2906): Restrict replication factor to the cluster size

## v0.9.0-rc33 [2015-06-09]

Expand Down
2 changes: 2 additions & 0 deletions meta/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ func (data *Data) CreateRetentionPolicy(database string, rpi *RetentionPolicyInf
// Validate retention policy.
if rpi.Name == "" {
return ErrRetentionPolicyNameRequired
} else if rpi.ReplicaN != len(data.Nodes) {
return ErrReplicationFactorMismatch
}

// Find database.
Expand Down
29 changes: 20 additions & 9 deletions meta/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestData_DropDatabase(t *testing.T) {

// Ensure a retention policy can be created.
func TestData_CreateRetentionPolicy(t *testing.T) {
var data meta.Data
data := meta.Data{Nodes: []meta.NodeInfo{{ID: 1}, {ID: 2}}}
if err := data.CreateDatabase("db0"); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -121,16 +121,28 @@ func TestData_CreateRetentionPolicy(t *testing.T) {

// Ensure that creating a policy without a name returns an error.
func TestData_CreateRetentionPolicy_ErrNameRequired(t *testing.T) {
var data meta.Data
data := meta.Data{Nodes: []meta.NodeInfo{{ID: 1}}}
if err := data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: ""}); err != meta.ErrRetentionPolicyNameRequired {
t.Fatalf("unexpected error: %s", err)
}
}

// Ensure that creating a policy with a replication factor that doesn't match
// the number of nodes in the cluster will return an error. This is a temporary
// restriction until v0.9.1 is released.
func TestData_CreateRetentionPolicy_ErrReplicationFactorMismatch(t *testing.T) {
data := meta.Data{
Nodes: []meta.NodeInfo{{ID: 1}, {ID: 2}, {ID: 3}},
}
if err := data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 2}); err != meta.ErrReplicationFactorMismatch {
t.Fatalf("unexpected error: %s", err)
}
}

// Ensure that creating a retention policy on a non-existent database returns an error.
func TestData_CreateRetentionPolicy_ErrDatabaseNotFound(t *testing.T) {
var data meta.Data
if err := data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0"}); err != meta.ErrDatabaseNotFound {
data := meta.Data{Nodes: []meta.NodeInfo{{ID: 1}}}
if err := data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 1}); err != meta.ErrDatabaseNotFound {
t.Fatalf("unexpected error: %s", err)
}
}
Expand Down Expand Up @@ -275,7 +287,7 @@ func TestData_CreateShardGroup(t *testing.T) {
t.Fatal(err)
} else if err = data.CreateDatabase("db0"); err != nil {
t.Fatal(err)
} else if err = data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", Duration: 1 * time.Hour}); err != nil {
} else if err = data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 2, Duration: 1 * time.Hour}); err != nil {
t.Fatal(err)
}

Expand All @@ -290,8 +302,7 @@ func TestData_CreateShardGroup(t *testing.T) {
StartTime: time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC),
EndTime: time.Date(2000, time.January, 1, 1, 0, 0, 0, time.UTC),
Shards: []meta.ShardInfo{
{ID: 1, OwnerIDs: []uint64{1}},
{ID: 2, OwnerIDs: []uint64{2}},
{ID: 1, OwnerIDs: []uint64{1, 2}},
},
}) {
t.Fatalf("unexpected shard group: %#v", sgi)
Expand All @@ -307,7 +318,7 @@ func TestData_ShardGroupExpiredDeleted(t *testing.T) {
t.Fatal(err)
} else if err = data.CreateDatabase("db0"); err != nil {
t.Fatal(err)
} else if err = data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", Duration: 1 * time.Hour}); err != nil {
} else if err = data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 2, Duration: 1 * time.Hour}); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -403,7 +414,7 @@ func TestData_DeleteShardGroup(t *testing.T) {
t.Fatal(err)
} else if err := data.CreateDatabase("db0"); err != nil {
t.Fatal(err)
} else if err = data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0"}); err != nil {
} else if err = data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 1}); err != nil {
t.Fatal(err)
} else if err := data.CreateShardGroup("db0", "rp0", time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)); err != nil {
t.Fatal(err)
Expand Down
5 changes: 5 additions & 0 deletions meta/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ var (
// ErrRetentionPolicyDurationTooLow is returned when updating a retention
// policy that has a duration lower than the allowed minimum.
ErrRetentionPolicyDurationTooLow = errors.New("retention policy duration too low")

// ErrReplicationFactorMismatch is returned when the replication factor
// does not match the number of nodes in the cluster. This is a temporary
// restriction until v0.9.1 is released.
ErrReplicationFactorMismatch = errors.New("replication factor must match cluster size; this limitation will be lifted in v0.9.1")
)

var (
Expand Down
25 changes: 13 additions & 12 deletions meta/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,10 @@ func TestStore_CreateRetentionPolicy(t *testing.T) {
s := MustOpenStore()
defer s.Close()

// Create database.
if _, err := s.CreateDatabase("db0"); err != nil {
// Create an additional nodes and database.
if _, err := s.CreateNode("hostX"); err != nil {
t.Fatal(err)
} else if _, err := s.CreateDatabase("db0"); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -275,7 +277,7 @@ func TestStore_DropRetentionPolicy(t *testing.T) {

// Create policies.
for i := 0; i < 3; i++ {
if _, err := s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: fmt.Sprintf("rp%d", i)}); err != nil {
if _, err := s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: fmt.Sprintf("rp%d", i), ReplicaN: 1}); err != nil {
t.Fatal(err)
}
}
Expand All @@ -286,13 +288,13 @@ func TestStore_DropRetentionPolicy(t *testing.T) {
}

// Ensure remaining policies are correct.
if rpi, _ := s.RetentionPolicy("db0", "rp0"); !reflect.DeepEqual(rpi, &meta.RetentionPolicyInfo{Name: "rp0", ShardGroupDuration: 7 * 24 * time.Hour}) {
if rpi, _ := s.RetentionPolicy("db0", "rp0"); !reflect.DeepEqual(rpi, &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 1, ShardGroupDuration: 7 * 24 * time.Hour}) {
t.Fatalf("unexpected policy(0): %#v", rpi)
}
if rpi, _ := s.RetentionPolicy("db0", "rp1"); rpi != nil {
t.Fatalf("unexpected policy(1): %#v", rpi)
}
if rpi, _ := s.RetentionPolicy("db0", "rp2"); !reflect.DeepEqual(rpi, &meta.RetentionPolicyInfo{Name: "rp2", ShardGroupDuration: 7 * 24 * time.Hour}) {
if rpi, _ := s.RetentionPolicy("db0", "rp2"); !reflect.DeepEqual(rpi, &meta.RetentionPolicyInfo{Name: "rp2", ReplicaN: 1, ShardGroupDuration: 7 * 24 * time.Hour}) {
t.Fatalf("unexpected policy(2): %#v", rpi)
}
}
Expand All @@ -306,7 +308,7 @@ func TestStore_SetDefaultRetentionPolicy(t *testing.T) {
// Create database.
if _, err := s.CreateDatabase("db0"); err != nil {
t.Fatal(err)
} else if _, err := s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0"}); err != nil {
} else if _, err := s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 1}); err != nil {
t.Fatal(err)
}

Expand All @@ -330,15 +332,14 @@ func TestStore_UpdateRetentionPolicy(t *testing.T) {
// Create database.
if _, err := s.CreateDatabase("db0"); err != nil {
t.Fatal(err)
} else if _, err := s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0"}); err != nil {
} else if _, err := s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 1}); err != nil {
t.Fatal(err)
}

// Update policy.
var rpu meta.RetentionPolicyUpdate
rpu.SetName("rp1")
rpu.SetDuration(10 * time.Hour)
rpu.SetReplicaN(3)
if err := s.UpdateRetentionPolicy("db0", "rp0", &rpu); err != nil {
t.Fatal(err)
}
Expand All @@ -350,7 +351,7 @@ func TestStore_UpdateRetentionPolicy(t *testing.T) {
Name: "rp1",
Duration: 10 * time.Hour,
ShardGroupDuration: 7 * 24 * time.Hour,
ReplicaN: 3,
ReplicaN: 1,
}) {
t.Fatalf("unexpected policy: %#v", rpi)
}
Expand All @@ -367,7 +368,7 @@ func TestStore_CreateShardGroup(t *testing.T) {
t.Fatal(err)
} else if _, err := s.CreateDatabase("db0"); err != nil {
t.Fatal(err)
} else if _, err = s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 1, Duration: 1 * time.Hour}); err != nil {
} else if _, err = s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 2, Duration: 1 * time.Hour}); err != nil {
t.Fatal(err)
}

Expand All @@ -390,7 +391,7 @@ func TestStore_DeleteShardGroup(t *testing.T) {
t.Fatal(err)
} else if _, err := s.CreateDatabase("db0"); err != nil {
t.Fatal(err)
} else if _, err = s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 1, Duration: 1 * time.Hour}); err != nil {
} else if _, err = s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 2, Duration: 1 * time.Hour}); err != nil {
t.Fatal(err)
} else if _, err := s.CreateShardGroup("db0", "rp0", time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)); err != nil {
t.Fatal(err)
Expand All @@ -413,7 +414,7 @@ func TestStore_PrecreateShardGroup(t *testing.T) {
t.Fatal(err)
} else if _, err := s.CreateDatabase("db0"); err != nil {
t.Fatal(err)
} else if _, err = s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 1, Duration: 1 * time.Hour}); err != nil {
} else if _, err = s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 2, Duration: 1 * time.Hour}); err != nil {
t.Fatal(err)
} else if _, err := s.CreateShardGroup("db0", "rp0", time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)); err != nil {
t.Fatal(err)
Expand Down