Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Added python integration test core modules #1247

Merged
merged 2 commits into from
Aug 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions integration-test/src/python/integration_test/core/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package(default_visibility = ["//visibility:public"])

load("/tools/rules/pex_rules", "pex_library", "pex_binary")

pex_library(
name = "pyheron_integration_core",
srcs = glob(["**/*.py"]),
deps = [
"//heron/streamparse/src/python:pyheron_lib"
],
)
7 changes: 7 additions & 0 deletions integration-test/src/python/integration_test/core/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""Core modules for python topology integration tests"""
__all__ = ['aggregator_bolt', 'constants', 'integration_test_spout', 'integration_test_bolt',
'terminal_bolt', 'test_topology_builder', 'batch_bolt']

from .batch_bolt import BatchBolt
from .test_topology_builder import TestTopologyBuilder
from . import constants as integ_const
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# copyright 2016 twitter. all rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
'''aggregator_bolt.py'''
import httplib
import json
from urlparse import urlparse

from heron.common.src.python.utils.log import Log
from .terminal_bolt import TerminalBolt
from . import constants as integ_constants

class AggregatorBolt(TerminalBolt):
"""Aggregator Bolt: aggregates the result of integration tests and posts it to the server"""
# the last bolt has nothing to emit
outputs = []

# pylint: disable=unused-argument
def initialize(self, config, context):
self.http_post_url = config[integ_constants.HTTP_POST_URL_KEY]
self.result = []
Log.info("HTTP post url: %s" % self.http_post_url)
self.parsed_url = urlparse(self.http_post_url)

def process(self, tup):
self.result.append(tup.values[0])

def _post_result_to_server(self, json_result):
conn = httplib.HTTPConnection(self.parsed_url.netloc)
conn.request("POST", self.parsed_url.path, json_result)
response = conn.getresponse()
if response.status == 200:
Log.info("HTTP POST successful")
else:
Log.severe("HTTP POST failed, response code: %d, response: %s"
% (response.status, response.read()))
return response.status

def write_finished_data(self):
json_result = json.dumps(self.result)
Log.info("Actual result: %s" % json_result)
Log.info("Posting actual result to %s" % self.http_post_url)
try:
response_code = self._post_result_to_server(json_result)
if response_code != 200:
# try again
response_code = self._post_result_to_server(json_result)
if response_code != 200:
raise RuntimeError("Response code: %d" % response_code)
except Exception as e:
raise RuntimeError("Posting result to server failed with: %s" % e.message)
22 changes: 22 additions & 0 deletions integration-test/src/python/integration_test/core/batch_bolt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Copyright 2016 Twitter. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""interface for batch bolt"""
from abc import abstractmethod
from heron.streamparse.src.python import Bolt

class BatchBolt(Bolt):
"""Batch bolt interface for integration test"""
@abstractmethod
def finish_batch(self):
pass
27 changes: 27 additions & 0 deletions integration-test/src/python/integration_test/core/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright 2016 Twitter. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
'''constants.py: constants for integration test for pyheron'''
INTEGRATION_TEST_MOCK_MESSAGE_ID = "__integration_test_mock_message_id"
INTEGRATION_TEST_TERMINAL = "__integration_test_mock_terminal"
INTEGRATION_TEST_CONTROL_STREAM_ID = "__integration_test_control_stream_id"

# internal config key
MAX_EXECUTIONS = 10
HTTP_POST_URL_KEY = "http.post.url"

# user defined config key
USER_SPOUT_CLASSPATH = "user.spout.classpath"
USER_BOLT_CLASSPATH = "user.bolt.classpath"
# user defined max executions
USER_MAX_EXECUTIONS = "user.max.exec"
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# Copyright 2016 Twitter. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Base bolt for integration tests"""
import copy

from heron.common.src.python.utils.log import Log
from heron.streamparse.src.python import Bolt, Stream
from heron.streamparse.src.python.component import HeronComponentSpec
import heron.common.src.python.pex_loader as pex_loader

from .batch_bolt import BatchBolt
from . import constants as integ_const

