-
Notifications
You must be signed in to change notification settings - Fork 147
/
Copy pathbase_announcement_client.py
47 lines (37 loc) · 1.66 KB
/
base_announcement_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
import abc
import logging
from http import HTTPStatus
from flask import jsonify, make_response, Response
from marshmallow import ValidationError
from amundsen_application.models.announcements import Announcements, AnnouncementsSchema
class BaseAnnouncementClient(abc.ABC):
@abc.abstractmethod
def __init__(self) -> None:
pass # pragma: no cover
@abc.abstractmethod
def get_posts(self) -> Announcements:
"""
Returns an instance of amundsen_application.models.announcements.Announcements, which should match
amundsen_application.models.announcements.AnnouncementsSchema
"""
pass # pragma: no cover
def _get_posts(self) -> Response:
def _create_error_response(message: str) -> Response:
logging.exception(message)
payload = jsonify({'posts': [], 'msg': message})
return make_response(payload, HTTPStatus.INTERNAL_SERVER_ERROR)
try:
announcements = self.get_posts()
except Exception as e:
message = 'Encountered exception getting posts: ' + str(e)
return _create_error_response(message)
try:
data = AnnouncementsSchema().dump(announcements)
AnnouncementsSchema().load(data) # validate returned object
payload = jsonify({'posts': data.get('posts'), 'msg': 'Success'})
return make_response(payload, HTTPStatus.OK)
except ValidationError as err:
message = 'Announcement data dump returned errors: ' + str(err.messages)
return _create_error_response(message)