Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test ci #268

Draft
wants to merge 74 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
81e5fa5
added kafka adapter
josephantonycj Feb 8, 2024
1f9f8c1
added requirements
josephantonycj Feb 8, 2024
bbd0e41
udpated kafka installation
josephantonycj Feb 9, 2024
e2aa037
updated setup.cfg
josephantonycj Feb 9, 2024
fa0bb6c
unit tests
Feb 12, 2024
c119267
tidying up the methods
josephantonycj Feb 13, 2024
c1d477c
test suite changes
Feb 13, 2024
420a88d
remove unused imports
Feb 13, 2024
8c0f1ac
added send receive message
josephantonycj Feb 13, 2024
d0d7971
test suite changes
Feb 13, 2024
3c670be
test suite changes
Feb 13, 2024
8b11d33
merged start and stop
josephantonycj Feb 13, 2024
8bdb780
test kafka wrapper
josephantonycj Feb 14, 2024
98dfa66
Updated the stats
josephantonycj Feb 14, 2024
2cc0b24
modified the context
josephantonycj Feb 20, 2024
d5f0f8d
added changes for kafka test
josephantonycj Feb 22, 2024
ae85b0d
updated filkes
josephantonycj Feb 22, 2024
627df7e
changes to unit tests
Feb 22, 2024
844208d
added setup config
josephantonycj Feb 27, 2024
12a3c7b
removed finagle and selenium
josephantonycj Feb 27, 2024
f9ef13d
changes to local testing
josephantonycj Feb 27, 2024
31feac3
added_commands
josephantonycj Feb 28, 2024
e0f990e
updated few changes
josephantonycj Feb 28, 2024
5ef5587
updated consumer scenaro
josephantonycj Feb 29, 2024
1586cd3
final one
josephantonycj Feb 29, 2024
a036d16
removed local file changes
josephantonycj Feb 29, 2024
a1c61cd
Final version
josephantonycj Feb 29, 2024
218a30e
test suite update
Feb 29, 2024
4f27575
updated unit test
josephantonycj Mar 1, 2024
571d559
fixed teh module
josephantonycj Mar 1, 2024
0c1b0ae
updated changes
josephantonycj Mar 1, 2024
45e9391
unit test fix
josephantonycj Mar 1, 2024
de397d3
changes to unit test
josephantonycj Mar 1, 2024
1de2016
few more fixes
josephantonycj Mar 1, 2024
51e180b
test suite updates
Mar 7, 2024
ed7c0fb
test suite update
Mar 7, 2024
36edeb1
update with few changes in test suite
josephantonycj Mar 12, 2024
afa0cb4
initialized kafka
josephantonycj Mar 12, 2024
6d96d3b
tweaks for unit test
josephantonycj Mar 12, 2024
95321d1
revert the changes
josephantonycj Mar 12, 2024
ce74441
new changes
josephantonycj Mar 12, 2024
684f60c
updated test suite
josephantonycj Mar 13, 2024
a1f26dc
minor fix
josephantonycj Mar 13, 2024
6f33a3c
updated test suite
josephantonycj Mar 13, 2024
778aeeb
unit test for create producer
josephantonycj Mar 13, 2024
3650ddc
fix the assert once object
josephantonycj Mar 13, 2024
0b64cf9
further fix for assert
josephantonycj Mar 13, 2024
273cc4d
resolve import
josephantonycj Mar 13, 2024
2fdd931
minor fix
josephantonycj Mar 13, 2024
bee5585
keyword argument fix
josephantonycj Mar 13, 2024
40c7bc7
remove unused import
Mar 14, 2024
198130b
test suite changes
Mar 14, 2024
eedcea1
removing test
Mar 19, 2024
431f832
producer test
Mar 19, 2024
70ed53e
patch object update
Mar 19, 2024
0547481
remove bytes
Mar 19, 2024
fce6389
remove testing send method
Mar 19, 2024
5646b65
resolving attr error
Mar 19, 2024
b8ba13c
revert changes
Mar 19, 2024
88a209e
mock object change
Mar 19, 2024
e9a0f2d
assert change
Mar 19, 2024
ea672c9
assert change
Mar 19, 2024
e94944d
assert change
Mar 19, 2024
74d0f51
test
MathinabegamAarif Mar 21, 2024
a8e35cd
test
MathinabegamAarif Mar 21, 2024
338e21a
test
MathinabegamAarif Mar 21, 2024
22885af
test
MathinabegamAarif Mar 21, 2024
042912b
test
MathinabegamAarif Mar 21, 2024
ceeb4e0
test
MathinabegamAarif Mar 21, 2024
f82f1f2
test
MathinabegamAarif Mar 21, 2024
434733e
test
MathinabegamAarif Mar 21, 2024
505158a
test
MathinabegamAarif Mar 21, 2024
d03c39b
add send
MathinabegamAarif Mar 26, 2024
c0df5d8
add send
MathinabegamAarif Mar 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion local/run_mite.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ docker-compose -f docker_compose_monitoring.yml up -d
# Run mite stack w/o controller
mite runner --controller-socket=tcp://127.0.0.1:14301 --message-socket=tcp://127.0.0.1:14302 &
mite duplicator --message-socket=tcp://0.0.0.0:14302 tcp://0.0.0.0:14303 &
mite stats --stats-in-socket=tcp://127.0.0.1:14303 --stats-out-socket=tcp://0.0.0.0:14305 --stats-include-processors=mite,mite_http &
mite stats --stats-in-socket=tcp://127.0.0.1:14303 --stats-out-socket=tcp://0.0.0.0:14305 --stats-include-processors=mite,mite_http,mite_kafka &
mite prometheus_exporter --stats-out-socket=tcp://127.0.0.1:14305 --web-address=0.0.0.0:9301 &

