diff --git a/local/run_mite.sh b/local/run_mite.sh index 1965a1ee..808220eb 100755 --- a/local/run_mite.sh +++ b/local/run_mite.sh @@ -10,7 +10,7 @@ docker-compose -f docker_compose_monitoring.yml up -d # Run mite stack w/o controller mite runner --controller-socket=tcp://127.0.0.1:14301 --message-socket=tcp://127.0.0.1:14302 & mite duplicator --message-socket=tcp://0.0.0.0:14302 tcp://0.0.0.0:14303 & -mite stats --stats-in-socket=tcp://127.0.0.1:14303 --stats-out-socket=tcp://0.0.0.0:14305 --stats-include-processors=mite,mite_http & +mite stats --stats-in-socket=tcp://127.0.0.1:14303 --stats-out-socket=tcp://0.0.0.0:14305 --stats-include-processors=mite,mite_http,mite_kafka & mite prometheus_exporter --stats-out-socket=tcp://127.0.0.1:14305 --web-address=0.0.0.0:9301 & # Start a mock web server diff --git a/mite_kafka/__init__.py b/mite_kafka/__init__.py new file mode 100644 index 00000000..a0fc3b95 --- /dev/null +++ b/mite_kafka/__init__.py @@ -0,0 +1,68 @@ +import asyncio +import logging +from contextlib import asynccontextmanager + +from aiokafka import AIOKafkaProducer, AIOKafkaConsumer +from mite.exceptions import MiteError + +logger = logging.getLogger(__name__) + +class KafkaError(MiteError): + pass + +class KafkaContext: + pass + +class _KafkaWrapper: + def __init__(self): + self._loop = asyncio.get_event_loop() + + def install(self, context): + context.kafka = self + + def uninstall(self, context): + del context.kafka + + async def create_producer(self, *args, **kwargs): + kwargs.setdefault("loop", self._loop) + return AIOKafkaProducer(*args, **kwargs) + + async def create_consumer(self, *args, **kwargs): + kwargs.setdefault("loop", self._loop) + return AIOKafkaConsumer(*args, **kwargs) + + async def send_and_wait(self, producer, topic, key=None, value=None, **kwargs): + try: + await producer.start() + await producer.send_and_wait(topic, key=key, value=value, **kwargs) + finally: + await producer.stop() + + async def get_message(self, consumer, *topics, **kwargs): + try: + await consumer.start() + async for msg in consumer: + return msg + finally: + await consumer.stop() + + async def create_message(self, value, **kwargs): + return value + +@asynccontextmanager +async def _kafka_context_manager(context): + kw = _KafkaWrapper() + kw.install(context) + try: + yield + except Exception as e: + raise KafkaError(f"Received an error from Kafka:\n{e}") from e + finally: + kw.uninstall(context) + +def mite_kafka(func): + async def wrapper(ctx, *args, **kwargs): + async with _kafka_context_manager(ctx): + return await func(ctx, *args, **kwargs) + + return wrapper diff --git a/mite_kafka/kafka_test_scenario.py b/mite_kafka/kafka_test_scenario.py new file mode 100644 index 00000000..a276ca4b --- /dev/null +++ b/mite_kafka/kafka_test_scenario.py @@ -0,0 +1,59 @@ +import asyncio +import logging +from mite.scenario import StopVolumeModel +from mite_kafka import KafkaError, mite_kafka + +# Set up logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Kafka broker address +KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092' + +# Define your Kafka topic +KAFKA_TOPIC = 'test_topic3' + +def volume_model_factory(n): + def vm(start, end): + if start > 60 : # Will run for 15 mins + raise StopVolumeModel + return n + + vm.__name__ = f"volume model {n}" + return vm + +# Example function to produce messages to Kafka +@mite_kafka +async def produce_to_kafka(ctx): + sent_ids = 0 + message = "Hello Kafka!" + producer = await ctx.kafka.create_producer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS) + try: + await ctx.kafka.send_and_wait(producer, KAFKA_TOPIC, value=message.encode('utf-8')) + logger.info(f"Message sent to Kafka: {message} to the topic {KAFKA_TOPIC}") + sent_ids +=1 + ctx.send("kafka_producer_stats", message=message, topic_name=KAFKA_TOPIC,total_sent=sent_ids) + except KafkaError as e: + logger.error(f"Error sending message to Kafka: {e}") + +# Example function to consume messages from Kafka +@mite_kafka +async def consume_from_kafka(ctx): + receive_ids = 0 + consumer = await ctx.kafka.create_consumer(KAFKA_TOPIC, bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS) + try: + async for message in consumer: + logger.info(f"Received message from Kafka: {message.value.decode('utf-8')}") + receive_ids += 1 + ctx.send("kafka_consumer_stats", message=message, topic_name=KAFKA_TOPIC, total_received=receive_ids) + except KafkaError as e: + logger.error(f"Error consuming message from Kafka: {e}") + finally: + await consumer.stop() + + +def scenario(): + return [ + ["mite_kafka.kafka_test_scenario:produce_to_kafka", None, volume_model_factory(2)], + # ["mite_kafka.kafka_test_stats:consume_from_kafka", None, volume_model_factory(2)] + ] \ No newline at end of file diff --git a/mite_kafka/stats.py b/mite_kafka/stats.py new file mode 100644 index 00000000..c17831d8 --- /dev/null +++ b/mite_kafka/stats.py @@ -0,0 +1,16 @@ +from mite.stats import Accumulator,extractor,matcher_by_type + +# Kafka + +_KAFKA_STATS = [ + Accumulator( + "mite_kafka_producer_stats", + matcher_by_type("kafka_producer_stats"), + extractor(["topic_name"], "total_sent"), + ), + Accumulator( + "mite_kafka_consumer_stats", + matcher_by_type("kafka_consumer_stats"), + extractor(["topic_name"], "total_received"), + ), +] diff --git a/setup.cfg b/setup.cfg index 479e6d60..f69e9a6a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -34,6 +34,8 @@ packages = find: [options.extras_require] amqp = aio_pika +kafka = + aiokafka finagle = thrift selenium = @@ -48,6 +50,7 @@ mite_stats = mite_finagle = mite_finagle.stats:STATS mite_http = mite_http.stats:STATS mite_selenium = mite_selenium.stats:STATS + mite_kafka = mite_kafka.stats:_KAFKA_STATS [options.packages.find] exclude = diff --git a/test-requirements.txt b/test-requirements.txt index 84c98924..2e59e295 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -1,4 +1,5 @@ aio-pika +aiokafka altair Cython h2 diff --git a/test/test_mite_kafka.py b/test/test_mite_kafka.py new file mode 100644 index 00000000..ee7adde2 --- /dev/null +++ b/test/test_mite_kafka.py @@ -0,0 +1,65 @@ +import asyncio +from unittest.mock import AsyncMock, patch + +import pytest +from mite_kafka import _KafkaWrapper, mite_kafka +from mocks.mock_context import MockContext +from aiokafka.producer import AIOKafkaProducer +from aiokafka import AIOKafkaClient + +@pytest.mark.asyncio +async def test_mite_kafka_decorator(): + context = MockContext() + + @mite_kafka + async def dummy_journey(ctx): + assert ctx.kafka is not None + + await dummy_journey(context) + +@pytest.mark.asyncio +async def test_mite_kafka_decorator_uninstall(): + context = MockContext() + + @mite_kafka + async def dummy_journey(ctx): + pass + + await dummy_journey(context) + + assert getattr(context, "kafka", None) is None + +@pytest.mark.asyncio +async def test_create_producer(): + # Create a mock for AIOKafkaProducer + producer_mock = AsyncMock() + # Patch AIOKafkaProducer to return the mock + with patch('aiokafka.producer.AIOKafkaProducer', new_callable=producer_mock): + # Create an instance of _KafkaWrapper + kafka_wrapper = _KafkaWrapper() + # Call the create_producer method + await kafka_wrapper.create_producer(bootstrap_servers='broker_url') # Pass the broker URL as a keyword argument + # Assert that the AIOKafkaProducer class was called with the expected arguments + # AIOKafkaProducer.return_value.assert_called() + await asyncio.sleep(0) + producer_mock.assert_called_once_with() + + + +@pytest.mark.asyncio +async def test_create_producer_two(): + # Create a mock for AIOKafkaProducer + producer_mock = AsyncMock() + # Patch AIOKafkaProducer to return the mock + with patch('aiokafka.producer.AIOKafkaProducer', new_callable=producer_mock): + # Create an instance of _KafkaWrapper + kafka_wrapper = _KafkaWrapper() + # Call the create_producer method + producer = await kafka_wrapper.create_producer(bootstrap_servers="localhost:9092") # Pass the broker URL as a keyword argument + # Assert that the AIOKafkaProducer class was called with the expected arguments + # AIOKafkaProducer.return_value.assert_called() + await asyncio.sleep(0) + await kafka_wrapper.send_and_wait(producer, "my_topic", b"Super message") + producer_mock.assert_called_once_with(bootstrap_servers='broker_url',loop=asyncio.get_event_loop()) + + diff --git a/tox.ini b/tox.ini index c32cf55f..c1435b50 100644 --- a/tox.ini +++ b/tox.ini @@ -45,6 +45,7 @@ source = mite_browser mite_selenium mite_amqp + mite_kafka mite_websocket mite_finagle plugins = Cython.Coverage