Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor visibility migration code and add support for OpenSearch visibility migration #6284

Merged
merged 11 commits into from
Sep 26, 2024
144 changes: 88 additions & 56 deletions cmd/server/cadence/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,62 +215,7 @@ func (s *server) startService() common.Daemon {
}

if isAdvancedVisEnabled {
// verify config of advanced visibility store
advancedVisStoreKey := s.cfg.Persistence.AdvancedVisibilityStore
advancedVisStore, ok := s.cfg.Persistence.DataStores[advancedVisStoreKey]
if !ok {
log.Fatalf("not able to find advanced visibility store in config: %v", advancedVisStoreKey)
}

// there are 3 circumstances:
// 1. advanced visibility store == elasticsearch, use ESClient and visibilityDualManager
// 2. advanced visibility store == pinot and in process of migration, use ESClient, PinotClient and and visibilityTripleManager
// 3. advanced visibility store == pinot and not migrating, use PinotClient and visibilityDualManager
if params.PersistenceConfig.AdvancedVisibilityStore == common.PinotVisibilityStoreName {
if advancedVisStore.Pinot.Migration.Enabled {
esVisibilityStore, ok := s.cfg.Persistence.DataStores[common.ESVisibilityStoreName]
if !ok {
log.Fatalf("not able to find elasticsearch visibility store in config")
}
params.ESConfig = esVisibilityStore.ElasticSearch
params.ESConfig.SetUsernamePassword()
esClient, err := elasticsearch.NewGenericClient(params.ESConfig, params.Logger)
if err != nil {
log.Fatalf("error creating elastic search client: %v", err)
}
params.ESClient = esClient

// verify index name
indexName, ok := params.ESConfig.Indices[common.VisibilityAppName]
if !ok || len(indexName) == 0 {
log.Fatalf("elastic search config missing visibility index")
}
} else {
params.ESClient = nil
}
params.PinotConfig = advancedVisStore.Pinot
pinotBroker := params.PinotConfig.Broker
pinotRawClient, err := pinot.NewFromBrokerList([]string{pinotBroker})
if err != nil || pinotRawClient == nil {
log.Fatalf("Creating Pinot visibility client failed: %v", err)
}
pinotClient := pnt.NewPinotClient(pinotRawClient, params.Logger, params.PinotConfig)
params.PinotClient = pinotClient
} else {
params.ESConfig = advancedVisStore.ElasticSearch
params.ESConfig.SetUsernamePassword()
esClient, err := elasticsearch.NewGenericClient(params.ESConfig, params.Logger)
if err != nil {
log.Fatalf("error creating elastic search client: %v", err)
}
params.ESClient = esClient

// verify index name
indexName, ok := params.ESConfig.Indices[common.VisibilityAppName]
if !ok || len(indexName) == 0 {
log.Fatalf("elastic search config missing visibility index")
}
}
s.setupVisibilityClients(&params)
}

publicClientConfig := params.RPCFactory.GetDispatcher().ClientConfig(rpc.OutboundPublicClient)
Expand Down Expand Up @@ -337,3 +282,90 @@ func execute(d common.Daemon, doneC chan struct{}) {
d.Start()
close(doneC)
}

// there are multiple circumstances:
// 1. advanced visibility store == elasticsearch, use ESClient and visibilityDualManager
// 2. advanced visibility store == pinot and in process of migration, use ESClient, PinotClient and and visibilityTripleManager
// 3. advanced visibility store == pinot and not migrating, use PinotClient and visibilityDualManager
// 4. advanced visibility store == opensearch and not migrating, this performs the same as 1, just use different version ES client and visibilityDualManager
// 5. advanced visibility store == opensearch and in process of migration, use ESClient and visibilityTripleManager
func (s *server) setupVisibilityClients(params *resource.Params) {
advancedVisStoreKey := s.cfg.Persistence.AdvancedVisibilityStore
advancedVisStore, ok := s.cfg.Persistence.DataStores[advancedVisStoreKey]
if !ok {
log.Fatalf("Cannot find advanced visibility store in config: %v", advancedVisStoreKey)
}

// Handle advanced visibility store based on type and migration state
switch advancedVisStoreKey {
case common.PinotVisibilityStoreName:
s.setupPinotClient(params, advancedVisStore)
case common.OSVisibilityStoreName:
s.setupOSClient(params, advancedVisStore)
default: // Assume Elasticsearch by default
s.setupESClient(params)
}
}

