Skip to content

Commit

Permalink
Refactoring Cassandra shard persistence manager for NoSQL support
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored May 25, 2021
1 parent e2cde4a commit a49e2da
Show file tree
Hide file tree
Showing 5 changed files with 533 additions and 319 deletions.
87 changes: 0 additions & 87 deletions common/persistence/cassandra/cassandraPersistenceUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -1435,93 +1435,6 @@ func updateBufferedEvents(
}
}

func createShardInfo(
currentCluster string,
rangeID int64,
shard map[string]interface{},
) *p.InternalShardInfo {

var pendingFailoverMarkersRawData []byte
var pendingFailoverMarkersEncoding string
var transferProcessingQueueStatesRawData []byte
var transferProcessingQueueStatesEncoding string
var timerProcessingQueueStatesRawData []byte
var timerProcessingQueueStatesEncoding string
info := &p.InternalShardInfo{}
info.RangeID = rangeID
for k, v := range shard {
switch k {
case "shard_id":
info.ShardID = v.(int)
case "owner":
info.Owner = v.(string)
case "stolen_since_renew":
info.StolenSinceRenew = v.(int)
case "updated_at":
info.UpdatedAt = v.(time.Time)
case "replication_ack_level":
info.ReplicationAckLevel = v.(int64)
case "transfer_ack_level":
info.TransferAckLevel = v.(int64)
case "timer_ack_level":
info.TimerAckLevel = v.(time.Time)
case "cluster_transfer_ack_level":
info.ClusterTransferAckLevel = v.(map[string]int64)
case "cluster_timer_ack_level":
info.ClusterTimerAckLevel = v.(map[string]time.Time)
case "transfer_processing_queue_states":
transferProcessingQueueStatesRawData = v.([]byte)
case "transfer_processing_queue_states_encoding":
transferProcessingQueueStatesEncoding = v.(string)
case "timer_processing_queue_states":
timerProcessingQueueStatesRawData = v.([]byte)
case "timer_processing_queue_states_encoding":
timerProcessingQueueStatesEncoding = v.(string)
case "domain_notification_version":
info.DomainNotificationVersion = v.(int64)
case "cluster_replication_level":
info.ClusterReplicationLevel = v.(map[string]int64)
case "replication_dlq_ack_level":
info.ReplicationDLQAckLevel = v.(map[string]int64)
case "pending_failover_markers":
pendingFailoverMarkersRawData = v.([]byte)
case "pending_failover_markers_encoding":
pendingFailoverMarkersEncoding = v.(string)
}
}

if info.ClusterTransferAckLevel == nil {
info.ClusterTransferAckLevel = map[string]int64{
currentCluster: info.TransferAckLevel,
}
}
if info.ClusterTimerAckLevel == nil {
info.ClusterTimerAckLevel = map[string]time.Time{
currentCluster: info.TimerAckLevel,
}
}
if info.ClusterReplicationLevel == nil {
info.ClusterReplicationLevel = make(map[string]int64)
}
if info.ReplicationDLQAckLevel == nil {
info.ReplicationDLQAckLevel = make(map[string]int64)
}
info.PendingFailoverMarkers = p.NewDataBlob(
pendingFailoverMarkersRawData,
common.EncodingType(pendingFailoverMarkersEncoding),
)
info.TransferProcessingQueueStates = p.NewDataBlob(
transferProcessingQueueStatesRawData,
common.EncodingType(transferProcessingQueueStatesEncoding),
)
info.TimerProcessingQueueStates = p.NewDataBlob(
timerProcessingQueueStatesRawData,
common.EncodingType(timerProcessingQueueStatesEncoding),
)

return info
}

func createWorkflowExecutionInfo(
result map[string]interface{},
) *p.InternalWorkflowExecutionInfo {
Expand Down
Loading

0 comments on commit a49e2da

Please sign in to comment.