Skip to content

Commit

Permalink
Fix adding multiple search attributes of same type at once (#4273)
Browse files Browse the repository at this point in the history
  • Loading branch information
rodrigozhou committed May 12, 2023
1 parent 4ef165a commit caac9c2
Show file tree
Hide file tree
Showing 4 changed files with 399 additions and 48 deletions.
14 changes: 14 additions & 0 deletions common/searchattribute/test_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,28 @@ var (
"CustomBoolField": enumspb.INDEXED_VALUE_TYPE_BOOL,

"Int01": enumspb.INDEXED_VALUE_TYPE_INT,
"Int02": enumspb.INDEXED_VALUE_TYPE_INT,
"Int03": enumspb.INDEXED_VALUE_TYPE_INT,
"Text01": enumspb.INDEXED_VALUE_TYPE_TEXT,
"Keyword01": enumspb.INDEXED_VALUE_TYPE_KEYWORD,
"Keyword02": enumspb.INDEXED_VALUE_TYPE_KEYWORD,
"Keyword03": enumspb.INDEXED_VALUE_TYPE_KEYWORD,
"Datetime01": enumspb.INDEXED_VALUE_TYPE_DATETIME,
"Double01": enumspb.INDEXED_VALUE_TYPE_DOUBLE,
"Bool01": enumspb.INDEXED_VALUE_TYPE_BOOL,
"KeywordList01": enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST,
},
}

TestAliases = map[string]string{
"Int01": "CustomIntField",
"Text01": "CustomTextField",
"Keyword01": "CustomKeywordField",
"Datetime01": "CustomDatetimeField",
"Double01": "CustomDoubleField",
"Bool01": "CustomBoolField",
"KeywordList01": "CustomKeywordListField",
}
)

func NewTestProvider() *TestProvider {
Expand Down
48 changes: 31 additions & 17 deletions service/frontend/adminHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ func (adh *AdminHandler) addSearchAttributesSQL(
return serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetNamespaceInfoMessage, nsName))
}

