Skip to content

Commit

Permalink
Merge branch 'master' into ingester
Browse files Browse the repository at this point in the history
  • Loading branch information
yurishkuro authored Oct 31, 2018
2 parents 6585329 + 2be7b0f commit 5fe7382
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 11 deletions.
2 changes: 2 additions & 0 deletions ADOPTERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,7 @@
* [UBER](http://uber.com)
* https://eng.uber.com/distributed-tracing/
* [Under Armour](https://www.underarmour.com)
* [Vistar Media](https://www.vistarmedia.com)
* http://labs.vistarmedia.com/2018/10/31/deploying-jaeger-with-cloudformation-via-bazel.html
* [Weave](https://www.getweave.com)
* [Zenly](https://zen.ly/)
2 changes: 1 addition & 1 deletion CODEOWNERS
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@

* @yurishkuro @black-adder @vprithvi @pavolloffay @jpkrohling @tiffon
* @black-adder @jpkrohling @objectiser @pavolloffay @tiffon @vprithvi @yurishkuro

4 changes: 4 additions & 0 deletions cmd/ingester/app/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Consumer struct {
deadlockDetector deadlockDetector

partitionIDToState map[int32]*consumerState
partitionsHeld metrics.Counter
}

type consumerState struct {
Expand All @@ -63,6 +64,7 @@ func New(params Params) (*Consumer, error) {
processorFactory: params.ProcessorFactory,
deadlockDetector: deadlockDetector,
partitionIDToState: make(map[int32]*consumerState),
partitionsHeld: partitionsHeld(params.MetricsFactory),
}, nil
}

Expand Down Expand Up @@ -100,6 +102,8 @@ func (c *Consumer) Close() error {

func (c *Consumer) handleMessages(pc sc.PartitionConsumer) {
c.logger.Info("Starting message handler", zap.Int32("partition", pc.Partition()))
c.partitionsHeld.Inc(1)
defer c.partitionsHeld.Inc(-1)
c.partitionIDToState[pc.Partition()].wg.Add(1)
defer c.partitionIDToState[pc.Partition()].wg.Done()
defer c.closePartition(pc)
Expand Down
8 changes: 7 additions & 1 deletion cmd/ingester/app/consumer/consumer_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/uber/jaeger-lib/metrics"
)

const consumerNamespace = "sarama-consumer"

type msgMetrics struct {
counter metrics.Counter
offsetGauge metrics.Gauge
Expand All @@ -36,7 +38,7 @@ type partitionMetrics struct {
}

func (c *Consumer) namespace(partition int32) metrics.Factory {
return c.metricsFactory.Namespace("sarama-consumer", map[string]string{"partition": strconv.Itoa(int(partition))})
return c.metricsFactory.Namespace(consumerNamespace, map[string]string{"partition": strconv.Itoa(int(partition))})
}

func (c *Consumer) newMsgMetrics(partition int32) msgMetrics {
Expand All @@ -58,3 +60,7 @@ func (c *Consumer) partitionMetrics(partition int32) partitionMetrics {
closeCounter: f.Counter("partition-close", nil),
startCounter: f.Counter("partition-start", nil)}
}

func partitionsHeld(metricsFactory metrics.Factory) metrics.Counter {
return metricsFactory.Namespace(consumerNamespace, nil).Counter("partitions-held", nil)
}
21 changes: 16 additions & 5 deletions cmd/ingester/app/consumer/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ const (
)

func TestConstructor(t *testing.T) {
newConsumer, err := New(Params{})
newConsumer, err := New(Params{MetricsFactory: metrics.NullFactory})
assert.NoError(t, err)
assert.NotNil(t, newConsumer)
}
Expand Down Expand Up @@ -83,23 +83,24 @@ func newSaramaClusterConsumer(saramaPartitionConsumer sarama.PartitionConsumer)
}

func newConsumer(
factory metrics.Factory,
metricsFactory metrics.Factory,
topic string,
processor processor.SpanProcessor,
consumer consumer.Consumer) *Consumer {

logger, _ := zap.NewDevelopment()
return &Consumer{
metricsFactory: factory,
metricsFactory: metricsFactory,
logger: logger,
internalConsumer: consumer,
partitionIDToState: make(map[int32]*consumerState),
deadlockDetector: newDeadlockDetector(factory, logger, time.Second),
partitionsHeld: partitionsHeld(metricsFactory),
deadlockDetector: newDeadlockDetector(metricsFactory, logger, time.Second),

processorFactory: ProcessorFactory{
topic: topic,
consumer: consumer,
metricsFactory: factory,
metricsFactory: metricsFactory,
logger: logger,
baseProcessor: processor,
parallelism: 1,
Expand Down Expand Up @@ -152,12 +153,22 @@ func TestSaramaConsumerWrapper_start_Messages(t *testing.T) {
mc.YieldMessage(msg)
isProcessed.Wait()

testutils.AssertCounterMetrics(t, localFactory, testutils.ExpectedMetric{
Name: "sarama-consumer.partitions-held",
Value: 1,
})

mp.AssertExpectations(t)
// Ensure that the partition consumer was updated in the map
assert.Equal(t, saramaPartitionConsumer.HighWaterMarkOffset(),
undertest.partitionIDToState[partition].partitionConsumer.HighWaterMarkOffset())
undertest.Close()

testutils.AssertCounterMetrics(t, localFactory, testutils.ExpectedMetric{
Name: "sarama-consumer.partitions-held",
Value: 0,
})

partitionTag := map[string]string{"partition": fmt.Sprint(partition)}
testutils.AssertCounterMetrics(t, localFactory, testutils.ExpectedMetric{
Name: "sarama-consumer.messages",
Expand Down
5 changes: 3 additions & 2 deletions plugin/storage/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,9 @@ func loadAndParseJSON(t *testing.T, path string, object interface{}) {
// required, because we want to only query on recent traces, so we replace all the dates with recent dates.
func correctTime(json []byte) []byte {
jsonString := string(json)
today := time.Now().Format("2006-01-02")
yesterday := time.Now().AddDate(0, 0, -1).Format("2006-01-02")
now := time.Now().UTC()
today := now.Format("2006-01-02")
yesterday := now.AddDate(0, 0, -1).Format("2006-01-02")
retString := strings.Replace(jsonString, "2017-01-26", today, -1)
retString = strings.Replace(retString, "2017-01-25", yesterday, -1)
return []byte(retString)
Expand Down
4 changes: 2 additions & 2 deletions scripts/travis/es-integration-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

set -e

docker pull docker.elastic.co/elasticsearch/elasticsearch:5.4.0
CID=$(docker run -d -p 9200:9200 -e "http.host=0.0.0.0" -e "transport.host=127.0.0.1" docker.elastic.co/elasticsearch/elasticsearch:5.4.0)
docker pull docker.elastic.co/elasticsearch/elasticsearch:5.6.12
CID=$(docker run -d -p 9200:9200 -e "http.host=0.0.0.0" -e "transport.host=127.0.0.1" docker.elastic.co/elasticsearch/elasticsearch:5.6.12)
export STORAGE=elasticsearch
make storage-integration-test
docker kill $CID

0 comments on commit 5fe7382

Please sign in to comment.