Skip to content

Commit

Permalink
rename domain version to db_version, move failover_version to top lev…
Browse files Browse the repository at this point in the history
…el (#582)

* rename domain version to db_version, move failover_version to top level
  • Loading branch information
wxing1292 authored Feb 26, 2018
1 parent 9f82f5a commit d914e07
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 48 deletions.
35 changes: 19 additions & 16 deletions common/persistence/cassandraMetadataPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ const (

templateDomainReplicationConfigType = `{` +
`active_cluster_name: ?, ` +
`failover_version: ?, ` +
`clusters: ? ` +
`}`

Expand All @@ -56,27 +55,29 @@ const (
`VALUES(?, {name: ?}) IF NOT EXISTS`

templateCreateDomainByNameQuery = `INSERT INTO domains_by_name (` +
`name, domain, config, replication_config) ` +
`VALUES(?, ` + templateDomainType + `, ` + templateDomainConfigType + `, ` + templateDomainReplicationConfigType + `) IF NOT EXISTS`
`name, domain, config, replication_config, failover_version) ` +
`VALUES(?, ` + templateDomainType + `, ` + templateDomainConfigType + `, ` + templateDomainReplicationConfigType + `, ?) IF NOT EXISTS`

templateGetDomainQuery = `SELECT domain.name ` +
`FROM domains ` +
`WHERE id = ?`

templateGetDomainByNameQuery = `SELECT domain.id, domain.name, domain.status, domain.description, ` +
`domain.owner_email, config.retention, config.emit_metric, ` +
`replication_config.active_cluster_name, replication_config.failover_version, replication_config.clusters, ` +
`version ` +
`replication_config.active_cluster_name, replication_config.clusters, ` +
`failover_version, ` +
`db_version ` +
`FROM domains_by_name ` +
`WHERE name = ?`

templateUpdateDomainByNameQuery = `UPDATE domains_by_name ` +
`SET domain = ` + templateDomainType + `, ` +
`config = ` + templateDomainConfigType + `, ` +
`replication_config = ` + templateDomainReplicationConfigType + `, ` +
`version = ? ` +
`failover_version = ? ,` +
`db_version = ? ` +
`WHERE name = ? ` +
`IF version = ? `
`IF db_version = ? `

templateDeleteDomainQuery = `DELETE FROM domains ` +
`WHERE id = ?`
Expand Down Expand Up @@ -145,8 +146,8 @@ func (m *cassandraMetadataPersistence) CreateDomain(request *CreateDomainRequest
request.Config.Retention,
request.Config.EmitMetric,
request.ReplicationConfig.ActiveClusterName,
request.ReplicationConfig.FailoverVersion,
serializeClusterConfigs(request.ReplicationConfig.Clusters),
request.FailoverVersion,
)

previous := make(map[string]interface{})
Expand Down Expand Up @@ -186,7 +187,8 @@ func (m *cassandraMetadataPersistence) GetDomain(request *GetDomainRequest) (*Ge
config := &DomainConfig{}
replicationConfig := &DomainReplicationConfig{}
var replicationClusters []map[string]interface{}
var version int64
var dbVersion int64
var failoverVersion int64

if len(request.ID) > 0 && len(request.Name) > 0 {
return nil, &workflow.BadRequestError{
Expand Down Expand Up @@ -232,9 +234,9 @@ func (m *cassandraMetadataPersistence) GetDomain(request *GetDomainRequest) (*Ge
&config.Retention,
&config.EmitMetric,
&replicationConfig.ActiveClusterName,
&replicationConfig.FailoverVersion,
&replicationClusters,
&version,
&failoverVersion,
&dbVersion,
)

if err != nil {
Expand All @@ -249,16 +251,17 @@ func (m *cassandraMetadataPersistence) GetDomain(request *GetDomainRequest) (*Ge
Info: info,
Config: config,
ReplicationConfig: replicationConfig,
Version: version,
FailoverVersion: failoverVersion,
DBVersion: dbVersion,
}, nil
}

func (m *cassandraMetadataPersistence) UpdateDomain(request *UpdateDomainRequest) error {
var nextVersion int64 = 1
var currentVersion *int64
if request.Version > 0 {
nextVersion = request.Version + 1
currentVersion = &request.Version
if request.DBVersion > 0 {
nextVersion = request.DBVersion + 1
currentVersion = &request.DBVersion
}
query := m.session.Query(templateUpdateDomainByNameQuery,
request.Info.ID,
Expand All @@ -269,8 +272,8 @@ func (m *cassandraMetadataPersistence) UpdateDomain(request *UpdateDomainRequest
request.Config.Retention,
request.Config.EmitMetric,
request.ReplicationConfig.ActiveClusterName,
request.ReplicationConfig.FailoverVersion,
serializeClusterConfigs(request.ReplicationConfig.Clusters),
request.FailoverVersion,
nextVersion,
request.Info.Name,
currentVersion,
Expand Down
44 changes: 25 additions & 19 deletions common/persistence/cassandraMetadataPersistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func (m *metadataPersistenceSuite) TestCreateDomain() {
EmitMetric: emitMetric,
},
&DomainReplicationConfig{},
0,
)

m.Nil(err0)
Expand All @@ -104,10 +105,10 @@ func (m *metadataPersistenceSuite) TestCreateDomain() {
m.Equal(retention, resp1.Config.Retention)
m.Equal(emitMetric, resp1.Config.EmitMetric)
m.Equal(testCurrentClusterName, resp1.ReplicationConfig.ActiveClusterName)
m.Equal(int64(0), resp1.ReplicationConfig.FailoverVersion)
m.Equal(1, len(resp1.ReplicationConfig.Clusters))
m.Equal(int64(0), resp1.FailoverVersion)
m.True(resp1.ReplicationConfig.Clusters[0].ClusterName == testCurrentClusterName)
m.Equal(int64(0), resp1.Version)
m.Equal(int64(0), resp1.DBVersion)

resp2, err2 := m.CreateDomain(
&DomainInfo{
Expand All @@ -121,6 +122,7 @@ func (m *metadataPersistenceSuite) TestCreateDomain() {
EmitMetric: false,
},
&DomainReplicationConfig{},
0,
)
m.NotNil(err2)
m.IsType(&gen.DomainAlreadyExistsError{}, err2)
Expand Down Expand Up @@ -165,9 +167,9 @@ func (m *metadataPersistenceSuite) TestGetDomain() {
},
&DomainReplicationConfig{
ActiveClusterName: clusterActive,
FailoverVersion: failoverVersion,
Clusters: clusters,
},
failoverVersion,
)
m.Nil(err1)
m.NotNil(resp1)
Expand All @@ -186,12 +188,12 @@ func (m *metadataPersistenceSuite) TestGetDomain() {
m.Equal(retention, resp2.Config.Retention)
m.Equal(emitMetric, resp2.Config.EmitMetric)
m.Equal(clusterActive, resp2.ReplicationConfig.ActiveClusterName)
m.Equal(failoverVersion, resp2.ReplicationConfig.FailoverVersion)
m.Equal(len(clusters), len(resp2.ReplicationConfig.Clusters))
for index := range clusters {
m.Equal(clusters[index], resp2.ReplicationConfig.Clusters[index])
}
m.Equal(int64(0), resp2.Version)
m.Equal(failoverVersion, resp2.FailoverVersion)
m.Equal(int64(0), resp2.DBVersion)

resp3, err3 := m.GetDomain("", name)
m.Nil(err3)
Expand All @@ -204,12 +206,12 @@ func (m *metadataPersistenceSuite) TestGetDomain() {
m.Equal(retention, resp3.Config.Retention)
m.Equal(emitMetric, resp3.Config.EmitMetric)
m.Equal(clusterActive, resp3.ReplicationConfig.ActiveClusterName)
m.Equal(failoverVersion, resp3.ReplicationConfig.FailoverVersion)
m.Equal(len(clusters), len(resp3.ReplicationConfig.Clusters))
for index := range clusters {
m.Equal(clusters[index], resp3.ReplicationConfig.Clusters[index])
}
m.Equal(int64(0), resp3.Version)
m.Equal(failoverVersion, resp3.FailoverVersion)
m.Equal(int64(0), resp3.DBVersion)

resp4, err4 := m.GetDomain(id, name)
m.NotNil(err4)
Expand Down Expand Up @@ -250,9 +252,9 @@ func (m *metadataPersistenceSuite) TestUpdateDomain() {
},
&DomainReplicationConfig{
ActiveClusterName: clusterActive,
FailoverVersion: failoverVersion,
Clusters: clusters,
},
failoverVersion,
)
m.Nil(err1)

Expand Down Expand Up @@ -293,10 +295,10 @@ func (m *metadataPersistenceSuite) TestUpdateDomain() {
},
&DomainReplicationConfig{
ActiveClusterName: updateClusterActive,
FailoverVersion: updateFailoverVersion,
Clusters: updateClusters,
},
resp2.Version,
updateFailoverVersion,
resp2.DBVersion,
)

m.Nil(err3)
Expand All @@ -312,12 +314,12 @@ func (m *metadataPersistenceSuite) TestUpdateDomain() {
m.Equal(updatedRetention, resp4.Config.Retention)
m.Equal(updatedEmitMetric, resp4.Config.EmitMetric)
m.Equal(updateClusterActive, resp4.ReplicationConfig.ActiveClusterName)
m.Equal(updateFailoverVersion, resp4.ReplicationConfig.FailoverVersion)
m.Equal(len(updateClusters), len(resp4.ReplicationConfig.Clusters))
for index := range clusters {
m.Equal(updateClusters[index], resp4.ReplicationConfig.Clusters[index])
}
m.Equal(resp2.Version+1, resp4.Version)
m.Equal(updateFailoverVersion, resp4.FailoverVersion)
m.Equal(resp2.DBVersion+1, resp4.DBVersion)

resp5, err5 := m.GetDomain("", name)
m.Nil(err5)
Expand All @@ -330,12 +332,12 @@ func (m *metadataPersistenceSuite) TestUpdateDomain() {
m.Equal(updatedRetention, resp5.Config.Retention)
m.Equal(updatedEmitMetric, resp5.Config.EmitMetric)
m.Equal(updateClusterActive, resp5.ReplicationConfig.ActiveClusterName)
m.Equal(updateFailoverVersion, resp5.ReplicationConfig.FailoverVersion)
m.Equal(len(updateClusters), len(resp5.ReplicationConfig.Clusters))
for index := range clusters {
m.Equal(updateClusters[index], resp5.ReplicationConfig.Clusters[index])
}
m.Equal(resp2.Version+1, resp5.Version)
m.Equal(updateFailoverVersion, resp5.FailoverVersion)
m.Equal(resp2.DBVersion+1, resp5.DBVersion)
}

func (m *metadataPersistenceSuite) TestDeleteDomain() {
Expand Down Expand Up @@ -371,9 +373,9 @@ func (m *metadataPersistenceSuite) TestDeleteDomain() {
},
&DomainReplicationConfig{
ActiveClusterName: clusterActive,
FailoverVersion: failoverVersion,
Clusters: clusters,
},
failoverVersion,
)
m.Nil(err1)
id := resp1.ID
Expand Down Expand Up @@ -409,9 +411,9 @@ func (m *metadataPersistenceSuite) TestDeleteDomain() {
},
&DomainReplicationConfig{
ActiveClusterName: clusterActive,
FailoverVersion: failoverVersion,
Clusters: clusters,
},
failoverVersion,
)
m.Nil(err6)
id = resp6.ID
Expand All @@ -431,14 +433,16 @@ func (m *metadataPersistenceSuite) TestDeleteDomain() {
m.Nil(resp9)
}

func (m *metadataPersistenceSuite) CreateDomain(info *DomainInfo, config *DomainConfig, replicationConfig *DomainReplicationConfig) (*CreateDomainResponse, error) {
func (m *metadataPersistenceSuite) CreateDomain(info *DomainInfo, config *DomainConfig,
replicationConfig *DomainReplicationConfig, failoverVersion int64) (*CreateDomainResponse, error) {
return m.MetadataManager.CreateDomain(&CreateDomainRequest{
Name: info.Name,
Status: info.Status,
Description: info.Description,
OwnerEmail: info.OwnerEmail,
Config: config,
ReplicationConfig: replicationConfig,
FailoverVersion: failoverVersion,
})
}

Expand All @@ -449,12 +453,14 @@ func (m *metadataPersistenceSuite) GetDomain(id, name string) (*GetDomainRespons
})
}

func (m *metadataPersistenceSuite) UpdateDomain(info *DomainInfo, config *DomainConfig, replicationConfig *DomainReplicationConfig, version int64) error {
func (m *metadataPersistenceSuite) UpdateDomain(info *DomainInfo, config *DomainConfig, replicationConfig *DomainReplicationConfig,
failoverVersion int64, dbVersion int64) error {
return m.MetadataManager.UpdateDomain(&UpdateDomainRequest{
Info: info,
Config: config,
ReplicationConfig: replicationConfig,
Version: version,
FailoverVersion: failoverVersion,
DBVersion: dbVersion,
})
}

Expand Down
8 changes: 5 additions & 3 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,6 @@ type (
// DomainReplicationConfig describes the cross DC domain replication configuration
DomainReplicationConfig struct {
ActiveClusterName string
FailoverVersion int64
Clusters []*ClusterReplicationConfig
}

Expand All @@ -668,6 +667,7 @@ type (
OwnerEmail string
Config *DomainConfig
ReplicationConfig *DomainReplicationConfig
FailoverVersion int64
}

// CreateDomainResponse is the response for CreateDomain
Expand All @@ -686,15 +686,17 @@ type (
Info *DomainInfo
Config *DomainConfig
ReplicationConfig *DomainReplicationConfig
Version int64
FailoverVersion int64
DBVersion int64
}

// UpdateDomainRequest is used to update domain
UpdateDomainRequest struct {
Info *DomainInfo
Config *DomainConfig
ReplicationConfig *DomainReplicationConfig
Version int64
FailoverVersion int64
DBVersion int64
}

// DeleteDomainRequest is used to delete domain entry from domains table
Expand Down
2 changes: 1 addition & 1 deletion host/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,9 +378,9 @@ func (s *integrationSuite) TestIntegrationUpdateGetDomain_Failover() {
},
ReplicationConfig: &persistence.DomainReplicationConfig{
ActiveClusterName: activeClusterName,
FailoverVersion: failoverVersion,
Clusters: persistenceClusters,
},
FailoverVersion: failoverVersion,
})

// when doing the failover, the only thing can be updated is the active cluster
Expand Down
4 changes: 2 additions & 2 deletions schema/cadence/schema.cql
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ CREATE TYPE cluster_replication_config (

CREATE TYPE domain_replication_config (
active_cluster_name text,
failover_version bigint,
clusters list<frozen<cluster_replication_config>>
);

Expand Down Expand Up @@ -248,7 +247,8 @@ CREATE TABLE domains_by_name (
domain frozen<domain>,
config frozen<domain_config>,
replication_config frozen<domain_replication_config>, -- indicating active cluster and standby cluster used for replication
version bigint, --indicate the version of the record, used for update
failover_version bigint, -- indicating the version of active domain only, used for domain failover
db_version bigint, -- indicate the version of the record, used for update
PRIMARY KEY (name)
) WITH COMPACTION = {
'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'
Expand Down
4 changes: 2 additions & 2 deletions schema/cadence/versioned/v0.5/add_replication_config.cql
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ CREATE TYPE cluster_replication_config (

CREATE TYPE domain_replication_config (
active_cluster_name text,
failover_version bigint,
clusters list<frozen<cluster_replication_config>>
);

ALTER TABLE domains_by_name ADD replication_config frozen<domain_replication_config>;
ALTER TABLE domains_by_name ADD version bigint;
ALTER TABLE domains_by_name ADD failover_version bigint;
ALTER TABLE domains_by_name ADD db_version bigint;
Loading

0 comments on commit d914e07

Please sign in to comment.