customSearchAttributes := currentSearchAttributes.Custom()
dbCustomSearchAttributes := searchattribute.GetSqlDbIndexSearchAttributes().CustomSearchAttributes
cmCustomSearchAttributes := currentSearchAttributes.Custom()
upsertFieldToAliasMap := make(map[string]string)
fieldToAliasMap := resp.Config.CustomSearchAttributeAliases
aliasToFieldMap := util.InverseMap(fieldToAliasMap)
Expand All @@ -364,15 +365,22 @@ func (adh *AdminHandler) addSearchAttributesSQL(
// find the first available field for the given type
targetFieldName := ""
cntUsed := 0
for fieldName, fieldType := range customSearchAttributes {
for fieldName, fieldType := range dbCustomSearchAttributes {
if fieldType != saType {
continue
}
if _, ok := fieldToAliasMap[fieldName]; !ok {
// make sure the pre-allocated custom search attributes are created in cluster metadata
if _, ok := cmCustomSearchAttributes[fieldName]; !ok {
continue
}
if _, ok := fieldToAliasMap[fieldName]; ok {
cntUsed++
} else if _, ok := upsertFieldToAliasMap[fieldName]; ok {
cntUsed++
} else {
targetFieldName = fieldName
break
}
cntUsed++
}
if targetFieldName == "" {
return serviceerror.NewInvalidArgument(
Expand Down Expand Up @@ -415,16 +423,20 @@ func (adh *AdminHandler) RemoveSearchAttributes(
indexName = adh.visibilityMgr.GetIndexName()
}

var err error
currentSearchAttributes, err := adh.saProvider.GetSearchAttributes(indexName, true)
if err != nil {
return nil, serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetSearchAttributesMessage, err))
}

// TODO (rodrigozhou): Remove condition `indexName == ""`.
// If indexName == "", then calling addSearchAttributesElasticsearch will
// register the search attributes in the cluster metadata if ES is up or if
// `skip-schema-update` is set. This is for backward compatibility using
// standard visibility.
if adh.visibilityMgr.HasStoreName(elasticsearch.PersistenceName) || indexName == "" {
err = adh.removeSearchAttributesElasticsearch(ctx, request, indexName)
err = adh.removeSearchAttributesElasticsearch(ctx, request, indexName, currentSearchAttributes)
} else {
err = adh.removeSearchAttributesSQL(ctx, request)
err = adh.removeSearchAttributesSQL(ctx, request, currentSearchAttributes)
}

if err != nil {
Expand All @@ -437,12 +449,8 @@ func (adh *AdminHandler) removeSearchAttributesElasticsearch(
ctx context.Context,
request *adminservice.RemoveSearchAttributesRequest,
indexName string,
currentSearchAttributes searchattribute.NameTypeMap,
) error {
currentSearchAttributes, err := adh.saProvider.GetSearchAttributes(indexName, true)
if err != nil {
return serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetSearchAttributesMessage, err))
}

newCustomSearchAttributes := maps.Clone(currentSearchAttributes.Custom())
for _, saName := range request.GetSearchAttributes() {
if !currentSearchAttributes.IsDefined(saName) {
Expand All @@ -454,7 +462,7 @@ func (adh *AdminHandler) removeSearchAttributesElasticsearch(
delete(newCustomSearchAttributes, saName)
}

err = adh.saManager.SaveSearchAttributes(ctx, indexName, newCustomSearchAttributes)
err := adh.saManager.SaveSearchAttributes(ctx, indexName, newCustomSearchAttributes)
if err != nil {
return serviceerror.NewUnavailable(fmt.Sprintf(errUnableToSaveSearchAttributesMessage, err))
}
Expand All @@ -464,6 +472,7 @@ func (adh *AdminHandler) removeSearchAttributesElasticsearch(
func (adh *AdminHandler) removeSearchAttributesSQL(
ctx context.Context,
request *adminservice.RemoveSearchAttributesRequest,
currentSearchAttributes searchattribute.NameTypeMap,
) error {
_, client, err := adh.clientFactory.NewLocalFrontendClientWithTimeout(
frontend.DefaultTimeout,
Expand All @@ -488,11 +497,16 @@ func (adh *AdminHandler) removeSearchAttributesSQL(
upsertFieldToAliasMap := make(map[string]string)
aliasToFieldMap := util.InverseMap(resp.Config.CustomSearchAttributeAliases)
for _, saName := range request.GetSearchAttributes() {
fieldName, ok := aliasToFieldMap[saName]
if !ok {
return serviceerror.NewNotFound(fmt.Sprintf(errSearchAttributeDoesntExistMessage, saName))
if fieldName, ok := aliasToFieldMap[saName]; ok {
upsertFieldToAliasMap[fieldName] = ""
continue
}
if currentSearchAttributes.IsDefined(saName) {
return serviceerror.NewInvalidArgument(
fmt.Sprintf(errUnableToRemoveNonCustomSearchAttributesMessage, saName),
)
}
upsertFieldToAliasMap[fieldName] = ""
return serviceerror.NewNotFound(fmt.Sprintf(errSearchAttributeDoesntExistMessage, saName))
}

_, err = client.UpdateNamespace(ctx, &workflowservice.UpdateNamespaceRequest{
Expand Down
39 changes: 24 additions & 15 deletions service/frontend/operator_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,11 +308,14 @@ func (h *OperatorHandlerImpl) addSearchAttributesSQL(
if _, ok := cmCustomSearchAttributes[fieldName]; !ok {
continue
}
if _, ok := fieldToAliasMap[fieldName]; !ok {
if _, ok := fieldToAliasMap[fieldName]; ok {
cntUsed++
} else if _, ok := upsertFieldToAliasMap[fieldName]; ok {
cntUsed++
} else {
targetFieldName = fieldName
break
}
cntUsed++
}
if targetFieldName == "" {
return serviceerror.NewInvalidArgument(
Expand Down Expand Up @@ -349,17 +352,21 @@ func (h *OperatorHandlerImpl) RemoveSearchAttributes(
return nil, errSearchAttributesNotSet
}

var err error
indexName := h.visibilityMgr.GetIndexName()
currentSearchAttributes, err := h.saProvider.GetSearchAttributes(indexName, true)
if err != nil {
return nil, serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetSearchAttributesMessage, err))
}

// TODO (rodrigozhou): Remove condition `indexName == ""`.
// If indexName == "", then calling addSearchAttributesElasticsearch will
// register the search attributes in the cluster metadata if ES is up or if
// `skip-schema-update` is set. This is for backward compatibility using
// standard visibility.
if h.visibilityMgr.HasStoreName(elasticsearch.PersistenceName) || indexName == "" {
err = h.removeSearchAttributesElasticsearch(ctx, request, indexName)
err = h.removeSearchAttributesElasticsearch(ctx, request, indexName, currentSearchAttributes)
} else {
err = h.removeSearchAttributesSQL(ctx, request)
err = h.removeSearchAttributesSQL(ctx, request, currentSearchAttributes)
}

if err != nil {
Expand All @@ -372,12 +379,8 @@ func (h *OperatorHandlerImpl) removeSearchAttributesElasticsearch(
ctx context.Context,
request *operatorservice.RemoveSearchAttributesRequest,
indexName string,
currentSearchAttributes searchattribute.NameTypeMap,
) error {
currentSearchAttributes, err := h.saProvider.GetSearchAttributes(indexName, true)
if err != nil {
return serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetSearchAttributesMessage, err))
}

newCustomSearchAttributes := maps.Clone(currentSearchAttributes.Custom())
for _, saName := range request.GetSearchAttributes() {
if !currentSearchAttributes.IsDefined(saName) {
Expand All @@ -391,7 +394,7 @@ func (h *OperatorHandlerImpl) removeSearchAttributesElasticsearch(
delete(newCustomSearchAttributes, saName)
}

err = h.saManager.SaveSearchAttributes(ctx, indexName, newCustomSearchAttributes)
err := h.saManager.SaveSearchAttributes(ctx, indexName, newCustomSearchAttributes)
if err != nil {
return serviceerror.NewUnavailable(fmt.Sprintf(errUnableToSaveSearchAttributesMessage, err))
}
Expand All @@ -401,6 +404,7 @@ func (h *OperatorHandlerImpl) removeSearchAttributesElasticsearch(
func (h *OperatorHandlerImpl) removeSearchAttributesSQL(
ctx context.Context,
request *operatorservice.RemoveSearchAttributesRequest,
currentSearchAttributes searchattribute.NameTypeMap,
) error {
_, client, err := h.clientFactory.NewLocalFrontendClientWithTimeout(
frontend.DefaultTimeout,
Expand All @@ -425,11 +429,16 @@ func (h *OperatorHandlerImpl) removeSearchAttributesSQL(
upsertFieldToAliasMap := make(map[string]string)
aliasToFieldMap := util.InverseMap(resp.Config.CustomSearchAttributeAliases)
for _, saName := range request.GetSearchAttributes() {
fieldName, ok := aliasToFieldMap[saName]
if !ok {
return serviceerror.NewNotFound(fmt.Sprintf(errSearchAttributeDoesntExistMessage, saName))
if fieldName, ok := aliasToFieldMap[saName]; ok {
upsertFieldToAliasMap[fieldName] = ""
continue
}
if currentSearchAttributes.IsDefined(saName) {
return serviceerror.NewInvalidArgument(
fmt.Sprintf(errUnableToRemoveNonCustomSearchAttributesMessage, saName),
)
}
upsertFieldToAliasMap[fieldName] = ""
return serviceerror.NewNotFound(fmt.Sprintf(errSearchAttributeDoesntExistMessage, saName))
}

_, err = client.UpdateNamespace(ctx, &workflowservice.UpdateNamespaceRequest{
Expand Down
Loading

0 comments on commit caac9c2

Please sign in to comment.