Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ES bulk processor commit timeout #3696

Merged
merged 5 commits into from
Dec 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions common/persistence/visibility/store/elasticsearch/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,20 +199,18 @@ func (p *processorImpl) bulkBeforeAction(_ int64, requests []elastic.BulkableReq
p.metricsHandler.Counter(metrics.ElasticsearchBulkProcessorRequests.GetMetricName()).Record(int64(len(requests)))
p.metricsHandler.Histogram(metrics.ElasticsearchBulkProcessorBulkSize.GetMetricName(), metrics.ElasticsearchBulkProcessorBulkSize.GetMetricUnit()).
Record(int64(len(requests)))
p.metricsHandler.Histogram(metrics.ElasticsearchBulkProcessorQueuedRequests.GetMetricName(), metrics.ElasticsearchBulkProcessorBulkSize.GetMetricUnit()).
Record(int64(p.mapToAckFuture.Len() - len(requests)))

for _, request := range requests {
visibilityTaskKey := p.extractVisibilityTaskKey(request)
if visibilityTaskKey == "" {
continue
}
_, _, _ = p.mapToAckFuture.GetAndDo(visibilityTaskKey, func(key interface{}, value interface{}) error {
future, ok := value.(*ackFuture)
ackF, ok := value.(*ackFuture)
if !ok {
p.logger.Fatal(fmt.Sprintf("mapToAckFuture has item of a wrong type %T (%T expected).", value, &ackFuture{}), tag.Value(key))
}
future.recordStart(p.metricsHandler)
ackF.recordStart(p.metricsHandler)
return nil
})
}
Expand Down Expand Up @@ -286,6 +284,10 @@ func (p *processorImpl) bulkAfterAction(_ int64, requests []elastic.BulkableRequ
p.metricsHandler.Counter(metrics.ElasticsearchBulkProcessorRetries.GetMetricName()).Record(1, metrics.HttpStatusTag(responseItem.Status))
}
}

// Record how many documents are waiting to be flushed to Elasticsearch after this bulk is committed.
p.metricsHandler.Histogram(metrics.ElasticsearchBulkProcessorQueuedRequests.GetMetricName(), metrics.ElasticsearchBulkProcessorBulkSize.GetMetricUnit()).
Record(int64(p.mapToAckFuture.Len()))
}

