Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions src/server/api/admin_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,10 +407,3 @@ def hit_gdrs():
# print("round-trip d is : \n " + str(d) )
# return "OK"

from rfm_funcs.create_scores import create_scores
@admin_api.route("/api/admin/test_create_scores", methods=["GET"])
def hit_create_scores():
current_app.logger.info("Hitting create_scores() ")
tuple_count = create_scores('2021-07-27')
current_app.logger.info("create_scores() processed " + str(tuple_count) + " scores")
return jsonify(200)
8 changes: 8 additions & 0 deletions src/server/api/internal_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from flask import jsonify, current_app
from datetime import datetime
from api.API_ingest import ingest_sources_from_api
from rfm_funcs.create_scores import create_scores

### Internal API endpoints can only be accessed from inside the cluster;
### they are blocked by location rule in NGINX config
Expand Down Expand Up @@ -29,3 +30,10 @@ def ingest_raw_data():

return jsonify({'outcome': 'OK'}), 200


@internal_api.route("/api/test_create_scores", methods=["GET"])
def hit_create_scores():
current_app.logger.info("Hitting create_scores() ")
tuple_count = create_scores()
current_app.logger.info("create_scores() processed " + str(tuple_count) + " scores")
return jsonify(200)
8 changes: 6 additions & 2 deletions src/server/pipeline/flow_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from config import RAW_DATA_PATH
from config import engine
from models import Base

from rfm_funcs.create_scores import create_scores

def start_flow():

