Skip to content

Commit

Permalink
Add index name to constructor of search attribute validator (#3834)
Browse files Browse the repository at this point in the history
  • Loading branch information
rodrigozhou authored Jan 26, 2023
1 parent c52848e commit 9d3fede
Show file tree
Hide file tree
Showing 9 changed files with 46 additions and 39 deletions.
7 changes: 5 additions & 2 deletions common/searchattribute/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type (
searchAttributesNumberOfKeysLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
searchAttributesSizeOfValueLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
searchAttributesTotalSizeLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
indexName string
}
)

Expand All @@ -53,19 +54,21 @@ func NewValidator(
searchAttributesNumberOfKeysLimit dynamicconfig.IntPropertyFnWithNamespaceFilter,
searchAttributesSizeOfValueLimit dynamicconfig.IntPropertyFnWithNamespaceFilter,
searchAttributesTotalSizeLimit dynamicconfig.IntPropertyFnWithNamespaceFilter,
indexName string,
) *Validator {
return &Validator{
searchAttributesProvider: searchAttributesProvider,
searchAttributesMapper: searchAttributesMapper,
searchAttributesNumberOfKeysLimit: searchAttributesNumberOfKeysLimit,
searchAttributesSizeOfValueLimit: searchAttributesSizeOfValueLimit,
searchAttributesTotalSizeLimit: searchAttributesTotalSizeLimit,
indexName: indexName,
}
}

