diff --git a/src/server/api/admin_api.py b/src/server/api/admin_api.py index 76905705..58b49d5c 100644 --- a/src/server/api/admin_api.py +++ b/src/server/api/admin_api.py @@ -407,10 +407,3 @@ def hit_gdrs(): # print("round-trip d is : \n " + str(d) ) # return "OK" -from rfm_funcs.create_scores import create_scores -@admin_api.route("/api/admin/test_create_scores", methods=["GET"]) -def hit_create_scores(): - current_app.logger.info("Hitting create_scores() ") - tuple_count = create_scores('2021-07-27') - current_app.logger.info("create_scores() processed " + str(tuple_count) + " scores") - return jsonify(200) diff --git a/src/server/api/internal_api.py b/src/server/api/internal_api.py index 6c26da3c..2d157a9a 100644 --- a/src/server/api/internal_api.py +++ b/src/server/api/internal_api.py @@ -2,6 +2,7 @@ from flask import jsonify, current_app from datetime import datetime from api.API_ingest import ingest_sources_from_api +from rfm_funcs.create_scores import create_scores ### Internal API endpoints can only be accessed from inside the cluster; ### they are blocked by location rule in NGINX config @@ -29,3 +30,10 @@ def ingest_raw_data(): return jsonify({'outcome': 'OK'}), 200 + +@internal_api.route("/api/test_create_scores", methods=["GET"]) +def hit_create_scores(): + current_app.logger.info("Hitting create_scores() ") + tuple_count = create_scores() + current_app.logger.info("create_scores() processed " + str(tuple_count) + " scores") + return jsonify(200) diff --git a/src/server/pipeline/flow_script.py b/src/server/pipeline/flow_script.py index 9c83a020..ae78332a 100644 --- a/src/server/pipeline/flow_script.py +++ b/src/server/pipeline/flow_script.py @@ -7,7 +7,7 @@ from config import RAW_DATA_PATH from config import engine from models import Base - +from rfm_funcs.create_scores import create_scores def start_flow(): @@ -78,7 +78,11 @@ def start_flow(): pdp.archived_date IS NULL ''') - current_app.logger.info('Finished flow script run') + current_app.logger.info('Finished flow script run, running RFM scoring') + + score_result = create_scores() # Run RFM scoring on newly-processed donations + current_app.logger.info('Scored ' + str(score_result) + ' tuples') + job_outcome = 'completed' log_db.log_exec_status(job_id, 'flow', 'complete', '' ) diff --git a/src/server/rfm_funcs/create_scores.py b/src/server/rfm_funcs/create_scores.py index 44e5518d..54b89040 100644 --- a/src/server/rfm_funcs/create_scores.py +++ b/src/server/rfm_funcs/create_scores.py @@ -1,4 +1,6 @@ from config import engine +from flask import current_app +import traceback import pandas as pd import numpy as np @@ -17,15 +19,15 @@ def date_difference(my_date, max_date): return diff -def create_scores(query_date): +def create_scores(): ''' - requires query date as input-- must be string in the following format "%Y-%m-%d" + (used to) require query date as input-- must be string in the following format "%Y-%m-%d" returns a list of matching_ids and scores as tuples will also insert rfm scores into rfm_scores table----see src/server/api/admin_api.py ''' with engine.connect() as connection: - + current_app.logger.debug("running create_scores()") # read in data from database via pull_donations_for_rfm() func (reads in as a list of tuples) df = pd.read_sql( """ @@ -40,92 +42,113 @@ def create_scores(query_date): from api.admin_api import read_rfm_edges, insert_rfm_scores # Avoid circular import issues rfm_dict = read_rfm_edges() - recency_labels = [5,4,3,2,1] - recency_bins = list(rfm_dict['r'].values()) #imported from table - frequency_labels = [1,2,3,4,5] - frequency_bins = list(rfm_dict['f'].values()) #imported from table + if len(rfm_dict) == 3: # r,f,m + + try: + + recency_labels = [5,4,3,2,1] + recency_bins = list(rfm_dict['r'].values()) #imported from table + + frequency_labels = [1,2,3,4,5] + frequency_bins = list(rfm_dict['f'].values()) #imported from table + + monetary_labels = [1,2,3,4,5] + monetary_bins = list(rfm_dict['m'].values()) #imported from table - monetary_labels = [1,2,3,4,5] - monetary_bins = list(rfm_dict['m'].values()) #imported from table + ########################## recency ######################################### - ########################## recency ######################################### + donations_past_year = df + donations_past_year['close_date'] =pd.to_datetime(donations_past_year['close_date']).dt.date - donations_past_year = df - donations_past_year['close_date'] =pd.to_datetime(donations_past_year['close_date']).dt.date + # calculate date difference between input date and individual row close date - # calculate date difference between input date and individual row close date + days = [] + max_close_date = donations_past_year['close_date'].max() + for ii in donations_past_year['close_date']: + days.append(date_difference(ii, max_close_date)) + donations_past_year['days_since'] = days - days = [] - max_close_date = donations_past_year['close_date'].max() - for ii in donations_past_year['close_date']: - days.append(date_difference(ii, max_close_date)) - donations_past_year['days_since'] = days + grouped_past_year = donations_past_year.groupby('matching_id').agg({'days_since': ['min']}).reset_index() + print(grouped_past_year.head()) + + grouped_past_year[('days_since', 'min')]= grouped_past_year[('days_since', 'min')].dt.days - grouped_past_year = donations_past_year.groupby('matching_id').agg({'days_since': ['min']}).reset_index() - print(grouped_past_year.head()) - - grouped_past_year[('days_since', 'min')]= grouped_past_year[('days_since', 'min')].dt.days + max_maybe = grouped_past_year[('days_since', 'min')].max() - max_maybe = grouped_past_year[('days_since', 'min')].max() + real_max = max(max_maybe, max(recency_bins)+1 ) - real_max = max(max_maybe, max(recency_bins)+1 ) + recency_bins.append(real_max) - recency_bins.append(real_max) + grouped_past_year['recency_score'] = pd.cut(grouped_past_year[('days_since','min')], bins= recency_bins, labels=recency_labels, include_lowest = True) + grouped_past_year.rename(columns={('recency_score', ''): 'recency_score'}) + ################################## frequency ############################### - grouped_past_year['recency_score'] = pd.cut(grouped_past_year[('days_since','min')], bins= recency_bins, labels=recency_labels, include_lowest = True) - grouped_past_year.rename(columns={('recency_score', ''): 'recency_score'}) + df['close_date'] = pd.DatetimeIndex(df['close_date']) - ################################## frequency ############################### + df_grouped = df.groupby(['matching_id', pd.Grouper(key = 'close_date', freq = 'Q')]).count().max(level=0) - df['close_date'] = pd.DatetimeIndex(df['close_date']) + df_grouped = df_grouped.reset_index() - df_grouped = df.groupby(['matching_id', pd.Grouper(key = 'close_date', freq = 'Q')]).count().max(level=0) + frequency_bins.append(np.inf) - df_grouped = df_grouped.reset_index() + df_frequency = df_grouped[['matching_id' , 'amount']] # amount is a placeholder as the groupby step just gives a frequency count, the value doesn't correspond to donation monetary amount. - frequency_bins.append(np.inf) + df_frequency = df_frequency.rename(columns = {'amount':'frequency'}) #renaming amount to frequency - df_frequency = df_grouped[['matching_id' , 'amount']] # amount is a placeholder as the groupby step just gives a frequency count, the value doesn't correspond to donation monetary amount. + df_frequency['frequency_score'] = pd.cut(df_frequency['frequency'], + bins = frequency_bins, labels=frequency_labels, include_lowest=True) - df_frequency = df_frequency.rename(columns = {'amount':'frequency'}) #renaming amount to frequency + ################################## amount ################################## - df_frequency['frequency_score'] = pd.cut(df_frequency['frequency'], - bins = frequency_bins, labels=frequency_labels, include_lowest=True) + monetary_bins.append(np.inf) - ################################## amount ################################## + df_amount = df.groupby(df['matching_id'], as_index=False).amount.max() - monetary_bins.append(np.inf) + df_amount['amount_score'] = pd.cut(df_amount['amount'], bins= monetary_bins, include_lowest=True, labels = monetary_labels) - df_amount = df.groupby(df['matching_id'], as_index=False).amount.max() + # raise ValueError # Just to test exception handling - df_amount['amount_score'] = pd.cut(df_amount['amount'], bins= monetary_bins, include_lowest=True, labels = monetary_labels) + # Concatenate rfm scores + # merge monetary df and frequency df + df_semi = df_amount.merge(df_frequency, left_on='matching_id', right_on= 'matching_id') + print(grouped_past_year.head()) + print(df_semi.head()) + df_final = df_semi.merge(grouped_past_year, left_on='matching_id', right_on= 'matching_id') # merge monetary/frequency dfs to recency df + ### get avg fm score and merge with df_final + # df_final['f_m_AVG_score'] = df_final[['frequency_score', 'amount_score']].mean(axis=1) - # Concatenate rfm scores - # merge monetary df and frequency df - df_semi = df_amount.merge(df_frequency, left_on='matching_id', right_on= 'matching_id') - print(grouped_past_year.head()) - print(df_semi.head()) - df_final = df_semi.merge(grouped_past_year, left_on='matching_id', right_on= 'matching_id') # merge monetary/frequency dfs to recency df - ### get avg fm score and merge with df_final - # df_final['f_m_AVG_score'] = df_final[['frequency_score', 'amount_score']].mean(axis=1) + # import function: rfm_concat, which will catenate integers as a string and then convert back to a single integer + from rfm_funcs.rfm_functions import rfm_concat + rfm_score = rfm_concat(df_final[('recency_score'), ''], df_final['frequency_score'], df_final['amount_score']) + # Append rfm score to final df + df_final['rfm_score'] = rfm_score - # import function: rfm_concat, which will catenate integers as a string and then convert back to a single integer - from rfm_funcs.rfm_functions import rfm_concat - rfm_score = rfm_concat(df_final[('recency_score'), ''], df_final['frequency_score'], df_final['amount_score']) + from rfm_funcs.rfm_functions import merge_series + score_tuples = merge_series((df_final['matching_id']), df_final['rfm_score']) - # Append rfm score to final df - df_final['rfm_score'] = rfm_score + except Exception as e: + current_app.logger.error(e) + trace_back_string = traceback.format_exc() + current_app.logger.error(trace_back_string) + return 0 - from rfm_funcs.rfm_functions import merge_series - score_tuples = merge_series((df_final['matching_id']), df_final['rfm_score']) + try: + insert_rfm_scores(score_tuples) + except Exception as e: + current_app.logger.error(e) + trace_back_string = traceback.format_exc() + current_app.logger.error(trace_back_string) + return 0 - insert_rfm_scores(score_tuples) + return len(score_tuples) # Not sure there's anything to do with them at this point - return len(score_tuples) # Not sure there's anything to do with them at this point + else: # Didn't get len == 3 + current_app.logger.error("rfm_edges missing from DB or malformed. Could not perform rfm scoring") + return 0