Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Global metrics for PyHeron #1319

Merged
merged 2 commits into from
Aug 31, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion heron/common/src/python/utils/metrics/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
'''Common heron metrics module'''
__all__ = ['metrics', 'metrics_helper']
__all__ = ['metrics', 'metrics_helper', 'global_metrics']

from .metrics import (IMetric,
CountMetric,
Expand Down
52 changes: 52 additions & 0 deletions heron/common/src/python/utils/metrics/global_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# copyright 2016 twitter. all rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2016 Twitter. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Singleton class which exposes a simple globally available counter for heron jobs.

It serves the same functionality as GlobalMetrics.java
"""
import threading
from .metrics import MultiCountMetric

metricsContainer = MultiCountMetric()
registered = False
root_name = '__auto__'

lock = threading.Lock()

def incr(key, to_add=1):
metricsContainer.incr(key, to_add)

def safe_incr(key, to_add=1):
with lock:
metricsContainer.incr(key, to_add)

def init(metrics_collector, metrics_bucket):
with lock:
global registered
if not registered:
metrics_collector.register_metric(root_name, metricsContainer, metrics_bucket)
registered = True
13 changes: 13 additions & 0 deletions heron/common/tests/python/utils/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,16 @@ pex_test(
size = "small",
)

pex_test(
name = "global_metrics_unittest",
srcs = ["global_metrics_unittest.py", "mock_generator.py"],
deps = [
"//heron/common/tests/python:pytest-py"
],
reqs = [
"py==1.4.27",
"pytest==2.6.4",
"unittest2==0.5.1",
],
size = "small",
)
62 changes: 62 additions & 0 deletions heron/common/tests/python/utils/global_metrics_unittest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Copyright 2016 Twitter. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# pylint: disable=missing-docstring

import unittest
import threading

from heron.common.src.python.utils.metrics import global_metrics
import heron.common.tests.python.utils.mock_generator as mock_generator

class GlobalMetricsTest(unittest.TestCase):
def setUp(self):
self.metrics_collector = mock_generator.MockMetricsCollector()
global_metrics.init(self.metrics_collector, 10)
self.lock = threading.Lock()

def test_normal(self):
global_metrics.incr("mycounter_a")
global_metrics.incr("mycounter_b", 3)
global_metrics.safe_incr("mycounter_c", 5)
counter = global_metrics.metricsContainer
d = counter.get_value_and_reset()
self.assertTrue("mycounter_a" in d)
self.assertTrue("mycounter_b" in d)
self.assertTrue("mycounter_c" in d)
self.assertEqual(d["mycounter_a"], 1)
self.assertEqual(d["mycounter_b"], 3)
self.assertEqual(d["mycounter_c"], 5)

def concurrent_incr(self):
def incr_worker():
global_metrics.safe_incr("K")
global_metrics.safe_incr("K", 2)
global_metrics.safe_incr("K", 3)
threads = []
for i in range(10):
t = threading.Thread(target=incr_worker)
threads.append(t)
t.start()
for t in threads:
t.join()
counter = global_metrics.metricsContainer
d = counter.get_value_and_reset()
self.assertTrue("K" in d)
self.assertEqual(d["K"], 60)

def test_concurrent_incr(self):
for i in range(100):
global_metrics.metricsContainer.get_value_and_reset()
self.concurrent_incr()
2 changes: 2 additions & 0 deletions heron/examples/src/python/bolt/count_bolt.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"""module for example bolt: CountBolt"""
from collections import Counter
from heron.pyheron.src.python import Bolt
from heron.common.src.python.utils.metrics import global_metrics

# pylint: disable=unused-argument
class CountBolt(Bolt):
Expand All @@ -36,6 +37,7 @@ def _increment(self, word, inc_by):
def process(self, tup):
word = tup.values[0]
self._increment(word, 10 if word == "heron" else 1)
global_metrics.safe_incr('count')

def process_tick(self, tup):
self.log("Got tick tuple!")
Expand Down
1 change: 0 additions & 1 deletion heron/examples/src/python/word_count_topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import sys

from heron.pyheron.src.python import Grouping, TopologyBuilder, constants

from heron.examples.src.python.spout import WordSpout
from heron.examples.src.python.bolt import CountBolt

Expand Down
7 changes: 6 additions & 1 deletion heron/instance/src/python/basics/bolt_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from heron.common.src.python.utils.log import Log
from heron.common.src.python.utils.tuple import TupleHelper, HeronTuple
from heron.common.src.python.utils.metrics import BoltMetrics
from heron.common.src.python.utils.metrics import global_metrics, BoltMetrics
from heron.common.src.python.utils.misc import SerializerHelper
from heron.proto import tuple_pb2
from heron.pyheron.src.python import Stream
Expand Down Expand Up @@ -55,6 +55,11 @@ def start(self):
self.bolt_impl.initialize(config=context.get_cluster_config(), context=context)
context.invoke_hook_prepare()

# prepare global metrics
interval = float(self.sys_config[constants.HERON_METRICS_EXPORT_INTERVAL_SEC])
collector = context.get_metrics_collector()
global_metrics.init(collector, interval)

# prepare for custom grouping
self.pplan_helper.prepare_custom_grouping(context)

Expand Down
7 changes: 6 additions & 1 deletion heron/instance/src/python/basics/spout_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from heron.common.src.python.utils.log import Log
from heron.common.src.python.utils.tuple import TupleHelper
from heron.common.src.python.utils.metrics import SpoutMetrics
from heron.common.src.python.utils.metrics import global_metrics, SpoutMetrics
from heron.common.src.python.utils.misc import SerializerHelper
from heron.proto import topology_pb2, tuple_pb2
from heron.pyheron.src.python import Stream
Expand Down Expand Up @@ -65,6 +65,11 @@ def start(self):
self.spout_impl.initialize(config=context.get_cluster_config(), context=context)
context.invoke_hook_prepare()

# prepare global metrics
interval = float(self.sys_config[constants.HERON_METRICS_EXPORT_INTERVAL_SEC])
collector = context.get_metrics_collector()
global_metrics.init(collector, interval)

# prepare for custom grouping
self.pplan_helper.prepare_custom_grouping(context)

Expand Down
4 changes: 3 additions & 1 deletion tools/python/checkstyle.ini
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ disable=
deprecated-lambda,
duplicate-code,
bad-builtin,
consider-iterating-dictionary
consider-iterating-dictionary,
# module as singleton class
global-statement


[REPORTS]
Expand Down