func (s *server) setupPinotClient(params *resource.Params, advancedVisStore config.DataStore) {
params.PinotConfig = advancedVisStore.Pinot
pinotBroker := params.PinotConfig.Broker
pinotRawClient, err := pinot.NewFromBrokerList([]string{pinotBroker})
if err != nil || pinotRawClient == nil {
log.Fatalf("Creating Pinot visibility client failed: %v", err)
}
params.PinotClient = pnt.NewPinotClient(pinotRawClient, params.Logger, params.PinotConfig)
if advancedVisStore.Pinot.Migration.Enabled {
s.setupESClient(params)
}
}

func (s *server) setupESClient(params *resource.Params) {
esVisibilityStore, ok := s.cfg.Persistence.DataStores[common.ESVisibilityStoreName]
if !ok {
log.Fatalf("Cannot find Elasticsearch visibility store in config")
}

params.ESConfig = esVisibilityStore.ElasticSearch
params.ESConfig.SetUsernamePassword()

esClient, err := elasticsearch.NewGenericClient(params.ESConfig, params.Logger)
if err != nil {
log.Fatalf("Error creating Elasticsearch client: %v", err)
}
params.ESClient = esClient

validateIndex(params.ESConfig)
}

func (s *server) setupOSClient(params *resource.Params, advancedVisStore config.DataStore) {
// OpenSearch client setup (same structure as Elasticsearch, just version difference)
// This is only for migration purposes
params.OSConfig = advancedVisStore.ElasticSearch
params.OSConfig.SetUsernamePassword()

osClient, err := elasticsearch.NewGenericClient(params.OSConfig, params.Logger)
if err != nil {
log.Fatalf("Error creating OpenSearch client: %v", err)
}
params.OSClient = osClient

validateIndex(params.OSConfig)

if advancedVisStore.ElasticSearch.Migration.Enabled {
s.setupESClient(params)
} else {
// this should not happen since when it is not in migration node
// we will use es-visibility and set the version to os2 instead of using os-visibility
neil-xie marked this conversation as resolved.
Show resolved Hide resolved
params.ESConfig = advancedVisStore.ElasticSearch
params.ESConfig.SetUsernamePassword()
params.ESClient = osClient
}
}

func validateIndex(config *config.ElasticSearchConfig) {
indexName, ok := config.Indices[common.VisibilityAppName]
if !ok || len(indexName) == 0 {
log.Fatalf("Visibility index is missing in config")
}
}
7 changes: 6 additions & 1 deletion common/config/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ type (
// optional to use Signed Certificates over https
TLS TLS `yaml:"tls"`
// optional to add custom headers
CustomHeaders map[string]string `yaml:"customHeaders,omitempty"`
CustomHeaders map[string]string `yaml:"customHeaders,omitempty"`
Migration VisibilityMigration `yaml:"migration"`
}

// AWSSigning contains config to enable signing,
Expand Down Expand Up @@ -84,6 +85,10 @@ type (
AWSEnvironmentCredential struct {
Region string `yaml:"region"`
}

VisibilityMigration struct {
Enabled bool `yaml:"enabled"`
}
)

// GetVisibilityIndex return visibility index name
Expand Down
14 changes: 5 additions & 9 deletions common/config/pinot.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,10 @@ package config
// PinotVisibilityConfig for connecting to Pinot
type (
PinotVisibilityConfig struct {
Cluster string `yaml:"cluster"` //nolint:govet
Broker string `yaml:"broker"` //nolint:govet
Table string `yaml:"table"` //nolint:govet
ServiceName string `yaml:"serviceName"` //nolint:govet
Migration PinotMigration `yaml:"migration"` //nolint:govet
}

PinotMigration struct {
Enabled bool `yaml:"enabled"` //nolint:govet
Cluster string `yaml:"cluster"` //nolint:govet
Broker string `yaml:"broker"` //nolint:govet
Table string `yaml:"table"` //nolint:govet
ServiceName string `yaml:"serviceName"` //nolint:govet
Migration VisibilityMigration `yaml:"migration"` //nolint:govet
}
)
18 changes: 16 additions & 2 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ const (
ESVisibilityStoreName = "es-visibility"
// PinotVisibilityStoreName is used to find pinot advanced visibility store
PinotVisibilityStoreName = "pinot-visibility"
// OSVisibilityStoreName is used to find opensearch advanced visibility store
OSVisibilityStoreName = "os-visibility"
)