func (p *processorImpl) buildResponseIndex(response *elastic.BulkResponse) map[string]*elastic.BulkResponseItem {
Expand All @@ -307,12 +309,12 @@ func (p *processorImpl) buildResponseIndex(response *elastic.BulkResponse) map[s
func (p *processorImpl) notifyResult(visibilityTaskKey string, ack bool) {
// Use RemoveIf here to prevent race condition with de-dup logic in Add method.
_ = p.mapToAckFuture.RemoveIf(visibilityTaskKey, func(key interface{}, value interface{}) bool {
future, ok := value.(*ackFuture)
ackF, ok := value.(*ackFuture)
if !ok {
p.logger.Fatal(fmt.Sprintf("mapToAckFuture has item of a wrong type %T (%T expected).", value, &ackFuture{}), tag.ESKey(visibilityTaskKey))
}

future.done(ack, p.metricsHandler)
ackF.done(ack, p.metricsHandler)
return true
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,12 @@ func (s *processorSuite) TestBulkAfterAction_Ack() {
Items: []map[string]*elastic.BulkResponseItem{mSuccess},
}

queuedRequestHistogram := metrics.NewMockHistogramIface(s.controller)
s.mockMetricHandler.EXPECT().Histogram(
metrics.ElasticsearchBulkProcessorQueuedRequests.GetMetricName(),
metrics.ElasticsearchBulkProcessorQueuedRequests.GetMetricUnit(),
).Return(queuedRequestHistogram)
queuedRequestHistogram.EXPECT().Record(int64(0))
s.mockMetricHandler.EXPECT().Timer(metrics.ElasticsearchBulkProcessorRequestLatency.GetMetricName()).Return(metrics.NoopTimerMetricFunc)
mapVal := newAckFuture()
s.esProcessor.mapToAckFuture.Put(testKey, mapVal)
Expand Down Expand Up @@ -287,6 +293,12 @@ func (s *processorSuite) TestBulkAfterAction_Nack() {
Items: []map[string]*elastic.BulkResponseItem{mFailed},
}

queuedRequestHistogram := metrics.NewMockHistogramIface(s.controller)
s.mockMetricHandler.EXPECT().Histogram(
metrics.ElasticsearchBulkProcessorQueuedRequests.GetMetricName(),
metrics.ElasticsearchBulkProcessorQueuedRequests.GetMetricUnit(),
).Return(queuedRequestHistogram)
queuedRequestHistogram.EXPECT().Record(int64(0))
s.mockMetricHandler.EXPECT().Timer(metrics.ElasticsearchBulkProcessorRequestLatency.GetMetricName()).Return(metrics.NoopTimerMetricFunc)
mapVal := newAckFuture()
s.esProcessor.mapToAckFuture.Put(testKey, mapVal)
Expand Down Expand Up @@ -352,12 +364,6 @@ func (s *processorSuite) TestBulkBeforeAction() {
metrics.ElasticsearchBulkProcessorBulkSize.GetMetricUnit(),
).Return(bulkSizeHistogram)
bulkSizeHistogram.EXPECT().Record(int64(1))
queuedRequestHistorgram := metrics.NewMockHistogramIface(s.controller)
s.mockMetricHandler.EXPECT().Histogram(
metrics.ElasticsearchBulkProcessorQueuedRequests.GetMetricName(),
metrics.ElasticsearchBulkProcessorQueuedRequests.GetMetricUnit(),
).Return(queuedRequestHistorgram)
queuedRequestHistorgram.EXPECT().Record(int64(0))
s.mockMetricHandler.EXPECT().Timer(metrics.ElasticsearchBulkProcessorWaitAddLatency.GetMetricName()).Return(metrics.NoopTimerMetricFunc)
s.mockMetricHandler.EXPECT().Timer(metrics.ElasticsearchBulkProcessorWaitStartLatency.GetMetricName()).Return(metrics.NoopTimerMetricFunc)
mapVal := newAckFuture()
Expand Down Expand Up @@ -484,7 +490,7 @@ func (s *processorSuite) TestErrorReasonFromResponse() {
func (s *processorSuite) Test_End2End() {
docsCount := 1000
parallelFactor := 10
version := int64(2208) //random
version := int64(2208) // random

request := &client.BulkableRequest{}
bulkIndexRequests := make([]elastic.BulkableRequest, docsCount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,10 @@ import (
"go.temporal.io/server/common/persistence/visibility/store/elasticsearch/client"
"go.temporal.io/server/common/persistence/visibility/store/query"
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/common/util"
)

const (
persistenceName = "elasticsearch"
PersistenceName = "elasticsearch"

delimiter = "~"
pointInTimeKeepAliveInterval = "1m"
Expand Down Expand Up @@ -128,7 +127,7 @@ func (s *visibilityStore) Close() {
}

func (s *visibilityStore) GetName() string {
return persistenceName
return PersistenceName
}

func (s *visibilityStore) RecordWorkflowExecutionStarted(
Expand Down Expand Up @@ -232,32 +231,30 @@ func (s *visibilityStore) addBulkIndexRequestAndWait(
}

func (s *visibilityStore) addBulkRequestAndWait(
ctx context.Context,
_ context.Context,
bulkRequest *client.BulkableRequest,
visibilityTaskKey string,
) error {
s.checkProcessor()

// Add method is blocking. If bulk processor is busy flushing previous bulk, request will wait here.
// Therefore, ackTimeoutTimer in fact wait for request to be committed after it was added to bulk processor.
// TODO: this also means ctx is not respected if bulk processor is busy. Shall we make Add non-blocking or
// respecting the context?
future := s.processor.Add(bulkRequest, visibilityTaskKey)

ackTimeout := s.processorAckTimeout()
if deadline, ok := ctx.Deadline(); ok {
ackTimeout = util.Min(ackTimeout, time.Until(deadline))
}
subCtx, subCtxCancelFn := context.WithTimeout(context.Background(), ackTimeout)
defer subCtxCancelFn()

ack, err := future.Get(subCtx)
ackF := s.processor.Add(bulkRequest, visibilityTaskKey)

if errors.Is(err, context.DeadlineExceeded) {
return &persistence.TimeoutError{Msg: fmt.Sprintf("visibility task %s timedout waiting for ACK after %v", visibilityTaskKey, s.processorAckTimeout())}
}
// processorAckTimeout is a maximum duration for bulk processor to commit the bulk and unblock the `ackF`.
// Default value is 30s and this timeout should never have happened,
// because Elasticsearch must process a bulk within 30s.
// Parent context is not respected here because it has shorter timeout (3s),
// which might already expired here due to wait at Add method above.
ctx, cancel := context.WithTimeout(context.Background(), s.processorAckTimeout())
defer cancel()
ack, err := ackF.Get(ctx)

if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return &persistence.TimeoutError{Msg: fmt.Sprintf("visibility task %s timed out waiting for ACK after %v", visibilityTaskKey, s.processorAckTimeout())}
}
// Returns non-retryable Internal error here because these errors are unexpected.
// Visibility task processor retries all errors though, therefore new request will be generated for the same task.
return serviceerror.NewInternal(fmt.Sprintf("visibility task %s received error %v", visibilityTaskKey, err))
}

Expand Down Expand Up @@ -900,7 +897,7 @@ func (s *visibilityStore) parseESDoc(docID string, docSource json.RawMessage, sa
// Very important line. See finishParseJSONValue bellow.
d.UseNumber()
if err := d.Decode(&sourceMap); err != nil {
s.metricsHandler.Counter(metrics.ElasticsearchDocumentParseFailuresCount.GetMetricName()) //.Record(1)
s.metricsHandler.Counter(metrics.ElasticsearchDocumentParseFailuresCount.GetMetricName()).Record(1)
return nil, serviceerror.NewInternal(fmt.Sprintf("Unable to unmarshal JSON from Elasticsearch document(%s): %v", docID, err))
}

Expand Down
4 changes: 2 additions & 2 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,14 +498,14 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
SearchAttributesTotalSizeLimit: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.SearchAttributesTotalSizeLimit, 40*1024),
IndexerConcurrency: dc.GetIntProperty(dynamicconfig.WorkerIndexerConcurrency, 100),
ESProcessorNumOfWorkers: dc.GetIntProperty(dynamicconfig.WorkerESProcessorNumOfWorkers, 1),
// Should be not greater than NumberOfShards(512)/NumberOfHistoryNodes(4) * VisibilityTaskWorkerCount(10)/ESProcessorNumOfWorkers(1) divided by workflow distribution factor (2 at least).
// Should not be greater than number of visibility task queue workers VisibilityProcessorSchedulerWorkerCount (default 512)
// Otherwise, visibility queue processors won't be able to fill up bulk with documents (even under heavy load) and bulk will flush due to interval, not number of actions.
ESProcessorBulkActions: dc.GetIntProperty(dynamicconfig.WorkerESProcessorBulkActions, 500),
// 16MB - just a sanity check. With ES document size ~1Kb it should never be reached.
ESProcessorBulkSize: dc.GetIntProperty(dynamicconfig.WorkerESProcessorBulkSize, 16*1024*1024),
// Bulk processor will flush every this interval regardless of last flush due to bulk actions.
ESProcessorFlushInterval: dc.GetDurationProperty(dynamicconfig.WorkerESProcessorFlushInterval, 1*time.Second),
ESProcessorAckTimeout: dc.GetDurationProperty(dynamicconfig.WorkerESProcessorAckTimeout, 1*time.Minute),
ESProcessorAckTimeout: dc.GetDurationProperty(dynamicconfig.WorkerESProcessorAckTimeout, 30*time.Second),

EnableCrossNamespaceCommands: dc.GetBoolProperty(dynamicconfig.EnableCrossNamespaceCommands, true),
EnableActivityEagerExecution: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.EnableActivityEagerExecution, false),
Expand Down
14 changes: 11 additions & 3 deletions service/history/visibilityQueueTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,10 +319,10 @@ func (t *visibilityQueueTaskExecutor) upsertExecution(
}

func (t *visibilityQueueTaskExecutor) processCloseExecution(
ctx context.Context,
parentCtx context.Context,
task *tasks.CloseExecutionVisibilityTask,
) (retError error) {
ctx, cancel := context.WithTimeout(ctx, taskTimeout)
ctx, cancel := context.WithTimeout(parentCtx, taskTimeout)
defer cancel()

namespaceEntry, err := t.shard.GetNamespaceRegistry().GetNamespaceByID(namespace.ID(task.GetNamespaceID()))
Expand Down Expand Up @@ -396,8 +396,13 @@ func (t *visibilityQueueTaskExecutor) processCloseExecution(
return err
}

// Elasticsearch bulk processor doesn't respect context timeout
// because under heavy load bulk flush might take longer than taskTimeout.
// Therefore, ctx timeout might be already expired
// and parentCtx (which doesn't have timeout) must be used everywhere bellow.

if t.enableCloseWorkflowCleanup(namespaceEntry.Name().String()) {
return t.cleanupExecutionInfo(ctx, task)
return t.cleanupExecutionInfo(parentCtx, task)
}
return nil
}
Expand Down Expand Up @@ -500,6 +505,9 @@ func (t *visibilityQueueTaskExecutor) cleanupExecutionInfo(
ctx context.Context,
task *tasks.CloseExecutionVisibilityTask,
) (retError error) {
ctx, cancel := context.WithTimeout(ctx, taskTimeout)
defer cancel()

weContext, release, err := getWorkflowExecutionContextForTask(ctx, t.cache, task)
if err != nil {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/visibility/manager"
"go.temporal.io/server/common/persistence/visibility/store/elasticsearch"
"go.temporal.io/server/service/worker/deletenamespace/errors"
)

Expand All @@ -63,7 +64,7 @@ func NewActivities(
}
}
func (a *Activities) IsAdvancedVisibilityActivity(_ context.Context) (bool, error) {
return strings.Contains(a.visibilityManager.GetName(), "elasticsearch"), nil
return strings.Contains(a.visibilityManager.GetName(), elasticsearch.PersistenceName), nil
}

func (a *Activities) CountExecutionsAdvVisibilityActivity(ctx context.Context, nsID namespace.ID, nsName namespace.Name) (int64, error) {
Expand Down