Skip to content

Commit

Permalink
Rename persistence files for consistency (#4256)
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored Jun 10, 2021
1 parent ff5e37a commit 99fb216
Show file tree
Hide file tree
Showing 30 changed files with 168 additions and 157 deletions.
3 changes: 2 additions & 1 deletion common/archiver/historyIterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/persistence-utils"
"github.com/uber/cadence/common/types"
)

Expand Down Expand Up @@ -236,7 +237,7 @@ func (i *historyIterator) readHistory(ctx context.Context, firstEventID int64) (
PageSize: i.historyPageSize,
ShardID: common.IntPtr(i.request.ShardID),
}
historyBatches, _, _, err := persistence.ReadFullPageV2EventsByBatch(ctx, i.historyV2Manager, req)
historyBatches, _, _, err := persistenceutils.ReadFullPageV2EventsByBatch(ctx, i.historyV2Manager, req)
return historyBatches, err

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package persistence
package cluster

import "github.com/uber/cadence/common/persistence"

// GetOrUseDefaultActiveCluster return the current cluster name or use the input if valid
func GetOrUseDefaultActiveCluster(currentClusterName string, activeClusterName string) string {
Expand All @@ -29,10 +31,10 @@ func GetOrUseDefaultActiveCluster(currentClusterName string, activeClusterName s
}

// GetOrUseDefaultClusters return the current cluster or use the input if valid
func GetOrUseDefaultClusters(currentClusterName string, clusters []*ClusterReplicationConfig) []*ClusterReplicationConfig {
func GetOrUseDefaultClusters(currentClusterName string, clusters []*persistence.ClusterReplicationConfig) []*persistence.ClusterReplicationConfig {
if len(clusters) == 0 {
return []*ClusterReplicationConfig{
&ClusterReplicationConfig{
return []*persistence.ClusterReplicationConfig{
&persistence.ClusterReplicationConfig{
ClusterName: currentClusterName,
},
}
Expand Down
2 changes: 1 addition & 1 deletion common/domain/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (d *handlerImpl) RegisterDomain(
clusterName := clusterConfig.GetClusterName()
clusters = append(clusters, &persistence.ClusterReplicationConfig{ClusterName: clusterName})
}
clusters = persistence.GetOrUseDefaultClusters(activeClusterName, clusters)
clusters = cluster.GetOrUseDefaultClusters(activeClusterName, clusters)

currentHistoryArchivalState := neverEnabledState()
nextHistoryArchivalState := currentHistoryArchivalState
Expand Down
10 changes: 5 additions & 5 deletions common/domain/handler_GlobalDomainDisabled_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (s *domainHandlerGlobalDomainDisabledSuite) TestRegisterGetDomain_InvalidCl
func (s *domainHandlerGlobalDomainDisabledSuite) TestRegisterGetDomain_AllDefault() {
domainName := s.getRandomDomainName()
var clusters []*types.ClusterReplicationConfiguration
for _, replicationConfig := range persistence.GetOrUseDefaultClusters(s.ClusterMetadata.GetCurrentClusterName(), nil) {
for _, replicationConfig := range cluster.GetOrUseDefaultClusters(s.ClusterMetadata.GetCurrentClusterName(), nil) {
clusters = append(clusters, &types.ClusterReplicationConfiguration{
ClusterName: replicationConfig.ClusterName,
})
Expand Down Expand Up @@ -244,7 +244,7 @@ func (s *domainHandlerGlobalDomainDisabledSuite) TestRegisterGetDomain_NoDefault
isGlobalDomain := false

var expectedClusters []*types.ClusterReplicationConfiguration
for _, replicationConfig := range persistence.GetOrUseDefaultClusters(s.ClusterMetadata.GetCurrentClusterName(), nil) {
for _, replicationConfig := range cluster.GetOrUseDefaultClusters(s.ClusterMetadata.GetCurrentClusterName(), nil) {
expectedClusters = append(expectedClusters, &types.ClusterReplicationConfiguration{
ClusterName: replicationConfig.ClusterName,
})
Expand Down Expand Up @@ -303,7 +303,7 @@ func (s *domainHandlerGlobalDomainDisabledSuite) TestUpdateGetDomain_NoAttrSet()
emitMetric := true
data := map[string]string{"some random key": "some random value"}
var clusters []*types.ClusterReplicationConfiguration
for _, replicationConfig := range persistence.GetOrUseDefaultClusters(s.ClusterMetadata.GetCurrentClusterName(), nil) {
for _, replicationConfig := range cluster.GetOrUseDefaultClusters(s.ClusterMetadata.GetCurrentClusterName(), nil) {
clusters = append(clusters, &types.ClusterReplicationConfiguration{
ClusterName: replicationConfig.ClusterName,
})
Expand Down Expand Up @@ -396,7 +396,7 @@ func (s *domainHandlerGlobalDomainDisabledSuite) TestUpdateGetDomain_AllAttrSet(
data := map[string]string{"some random key": "some random value"}

var expectedClusters []*types.ClusterReplicationConfiguration
for _, replicationConfig := range persistence.GetOrUseDefaultClusters(s.ClusterMetadata.GetCurrentClusterName(), nil) {
for _, replicationConfig := range cluster.GetOrUseDefaultClusters(s.ClusterMetadata.GetCurrentClusterName(), nil) {
expectedClusters = append(expectedClusters, &types.ClusterReplicationConfiguration{
ClusterName: replicationConfig.ClusterName,
})
Expand Down Expand Up @@ -499,7 +499,7 @@ func setupLocalDomain(s suite.Suite, handler *handlerImpl, clusterMetadata clust
emitMetric := true
data := map[string]string{"some random key": "some random value"}
var clusters []*types.ClusterReplicationConfiguration
for _, replicationConfig := range persistence.GetOrUseDefaultClusters(clusterMetadata.GetCurrentClusterName(), nil) {
for _, replicationConfig := range cluster.GetOrUseDefaultClusters(clusterMetadata.GetCurrentClusterName(), nil) {
clusters = append(clusters, &types.ClusterReplicationConfiguration{
ClusterName: replicationConfig.ClusterName,
})
Expand Down
12 changes: 6 additions & 6 deletions common/domain/handler_GlobalDomainEnabled_MasterCluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (s *domainHandlerGlobalDomainEnabledPrimaryClusterSuite) TestRegisterGetDom
domainName := s.getRandomDomainName()
isGlobalDomain := false
var clusters []*types.ClusterReplicationConfiguration
for _, replicationConfig := range persistence.GetOrUseDefaultClusters(s.ClusterMetadata.GetCurrentClusterName(), nil) {
for _, replicationConfig := range cluster.GetOrUseDefaultClusters(s.ClusterMetadata.GetCurrentClusterName(), nil) {
clusters = append(clusters, &types.ClusterReplicationConfiguration{
ClusterName: replicationConfig.ClusterName,
})
Expand Down Expand Up @@ -217,7 +217,7 @@ func (s *domainHandlerGlobalDomainEnabledPrimaryClusterSuite) TestRegisterGetDom
isGlobalDomain := false

var expectedClusters []*types.ClusterReplicationConfiguration
for _, replicationConfig := range persistence.GetOrUseDefaultClusters(s.ClusterMetadata.GetCurrentClusterName(), nil) {
for _, replicationConfig := range cluster.GetOrUseDefaultClusters(s.ClusterMetadata.GetCurrentClusterName(), nil) {
expectedClusters = append(expectedClusters, &types.ClusterReplicationConfiguration{
ClusterName: replicationConfig.ClusterName,
})
Expand Down Expand Up @@ -276,7 +276,7 @@ func (s *domainHandlerGlobalDomainEnabledPrimaryClusterSuite) TestUpdateGetDomai
emitMetric := true
data := map[string]string{"some random key": "some random value"}
var clusters []*types.ClusterReplicationConfiguration
for _, replicationConfig := range persistence.GetOrUseDefaultClusters(s.ClusterMetadata.GetCurrentClusterName(), nil) {
for _, replicationConfig := range cluster.GetOrUseDefaultClusters(s.ClusterMetadata.GetCurrentClusterName(), nil) {
clusters = append(clusters, &types.ClusterReplicationConfiguration{
ClusterName: replicationConfig.ClusterName,
})
Expand Down Expand Up @@ -366,7 +366,7 @@ func (s *domainHandlerGlobalDomainEnabledPrimaryClusterSuite) TestUpdateGetDomai
emitMetric := true
data := map[string]string{"some random key": "some random value"}
var clusters []*types.ClusterReplicationConfiguration
for _, replicationConfig := range persistence.GetOrUseDefaultClusters(s.ClusterMetadata.GetCurrentClusterName(), nil) {
for _, replicationConfig := range cluster.GetOrUseDefaultClusters(s.ClusterMetadata.GetCurrentClusterName(), nil) {
clusters = append(clusters, &types.ClusterReplicationConfiguration{
ClusterName: replicationConfig.ClusterName,
})
Expand Down Expand Up @@ -461,7 +461,7 @@ func (s *domainHandlerGlobalDomainEnabledPrimaryClusterSuite) TestRegisterGetDom
domainName := s.getRandomDomainName()
isGlobalDomain := true
var clusters []*types.ClusterReplicationConfiguration
for _, replicationConfig := range persistence.GetOrUseDefaultClusters(s.ClusterMetadata.GetCurrentClusterName(), nil) {
for _, replicationConfig := range cluster.GetOrUseDefaultClusters(s.ClusterMetadata.GetCurrentClusterName(), nil) {
clusters = append(clusters, &types.ClusterReplicationConfiguration{
ClusterName: replicationConfig.ClusterName,
})
Expand Down Expand Up @@ -881,7 +881,7 @@ func (s *domainHandlerGlobalDomainEnabledPrimaryClusterSuite) TestUpdateDomain_C
domainName := s.getRandomDomainName()
isGlobalDomain := true
var clusters []*types.ClusterReplicationConfiguration
for _, replicationConfig := range persistence.GetOrUseDefaultClusters(s.ClusterMetadata.GetCurrentClusterName(), nil) {
for _, replicationConfig := range cluster.GetOrUseDefaultClusters(s.ClusterMetadata.GetCurrentClusterName(), nil) {
clusters = append(clusters, &types.ClusterReplicationConfiguration{
ClusterName: replicationConfig.ClusterName,
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (s *domainHandlerGlobalDomainEnabledNotPrimaryClusterSuite) TestRegisterGet
domainName := s.getRandomDomainName()
isGlobalDomain := false
var clusters []*types.ClusterReplicationConfiguration
for _, replicationConfig := range persistence.GetOrUseDefaultClusters(s.ClusterMetadata.GetCurrentClusterName(), nil) {
for _, replicationConfig := range cluster.GetOrUseDefaultClusters(s.ClusterMetadata.GetCurrentClusterName(), nil) {
clusters = append(clusters, &types.ClusterReplicationConfiguration{
ClusterName: replicationConfig.ClusterName,
})
Expand Down Expand Up @@ -188,7 +188,7 @@ func (s *domainHandlerGlobalDomainEnabledNotPrimaryClusterSuite) TestRegisterGet
isGlobalDomain := false

var expectedClusters []*types.ClusterReplicationConfiguration
for _, replicationConfig := range persistence.GetOrUseDefaultClusters(s.ClusterMetadata.GetCurrentClusterName(), nil) {
for _, replicationConfig := range cluster.GetOrUseDefaultClusters(s.ClusterMetadata.GetCurrentClusterName(), nil) {
expectedClusters = append(expectedClusters, &types.ClusterReplicationConfiguration{
ClusterName: replicationConfig.ClusterName,
})
Expand Down Expand Up @@ -247,7 +247,7 @@ func (s *domainHandlerGlobalDomainEnabledNotPrimaryClusterSuite) TestUpdateGetDo
emitMetric := true
data := map[string]string{"some random key": "some random value"}
var clusters []*types.ClusterReplicationConfiguration
for _, replicationConfig := range persistence.GetOrUseDefaultClusters(s.ClusterMetadata.GetCurrentClusterName(), nil) {
for _, replicationConfig := range cluster.GetOrUseDefaultClusters(s.ClusterMetadata.GetCurrentClusterName(), nil) {
clusters = append(clusters, &types.ClusterReplicationConfiguration{
ClusterName: replicationConfig.ClusterName,
})
Expand Down Expand Up @@ -337,7 +337,7 @@ func (s *domainHandlerGlobalDomainEnabledNotPrimaryClusterSuite) TestUpdateGetDo
emitMetric := true
data := map[string]string{"some random key": "some random value"}
var clusters []*types.ClusterReplicationConfiguration
for _, replicationConfig := range persistence.GetOrUseDefaultClusters(s.ClusterMetadata.GetCurrentClusterName(), nil) {
for _, replicationConfig := range cluster.GetOrUseDefaultClusters(s.ClusterMetadata.GetCurrentClusterName(), nil) {
clusters = append(clusters, &types.ClusterReplicationConfiguration{
ClusterName: replicationConfig.ClusterName,
})
Expand Down Expand Up @@ -432,7 +432,7 @@ func (s *domainHandlerGlobalDomainEnabledNotPrimaryClusterSuite) TestRegisterGet
domainName := s.getRandomDomainName()
isGlobalDomain := true
var clusters []*types.ClusterReplicationConfiguration
for _, replicationConfig := range persistence.GetOrUseDefaultClusters(s.ClusterMetadata.GetCurrentClusterName(), nil) {
for _, replicationConfig := range cluster.GetOrUseDefaultClusters(s.ClusterMetadata.GetCurrentClusterName(), nil) {
clusters = append(clusters, &types.ClusterReplicationConfiguration{
ClusterName: replicationConfig.ClusterName,
})
Expand Down
2 changes: 1 addition & 1 deletion common/domain/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (s *domainHandlerCommonSuite) TestListDomain() {
isGlobalDomain1 := false
activeClusterName1 := s.ClusterMetadata.GetCurrentClusterName()
var cluster1 []*types.ClusterReplicationConfiguration
for _, replicationConfig := range persistence.GetOrUseDefaultClusters(s.ClusterMetadata.GetCurrentClusterName(), nil) {
for _, replicationConfig := range cluster.GetOrUseDefaultClusters(s.ClusterMetadata.GetCurrentClusterName(), nil) {
cluster1 = append(cluster1, &types.ClusterReplicationConfiguration{
ClusterName: replicationConfig.ClusterName,
})
Expand Down
7 changes: 4 additions & 3 deletions common/persistence/cassandra/cassandraHistoryPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql"
"github.com/uber/cadence/common/persistence/persistence-utils"
"github.com/uber/cadence/common/types"
)

Expand Down Expand Up @@ -86,7 +87,7 @@ func (h *nosqlHistoryManager) AppendHistoryNodes(
) error {

branchInfo := request.BranchInfo
beginNodeID := p.GetBeginNodeID(branchInfo)
beginNodeID := persistenceutils.GetBeginNodeID(branchInfo)

if request.NodeID < beginNodeID {
return &p.InvalidPersistenceRequestError{
Expand Down Expand Up @@ -253,7 +254,7 @@ func (h *nosqlHistoryManager) ForkHistoryBranch(
treeID := *forkB.TreeID
newAncestors := make([]*types.HistoryBranchRange, 0, len(forkB.Ancestors)+1)

beginNodeID := p.GetBeginNodeID(forkB)
beginNodeID := persistenceutils.GetBeginNodeID(forkB)
if beginNodeID >= request.ForkNodeID {
// this is the case that new branch's ancestors doesn't include the forking branch
for _, br := range forkB.Ancestors {
Expand Down Expand Up @@ -318,7 +319,7 @@ func (h *nosqlHistoryManager) DeleteHistoryBranch(
branch := request.BranchInfo
treeID := *branch.TreeID
brsToDelete := branch.Ancestors
beginNodeID := p.GetBeginNodeID(branch)
beginNodeID := persistenceutils.GetBeginNodeID(branch)
brsToDelete = append(brsToDelete, &types.HistoryBranchRange{
BranchID: branch.BranchID,
BeginNodeID: common.Int64Ptr(beginNodeID),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"fmt"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/log"
p "github.com/uber/cadence/common/persistence"
Expand Down Expand Up @@ -172,8 +173,8 @@ func (m *nosqlDomainManager) GetDomain(
if row.Info.Data == nil {
row.Info.Data = map[string]string{}
}
row.ReplicationConfig.ActiveClusterName = p.GetOrUseDefaultActiveCluster(m.currentClusterName, row.ReplicationConfig.ActiveClusterName)
row.ReplicationConfig.Clusters = p.GetOrUseDefaultClusters(m.currentClusterName, row.ReplicationConfig.Clusters)
row.ReplicationConfig.ActiveClusterName = cluster.GetOrUseDefaultActiveCluster(m.currentClusterName, row.ReplicationConfig.ActiveClusterName)
row.ReplicationConfig.Clusters = cluster.GetOrUseDefaultClusters(m.currentClusterName, row.ReplicationConfig.Clusters)

domainConfig, err := m.fromNoSQLInternalDomainConfig(row.Config)
if err != nil {
Expand Down Expand Up @@ -210,8 +211,8 @@ func (m *nosqlDomainManager) ListDomains(
if row.Info.Data == nil {
row.Info.Data = map[string]string{}
}
row.ReplicationConfig.ActiveClusterName = p.GetOrUseDefaultActiveCluster(m.currentClusterName, row.ReplicationConfig.ActiveClusterName)
row.ReplicationConfig.Clusters = p.GetOrUseDefaultClusters(m.currentClusterName, row.ReplicationConfig.Clusters)
row.ReplicationConfig.ActiveClusterName = cluster.GetOrUseDefaultActiveCluster(m.currentClusterName, row.ReplicationConfig.ActiveClusterName)
row.ReplicationConfig.Clusters = cluster.GetOrUseDefaultClusters(m.currentClusterName, row.ReplicationConfig.Clusters)

domainConfig, err := m.fromNoSQLInternalDomainConfig(row.Config)
if err != nil {
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -562,3 +562,10 @@ func (m *historyV2ManagerImpl) serializeToken(
func (m *historyV2ManagerImpl) Close() {
m.persistence.Close()
}

func getShardID(shardID *int) (int, error) {
if shardID == nil {
return 0, fmt.Errorf("shardID is not set for persistence operation")
}
return *shardID, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package persistence
package persistenceutils

import (
"context"
"fmt"

"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/types"
)

Expand All @@ -32,8 +32,8 @@ import (
// of data read, the next page token, and an error if present.
func ReadFullPageV2Events(
ctx context.Context,
historyV2Mgr HistoryManager,
req *ReadHistoryBranchRequest,
historyV2Mgr persistence.HistoryManager,
req *persistence.ReadHistoryBranchRequest,
) ([]*types.HistoryEvent, int, []byte, error) {
historyEvents := []*types.HistoryEvent{}
size := int(0)
Expand All @@ -56,8 +56,8 @@ func ReadFullPageV2Events(
// of data read, the next page token, and an error if present.
func ReadFullPageV2EventsByBatch(
ctx context.Context,
historyV2Mgr HistoryManager,
req *ReadHistoryBranchRequest,
historyV2Mgr persistence.HistoryManager,
req *persistence.ReadHistoryBranchRequest,
) ([]*types.History, int, []byte, error) {
historyBatches := []*types.History{}
eventsRead := 0
Expand Down Expand Up @@ -92,7 +92,7 @@ func GetBeginNodeID(bi types.HistoryBranch) int64 {
// PaginateHistory return paged history
func PaginateHistory(
ctx context.Context,
historyV2Mgr HistoryManager,
historyV2Mgr persistence.HistoryManager,
byBatch bool,
branchToken []byte,
firstEventID int64,
Expand All @@ -107,7 +107,7 @@ func PaginateHistory(
var tokenOut []byte
var historySize int

req := &ReadHistoryBranchRequest{
req := &persistence.ReadHistoryBranchRequest{
BranchToken: branchToken,
MinEventID: firstEventID,
MaxEventID: nextEventID,
Expand Down Expand Up @@ -140,10 +140,3 @@ func PaginateHistory(

return historyEvents, historyBatches, tokenOut, historySize, nil
}

func getShardID(shardID *int) (int, error) {
if shardID == nil {
return 0, fmt.Errorf("shardID is not set for persistence operation")
}
return *shardID, nil
}
2 changes: 1 addition & 1 deletion common/persistence/sql/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (f *Factory) NewQueue(queueType p.QueueType) (p.Queue, error) {
return nil, err
}

return newQueue(conn, f.logger, queueType)
return newQueueStore(conn, f.logger, queueType)
}

// Close closes the factory
Expand Down
Loading

0 comments on commit 99fb216

Please sign in to comment.