Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(infra): WIP data exports to grants account #698

Merged
merged 5 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from data_model.models import Cache
from scorer.export_utils import (
AWSOverrideCredentials,
export_data_for_model,
get_pa_schema,
upload_to_s3,
Expand Down Expand Up @@ -108,6 +109,8 @@ def add_arguments(self, parser):
)

parser.add_argument("--filename", type=str, help="The output filename")
parser.add_argument("--s3-access-key", type=str, default="", help="The S3 access key for dedicated S3 download (like digital ocean)")
parser.add_argument("--s3-secret-access-key", type=str, default="", help="The S3 secret access key for dedicated S3 download (like digital ocean)")
parser.add_argument(
"--s3-extra-args",
type=str,
Expand All @@ -132,6 +135,8 @@ def handle(self, *args, **options):
batch_size = options["batch_size"]
s3_uri = options["s3_uri"]
filename = options["filename"]
s3_access_key = options["s3_access_key"]
s3_secret_access_key = options["s3_secret_access_key"]
format = options["format"]
data_model_names = (
[n.strip() for n in options["data_model"].split(",")]
Expand All @@ -148,6 +153,9 @@ def handle(self, *args, **options):
self.stdout.write(f"EXPORT - batch_size : '{batch_size}'")
self.stdout.write(f"EXPORT - filename : '{filename}'")


self.stdout.write(f"EXPORT - DEBUG LARISA : acc_key : '{s3_access_key}', s3_secret_access_key: '{s3_secret_access_key}'")

parsed_uri = urlparse(s3_uri)
s3_bucket_name = parsed_uri.netloc
s3_folder = parsed_uri.path.strip("/")
Expand All @@ -170,7 +178,17 @@ def handle(self, *args, **options):
self.style.SUCCESS(f"EXPORT - Data exported to '{filename}'")
)

upload_to_s3(filename, s3_folder, s3_bucket_name, extra_args)
if s3_access_key and s3_secret_access_key:
aws_override_credentials = AWSOverrideCredentials(
aws_access_key_id=s3_access_key,
aws_secret_access_key=s3_secret_access_key,
aws_endpoint_url="",
)
upload_to_s3(
filename, s3_folder, s3_bucket_name, extra_args, aws_override_credentials
)
else:
upload_to_s3(filename, s3_folder, s3_bucket_name, extra_args)

if cloudfront_distribution_id:
client = boto3.client("cloudfront")
Expand Down
110 changes: 99 additions & 11 deletions infra/aws/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1428,8 +1428,8 @@ const exportVals = createScoreExportBucketAndDomain(
);
// The following scorer dumps the Allo scorer scores to a public S3 bucket
// for the Allo team to easily pull the data

export const frequentAlloScorerDataDumpTaskDefinition = pulumi
// This will be removed after the confirmation that the new exports are working properly.
const frequentAlloScorerDataDumpTaskDefinition = pulumi
.all([exportVals])
.apply(([_exportedVals]) => {
return pulumi.all([_exportedVals.cloudFront.id]).apply(([cloudFrontId]) => {
Expand All @@ -1456,8 +1456,6 @@ export const frequentAlloScorerDataDumpTaskDefinition = pulumi
]) +
"'",
`--s3-uri=s3://${publicDataDomain}/passport_scores/`,
// "--summary-extra-args",
// JSON.stringify({ ACL: "public-read" }),
].join(" "),
scheduleExpression: "cron(*/30 * ? * * *)", // Run the task every 30 min
alertTopic: pagerdutyTopic,
Expand All @@ -1471,7 +1469,63 @@ export const frequentAlloScorerDataDumpTaskDefinition = pulumi
});
});

export const frequentScorerDataDumpTaskDefinitionForScorer_335 = pulumi
// Only for production
if (stack === "production") {
const frequentAlloScorerDataDumpTaskDefinitionDigitalOcean = pulumi
.all([exportVals, apiSecrets])
.apply(([_exportedVals, _apiSecrets]) => {
const digitalOceanAccessKey = _apiSecrets.find(secret => secret.name === "GRANTS_DIGITAL_OCEAN_ACCESS_KEY")?.valueFrom;
const digitalOceanSecretAccessKey = _apiSecrets.find(secret => secret.name === "GRANTS_DIGITAL_OCEAN_SECRET_ACCESS_KEY")?.valueFrom;
const digitalOceanS3Endpoint = op.read.parse(
`op://DevOps/passport-scorer-${stack}-env/api/GRANTS_DIGITAL_OCEAN_S3_ENDPOINT`
);
return pulumi.all([_exportedVals.cloudFront.id]).apply(([cloudFrontId]) => {
createScheduledTask({
name: "frequent-allo-scorer-data-dump-grants",
config: {
...baseScorerServiceConfig,
securityGroup: secgrp,
command: [
"python",
"manage.py",
"scorer_dump_data",
"--batch-size=1000",
"--database=read_replica_analytics",
"--config",
"'" +
JSON.stringify([
{
name: "registry.Score",
filter: { passport__community_id: 335 },
select_related: ["passport"],
},
]) +
"'",
`--s3-uri=s3://${digitalOceanS3Endpoint}`,
].join(" "),
scheduleExpression: "cron(*/30 * ? * * *)", // Run the task every 30 min
alertTopic: pagerdutyTopic,
},
environment:apiEnvironment,

secrets: _apiSecrets.map(secret => {
if (secret.name === "S3_DATA_AWS_SECRET_ACCESS_KEY") {
return { ...secret, valueFrom: digitalOceanAccessKey}; // Replace for data dump with digital ocean credentials
}
if (secret.name === "S3_DATA_AWS_SECRET_KEY_ID") {
return { ...secret, valueFrom: digitalOceanSecretAccessKey }; // Replace for data dump with digital ocean credentials
}
return secret;
}) as secretsManager.SecretRef[],
alarmPeriodSeconds: 3600, // 1h in seconds
enableInvocationAlerts: true,
scorerSecretManagerArn: scorerSecret.arn,
});
});
});
}

