Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Code Migration from msr-csv #5

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
.DS_Store
output/
116 changes: 77 additions & 39 deletions scripts/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,45 @@
This script includes the local computations for multi-shot ridge
regression with decentralized statistic calculation
"""
import os
import warnings

warnings.simplefilter("ignore")

import ujson as json
import numpy as np
import sys
import regression as reg
import coinstacparsers
from coinstacparsers import parsers
import pandas as pd
from local_ancillary import gather_local_stats, add_site_covariates
import utils as ut

from regression import sum_squared_error, y_estimate
import local_ancillary as lc


def local_0(args):
input_list = args["input"]
lamb = input_list["lambda"]

(X, y) = parsers.fsl_parser(args)
columns_to_normalize = lc.check_cols_to_normalize(X)

tol = input_list["tol"]
eta = input_list["eta"]
max_iter = input_list["max_iter"]

output_dict = {"computation_phase": "local_0",
"tol": tol,
"eta": eta
}
output_dict = {
"computation_phase": "local_0",
"tol": tol,
"eta": eta,
"columns_to_normalize": columns_to_normalize,
}

cache_dict = {
"covariates": X.to_json(orient='records'),
"dependents": y.to_json(orient='records'),
"covariates": X.to_json(orient="records"),
"dependents": y.to_json(orient="records"),
"lambda": lamb,
"max_iter": max_iter,
}
Expand All @@ -43,41 +51,55 @@ def local_0(args):
"output": output_dict,
"cache": cache_dict,
}

# raise Exception(computation_output_dict)
return json.dumps(computation_output_dict)


def local_1(args):
"""Read data from the local sites, perform local regressions and send
local statistics to the remote site"""

X = pd.read_json(args["cache"]["covariates"], orient='records')
y = pd.read_json(args["cache"]["dependents"], orient='records')
input_list = args["input"]
ut.log(input_list, args["state"])

X = pd.read_json(args["cache"]["covariates"], orient="records")

X = lc.normalize_columns(X, input_list["columns_to_normalize"])
ut.log(
f'\n\nNormalizing the following column values to their z-scores: {input_list["columns_to_normalize"]} \n ',
args["state"],
)

y = pd.read_json(args["cache"]["dependents"], orient="records")
y_labels = list(y.columns)

meanY_vector, lenY_vector, local_stats_list = gather_local_stats(X, y)
ut.log(f'\nlocal stats list: {str(local_stats_list)} ', args["state"])
augmented_X = add_site_covariates(args, X)
meanY_vector, lenY_vector, local_stats_list, beta_vector = (
lc.gather_local_stats(X, y)
)
ut.log(f"\nlocal stats list: {str(local_stats_list)} ", args["state"])
augmented_X = lc.add_site_covariates(args, X)

beta_vec_size = augmented_X.shape[1]
X_labels = list(augmented_X.columns)
X_labels = list(X.columns)

output_dict = {
"beta_vec_size": beta_vec_size,
"beta_vector_local": beta_vector,
"X_labels": X_labels,
"augmented_X_labels": list(augmented_X.columns),
"number_of_regressions": len(y_labels),
"computation_phase": "local_1"
"computation_phase": "local_1",
}

cache_dict = {
"beta_vec_size": beta_vec_size,
"number_of_regressions": len(y_labels),
"covariates": augmented_X.to_json(orient='records'),
"covariates": augmented_X.to_json(orient="records"),
"y_labels": y_labels,
"mean_y_local": meanY_vector,
"count_local": lenY_vector,
"local_stats_list": local_stats_list,
"max_iter": args["cache"]["max_iter"]
"max_iter": args["cache"]["max_iter"],
}

computation_output = {
Expand All @@ -89,32 +111,48 @@ def local_1(args):


def local_2(args):
X = pd.read_json(args["cache"]["covariates"], orient='records')
y = pd.read_json(args["cache"]["dependents"], orient='records')
X = pd.read_json(args["cache"]["covariates"], orient="records")
y = pd.read_json(args["cache"]["dependents"], orient="records")

X.to_csv(
os.path.join(
args["state"]["outputDirectory"], f'{args["state"]["clientId"]}_X.csv'
)
)
y.to_csv(
os.path.join(
args["state"]["outputDirectory"], f'{args["state"]["clientId"]}_y.csv'
)
)

beta_vec_size = args["cache"]["beta_vec_size"]
number_of_regressions = args["cache"]["number_of_regressions"]

mask_flag = args["input"].get("mask_flag",
np.zeros(number_of_regressions, dtype=bool))
mask_flag = args["input"].get(
"mask_flag", np.zeros(number_of_regressions, dtype=bool)
)

biased_X = np.array(X)
y = pd.DataFrame(y.values)

w = args["input"]["remote_beta"]

gradient = np.zeros((number_of_regressions, beta_vec_size))
cost = np.zeros(number_of_regressions)

for i in range(number_of_regressions):
y_ = y[i]
w_ = w[i]
if not mask_flag[i]:
gradient[i, :] = (
1 / len(X)) * np.dot(biased_X.T, np.dot(biased_X, w_) - y_)
gradient[i, :] = (1 / len(X)) * np.dot(
biased_X.T, np.dot(biased_X, w_) - y_
)
cost[i] = lc.get_cost(y_actual=y[i], y_predicted=np.dot(biased_X, w_))

output_dict = {
"local_grad": gradient.tolist(),
"computation_phase": "local_2"
"local_cost": cost.tolist(),
"computation_phase": "local_2",
}

cache_dict = {"max_iter": args["cache"]["max_iter"]}
Expand All @@ -134,7 +172,7 @@ def local_3(args):
"count_local": args["cache"]["count_local"],
"local_stats_list": args["cache"]["local_stats_list"],
"y_labels": args["cache"]["y_labels"],
"computation_phase": 'local_3'
"computation_phase": "local_3",
}

cache_dict = {}
Expand Down Expand Up @@ -181,8 +219,8 @@ def local_4(args):
cache_list = args["cache"]
input_list = args["input"]

X = pd.read_json(cache_list["covariates"], orient='records')
y = pd.read_json(cache_list["dependents"], orient='records')
X = pd.read_json(cache_list["covariates"], orient="records")
y = pd.read_json(cache_list["dependents"], orient="records")
biased_X = np.array(X)

avg_beta_vector = input_list["avg_beta_vector"]
Expand All @@ -192,19 +230,19 @@ def local_4(args):
for index, column in enumerate(y.columns):
curr_y = y[column].values
SSE_local.append(
reg.sum_squared_error(biased_X, curr_y, avg_beta_vector))
sum_squared_error(curr_y, y_estimate(biased_X, avg_beta_vector)[index])
)
SST_local.append(
np.sum(
np.square(np.subtract(curr_y, mean_y_global[index])),
dtype=float))
np.sum(np.square(np.subtract(curr_y, mean_y_global[index])), dtype=float)
)

varX_matrix_local = np.dot(biased_X.T, biased_X)

output_dict = {
"SSE_local": SSE_local,
"SST_local": SST_local,
"varX_matrix_local": varX_matrix_local.tolist(),
"computation_phase": "local_4"
"computation_phase": "local_4",
}

cache_dict = {}
Expand All @@ -217,27 +255,27 @@ def local_4(args):
return json.dumps(computation_output)


if __name__ == '__main__':
if __name__ == "__main__":

parsed_args = json.loads(sys.stdin.read())
phase_key = list(reg.listRecursive(parsed_args, 'computation_phase'))
phase_key = list(reg.listRecursive(parsed_args, "computation_phase"))

if not phase_key:
computation_output = local_0(parsed_args)
sys.stdout.write(computation_output)
elif 'remote_0' in phase_key:
elif "remote_0" in phase_key:
computation_output = local_1(parsed_args)
sys.stdout.write(computation_output)
elif 'remote_1' in phase_key:
elif "remote_1" in phase_key:
computation_output = local_2(parsed_args)
sys.stdout.write(computation_output)
elif 'remote_2a' in phase_key:
elif "remote_2a" in phase_key:
computation_output = local_2(parsed_args)
sys.stdout.write(computation_output)
elif 'remote_2b' in phase_key:
elif "remote_2b" in phase_key:
computation_output = local_3(parsed_args)
sys.stdout.write(computation_output)
elif 'remote_3' in phase_key:
elif "remote_3" in phase_key:
computation_output = local_4(parsed_args)
sys.stdout.write(computation_output)
else:
Expand Down
23 changes: 22 additions & 1 deletion scripts/local_ancillary.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ def gather_local_stats(X, y):
]
local_stats_dict = {key: value for key, value in zip(keys, values)}
local_stats_list.append(local_stats_dict)

beta_vector = [l.tolist() for l in local_params]

return meanY_vector, lenY_vector, local_stats_list
return meanY_vector, lenY_vector, local_stats_list, beta_vector


def add_site_covariates(args, X):
Expand All @@ -74,3 +76,22 @@ def add_site_covariates(args, X):
augmented_X = pd.concat([biased_X, site_df], axis=1)

return augmented_X

def get_cost(y_actual, y_predicted):
return np.average((y_actual-y_predicted)**2)

def check_cols_to_normalize(X):
columns_to_normalize=[]
max_vals = X.max(axis=0).to_numpy()
minval = np.min(max_vals[np.nonzero(max_vals)])
X_headers = list(X.columns)
ranges = max_vals / minval
temp_cols_indxs = np.where(ranges > 10000)[0]
for col_indx in temp_cols_indxs:
columns_to_normalize.append(X_headers[col_indx])
return columns_to_normalize

def normalize_columns(data_df, cols):
for col in cols:
data_df[col] = (data_df[col] - data_df[col].mean())/(data_df[col].std())
return data_df
4 changes: 2 additions & 2 deletions scripts/regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def y_estimate(biased_X, beta_vector):
return np.dot(beta_vector, np.matrix.transpose(biased_X))


def sum_squared_error(biased_X, y, beta_vector):
def sum_squared_error(y, y_pred):
"""Calculates the sum of squared errors (SSE)

Args:
Expand All @@ -87,7 +87,7 @@ def sum_squared_error(biased_X, y, beta_vector):
Comments:
SSE = ||(y - y_estimate)^2||^2 where ||.|| --> l2-norm
"""
return np.linalg.norm(y - y_estimate(biased_X, beta_vector))**2
return np.sum((y_pred - y) ** 2)


def sum_squared_total(y):
Expand Down
Loading