# pylint: disable=missing-docstring
class IntegrationTestBolt(Bolt):
"""Base bolt for integration test

Every bolt of integration test topology consists of this instance, each delegating user's bolt.
"""
outputs = [Stream(fields=[integ_const.INTEGRATION_TEST_TERMINAL],
name=integ_const.INTEGRATION_TEST_CONTROL_STREAM_ID)]

@classmethod
def spec(cls, name, par, inputs, config, user_bolt_classpath, user_output_fields=None):
python_class_path = "%s.%s" % (cls.__module__, cls.__name__)
config[integ_const.USER_BOLT_CLASSPATH] = user_bolt_classpath
# avoid modification to cls.outputs
_outputs = copy.copy(cls.outputs)
if user_output_fields is not None:
_outputs.extend(user_output_fields)
return HeronComponentSpec(name, python_class_path, is_spout=False, par=par,
inputs=inputs, outputs=_outputs, config=config)

def initialize(self, config, context):
user_bolt_classpath = config.get(integ_const.USER_BOLT_CLASSPATH, None)
if user_bolt_classpath is None:
raise RuntimeError("User defined integration bolt was not found")
user_bolt_cls = self._load_user_bolt(context.get_topology_pex_path(), user_bolt_classpath)
self.user_bolt = user_bolt_cls(delegate=self)

upstream_components = set()
self.terminal_to_receive = 0
for streamId in context.get_this_sources().keys():
# streamId is topology_pb2.StreamId protobuf message
upstream_components.add(streamId.component_name)
for comp_name in upstream_components:
self.terminal_to_receive += len(context.get_component_tasks(comp_name))

self.tuple_received = 0
self.tuples_processed = 0
self.current_tuple_processing = None

Log.info("Terminals to receive: %d" % self.terminal_to_receive)
self.user_bolt.initialize(config, context)

@staticmethod
def _load_user_bolt(pex_file, classpath):
pex_loader.load_pex(pex_file)
cls = pex_loader.import_and_get_class(pex_file, classpath)
return cls

@property
def is_done(self):
return self.terminal_to_receive == 0

def process(self, tup):
self.tuple_received += 1
stream_id = tup.stream

Log.info("Received a tuple: %s from %s" % (tup, stream_id))
if stream_id == integ_const.INTEGRATION_TEST_CONTROL_STREAM_ID:
self.terminal_to_receive -= 1
if self.is_done:
if isinstance(self.user_bolt, BatchBolt):
Log.info("Invoke bolt to do finish batch")
self.user_bolt.finish_batch()

Log.info("Populating the terminals to downstream")
super(IntegrationTestBolt, self).emit(
[integ_const.INTEGRATION_TEST_TERMINAL],
stream=integ_const.INTEGRATION_TEST_CONTROL_STREAM_ID)
else:
self.current_tuple_processing = tup
self.user_bolt.process(tup)
self.ack(tup)

def emit(self, tup, stream=Stream.DEFAULT_STREAM_ID, anchors=None,
direct_task=None, need_task_ids=False):
if tup is None:
super(IntegrationTestBolt, self).emit(list(self.current_tuple_processing),
stream=stream, anchors=anchors,
direct_task=direct_task, need_task_ids=need_task_ids)
else:
super(IntegrationTestBolt, self).emit(tup, stream, anchors, direct_task, need_task_ids)

def ack(self, tup):
Log.info("Trying to do an ack. tuples processed: %d, received: %d"
% (self.tuples_processed, self.tuple_received))
if self.tuples_processed < self.tuple_received:
super(IntegrationTestBolt, self).ack(tup)
self.tuples_processed += 1

def fail(self, tup):
Log.info("Trying to do a fail. tuples processed: %d, received: %d"
% (self.tuples_processed, self.tuple_received))
if self.tuples_processed < self.tuple_received:
super(IntegrationTestBolt, self).fail(tup)
self.tuples_processed += 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# Copyright 2016 Twitter. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Base spout for integration tests"""
import copy
from heron.common.src.python.utils.log import Log
from heron.streamparse.src.python import Spout, Stream
from heron.streamparse.src.python.component import HeronComponentSpec
import heron.common.src.python.pex_loader as pex_loader

from . import constants as integ_const

