Skip to content

Commit

Permalink
test: fix instrumentation
Browse files Browse the repository at this point in the history
  • Loading branch information
JoanFM committed Nov 6, 2024
1 parent df81dbd commit 49b4e8e
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 144 deletions.
268 changes: 136 additions & 132 deletions tests/integration/docarray_v2/test_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,141 @@
from jina.helper import random_port


@pytest.mark.parametrize(
'protocols', [['grpc', 'http', 'websocket']]
)
@pytest.mark.parametrize('reduce', [False, True])
@pytest.mark.parametrize('sleep_time', [5])
def test_flow_with_shards_all_shards_return(protocols, reduce, sleep_time):
from typing import List

from docarray import BaseDoc, DocList
from docarray.documents import TextDoc

class TextDocWithId(TextDoc):
id: str
l: List[int] = []

class ResultTestDoc(BaseDoc):
price: int = '2'
l: List[int] = [3]
matches: DocList[TextDocWithId]

class SimilarityTestIndexer(Executor):
"""Simulates an indexer where no shard would fail, they all pass results"""

def __init__(self, sleep_time=0.1, *args, **kwargs):
super().__init__(*args, **kwargs)
self._docs = DocList[TextDocWithId]()
time.sleep(sleep_time)

@requests(on=['/index'])
def index(
self, docs: DocList[TextDocWithId], **kwargs
) -> DocList[TextDocWithId]:
for doc in docs:
self._docs.append(doc)

@requests(on=['/search'])
def search(
self, docs: DocList[TextDocWithId], **kwargs
) -> DocList[ResultTestDoc]:
resp = DocList[ResultTestDoc]()
for q in docs:
res = ResultTestDoc(id=q.id, matches=self._docs[0:3])
resp.append(res)
return resp

ports = [random_port() for _ in protocols]
with Flow(protocol=protocols, port=ports).add(
uses=SimilarityTestIndexer,
uses_with={'sleep_time': sleep_time},
shards=2,
reduce=reduce,
):
time.sleep(5)
for port, protocol in zip(ports, protocols):
c = Client(port=port, protocol=protocol)
index_da = DocList[TextDocWithId](
[TextDocWithId(id=f'{i}', text=f'ID {i}') for i in range(10)]
)
c.index(inputs=index_da, request_size=1, return_type=DocList[TextDocWithId])

responses = c.search(
inputs=index_da[0:1], request_size=1, return_type=DocList[ResultTestDoc]
)
assert len(responses) == 1 if reduce else 2
for r in responses:
assert r.l[0] == 3
assert len(r.matches) == 6
for match in r.matches:
assert 'ID' in match.text


@pytest.mark.parametrize('reduce', [True, False])
@pytest.mark.parametrize('sleep_time', [5])
def test_deployments_with_shards_all_shards_return(reduce, sleep_time):
from typing import List

from docarray import BaseDoc, DocList
from docarray.documents import TextDoc

class TextDocWithId(TextDoc):
id: str
l: List[int] = []

class ResultTestDoc(BaseDoc):
price: int = '2'
l: List[int] = [3]
matches: DocList[TextDocWithId]

class SimilarityTestIndexer(Executor):
"""Simulates an indexer where no shard would fail, they all pass results"""

def __init__(self, sleep_time=0.1, *args, **kwargs):
super().__init__(*args, **kwargs)
self._docs = DocList[TextDocWithId]()
time.sleep(sleep_time)

@requests(on=['/index'])
def index(
self, docs: DocList[TextDocWithId], **kwargs
) -> DocList[TextDocWithId]:
for doc in docs:
self._docs.append(doc)

@requests(on=['/search'])
def search(
self, docs: DocList[TextDocWithId], **kwargs
) -> DocList[ResultTestDoc]:
resp = DocList[ResultTestDoc]()
for q in docs:
res = ResultTestDoc(id=q.id, matches=self._docs[0:3])
resp.append(res)
return resp