// Validate search attributes are valid for writing.
// The search attributes must be unaliased before calling validation.
func (v *Validator) Validate(searchAttributes *commonpb.SearchAttributes, namespace string, indexName string) error {
func (v *Validator) Validate(searchAttributes *commonpb.SearchAttributes, namespace string) error {
if searchAttributes == nil {
return nil
}
Expand All @@ -81,7 +84,7 @@ func (v *Validator) Validate(searchAttributes *commonpb.SearchAttributes, namesp
)
}

saTypeMap, err := v.searchAttributesProvider.GetSearchAttributes(indexName, false)
saTypeMap, err := v.searchAttributesProvider.GetSearchAttributes(v.indexName, false)
if err != nil {
return serviceerror.NewInvalidArgument(
fmt.Sprintf("unable to get search attributes from cluster metadata: %v", err),
Expand Down
42 changes: 25 additions & 17 deletions common/searchattribute/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,14 @@ func (s *searchAttributesValidatorSuite) TestSearchAttributesValidate() {
nil,
dynamicconfig.GetIntPropertyFilteredByNamespace(numOfKeysLimit),
dynamicconfig.GetIntPropertyFilteredByNamespace(sizeOfValueLimit),
dynamicconfig.GetIntPropertyFilteredByNamespace(sizeOfTotalLimit))
dynamicconfig.GetIntPropertyFilteredByNamespace(sizeOfTotalLimit),
"",
)

namespace := "namespace"
var attr *commonpb.SearchAttributes

err := saValidator.Validate(attr, namespace, "")
err := saValidator.Validate(attr, namespace)
s.NoError(err)

intPayload, err := payload.Encode(1)
Expand All @@ -69,7 +71,7 @@ func (s *searchAttributesValidatorSuite) TestSearchAttributesValidate() {
attr = &commonpb.SearchAttributes{
IndexedFields: fields,
}
err = saValidator.Validate(attr, namespace, "")
err = saValidator.Validate(attr, namespace)
s.NoError(err)

fields = map[string]*commonpb.Payload{
Expand All @@ -78,15 +80,15 @@ func (s *searchAttributesValidatorSuite) TestSearchAttributesValidate() {
"CustomBoolField": payload.EncodeString("true"),
}
attr.IndexedFields = fields
err = saValidator.Validate(attr, namespace, "")
err = saValidator.Validate(attr, namespace)
s.Error(err)
s.Equal("number of search attributes 3 exceeds limit 2", err.Error())

fields = map[string]*commonpb.Payload{
"InvalidKey": payload.EncodeString("1"),
}
attr.IndexedFields = fields
err = saValidator.Validate(attr, namespace, "")
err = saValidator.Validate(attr, namespace)
s.Error(err)
s.Equal("search attribute InvalidKey is not defined", err.Error())

Expand All @@ -95,7 +97,7 @@ func (s *searchAttributesValidatorSuite) TestSearchAttributesValidate() {
"CustomBoolField": payload.EncodeString("123"),
}
attr.IndexedFields = fields
err = saValidator.Validate(attr, namespace, "")
err = saValidator.Validate(attr, namespace)
s.Error(err)
s.Equal("invalid value for search attribute CustomBoolField of type Bool: 123", err.Error())

Expand All @@ -105,14 +107,14 @@ func (s *searchAttributesValidatorSuite) TestSearchAttributesValidate() {
"CustomIntField": intArrayPayload,
}
attr.IndexedFields = fields
err = saValidator.Validate(attr, namespace, "")
err = saValidator.Validate(attr, namespace)
s.NoError(err)

fields = map[string]*commonpb.Payload{
"StartTime": intPayload,
}
attr.IndexedFields = fields
err = saValidator.Validate(attr, namespace, "")
err = saValidator.Validate(attr, namespace)
s.Error(err)
s.Equal("StartTime attribute can't be set in SearchAttributes", err.Error())
}
Expand All @@ -127,12 +129,14 @@ func (s *searchAttributesValidatorSuite) TestSearchAttributesValidate_Mapper() {
&TestMapper{},
dynamicconfig.GetIntPropertyFilteredByNamespace(numOfKeysLimit),
dynamicconfig.GetIntPropertyFilteredByNamespace(sizeOfValueLimit),
dynamicconfig.GetIntPropertyFilteredByNamespace(sizeOfTotalLimit))
dynamicconfig.GetIntPropertyFilteredByNamespace(sizeOfTotalLimit),
"",
)

namespace := "test-namespace"
var attr *commonpb.SearchAttributes

err := saValidator.Validate(attr, namespace, "")
err := saValidator.Validate(attr, namespace)
s.Nil(err)

intPayload, err := payload.Encode(1)
Expand All @@ -143,7 +147,7 @@ func (s *searchAttributesValidatorSuite) TestSearchAttributesValidate_Mapper() {
attr = &commonpb.SearchAttributes{
IndexedFields: fields,
}
err = saValidator.Validate(attr, namespace, "")
err = saValidator.Validate(attr, namespace)
s.NoError(err)

fields = map[string]*commonpb.Payload{
Expand All @@ -152,18 +156,18 @@ func (s *searchAttributesValidatorSuite) TestSearchAttributesValidate_Mapper() {
attr = &commonpb.SearchAttributes{
IndexedFields: fields,
}
err = saValidator.Validate(attr, "test-namespace", "")
err = saValidator.Validate(attr, "test-namespace")
s.NoError(err)

fields = map[string]*commonpb.Payload{
"InvalidKey": payload.EncodeString("1"),
}
attr.IndexedFields = fields
err = saValidator.Validate(attr, namespace, "")
err = saValidator.Validate(attr, namespace)
s.Error(err)
s.Equal("search attribute alias_of_InvalidKey is not defined", err.Error())

err = saValidator.Validate(attr, "error-namespace", "")
err = saValidator.Validate(attr, "error-namespace")
s.Error(err)
s.EqualError(err, "mapper error")

Expand All @@ -172,7 +176,7 @@ func (s *searchAttributesValidatorSuite) TestSearchAttributesValidate_Mapper() {
"CustomBoolField": payload.EncodeString("123"),
}
attr.IndexedFields = fields
err = saValidator.Validate(attr, namespace, "")
err = saValidator.Validate(attr, namespace)
s.Error(err)
s.Equal("invalid value for search attribute alias_of_CustomBoolField of type Bool: 123", err.Error())
}
Expand All @@ -187,7 +191,9 @@ func (s *searchAttributesValidatorSuite) TestSearchAttributesValidateSize() {
nil,
dynamicconfig.GetIntPropertyFilteredByNamespace(numOfKeysLimit),
dynamicconfig.GetIntPropertyFilteredByNamespace(sizeOfValueLimit),
dynamicconfig.GetIntPropertyFilteredByNamespace(sizeOfTotalLimit))
dynamicconfig.GetIntPropertyFilteredByNamespace(sizeOfTotalLimit),
"",
)

namespace := "namespace"

Expand Down Expand Up @@ -223,7 +229,9 @@ func (s *searchAttributesValidatorSuite) TestSearchAttributesValidateSize_Mapper
&TestMapper{},
dynamicconfig.GetIntPropertyFilteredByNamespace(numOfKeysLimit),
dynamicconfig.GetIntPropertyFilteredByNamespace(sizeOfValueLimit),
dynamicconfig.GetIntPropertyFilteredByNamespace(sizeOfTotalLimit))
dynamicconfig.GetIntPropertyFilteredByNamespace(sizeOfTotalLimit),
"",
)

namespace := "test-namespace"

Expand Down
6 changes: 4 additions & 2 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,9 @@ func NewWorkflowHandler(
saMapper,
config.SearchAttributesNumberOfKeysLimit,
config.SearchAttributesSizeOfValueLimit,
config.SearchAttributesTotalSizeLimit),
config.SearchAttributesTotalSizeLimit,
config.ESIndexName,
),
archivalMetadata: archivalMetadata,
healthServer: healthServer,
overrides: NewOverrides(),
Expand Down Expand Up @@ -4271,7 +4273,7 @@ func (wh *WorkflowHandler) processOutgoingSearchAttributes(events []*historypb.H
}