class IntegrationTestSpout(Spout):
"""Base spout for integration test

Every spout of integration test topology consists of this instance, each delegating user's spout.
"""
outputs = [Stream(fields=[integ_const.INTEGRATION_TEST_TERMINAL],
name=integ_const.INTEGRATION_TEST_CONTROL_STREAM_ID)]

@classmethod
def spec(cls, name, par, config, user_spout_classpath, user_output_fields=None):
python_class_path = "%s.%s" % (cls.__module__, cls.__name__)

config[integ_const.USER_SPOUT_CLASSPATH] = user_spout_classpath
# avoid modification to cls.outputs
_outputs = copy.copy(cls.outputs)
if user_output_fields is not None:
_outputs.extend(user_output_fields)
return HeronComponentSpec(name, python_class_path, is_spout=True, par=par,
inputs=None, outputs=_outputs, config=config)

def initialize(self, config, context):
user_spout_classpath = config.get(integ_const.USER_SPOUT_CLASSPATH, None)
if user_spout_classpath is None:
raise RuntimeError("User defined integration test spout was not found")
user_spout_cls = self._load_user_spout(context.get_topology_pex_path(), user_spout_classpath)
self.user_spout = user_spout_cls(delegate=self)

self.max_executions = config.get(integ_const.USER_MAX_EXECUTIONS, integ_const.MAX_EXECUTIONS)
assert isinstance(self.max_executions, int) and self.max_executions > 0
Log.info("Max executions: %d" % self.max_executions)
self.tuples_to_complete = 0

self.user_spout.initialize(config, context)

@staticmethod
def _load_user_spout(pex_file, classpath):
pex_loader.load_pex(pex_file)
cls = pex_loader.import_and_get_class(pex_file, classpath)
return cls

@property
def is_done(self):
return self.max_executions == 0

def next_tuple(self):
if self.is_done:
return

self.max_executions -= 1
Log.info("max executions: %d" % self.max_executions)

self.user_spout.next_tuple()

if self.is_done:
self._emit_terminal_if_needed()
Log.info("This topology is finished.")

def ack(self, tup_id):
Log.info("Received an ack with tuple id: %s" % str(tup_id))
self.tuples_to_complete -= 1
if tup_id != integ_const.INTEGRATION_TEST_MOCK_MESSAGE_ID:
self.user_spout.ack(tup_id)
self._emit_terminal_if_needed()

def fail(self, tup_id):
Log.info("Received a fail message with tuple id: %s" % str(tup_id))
self.tuples_to_complete -= 1
if tup_id != integ_const.INTEGRATION_TEST_MOCK_MESSAGE_ID:
self.user_spout.fail(tup_id)
self._emit_terminal_if_needed()

def emit(self, tup, tup_id=None, stream=Stream.DEFAULT_STREAM_ID,
direct_task=None, need_task_ids=None):
"""Emits from this integration test spout

Overriden method which will be called when user's spout calls emit()
"""
# if is_control True -> control stream should not count
self.tuples_to_complete += 1

if tup_id is None:
Log.info("Add tup_id for tuple: %s" % str(tup))
_tup_id = integ_const.INTEGRATION_TEST_MOCK_MESSAGE_ID
else:
_tup_id = tup_id

super(IntegrationTestSpout, self).emit(tup, _tup_id, stream, direct_task, need_task_ids)

def _emit_terminal_if_needed(self):
Log.info("is_done: %s, tuples_to_complete: %s" % (self.is_done, self.tuples_to_complete))
if self.is_done and self.tuples_to_complete == 0:
Log.info("Emitting terminals to downstream")
super(IntegrationTestSpout, self).emit([integ_const.INTEGRATION_TEST_TERMINAL],
stream=integ_const.INTEGRATION_TEST_CONTROL_STREAM_ID)
25 changes: 25 additions & 0 deletions integration-test/src/python/integration_test/core/terminal_bolt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Copyright 2016 Twitter. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
'''interface for terminal bolt'''
from abc import abstractmethod
from .batch_bolt import BatchBolt

class TerminalBolt(BatchBolt):
"""Terminal bolt interface for integration test"""
def finish_batch(self):
self.write_finished_data()

@abstractmethod
def write_finished_data(self):
pass
Loading