Skip to content

Commit

Permalink
Merge pull request #2435 from data-for-change/dev
Browse files Browse the repository at this point in the history
Feature 2321 creating telegram push notifications trigger dag on news…
  • Loading branch information
atalyaalon authored Aug 8, 2023
2 parents cf986a9 + 0a54e85 commit 8473b71
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 8 deletions.
23 changes: 21 additions & 2 deletions anyway/parsers/news_flash_db_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
from anyway.parsers import timezones
from anyway.models import NewsFlash
from anyway.slack_accident_notifications import publish_notification

from anyway.utilities import trigger_airflow_dag
from anyway.widgets.widget_utils import newsflash_has_location

# fmt: off

Expand Down Expand Up @@ -75,6 +76,19 @@ def remove_duplicate_rows(self):
)
self.commit()

@staticmethod
def generate_infographics_and_send_to_telegram(newsflashid):
dag_conf = {"news_flash_id": newsflashid}
trigger_airflow_dag("generate-and-send-infographics-images", dag_conf)

@staticmethod
def publish_notifications(newsflash: NewsFlash):
publish_notification(newsflash)
if newsflash_has_location(newsflash):
DBAdapter.generate_infographics_and_send_to_telegram(newsflash.id)
else:
logging.debug("newsflash does not have location, not publishing")

def insert_new_newsflash(self, newsflash: NewsFlash) -> None:
logging.info("Adding newsflash, is accident: {}, date: {}"
.format(newsflash.accident, newsflash.date))
Expand All @@ -83,7 +97,12 @@ def insert_new_newsflash(self, newsflash: NewsFlash) -> None:
self.db.session.commit()
infographics_data_cache_updater.add_news_flash_to_cache(newsflash)
if os.environ.get("FLASK_ENV") == "production" and newsflash.accident:
publish_notification(newsflash)
try:
DBAdapter.publish_notifications(newsflash)
except Exception as e:
logging.error("publish notifications failed")
logging.error(e)


def get_newsflash_by_id(self, id):
return self.db.session.query(NewsFlash).filter(NewsFlash.id == id)
Expand Down
3 changes: 2 additions & 1 deletion anyway/telegram_accident_notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@

TELEGRAM_CHANNEL_CHAT_ID = -1001666083560
TELEGRAM_LINKED_GROUP_CHAT_ID = -1001954877540
TEXT_FOR_AFTER_INFOGRAPHICS_MESSAGE = 'מקור המידע בלמ"ס, הופק באמצעות ANYWAY מבית "נתון לשינוי" למידע נוסף:'
TEXT_FOR_AFTER_INFOGRAPHICS_MESSAGE = 'מקור המידע בלמ"ס. נתוני התאונה שבמבזק לא נכללים באינפוגרפיקה. ' \
'הופק באמצעות ANYWAY מבית "נתון לשינוי" למידע נוסף:'

def send_initial_message_in_channel(bot, text):
return bot.send_message(TELEGRAM_CHANNEL_CHAT_ID, text)
Expand Down
19 changes: 19 additions & 0 deletions anyway/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,3 +338,22 @@ def is_a_valid_email(tmp_given_user_email: str) -> bool:

def half_rounded_up(num: int):
return math.ceil(num / 2)

def trigger_airflow_dag(dag_id, conf=None):
import airflow_client.client
from airflow_client.client.api import dag_run_api
from airflow_client.client.model.dag_run import DAGRun
from anyway import secrets

if conf is None:
conf = {}
airflow_api_url = "https://airflow.anyway.co.il/api/v1"
configuration = airflow_client.client.Configuration(
host=airflow_api_url,
username=secrets.get("AIRFLOW_USER"),
password=secrets.get("AIRFLOW_PASSWORD")
)
with airflow_client.client.ApiClient(configuration) as api_client:
dag_run_api_instance = dag_run_api.DAGRunApi(api_client)
dag_run = DAGRun(conf=conf)
return dag_run_api_instance.post_dag_run(dag_id, dag_run)
21 changes: 18 additions & 3 deletions anyway/widgets/widget_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from anyway.models import InvolvedMarkerView
from anyway.request_params import LocationInfo
from anyway.vehicle_type import VehicleType
from anyway.models import NewsFlash


def get_query(table_obj, filters, start_time, end_time):
Expand Down Expand Up @@ -52,7 +53,10 @@ def get_accidents_stats(
end_time=None,
):
filters = filters or {}
provider_code_filters = [BE_CONST.CBS_ACCIDENT_TYPE_1_CODE, BE_CONST.CBS_ACCIDENT_TYPE_3_CODE]
provider_code_filters = [
BE_CONST.CBS_ACCIDENT_TYPE_1_CODE,
BE_CONST.CBS_ACCIDENT_TYPE_3_CODE,
]
filters["provider_code"] = filters.get("provider_code", provider_code_filters)

# get stats
Expand All @@ -74,7 +78,8 @@ def get_accidents_stats(
else:
query = query.group_by(group_by)
query = query.with_entities(
group_by, func.count(count) if not cnt_distinct else func.count(distinct(count))
group_by,
func.count(count) if not cnt_distinct else func.count(distinct(count)),
)
df = pd.read_sql_query(query.statement, query.session.bind)
df.rename(columns={"count_1": "count"}, inplace=True) # pylint: disable=no-member
Expand All @@ -98,7 +103,10 @@ def retro_dictify(indexable) -> Dict[Any, Dict[Any, Any]]:


def add_empty_keys_to_gen_two_level_dict(
d, level_1_values: List[Any], level_2_values: List[Any], default_level_3_value: int = 0
d,
level_1_values: List[Any],
level_2_values: List[Any],
default_level_3_value: int = 0,
) -> Dict[Any, Dict[Any, int]]:
for v1 in level_1_values:
if v1 not in d:
Expand Down Expand Up @@ -228,3 +236,10 @@ def join_strings(strings, sep_a=" ,", sep_b=" ו-"):
return sep_b.join(strings)
else:
return sep_a.join(strings[:-1]) + sep_b + strings[-1]


def newsflash_has_location(newsflash: NewsFlash):
resolution = newsflash.resolution
return (resolution == "suburban_road" and newsflash.road_segment_name) or (
resolution == "street" and newsflash.street1_hebrew
)
7 changes: 7 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,13 @@ def generate_images_and_send_notification(id):
publish_notification(id)


@telegram.command()
@click.option("--id", type=int)
def trigger_dag(id):
from anyway.utilities import trigger_airflow_dag
dag_conf = {"news_flash_id": id}
trigger_airflow_dag("generate-and-send-infographics-images", dag_conf)

if __name__ == "__main__":
cli(sys.argv[1:]) # pylint: disable=too-many-function-args

5 changes: 3 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,6 @@ phonenumbers==8.12.21
flask-principal==0.4.0
pydantic==1.8.2
swifter==1.3.4
telebot
selenium
telebot==0.0.5
selenium==4.11.2
apache-airflow-client==2.6.2

0 comments on commit 8473b71

Please sign in to comment.