Expand Down Expand Up @@ -78,7 +78,11 @@ def start_flow():
pdp.archived_date IS NULL
''')

current_app.logger.info('Finished flow script run')
current_app.logger.info('Finished flow script run, running RFM scoring')

score_result = create_scores() # Run RFM scoring on newly-processed donations
current_app.logger.info('Scored ' + str(score_result) + ' tuples')

job_outcome = 'completed'
log_db.log_exec_status(job_id, 'flow', 'complete', '' )

Expand Down
137 changes: 80 additions & 57 deletions src/server/rfm_funcs/create_scores.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from config import engine
from flask import current_app
import traceback

import pandas as pd
import numpy as np
Expand All @@ -17,15 +19,15 @@ def date_difference(my_date, max_date):
return diff


def create_scores(query_date):
def create_scores():
'''
requires query date as input-- must be string in the following format "%Y-%m-%d"
(used to) require query date as input-- must be string in the following format "%Y-%m-%d"
returns a list of matching_ids and scores as tuples
will also insert rfm scores into rfm_scores table----see src/server/api/admin_api.py
'''

with engine.connect() as connection:

current_app.logger.debug("running create_scores()")
# read in data from database via pull_donations_for_rfm() func (reads in as a list of tuples)
df = pd.read_sql(
"""
Expand All @@ -40,92 +42,113 @@ def create_scores(query_date):
from api.admin_api import read_rfm_edges, insert_rfm_scores # Avoid circular import issues

rfm_dict = read_rfm_edges()
recency_labels = [5,4,3,2,1]
recency_bins = list(rfm_dict['r'].values()) #imported from table

frequency_labels = [1,2,3,4,5]
frequency_bins = list(rfm_dict['f'].values()) #imported from table
if len(rfm_dict) == 3: # r,f,m

try:

recency_labels = [5,4,3,2,1]
recency_bins = list(rfm_dict['r'].values()) #imported from table

frequency_labels = [1,2,3,4,5]
frequency_bins = list(rfm_dict['f'].values()) #imported from table

monetary_labels = [1,2,3,4,5]
monetary_bins = list(rfm_dict['m'].values()) #imported from table

monetary_labels = [1,2,3,4,5]
monetary_bins = list(rfm_dict['m'].values()) #imported from table

########################## recency #########################################

########################## recency #########################################
donations_past_year = df
donations_past_year['close_date'] =pd.to_datetime(donations_past_year['close_date']).dt.date

donations_past_year = df
donations_past_year['close_date'] =pd.to_datetime(donations_past_year['close_date']).dt.date
# calculate date difference between input date and individual row close date

# calculate date difference between input date and individual row close date
days = []
max_close_date = donations_past_year['close_date'].max()
for ii in donations_past_year['close_date']:
days.append(date_difference(ii, max_close_date))
donations_past_year['days_since'] = days

days = []
max_close_date = donations_past_year['close_date'].max()
for ii in donations_past_year['close_date']:
days.append(date_difference(ii, max_close_date))
donations_past_year['days_since'] = days
grouped_past_year = donations_past_year.groupby('matching_id').agg({'days_since': ['min']}).reset_index()
print(grouped_past_year.head())

grouped_past_year[('days_since', 'min')]= grouped_past_year[('days_since', 'min')].dt.days

grouped_past_year = donations_past_year.groupby('matching_id').agg({'days_since': ['min']}).reset_index()
print(grouped_past_year.head())

grouped_past_year[('days_since', 'min')]= grouped_past_year[('days_since', 'min')].dt.days
max_maybe = grouped_past_year[('days_since', 'min')].max()

max_maybe = grouped_past_year[('days_since', 'min')].max()
real_max = max(max_maybe, max(recency_bins)+1 )

real_max = max(max_maybe, max(recency_bins)+1 )
recency_bins.append(real_max)

recency_bins.append(real_max)

grouped_past_year['recency_score'] = pd.cut(grouped_past_year[('days_since','min')], bins= recency_bins, labels=recency_labels, include_lowest = True)
grouped_past_year.rename(columns={('recency_score', ''): 'recency_score'})

################################## frequency ###############################

grouped_past_year['recency_score'] = pd.cut(grouped_past_year[('days_since','min')], bins= recency_bins, labels=recency_labels, include_lowest = True)
grouped_past_year.rename(columns={('recency_score', ''): 'recency_score'})
df['close_date'] = pd.DatetimeIndex(df['close_date'])

################################## frequency ###############################
df_grouped = df.groupby(['matching_id', pd.Grouper(key = 'close_date', freq = 'Q')]).count().max(level=0)

df['close_date'] = pd.DatetimeIndex(df['close_date'])
df_grouped = df_grouped.reset_index()

df_grouped = df.groupby(['matching_id', pd.Grouper(key = 'close_date', freq = 'Q')]).count().max(level=0)
frequency_bins.append(np.inf)

df_grouped = df_grouped.reset_index()
df_frequency = df_grouped[['matching_id' , 'amount']] # amount is a placeholder as the groupby step just gives a frequency count, the value doesn't correspond to donation monetary amount.

frequency_bins.append(np.inf)
df_frequency = df_frequency.rename(columns = {'amount':'frequency'}) #renaming amount to frequency

df_frequency = df_grouped[['matching_id' , 'amount']] # amount is a placeholder as the groupby step just gives a frequency count, the value doesn't correspond to donation monetary amount.
df_frequency['frequency_score'] = pd.cut(df_frequency['frequency'],
bins = frequency_bins, labels=frequency_labels, include_lowest=True)

df_frequency = df_frequency.rename(columns = {'amount':'frequency'}) #renaming amount to frequency
################################## amount ##################################

df_frequency['frequency_score'] = pd.cut(df_frequency['frequency'],
bins = frequency_bins, labels=frequency_labels, include_lowest=True)
monetary_bins.append(np.inf)

################################## amount ##################################
df_amount = df.groupby(df['matching_id'], as_index=False).amount.max()

monetary_bins.append(np.inf)
df_amount['amount_score'] = pd.cut(df_amount['amount'], bins= monetary_bins, include_lowest=True, labels = monetary_labels)

df_amount = df.groupby(df['matching_id'], as_index=False).amount.max()
# raise ValueError # Just to test exception handling

df_amount['amount_score'] = pd.cut(df_amount['amount'], bins= monetary_bins, include_lowest=True, labels = monetary_labels)
# Concatenate rfm scores
# merge monetary df and frequency df
df_semi = df_amount.merge(df_frequency, left_on='matching_id', right_on= 'matching_id')
print(grouped_past_year.head())
print(df_semi.head())
df_final = df_semi.merge(grouped_past_year, left_on='matching_id', right_on= 'matching_id') # merge monetary/frequency dfs to recency df

### get avg fm score and merge with df_final
# df_final['f_m_AVG_score'] = df_final[['frequency_score', 'amount_score']].mean(axis=1)

# Concatenate rfm scores
# merge monetary df and frequency df
df_semi = df_amount.merge(df_frequency, left_on='matching_id', right_on= 'matching_id')
print(grouped_past_year.head())
print(df_semi.head())
df_final = df_semi.merge(grouped_past_year, left_on='matching_id', right_on= 'matching_id') # merge monetary/frequency dfs to recency df

### get avg fm score and merge with df_final
# df_final['f_m_AVG_score'] = df_final[['frequency_score', 'amount_score']].mean(axis=1)
# import function: rfm_concat, which will catenate integers as a string and then convert back to a single integer
from rfm_funcs.rfm_functions import rfm_concat
rfm_score = rfm_concat(df_final[('recency_score'), ''], df_final['frequency_score'], df_final['amount_score'])

# Append rfm score to final df
df_final['rfm_score'] = rfm_score

# import function: rfm_concat, which will catenate integers as a string and then convert back to a single integer
from rfm_funcs.rfm_functions import rfm_concat
rfm_score = rfm_concat(df_final[('recency_score'), ''], df_final['frequency_score'], df_final['amount_score'])
from rfm_funcs.rfm_functions import merge_series
score_tuples = merge_series((df_final['matching_id']), df_final['rfm_score'])

# Append rfm score to final df
df_final['rfm_score'] = rfm_score
except Exception as e:
current_app.logger.error(e)
trace_back_string = traceback.format_exc()
current_app.logger.error(trace_back_string)
return 0

from rfm_funcs.rfm_functions import merge_series
score_tuples = merge_series((df_final['matching_id']), df_final['rfm_score'])
try:
insert_rfm_scores(score_tuples)
except Exception as e:
current_app.logger.error(e)
trace_back_string = traceback.format_exc()
current_app.logger.error(trace_back_string)
return 0

insert_rfm_scores(score_tuples)
return len(score_tuples) # Not sure there's anything to do with them at this point

return len(score_tuples) # Not sure there's anything to do with them at this point
else: # Didn't get len == 3
current_app.logger.error("rfm_edges missing from DB or malformed. Could not perform rfm scoring")
return 0