func (wh *WorkflowHandler) validateSearchAttributes(searchAttributes *commonpb.SearchAttributes, namespaceName namespace.Name) error {
if err := wh.saValidator.Validate(searchAttributes, namespaceName.String(), wh.config.ESIndexName); err != nil {
if err := wh.saValidator.Validate(searchAttributes, namespaceName.String()); err != nil {
return err
}
if err := wh.saValidator.ValidateSize(searchAttributes, namespaceName.String()); err != nil {
Expand Down
9 changes: 3 additions & 6 deletions service/history/commandChecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,6 @@ func (v *commandAttrValidator) validateSignalExternalWorkflowExecutionAttributes
func (v *commandAttrValidator) validateUpsertWorkflowSearchAttributes(
namespace namespace.Name,
attributes *commandpb.UpsertWorkflowSearchAttributesCommandAttributes,
visibilityIndexName string,
) (enumspb.WorkflowTaskFailedCause, error) {

const failedCause = enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SEARCH_ATTRIBUTES
Expand All @@ -563,7 +562,7 @@ func (v *commandAttrValidator) validateUpsertWorkflowSearchAttributes(
if len(attributes.GetSearchAttributes().GetIndexedFields()) == 0 {
return failedCause, serviceerror.NewInvalidArgument("IndexedFields is empty on command.")
}
if err := v.searchAttributesValidator.Validate(attributes.GetSearchAttributes(), namespace.String(), visibilityIndexName); err != nil {
if err := v.searchAttributesValidator.Validate(attributes.GetSearchAttributes(), namespace.String()); err != nil {
return failedCause, err
}

Expand Down Expand Up @@ -600,7 +599,6 @@ func (v *commandAttrValidator) validateContinueAsNewWorkflowExecutionAttributes(
namespace namespace.Name,
attributes *commandpb.ContinueAsNewWorkflowExecutionCommandAttributes,
executionInfo *persistencespb.WorkflowExecutionInfo,
visibilityIndexName string,
) (enumspb.WorkflowTaskFailedCause, error) {

const failedCause = enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_CONTINUE_AS_NEW_ATTRIBUTES
Expand Down Expand Up @@ -664,7 +662,7 @@ func (v *commandAttrValidator) validateContinueAsNewWorkflowExecutionAttributes(
return failedCause, err
}

if err = v.searchAttributesValidator.Validate(attributes.GetSearchAttributes(), namespace.String(), visibilityIndexName); err != nil {
if err = v.searchAttributesValidator.Validate(attributes.GetSearchAttributes(), namespace.String()); err != nil {
return enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SEARCH_ATTRIBUTES, err
}

Expand All @@ -678,7 +676,6 @@ func (v *commandAttrValidator) validateStartChildExecutionAttributes(
attributes *commandpb.StartChildWorkflowExecutionCommandAttributes,
parentInfo *persistencespb.WorkflowExecutionInfo,
defaultWorkflowTaskTimeoutFn dynamicconfig.DurationPropertyFnWithNamespaceFilter,
visibilityIndexName string,
) (enumspb.WorkflowTaskFailedCause, error) {

const failedCause = enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_START_CHILD_EXECUTION_ATTRIBUTES
Expand Down Expand Up @@ -733,7 +730,7 @@ func (v *commandAttrValidator) validateStartChildExecutionAttributes(
return failedCause, err
}

if err := v.searchAttributesValidator.Validate(attributes.GetSearchAttributes(), targetNamespace.String(), visibilityIndexName); err != nil {
if err := v.searchAttributesValidator.Validate(attributes.GetSearchAttributes(), targetNamespace.String()); err != nil {
return enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SEARCH_ATTRIBUTES, err
}

Expand Down
10 changes: 5 additions & 5 deletions service/history/commandChecker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func (s *commandAttrValidatorSuite) SetupTest() {
config.SearchAttributesNumberOfKeysLimit,
config.SearchAttributesSizeOfValueLimit,
config.SearchAttributesTotalSizeLimit,
"index-name",
))
}

Expand Down Expand Up @@ -190,17 +191,17 @@ func (s *commandAttrValidatorSuite) TestValidateUpsertWorkflowSearchAttributes()
namespace := namespace.Name("tests.Namespace")
var attributes *commandpb.UpsertWorkflowSearchAttributesCommandAttributes

fc, err := s.validator.validateUpsertWorkflowSearchAttributes(namespace, attributes, "index-name")
fc, err := s.validator.validateUpsertWorkflowSearchAttributes(namespace, attributes)
s.EqualError(err, "UpsertWorkflowSearchAttributesCommandAttributes is not set on command.")
s.Equal(enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SEARCH_ATTRIBUTES, fc)

attributes = &commandpb.UpsertWorkflowSearchAttributesCommandAttributes{}
fc, err = s.validator.validateUpsertWorkflowSearchAttributes(namespace, attributes, "index-name")
fc, err = s.validator.validateUpsertWorkflowSearchAttributes(namespace, attributes)
s.EqualError(err, "SearchAttributes is not set on command.")
s.Equal(enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SEARCH_ATTRIBUTES, fc)

attributes.SearchAttributes = &commonpb.SearchAttributes{}
fc, err = s.validator.validateUpsertWorkflowSearchAttributes(namespace, attributes, "index-name")
fc, err = s.validator.validateUpsertWorkflowSearchAttributes(namespace, attributes)
s.EqualError(err, "IndexedFields is empty on command.")
s.Equal(enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SEARCH_ATTRIBUTES, fc)

Expand All @@ -209,7 +210,7 @@ func (s *commandAttrValidatorSuite) TestValidateUpsertWorkflowSearchAttributes()
attributes.SearchAttributes.IndexedFields = map[string]*commonpb.Payload{
"CustomKeywordField": saPayload,
}
fc, err = s.validator.validateUpsertWorkflowSearchAttributes(namespace, attributes, "index-name")
fc, err = s.validator.validateUpsertWorkflowSearchAttributes(namespace, attributes)
s.NoError(err)
s.Equal(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNSPECIFIED, fc)
}
Expand Down Expand Up @@ -238,7 +239,6 @@ func (s *commandAttrValidatorSuite) TestValidateContinueAsNewWorkflowExecutionAt
tests.Namespace,
attributes,
executionInfo,
"index-name",
)
s.NoError(err)
s.Equal(enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNSPECIFIED, fc)
Expand Down
1 change: 1 addition & 0 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ func NewEngineWithShardContext(
config.SearchAttributesNumberOfKeysLimit,
config.SearchAttributesSizeOfValueLimit,
config.SearchAttributesTotalSizeLimit,
config.DefaultVisibilityIndexName,
)

historyEngImpl.workflowTaskHandler = newWorkflowTaskHandlerCallback(historyEngImpl)
Expand Down
1 change: 1 addition & 0 deletions service/history/historyEngine2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ func (s *engine2Suite) SetupTest() {
s.config.SearchAttributesNumberOfKeysLimit,
s.config.SearchAttributesSizeOfValueLimit,
s.config.SearchAttributesTotalSizeLimit,
s.config.DefaultVisibilityIndexName,
),
workflowConsistencyChecker: api.NewWorkflowConsistencyChecker(mockShard, s.workflowCache),
}
Expand Down
8 changes: 1 addition & 7 deletions service/history/workflowTaskHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,6 @@ func (handler *workflowTaskHandlerImpl) handleCommandContinueAsNewWorkflow(
namespaceName,
attr,
handler.mutableState.GetExecutionInfo(),
handler.config.DefaultVisibilityIndexName,
)
},
); err != nil || handler.stopProcessing {
Expand Down Expand Up @@ -899,7 +898,6 @@ func (handler *workflowTaskHandlerImpl) handleCommandStartChildWorkflow(
attr,
handler.mutableState.GetExecutionInfo(),
handler.config.DefaultWorkflowTaskTimeout,
handler.config.DefaultVisibilityIndexName,
)
},
); err != nil || handler.stopProcessing {
Expand Down Expand Up @@ -1038,11 +1036,7 @@ func (handler *workflowTaskHandlerImpl) handleCommandUpsertWorkflowSearchAttribu
// valid search attributes for upsert
if err := handler.validateCommandAttr(
func() (enumspb.WorkflowTaskFailedCause, error) {
return handler.attrValidator.validateUpsertWorkflowSearchAttributes(
namespace,
attr,
handler.config.DefaultVisibilityIndexName,
)
return handler.attrValidator.validateUpsertWorkflowSearchAttributes(namespace, attr)
},
); err != nil || handler.stopProcessing {
return err
Expand Down
1 change: 1 addition & 0 deletions service/history/workflowTaskHandlerCallbacks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func (s *WorkflowTaskHandlerCallbackSuite) SetupTest() {
config.SearchAttributesNumberOfKeysLimit,
config.SearchAttributesSizeOfValueLimit,
config.SearchAttributesTotalSizeLimit,
config.DefaultVisibilityIndexName,
),
workflowConsistencyChecker: api.NewWorkflowConsistencyChecker(mockShard, workflowCache),
}
Expand Down

0 comments on commit 9d3fede

Please sign in to comment.