Skip to content

Commit

Permalink
Address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Dec 10, 2022
1 parent 437d737 commit 51659ca
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,6 @@ func (p *processorImpl) bulkBeforeAction(_ int64, requests []elastic.BulkableReq
p.metricsHandler.Counter(metrics.ElasticsearchBulkProcessorRequests.GetMetricName()).Record(int64(len(requests)))
p.metricsHandler.Histogram(metrics.ElasticsearchBulkProcessorBulkSize.GetMetricName(), metrics.ElasticsearchBulkProcessorBulkSize.GetMetricUnit()).
Record(int64(len(requests)))
p.metricsHandler.Histogram(metrics.ElasticsearchBulkProcessorQueuedRequests.GetMetricName(), metrics.ElasticsearchBulkProcessorBulkSize.GetMetricUnit()).
Record(int64(p.mapToAckFuture.Len()))

for _, request := range requests {
visibilityTaskKey := p.extractVisibilityTaskKey(request)
Expand All @@ -220,6 +218,10 @@ func (p *processorImpl) bulkBeforeAction(_ int64, requests []elastic.BulkableReq

// bulkAfterAction is triggered after bulk processor commit
func (p *processorImpl) bulkAfterAction(_ int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
// Record how many documents are still waiting to be flushed to Elasticsearch.
p.metricsHandler.Histogram(metrics.ElasticsearchBulkProcessorQueuedRequests.GetMetricName(), metrics.ElasticsearchBulkProcessorBulkSize.GetMetricUnit()).
Record(int64(p.mapToAckFuture.Len()))

if err != nil {
const logFirstNRequests = 5
httpStatus := client.HttpStatus(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,12 @@ func (s *processorSuite) TestBulkAfterAction_Ack() {
Items: []map[string]*elastic.BulkResponseItem{mSuccess},
}

queuedRequestHistogram := metrics.NewMockHistogramMetric(s.controller)
s.mockMetricHandler.EXPECT().Histogram(
metrics.ElasticsearchBulkProcessorQueuedRequests.GetMetricName(),
metrics.ElasticsearchBulkProcessorQueuedRequests.GetMetricUnit(),
).Return(queuedRequestHistogram)
queuedRequestHistogram.EXPECT().Record(int64(1))
s.mockMetricHandler.EXPECT().Timer(metrics.ElasticsearchBulkProcessorRequestLatency.GetMetricName()).Return(metrics.NoopTimerMetricFunc)
mapVal := newAckFuture()
s.esProcessor.mapToAckFuture.Put(testKey, mapVal)
Expand Down Expand Up @@ -287,6 +293,12 @@ func (s *processorSuite) TestBulkAfterAction_Nack() {
Items: []map[string]*elastic.BulkResponseItem{mFailed},
}

queuedRequestHistogram := metrics.NewMockHistogramMetric(s.controller)
s.mockMetricHandler.EXPECT().Histogram(
metrics.ElasticsearchBulkProcessorQueuedRequests.GetMetricName(),
metrics.ElasticsearchBulkProcessorQueuedRequests.GetMetricUnit(),
).Return(queuedRequestHistogram)
queuedRequestHistogram.EXPECT().Record(int64(1))
s.mockMetricHandler.EXPECT().Timer(metrics.ElasticsearchBulkProcessorRequestLatency.GetMetricName()).Return(metrics.NoopTimerMetricFunc)
mapVal := newAckFuture()
s.esProcessor.mapToAckFuture.Put(testKey, mapVal)
Expand Down Expand Up @@ -327,6 +339,12 @@ func (s *processorSuite) TestBulkAfterAction_Error() {
Items: []map[string]*elastic.BulkResponseItem{mFailed},
}

queuedRequestHistogram := metrics.NewMockHistogramMetric(s.controller)
s.mockMetricHandler.EXPECT().Histogram(
metrics.ElasticsearchBulkProcessorQueuedRequests.GetMetricName(),
metrics.ElasticsearchBulkProcessorQueuedRequests.GetMetricUnit(),
).Return(queuedRequestHistogram)
queuedRequestHistogram.EXPECT().Record(int64(0))
counterMetric := metrics.NewMockCounterMetric(s.controller)
s.mockMetricHandler.EXPECT().Counter(metrics.ElasticsearchBulkProcessorFailures.GetMetricName()).Return(counterMetric)
counterMetric.EXPECT().Record(int64(1), metrics.HttpStatusTag(400))
Expand All @@ -352,12 +370,6 @@ func (s *processorSuite) TestBulkBeforeAction() {
metrics.ElasticsearchBulkProcessorBulkSize.GetMetricUnit(),
).Return(bulkSizeHistogram)
bulkSizeHistogram.EXPECT().Record(int64(1))
queuedRequestHistorgram := metrics.NewMockHistogramMetric(s.controller)
s.mockMetricHandler.EXPECT().Histogram(
metrics.ElasticsearchBulkProcessorQueuedRequests.GetMetricName(),
metrics.ElasticsearchBulkProcessorQueuedRequests.GetMetricUnit(),
).Return(queuedRequestHistorgram)
queuedRequestHistorgram.EXPECT().Record(int64(1))
s.mockMetricHandler.EXPECT().Timer(metrics.ElasticsearchBulkProcessorWaitAddLatency.GetMetricName()).Return(metrics.NoopTimerMetricFunc)
s.mockMetricHandler.EXPECT().Timer(metrics.ElasticsearchBulkProcessorWaitStartLatency.GetMetricName()).Return(metrics.NoopTimerMetricFunc)
mapVal := newAckFuture()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func (s *visibilityStore) addBulkRequestAndWait(

if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return &persistence.TimeoutError{Msg: fmt.Sprintf("visibility task %s timedout waiting for ACK after %v", visibilityTaskKey, s.processorAckTimeout())}
return &persistence.TimeoutError{Msg: fmt.Sprintf("visibility task %s timed out waiting for ACK after %v", visibilityTaskKey, s.processorAckTimeout())}
}
// Returns non-retryable Internal error here because these errors are unexpected.
// Visibility task processor retries all errors though, therefore new request will be generated for the same task.
Expand Down
2 changes: 1 addition & 1 deletion service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
SearchAttributesTotalSizeLimit: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.SearchAttributesTotalSizeLimit, 40*1024),
IndexerConcurrency: dc.GetIntProperty(dynamicconfig.WorkerIndexerConcurrency, 100),
ESProcessorNumOfWorkers: dc.GetIntProperty(dynamicconfig.WorkerESProcessorNumOfWorkers, 2),
// Should be not greater than number of visibility task queue workers VisibilityProcessorSchedulerWorkerCount (default 512)
// Should not be greater than number of visibility task queue workers VisibilityProcessorSchedulerWorkerCount (default 512)
// Otherwise, visibility queue processors won't be able to fill up bulk with documents (even under heavy load) and bulk will flush due to interval, not number of actions.
ESProcessorBulkActions: dc.GetIntProperty(dynamicconfig.WorkerESProcessorBulkActions, 500),
// 16MB - just a sanity check. With ES document size ~1Kb it should never be reached.
Expand Down

0 comments on commit 51659ca

Please sign in to comment.