// This was flagged by salus as potentially hardcoded credentials. This is a false positive by the scanner and should be
Expand Down Expand Up @@ -157,8 +159,20 @@ const (
AdvancedVisibilityWritingModeOn = "on"
// AdvancedVisibilityWritingModeDual means write to both normal visibility and advanced visibility store
AdvancedVisibilityWritingModeDual = "dual"
// AdvancedVisibilityWritingModeTriple means write to both normal visibility and advanced visibility store, includes ES and Pinot
AdvancedVisibilityWritingModeTriple = "triple"
)

// enum for dynamic config AdvancedVisibilityMigrationWritingMode
const (
// AdvancedVisibilityMigrationWritingModeOff means do not write to advanced visibility store
AdvancedVisibilityMigrationWritingModeOff = "off"
// AdvancedVisibilityMigrationWritingModeTriple means write to normal visibility and advanced visibility store
AdvancedVisibilityMigrationWritingModeTriple = "triple"
// AdvancedVisibilityMigrationWritingModeDual means write to both advanced visibility stores
AdvancedVisibilityMigrationWritingModeDual = "dual"
// AdvancedVisibilityMigrationWritingModeTriple means write to source visibility store during migration
AdvancedVisibilityMigrationWritingModeSource = "source"
// AdvancedVisibilityMigrationWritingModeDestination means write to destination visibility store during migration
AdvancedVisibilityMigrationWritingModeDestination = "destination"
)

const (
Expand Down
11 changes: 11 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2325,6 +2325,12 @@ const (
// Default value: "on"
// Allowed filters: N/A
AdvancedVisibilityWritingMode
// AdvancedVisibilityMigrationWritingMode is key for how to write to advanced visibility during migration.
// KeyName: system.AdvancedVisibilityMigrationWritingMode
// Value type: String enum: "dual"(means writing to both source and destination advanced visibility, "source" (means writing to source visibility only), "destination" (means writing to destination visibility only) or "off" (means writing to db visibility only)
// Default value: "dual"
// Allowed filters: N/A
AdvancedVisibilityMigrationWritingMode
// HistoryArchivalStatus is key for the status of history archival to override the value from static config.
// KeyName: system.historyArchivalStatus
// Value type: string enum: "enabled" or "disabled"
Expand Down Expand Up @@ -4681,6 +4687,11 @@ var StringKeys = map[StringKey]DynamicString{
Description: "AdvancedVisibilityWritingMode is key for how to write to advanced visibility. The most useful option is dual, which can be used for seamless migration from db visibility to advanced visibility, usually using with EnableReadVisibilityFromES",
DefaultValue: "on",
},
AdvancedVisibilityMigrationWritingMode: {
KeyName: "system.advancedVisibilityMigrationWritingMode",
Description: "AdvancedVisibilityMigrationWritingMode is key for how to write to advanced visibility. The most useful option is dual, which can be used for seamless migration from advanced visibility to another",
DefaultValue: "dual",
},
HistoryArchivalStatus: {
KeyName: "system.historyArchivalStatus",
Description: "HistoryArchivalStatus is key for the status of history archival to override the value from static config.",
Expand Down
2 changes: 2 additions & 0 deletions common/persistence/client/bean.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ type (
ESConfig *config.ElasticSearchConfig
PinotConfig *config.PinotVisibilityConfig
PinotClient pinot.GenericClient
OSClient es.GenericClient
OSConfig *config.ElasticSearchConfig
}
)

Expand Down
Loading
Loading