Skip to content

Commit

Permalink
Merge branch 'replication_filter' of ssh://github.com/uber/cadence in…
Browse files Browse the repository at this point in the history
…to replication_filter
  • Loading branch information
yux0 committed Mar 24, 2021
2 parents 9767e24 + 7eeb0ea commit 09148e9
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,8 @@ func (m *nosqlDomainManager) CreateDomain(
err = m.db.InsertDomain(ctx, row)

if err != nil {
if m.db.IsConditionFailedError(err) {
return nil, &types.DomainAlreadyExistsError{
Message: fmt.Sprintf("CreateDomain operation failed because of conditional failure, %v", err),
}
if _, ok := err.(*types.DomainAlreadyExistsError); ok {
return nil, err
}
return nil, convertCommonErrors(m.db, "CreateDomain", err)
}
Expand Down
21 changes: 15 additions & 6 deletions common/persistence/nosql/nosqlplugin/cassandra/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,13 +224,22 @@ func (db *cdb) InsertDomain(
db.logger.Warn("Unable to delete orphan domain record. Error", tag.Error(errDelete))
}

if domain, ok := previous["domain"].(map[string]interface{}); ok {
msg := fmt.Sprintf("Domain already exists. DomainId: %v", domain)
db.logger.Warn(msg)
return errConditionFailed
for {
// first iter MapScan is done inside MapExecuteBatchCAS
if domain, ok := previous["name"].(string); ok && domain == row.Info.Name {
db.logger.Warn("Domain already exists", tag.WorkflowDomainName(domain))
return &types.DomainAlreadyExistsError{
Message: fmt.Sprintf("Domain %v already exists", previous["domain"]),
}
}

previous = make(map[string]interface{})
if !iter.MapScan(previous) {
break
}
}

db.logger.Warn("CreateDomain operation failed because of conditional failure.")
db.logger.Warn("Create domain operation failed because of condition update failure on domain metadata record")
return errConditionFailed
}

Expand Down Expand Up @@ -309,7 +318,7 @@ func (db *cdb) UpdateDomain(
return err
}
if !applied {
return fmt.Errorf("UpdateDomain operation failed because of conditional failure")
return errConditionFailed
}
return nil
}
Expand Down
100 changes: 60 additions & 40 deletions common/persistence/persistence-tests/metadataPersistenceV2Test.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,9 +376,16 @@ func (m *MetadataPersistenceSuiteV2) TestConcurrentCreateDomain() {
ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)
defer cancel()

id := uuid.New()
concurrency := 16
numDomains := 5
domainIDs := make([]string, numDomains)
names := make([]string, numDomains)
registered := make([]bool, numDomains)
for idx := range domainIDs {
domainIDs[idx] = uuid.New()
names[idx] = "concurrent-create-domain-test-name-" + strconv.Itoa(idx)
}

name := "concurrent-create-domain-test-name"
status := p.DomainStatusRegistered
description := "concurrent-create-domain-test-description"
owner := "create-domain-test-owner"
Expand Down Expand Up @@ -412,17 +419,17 @@ func (m *MetadataPersistenceSuiteV2) TestConcurrentCreateDomain() {
},
},
}
concurrency := 16
successCount := int32(0)
successCount := 0
var mutex sync.Mutex
var wg sync.WaitGroup
for i := 1; i <= concurrency; i++ {
newValue := fmt.Sprintf("v-%v", i)
wg.Add(1)
go func(data map[string]string) {
go func(idx int) {
data := map[string]string{"k0": fmt.Sprintf("v-%v", idx)}
_, err1 := m.CreateDomain(ctx,
&p.DomainInfo{
ID: id,
Name: name,
ID: domainIDs[idx%numDomains],
Name: names[idx%numDomains],
Status: status,
Description: description,
OwnerEmail: owner,
Expand All @@ -446,45 +453,58 @@ func (m *MetadataPersistenceSuiteV2) TestConcurrentCreateDomain() {
failoverVersion,
0,
)
mutex.Lock()
defer mutex.Unlock()
if err1 == nil {
atomic.AddInt32(&successCount, 1)
successCount++
registered[idx%numDomains] = true
}
if _, ok := err1.(*types.DomainAlreadyExistsError); ok {
registered[idx%numDomains] = true
}
wg.Done()
}(map[string]string{"k0": newValue})
}(i)
}
wg.Wait()
m.Equal(int32(1), successCount)
m.GreaterOrEqual(successCount, 1)

resp, err3 := m.GetDomain(ctx, "", name)
m.NoError(err3)
m.NotNil(resp)
m.Equal(name, resp.Info.Name)
m.Equal(status, resp.Info.Status)
m.Equal(description, resp.Info.Description)
m.Equal(owner, resp.Info.OwnerEmail)
m.Equal(retention, resp.Config.Retention)
m.Equal(emitMetric, resp.Config.EmitMetric)
m.Equal(historyArchivalStatus, resp.Config.HistoryArchivalStatus)
m.Equal(historyArchivalURI, resp.Config.HistoryArchivalURI)
m.Equal(visibilityArchivalStatus, resp.Config.VisibilityArchivalStatus)
m.Equal(visibilityArchivalURI, resp.Config.VisibilityArchivalURI)
m.Equal(testBinaries, resp.Config.BadBinaries)
m.Equal(clusterActive, resp.ReplicationConfig.ActiveClusterName)
m.Equal(len(clusters), len(resp.ReplicationConfig.Clusters))
for index := range clusters {
m.Equal(clusters[index], resp.ReplicationConfig.Clusters[index])
}
m.Equal(isGlobalDomain, resp.IsGlobalDomain)
m.Equal(configVersion, resp.ConfigVersion)
m.Equal(failoverVersion, resp.FailoverVersion)
m.Equal(common.InitialPreviousFailoverVersion, resp.PreviousFailoverVersion)
for i := 0; i != numDomains; i++ {
if !registered[i] {
continue
}

//check domain data
ss := strings.Split(resp.Info.Data["k0"], "-")
m.Equal(2, len(ss))
vi, err := strconv.Atoi(ss[1])
m.NoError(err)
m.Equal(true, vi > 0 && vi <= concurrency)
resp, err3 := m.GetDomain(ctx, "", names[i])
m.NoError(err3)
m.NotNil(resp)
m.Equal(domainIDs[i], resp.Info.ID)
m.Equal(names[i], resp.Info.Name)
m.Equal(status, resp.Info.Status)
m.Equal(description, resp.Info.Description)
m.Equal(owner, resp.Info.OwnerEmail)
m.Equal(retention, resp.Config.Retention)
m.Equal(emitMetric, resp.Config.EmitMetric)
m.Equal(historyArchivalStatus, resp.Config.HistoryArchivalStatus)
m.Equal(historyArchivalURI, resp.Config.HistoryArchivalURI)
m.Equal(visibilityArchivalStatus, resp.Config.VisibilityArchivalStatus)
m.Equal(visibilityArchivalURI, resp.Config.VisibilityArchivalURI)
m.Equal(testBinaries, resp.Config.BadBinaries)
m.Equal(clusterActive, resp.ReplicationConfig.ActiveClusterName)
m.Equal(len(clusters), len(resp.ReplicationConfig.Clusters))
for index := range clusters {
m.Equal(clusters[index], resp.ReplicationConfig.Clusters[index])
}
m.Equal(isGlobalDomain, resp.IsGlobalDomain)
m.Equal(configVersion, resp.ConfigVersion)
m.Equal(failoverVersion, resp.FailoverVersion)
m.Equal(common.InitialPreviousFailoverVersion, resp.PreviousFailoverVersion)

//check domain data
ss := strings.Split(resp.Info.Data["k0"], "-")
m.Equal(2, len(ss))
vi, err := strconv.Atoi(ss[1])
m.NoError(err)
m.Equal(true, vi > 0 && vi <= concurrency)
}
}

// TestConcurrentUpdateDomain test
Expand Down
Loading

0 comments on commit 09148e9

Please sign in to comment.