# Start a mock web server
Expand Down
68 changes: 68 additions & 0 deletions mite_kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import asyncio
import logging
from contextlib import asynccontextmanager

from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
from mite.exceptions import MiteError

logger = logging.getLogger(__name__)

class KafkaError(MiteError):
pass

class KafkaContext:
pass

class _KafkaWrapper:
def __init__(self):
self._loop = asyncio.get_event_loop()

def install(self, context):
context.kafka = self

def uninstall(self, context):
del context.kafka

async def create_producer(self, *args, **kwargs):
kwargs.setdefault("loop", self._loop)
return AIOKafkaProducer(*args, **kwargs)

async def create_consumer(self, *args, **kwargs):
kwargs.setdefault("loop", self._loop)
return AIOKafkaConsumer(*args, **kwargs)

async def send_and_wait(self, producer, topic, key=None, value=None, **kwargs):
try:
await producer.start()
await producer.send_and_wait(topic, key=key, value=value, **kwargs)
finally:
await producer.stop()

async def get_message(self, consumer, *topics, **kwargs):
try:
await consumer.start()
async for msg in consumer:
return msg
finally:
await consumer.stop()

async def create_message(self, value, **kwargs):
return value

@asynccontextmanager
async def _kafka_context_manager(context):
kw = _KafkaWrapper()
kw.install(context)
try:
yield
except Exception as e:
raise KafkaError(f"Received an error from Kafka:\n{e}") from e
finally:
kw.uninstall(context)

def mite_kafka(func):
async def wrapper(ctx, *args, **kwargs):
async with _kafka_context_manager(ctx):
return await func(ctx, *args, **kwargs)

return wrapper
59 changes: 59 additions & 0 deletions mite_kafka/kafka_test_scenario.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import asyncio
import logging
from mite.scenario import StopVolumeModel
from mite_kafka import KafkaError, mite_kafka

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Kafka broker address
KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092'

# Define your Kafka topic
KAFKA_TOPIC = 'test_topic3'

def volume_model_factory(n):
def vm(start, end):
if start > 60 : # Will run for 15 mins
raise StopVolumeModel
return n

vm.__name__ = f"volume model {n}"
return vm

# Example function to produce messages to Kafka
@mite_kafka
async def produce_to_kafka(ctx):
sent_ids = 0
message = "Hello Kafka!"
producer = await ctx.kafka.create_producer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS)
try:
await ctx.kafka.send_and_wait(producer, KAFKA_TOPIC, value=message.encode('utf-8'))
logger.info(f"Message sent to Kafka: {message} to the topic {KAFKA_TOPIC}")
sent_ids +=1
ctx.send("kafka_producer_stats", message=message, topic_name=KAFKA_TOPIC,total_sent=sent_ids)
except KafkaError as e:
logger.error(f"Error sending message to Kafka: {e}")

