Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tests #3

Merged
merged 33 commits into from
Feb 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
0332ef2
add tests
quaxsze Jan 14, 2022
36cfb93
filters
quaxsze Jan 18, 2022
dcb6dc5
filters and tests
quaxsze Jan 18, 2022
f796db4
Merge branch 'main' into addTests
maudetes Jan 19, 2022
c33f75d
remove dockefiles
quaxsze Jan 20, 2022
0230cb2
add missing requirements
quaxsze Jan 20, 2022
bf4503c
fix service
quaxsze Jan 20, 2022
a8b5e52
Merge branch 'main' into addTests
quaxsze Jan 20, 2022
cb29915
Merge branch 'main' of github.com:opendatateam/udata-search-service i…
quaxsze Jan 24, 2022
586462f
fix tests
quaxsze Jan 24, 2022
2c5c51f
Merge branch 'addTests' of github.com:opendatateam/udata-search-servi…
quaxsze Jan 24, 2022
06d605a
add github action
quaxsze Jan 24, 2022
970145d
add github action2
quaxsze Jan 24, 2022
7d85fd4
linting fixes
quaxsze Jan 24, 2022
7eb73bb
Improve temporal and spatial support (#8)
maudetes Jan 25, 2022
ddcaa8e
add more tests
quaxsze Jan 26, 2022
8911c98
fix tests
quaxsze Jan 26, 2022
b6da8b8
fix tests
quaxsze Jan 26, 2022
239d20b
Add tests for the kafka consumer
maudetes Jan 26, 2022
4c26673
Deal with organization deserialization better
maudetes Jan 26, 2022
03e1e76
Merge pull request #9 from opendatateam/add-kafka-consumer-tests
maudetes Jan 27, 2022
35e0da8
fix empty filters
quaxsze Jan 27, 2022
a032f7c
Merge branch 'addTests' of github.com:opendatateam/udata-search-servi…
quaxsze Jan 27, 2022
bd0c881
Add normalization before indexation in consumer
maudetes Jan 27, 2022
7e94483
Use `and` operator and a most fields query to dataset search query
maudetes Jan 27, 2022
5ce9022
Update tests following normalization
maudetes Jan 28, 2022
3e8d4c7
add api tests
quaxsze Jan 28, 2022
b5b9113
Merge branch 'addTests' of github.com:opendatateam/udata-search-servi…
quaxsze Jan 28, 2022
556e326
fix requirements
quaxsze Jan 28, 2022
25700f4
fix filters
quaxsze Feb 2, 2022
93848d6
Change normalized values to Float type
maudetes Feb 3, 2022
602dc8d
Update app/infrastructure/search_clients.py
quaxsze Feb 3, 2022
fc7582e
fix
quaxsze Feb 3, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions .github/workflows/github-actions.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
name: Pytest package

on: [push]

jobs:
build:

runs-on: ubuntu-latest
services:
es-1:
image: udata/elasticsearch:7.16.2
env:
node.name: es01
cluster.name: es-docker-cluster
cluster.initial_master_nodes: es01
ports:
- 9201:9200
strategy:
matrix:
python-version: [3.9]

steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install flake8 pytest
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
- name: Test with pytest
run: |
pytest
1 change: 0 additions & 1 deletion app/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from flask import Flask
from app.config import Config
from app.container import Container
from app.infrastructure import kafka_consumer
from app.presentation import api, commands


Expand Down
1 change: 1 addition & 0 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ class Config:

class Testing(Config):
TESTING = True
ELASTICSEARCH_URL = 'localhost:9201'
46 changes: 32 additions & 14 deletions app/domain/entities.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import dataclasses
from typing import List
from datetime import datetime
from dateutil.parser import isoparse


@dataclasses.dataclass
Expand All @@ -22,24 +24,26 @@ class Organization(EntityBase):
description: str
url: str
orga_sp: int
created_at: str
created_at: datetime.date
followers: int
datasets: int

badges: List[str] = None
acronym: str = None

def __post_init__(self):
if isinstance(self.created_at, datetime):
self.created_at = self.created_at.strftime('%Y-%m-%d')
if isinstance(self.created_at, str):
self.created_at = isoparse(self.created_at)


@dataclasses.dataclass
class Dataset(EntityBase):
id: str
title: str
acronym: str
url: str
created_at: str
created_at: datetime.date
frequency: str
format: List[str]
views: int
followers: int
reuses: int
Expand All @@ -48,37 +52,51 @@ class Dataset(EntityBase):
concat_title_org: str
description: str

temporal_coverage_start: str = None
temporal_coverage_end: str = None
acronym: str = None
badges: List[str] = None
tags: List[str] = None
license: str = None
temporal_coverage_start: datetime.date = None
temporal_coverage_end: datetime.date = None
granularity: str = None
geozones: str = None

orga_sp: int = None
orga_followers: int = None
organization_id: str = None
organization: str = None
organization_name: str = None
owner: str = None

def __post_init__(self):
if isinstance(self.created_at, datetime):
self.created_at = self.created_at.strftime('%Y-%m-%d')
if isinstance(self.created_at, str):
self.created_at = isoparse(self.created_at)
if isinstance(self.temporal_coverage_start, str):
self.temporal_coverage_start = isoparse(self.temporal_coverage_start)
if isinstance(self.temporal_coverage_end, str):
self.temporal_coverage_end = isoparse(self.temporal_coverage_end)


@dataclasses.dataclass
class Reuse(EntityBase):
id: str
title: str
url: str
created_at: str
created_at: datetime.date
views: int
followers: int
datasets: int
featured: int
description: str
type: str
topic: str

tags: List[str] = None
badges: List[str] = None
orga_followers: int = None
organization_id: str = None
organization: str = None
organization_name: str = None
owner: str = None

def __post_init__(self):
if isinstance(self.created_at, datetime):
self.created_at = self.created_at.strftime('%Y-%m-%d')
if isinstance(self.created_at, str):
self.created_at = isoparse(self.created_at)
77 changes: 53 additions & 24 deletions app/infrastructure/kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from kafka import KafkaConsumer

from app.domain.entities import Dataset, Organization, Reuse
from app.infrastructure.utils import get_concat_title_org, log2p

ELASTIC_HOST = os.environ.get('ELASTIC_HOST', 'localhost')
ELASTIC_PORT = os.environ.get('ELASTIC_PORT', '9200')
Expand Down Expand Up @@ -37,8 +38,8 @@ def create_kafka_consumer():
consumer = KafkaConsumer(
bootstrap_servers=f'{KAFKA_HOST}:{KAFKA_PORT}',
group_id='elastic',
reconnect_backoff_max_ms=100000, # TODO: what value to set here?
reconnect_backoff_max_ms=100000, # TODO: what value to set here?

# API Version is needed in order to prevent api version guessing leading to an error
# on startup if Kafka Broker isn't ready yet
api_version=tuple([int(value) for value in KAFKA_API_VERSION.split('.')])
Expand All @@ -51,53 +52,81 @@ def create_kafka_consumer():
class DatasetConsumer(Dataset):
@classmethod
def load_from_dict(cls, data):
data["organization_id"] = data["organization"].get('id') if data["organization"] else None
data["orga_followers"] = data["organization"].get('followers') if data["organization"] else None
data["orga_sp"] = data["organization"].get('public_service') if data["organization"] else None
data["organization"] = data["organization"].get('name') if data["organization"] else None
organization = data["organization"]
data["organization"] = organization.get('id') if organization else None
data["orga_followers"] = organization.get('followers') if organization else None
data["orga_sp"] = organization.get('public_service') if organization else None
data["organization_name"] = organization.get('name') if organization else None

data["concat_title_org"] = get_concat_title_org(data["title"], data['acronym'], data['organization_name'])
data["geozones"] = [zone.get("id") for zone in data.get("geozones", [])]

# Normalize values
data["views"] = log2p(data.get("views", 0))
data["followers"] = log2p(data.get("followers", 0))
data["reuses"] = log2p(data.get("reuses", 0))
data["orga_followers"] = log2p(data.get("orga_followers", 0))
data["orga_sp"] = 4 if data.get("orga_sp", 0) else 1
data["featured"] = 4 if data.get("featured", 0) else 1

data["concat_title_org"] = data["title"] + (' ' + data["organization"] if data["organization"] else '')
data["geozones"] = '' # TODO
return super().load_from_dict(data)


class ReuseConsumer(Reuse):
@classmethod
def load_from_dict(cls, data):
data["organization_id"] = data["organization"].get('id') if data["organization"] else None
data["orga_followers"] = data["organization"].get('followers') if data["organization"] else None
data["organization"] = data["organization"].get('name') if data["organization"] else None
organization = data["organization"]
data["organization"] = organization.get('id') if organization else None
data["orga_followers"] = organization.get('followers') if organization else None
data["organization_name"] = organization.get('name') if organization else None

# Normalize values
data["views"] = log2p(data.get("views", 0))
data["followers"] = log2p(data.get("followers", 0))
data["orga_followers"] = log2p(data.get("orga_followers", 0))
return super().load_from_dict(data)


class OrganizationConsumer(Organization):
pass
@classmethod
def load_from_dict(cls, data):
data["followers"] = log2p(data.get("followers", 0))
return super().load_from_dict(data)


def parse_message(index, val_utf8):
if index == 'dataset':
dataclass_consumer = DatasetConsumer
elif index == 'reuse':
dataclass_consumer = ReuseConsumer
elif index == 'organization':
dataclass_consumer = OrganizationConsumer
else:
raise ValueError(f'Model Deserializer not implemented for index: {index}')
try:
data = dataclass_consumer.load_from_dict(json.loads(val_utf8)).to_dict()
return data
except Exception as e:
raise ValueError(f'Failed to deserialize message: {val_utf8}. Exception raised: {e}')


def consume_messages(consumer, es):
logging.info('Ready to consume message')
for message in consumer:
value = message.value
val_utf8 = value.decode('utf-8').replace('NaN', 'null')

key = message.key
index = message.topic

logging.warning(f'Message recieved with key: {key} and value: {value}')
logging.info(f'Message recieved with key: {key} and value: {value}')

if val_utf8 != 'null':
if index == 'dataset':
dataclass_consumer = DatasetConsumer
elif index == 'reuse':
dataclass_consumer = ReuseConsumer
elif index == 'organization':
dataclass_consumer = OrganizationConsumer
else:
logging.error(f'Model Deserializer not implemented for index: {index}')
continue
data = dataclass_consumer.load_from_dict(json.loads(val_utf8)).to_dict()
try:
data = parse_message(index, val_utf8)
es.index(index=index, id=key.decode('utf-8'), document=data)
except ValueError as e:
logging.error(f'ValueError when parsing message: {e}')
except ConnectionError as e:
logging.error(f'ConnectionError with Elastic Client: {e}')
# TODO: add a retry mechanism?
Expand Down
Loading