const frequentScorerDataDumpTaskDefinitionForScorer_335 = pulumi
.all([exportVals])
.apply(([_exportedVals]) => {
return pulumi.all([_exportedVals.cloudFront.id]).apply(([cloudFrontId]) => {
Expand All @@ -1498,8 +1552,6 @@ export const frequentScorerDataDumpTaskDefinitionForScorer_335 = pulumi
]) +
"'",
`--s3-uri=s3://${publicDataDomain}/passport_scores/335/`,
// "--summary-extra-args",
// JSON.stringify({ ACL: "public-read" }),
].join(" "),
scheduleExpression: "cron(*/30 * ? * * *)", // Run the task every 30 min
alertTopic: pagerdutyTopic,
Expand All @@ -1513,7 +1565,7 @@ export const frequentScorerDataDumpTaskDefinitionForScorer_335 = pulumi
});
});

export const frequentScorerDataDumpTaskDefinitionForScorer_6608 = pulumi
const frequentScorerDataDumpTaskDefinitionForScorer_6608 = pulumi
.all([exportVals])
.apply(([_exportedVals]) => {
return pulumi.all([_exportedVals.cloudFront.id]).apply(([cloudFrontId]) => {
Expand All @@ -1540,8 +1592,6 @@ export const frequentScorerDataDumpTaskDefinitionForScorer_6608 = pulumi
]) +
"'",
`--s3-uri=s3://${publicDataDomain}/passport_scores/6608/`,
// "--summary-extra-args",
// JSON.stringify({ ACL: "public-read" }),
].join(" "),
scheduleExpression: "cron(*/30 * ? * * *)", // Run the task every 30 min
alertTopic: pagerdutyTopic,
Expand All @@ -1558,7 +1608,8 @@ export const frequentScorerDataDumpTaskDefinitionForScorer_6608 = pulumi
/*
* Dump data for the eth-model V2
*/
export const frequentEthModelV2ScoreDataDumpTaskDefinitionForScorer = pulumi
// this for sure
const frequentEthModelV2ScoreDataDumpTaskDefinitionForScorer = pulumi
.all([exportVals])
.apply(([_exportedVals]) => {
return pulumi.all([_exportedVals.cloudFront.id]).apply(([cloudFrontId]) => {
Expand Down Expand Up @@ -1588,6 +1639,43 @@ export const frequentEthModelV2ScoreDataDumpTaskDefinitionForScorer = pulumi
});
});

if (stack === "production") {
const frequentEthModelV2ScoreDataDumpTaskDefinitionForScorerDigitalOcean = pulumi
.all([exportVals, apiSecrets])
.apply(([_exportedVals, _apiSecrets]) => {
// const digitalOceanAccessKey = _apiSecrets.find(secret => secret.name === "GRANTS_DIGITAL_OCEAN_ACCESS_KEY")?.valueFrom;
// const digitalOceanSecretAccessKey = _apiSecrets.find(secret => secret.name === "GRANTS_DIGITAL_OCEAN_SECRET_ACCESS_KEY")?.valueFrom;
const digitalOceanS3Endpoint = op.read.parse(
`op://DevOps/passport-scorer-${stack}-env/api/GRANTS_DIGITAL_OCEAN_S3_ENDPOINT`
);
return pulumi.all([_exportedVals.cloudFront.id]).apply(([cloudFrontId]) => {
createScheduledTask({
name: "frequent-eth-model-v2-dump-grants",
config: {
...baseScorerServiceConfig,
securityGroup: secgrp,
command: [
"python",
"manage.py",
"scorer_dump_data_model_score",
`--s3-uri=s3://${digitalOceanS3Endpoint}`,
`--s3-access-key=$GRANTS_DIGITAL_OCEAN_ACCESS_KEY`,
`--s3-secret-access-key=$GRANTS_DIGITAL_OCEAN_SECRET_ACCESS_KEY`,
"--filename=model_scores.parquet",
"--format=parquet",
].join(" "),
scheduleExpression: "cron(*/30 * ? * * *)", // Run the task every 30 min
alertTopic: pagerdutyTopic,
},
environment: apiEnvironment,
secrets: _apiSecrets,
alarmPeriodSeconds: 3600, // 1h in seconds
enableInvocationAlerts: true,
scorerSecretManagerArn: scorerSecret.arn,
});
});
});
}
export const coinbaseRevocationCheck = createScheduledTask({
name: "coinbase-revocation-check",
config: {
Expand Down
4 changes: 2 additions & 2 deletions infra/lib/scorer/scheduledTasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export function createTask({
name: string;
config: ScheduledTaskConfig;
environment: secretsManager.EnvironmentVar[];
secrets: pulumi.Output<secretsManager.SecretRef[]>;
secrets: pulumi.Output<secretsManager.SecretRef[]> | secretsManager.SecretRef[];
scorerSecretManagerArn: Input<string>;
}) {
const {
Expand Down Expand Up @@ -225,7 +225,7 @@ export function createScheduledTask({
name: string;
config: ScheduledTaskConfig;
environment: secretsManager.EnvironmentVar[];
secrets: pulumi.Output<secretsManager.SecretRef[]>;
secrets: pulumi.Output<secretsManager.SecretRef[]> | secretsManager.SecretRef[];
alarmPeriodSeconds?: number;
enableInvocationAlerts?: boolean;
scorerSecretManagerArn: Input<string>;
Expand Down
Loading