diff --git a/tools/cli/admin.go b/tools/cli/admin.go index 6fefc030e8b..674a3593cec 100644 --- a/tools/cli/admin.go +++ b/tools/cli/admin.go @@ -475,7 +475,7 @@ func newAdminKafkaCommands() []cli.Command { { Name: "rereplicate", Aliases: []string{"rrp"}, - Usage: "Rereplicate replication tasks to target topic from history tables", + Usage: "Rereplicate replication tasks from history tables", Flags: append(getDBFlags(), cli.StringFlag{ Name: FlagSourceCluster, diff --git a/tools/cli/adminKafkaCommands.go b/tools/cli/adminKafkaCommands.go index b1ed9915885..3d460784b72 100644 --- a/tools/cli/adminKafkaCommands.go +++ b/tools/cli/adminKafkaCommands.go @@ -42,10 +42,7 @@ import ( "github.com/uber/cadence/client/admin" "github.com/uber/cadence/common" "github.com/uber/cadence/common/config" - "github.com/uber/cadence/common/log/loggerimpl" "github.com/uber/cadence/common/persistence" - "github.com/uber/cadence/common/persistence/cassandra" - "github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql" "github.com/uber/cadence/common/types" "github.com/uber/cadence/common/types/mapper/thrift" ) @@ -462,14 +459,9 @@ func doRereplicate( endEventID int64, endEventVersion int64, sourceCluster string, - cqlClient gocql.Client, - session gocql.Session, adminClient admin.Client, + exeMgr persistence.ExecutionManager, ) { - - exeM, _ := cassandra.NewWorkflowExecutionPersistence(shardID, cqlClient, session, loggerimpl.NewNopLogger()) - exeMgr := persistence.NewExecutionManagerImpl(exeM, loggerimpl.NewNopLogger()) - fmt.Printf("Start rereplicate for wid: %v, rid:%v \n", wid, rid) resp, err := exeMgr.GetWorkflowExecution(ctx, &persistence.GetWorkflowExecutionRequest{ DomainID: domainID, @@ -517,7 +509,6 @@ func AdminRereplicate(c *cli.Context) { ErrorAndExit("End event version is not defined", nil) } - client, session := connectToCassandra(c) adminClient := cFactory.ServerAdminClient(c) endEventID := c.Int64(FlagMaxEventID) endVersion := c.Int64(FlagEndEventVersion) @@ -526,6 +517,8 @@ func AdminRereplicate(c *cli.Context) { rid := getRequiredOption(c, FlagRunID) shardID := common.WorkflowIDToHistoryShard(wid, numberOfShards) contextTimeout := defaultResendContextTimeout + executionManager := initializeExecutionStore(c, shardID, 0) + defer executionManager.Close() if c.GlobalIsSet(FlagContextTimeout) { contextTimeout = time.Duration(c.GlobalInt(FlagContextTimeout)) * time.Second } @@ -541,9 +534,8 @@ func AdminRereplicate(c *cli.Context) { endEventID, endVersion, sourceCluster, - client, - session, adminClient, + executionManager, ) }