# Example function to consume messages from Kafka
@mite_kafka
async def consume_from_kafka(ctx):
receive_ids = 0
consumer = await ctx.kafka.create_consumer(KAFKA_TOPIC, bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS)
try:
async for message in consumer:
logger.info(f"Received message from Kafka: {message.value.decode('utf-8')}")
receive_ids += 1
ctx.send("kafka_consumer_stats", message=message, topic_name=KAFKA_TOPIC, total_received=receive_ids)
except KafkaError as e:
logger.error(f"Error consuming message from Kafka: {e}")
finally:
await consumer.stop()


def scenario():
return [
["mite_kafka.kafka_test_scenario:produce_to_kafka", None, volume_model_factory(2)],
# ["mite_kafka.kafka_test_stats:consume_from_kafka", None, volume_model_factory(2)]
]
16 changes: 16 additions & 0 deletions mite_kafka/stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from mite.stats import Accumulator,extractor,matcher_by_type

# Kafka

_KAFKA_STATS = [
Accumulator(
"mite_kafka_producer_stats",
matcher_by_type("kafka_producer_stats"),
extractor(["topic_name"], "total_sent"),
),
Accumulator(
"mite_kafka_consumer_stats",
matcher_by_type("kafka_consumer_stats"),
extractor(["topic_name"], "total_received"),
),
]
3 changes: 3 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ packages = find:
[options.extras_require]
amqp =
aio_pika
kafka =
aiokafka
finagle =
thrift
selenium =
Expand All @@ -48,6 +50,7 @@ mite_stats =
mite_finagle = mite_finagle.stats:STATS
mite_http = mite_http.stats:STATS
mite_selenium = mite_selenium.stats:STATS
mite_kafka = mite_kafka.stats:_KAFKA_STATS

[options.packages.find]
exclude =
Expand Down
1 change: 1 addition & 0 deletions test-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
aio-pika
aiokafka
altair
Cython
h2
Expand Down
65 changes: 65 additions & 0 deletions test/test_mite_kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import asyncio
from unittest.mock import AsyncMock, patch

import pytest
from mite_kafka import _KafkaWrapper, mite_kafka
from mocks.mock_context import MockContext
from aiokafka.producer import AIOKafkaProducer
from aiokafka import AIOKafkaClient

@pytest.mark.asyncio
async def test_mite_kafka_decorator():
context = MockContext()

@mite_kafka
async def dummy_journey(ctx):
assert ctx.kafka is not None

await dummy_journey(context)

@pytest.mark.asyncio
async def test_mite_kafka_decorator_uninstall():
context = MockContext()

@mite_kafka
async def dummy_journey(ctx):
pass

await dummy_journey(context)

assert getattr(context, "kafka", None) is None

@pytest.mark.asyncio
async def test_create_producer():
# Create a mock for AIOKafkaProducer
producer_mock = AsyncMock()
# Patch AIOKafkaProducer to return the mock
with patch('aiokafka.producer.AIOKafkaProducer', new_callable=producer_mock):
# Create an instance of _KafkaWrapper
kafka_wrapper = _KafkaWrapper()
# Call the create_producer method
await kafka_wrapper.create_producer(bootstrap_servers='broker_url') # Pass the broker URL as a keyword argument
# Assert that the AIOKafkaProducer class was called with the expected arguments
# AIOKafkaProducer.return_value.assert_called()
await asyncio.sleep(0)
producer_mock.assert_called_once_with()



@pytest.mark.asyncio
async def test_create_producer_two():
# Create a mock for AIOKafkaProducer
producer_mock = AsyncMock()
# Patch AIOKafkaProducer to return the mock
with patch('aiokafka.producer.AIOKafkaProducer', new_callable=producer_mock):
# Create an instance of _KafkaWrapper
kafka_wrapper = _KafkaWrapper()
# Call the create_producer method
producer = await kafka_wrapper.create_producer(bootstrap_servers="localhost:9092") # Pass the broker URL as a keyword argument
# Assert that the AIOKafkaProducer class was called with the expected arguments
# AIOKafkaProducer.return_value.assert_called()
await asyncio.sleep(0)
await kafka_wrapper.send_and_wait(producer, "my_topic", b"Super message")
producer_mock.assert_called_once_with(bootstrap_servers='broker_url',loop=asyncio.get_event_loop())


1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ source =
mite_browser
mite_selenium
mite_amqp
mite_kafka
mite_websocket
mite_finagle
plugins = Cython.Coverage
Expand Down