with Deployment(
uses=SimilarityTestIndexer,
uses_with={'sleep_time': sleep_time},
shards=2,
reduce=reduce,
) as dep:
time.sleep(5)
index_da = DocList[TextDocWithId](
[TextDocWithId(id=f'{i}', text=f'ID {i}') for i in range(10)]
)
dep.index(inputs=index_da, request_size=1, return_type=DocList[TextDocWithId])
responses = dep.search(
inputs=index_da[0:1], request_size=1, return_type=DocList[ResultTestDoc]
)
assert len(responses) == 1 if reduce else 2
for r in responses:
assert r.l[0] == 3
assert len(r.matches) == 6
for match in r.matches:
assert 'ID' in match.text


@pytest.mark.parametrize(
'protocols', [['grpc'], ['http'], ['websocket'], ['grpc', 'http', 'websocket']]
)
Expand All @@ -46,6 +181,7 @@ def foo(self, docs: DocList[Image], **kwargs) -> DocList[Image]:

ports = [random_port() for _ in protocols]
with Flow(port=ports, protocol=protocols, replicas=replicas).add(uses=MyExecDifSchema) as f:
time.sleep(5)
for port, protocol in zip(ports, protocols):
c = Client(port=port, protocol=protocol)
docs = c.post(
Expand Down Expand Up @@ -1335,138 +1471,6 @@ def search(
assert q.text == r.text


@pytest.mark.parametrize('reduce', [True, False])
@pytest.mark.parametrize('sleep_time', [0.1, 5])
def test_deployments_with_shards_all_shards_return(reduce, sleep_time):
from typing import List

from docarray import BaseDoc, DocList
from docarray.documents import TextDoc

class TextDocWithId(TextDoc):
id: str
l: List[int] = []

class ResultTestDoc(BaseDoc):
price: int = '2'
l: List[int] = [3]
matches: DocList[TextDocWithId]

class SimilarityTestIndexer(Executor):
"""Simulates an indexer where no shard would fail, they all pass results"""

def __init__(self, sleep_time=0.1, *args, **kwargs):
super().__init__(*args, **kwargs)
self._docs = DocList[TextDocWithId]()
time.sleep(sleep_time)

@requests(on=['/index'])
def index(
self, docs: DocList[TextDocWithId], **kwargs
) -> DocList[TextDocWithId]:
for doc in docs:
self._docs.append(doc)

@requests(on=['/search'])
def search(
self, docs: DocList[TextDocWithId], **kwargs
) -> DocList[ResultTestDoc]:
resp = DocList[ResultTestDoc]()
for q in docs:
res = ResultTestDoc(id=q.id, matches=self._docs[0:3])
resp.append(res)
return resp

with Deployment(
uses=SimilarityTestIndexer,
uses_with={'sleep_time': sleep_time},
shards=2,
reduce=reduce,
) as dep:
index_da = DocList[TextDocWithId](
[TextDocWithId(id=f'{i}', text=f'ID {i}') for i in range(10)]
)
dep.index(inputs=index_da, request_size=1, return_type=DocList[TextDocWithId])
responses = dep.search(
inputs=index_da[0:1], request_size=1, return_type=DocList[ResultTestDoc]
)
assert len(responses) == 1 if reduce else 2
for r in responses:
assert r.l[0] == 3
assert len(r.matches) == 6
for match in r.matches:
assert 'ID' in match.text


@pytest.mark.parametrize(
'protocols', [['grpc'], ['http'], ['websocket']]
)
@pytest.mark.parametrize('reduce', [True, False])
@pytest.mark.parametrize('sleep_time', [5])
def test_flow_with_shards_all_shards_return(protocols, reduce, sleep_time):
from typing import List

from docarray import BaseDoc, DocList
from docarray.documents import TextDoc

class TextDocWithId(TextDoc):
id: str
l: List[int] = []

class ResultTestDoc(BaseDoc):
price: int = '2'
l: List[int] = [3]
matches: DocList[TextDocWithId]

class SimilarityTestIndexer(Executor):
"""Simulates an indexer where no shard would fail, they all pass results"""

def __init__(self, sleep_time=0.1, *args, **kwargs):
super().__init__(*args, **kwargs)
self._docs = DocList[TextDocWithId]()
time.sleep(sleep_time)

@requests(on=['/index'])
def index(
self, docs: DocList[TextDocWithId], **kwargs
) -> DocList[TextDocWithId]:
for doc in docs:
self._docs.append(doc)

@requests(on=['/search'])
def search(
self, docs: DocList[TextDocWithId], **kwargs
) -> DocList[ResultTestDoc]:
resp = DocList[ResultTestDoc]()
for q in docs:
res = ResultTestDoc(id=q.id, matches=self._docs[0:3])
resp.append(res)
return resp

ports = [random_port() for _ in protocols]
with Flow(protocol=protocols, port=ports).add(
uses=SimilarityTestIndexer,
uses_with={'sleep_time': sleep_time},
shards=2,
reduce=reduce,
):
for port, protocol in zip(ports, protocols):
c = Client(port=port, protocol=protocol)
index_da = DocList[TextDocWithId](
[TextDocWithId(id=f'{i}', text=f'ID {i}') for i in range(10)]
)
c.index(inputs=index_da, request_size=1, return_type=DocList[TextDocWithId])
responses = c.search(
inputs=index_da[0:1], request_size=1, return_type=DocList[ResultTestDoc]
)
assert len(responses) == 1 if reduce else 2
for r in responses:
assert r.l[0] == 3
assert len(r.matches) == 6
for match in r.matches:
assert 'ID' in match.text


def test_issue_shards_missmatch_endpoint_and_shard_with_lists():
class MyDoc(BaseDoc):
text: str
Expand Down
22 changes: 15 additions & 7 deletions tests/unit/serve/instrumentation/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def __init__(
type, "opentelemetry.sdk.metrics.view.Aggregation"
] = None,
):
print(f'JOAN IS HERE DIRMETRIC')
super().__init__(
preferred_temporality=preferred_temporality,
preferred_aggregation=preferred_aggregation,
Expand All @@ -40,6 +41,7 @@ def export(
timeout_millis: float = 10_000,
**kwargs,
) -> MetricExportResult:
print(f'export to {self.metric_filename} => {metrics_data.to_json()[0:3]}')
self.f.write(metrics_data.to_json())
self.f.write('\n')
self.f.flush()
Expand Down Expand Up @@ -75,6 +77,7 @@ def monkeypatch_metric_exporter(
f.write('0')

def collect_metrics():
print(f'tick_counter_filename {tick_counter_filename}')
with open(tick_counter_filename, 'r', encoding='utf-8') as ft:
tick_counter = int(ft.read())
with open(tick_counter_filename, 'w', encoding='utf-8') as ft2:
Expand All @@ -88,15 +91,20 @@ def _get_service_name(otel_measurement):

def read_metrics():
def read_metric_file(filename):
print(f'filename {filename}')
with open(filename, 'r', encoding='utf-8') as fr:
r = fr.read()
print(f'READ {r}')
return json.loads(r)

return {
_get_service_name(i): i
for i in map(read_metric_file, metrics_path.glob('*'))
}
print(f'READ {r[0:3]}')
try:
return json.loads(r)
except:
return None

ret = {}
for i in map(read_metric_file, metrics_path.glob('*')):
if i is not None:
ret[_get_service_name(i)] = i
return ret

class PatchedTextReader(PeriodicExportingMetricReader):
def __init__(self, *args, **kwargs) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ def meow(self, docs, **kwargs):
f.post('/')
collect_metrics()
metrics = read_metrics()
print(f' metrics {metrics.keys()}')
gateway_metrics = metrics['gateway/rep-0']['resource_metrics'][0][
'scope_metrics'
][0]['metrics']
Expand Down
8 changes: 4 additions & 4 deletions tests/unit/serve/instrumentation/test_instrumentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,10 @@ def _sleep():
}

@MetricsTimer(summary, histogram, labels)
def _sleep():
def _sleep_2():
time.sleep(0.1)

_sleep()
_sleep_2()

# Prometheus samples
summary_count_sample = [
Expand All @@ -107,5 +107,5 @@ def _sleep():
.to_json()
)
assert 'time_taken_decorator' == histogram_metric['name']
assert 1 == histogram_metric['data']['data_points'][0]['count']
assert labels == histogram_metric['data']['data_points'][0]['attributes']
assert 1 == histogram_metric['data']['data_points'][1]['count']
assert labels == histogram_metric['data']['data_points'][1]['attributes']

0 comments on commit 49b4e8e

Please sign in to comment.