diff --git a/api/py/ai/chronon/repo/run.py b/api/py/ai/chronon/repo/run.py index 88036b095..c7789dfb5 100755 --- a/api/py/ai/chronon/repo/run.py +++ b/api/py/ai/chronon/repo/run.py @@ -184,7 +184,7 @@ def download_only_once(url, path): @retry_decorator(retries=3, backoff=50) def download_jar(version, jar_type="uber", release_tag=None, spark_version="2.4.0"): assert ( - spark_version in SUPPORTED_SPARK + spark_version in SUPPORTED_SPARK ), f"Received unsupported spark version {spark_version}. Supported spark versions are {SUPPORTED_SPARK}" scala_version = SCALA_VERSION_FOR_SPARK[spark_version] maven_url_prefix = os.environ.get("CHRONON_MAVEN_MIRROR_PREFIX", None) @@ -237,6 +237,7 @@ def set_runtime_env(args): - Environment variables derived from args (like app_name) - conf.metaData.modeToEnvMap for the mode (set on config) - team environment per context and mode set on teams.json + - production team environment per mode set on teams.json - default team environment per context and mode set on teams.json - Common Environment set in teams.json """ @@ -245,6 +246,7 @@ def set_runtime_env(args): "conf_env": {}, "default_env": {}, "team_env": {}, + "production_team_env ": {}, "cli_args": {}, } conf_type = None @@ -262,7 +264,7 @@ def set_runtime_env(args): ) if args.conf and effective_mode: try: - context, conf_type, team, _ = args.conf.split("/")[-4:] + _, conf_type, team, _ = args.conf.split("/")[-4:] except Exception as e: logging.error( "Invalid conf path: {}, please ensure to supply the relative path to zipline/ folder".format( @@ -272,6 +274,9 @@ def set_runtime_env(args): raise e if not team: team = "default" + # context is the environment in which the job is running, which is provided from the args, + # default to be dev. + context = args.env logging.info( f"Context: {context} -- conf_type: {conf_type} -- team: {team}" ) @@ -298,6 +303,11 @@ def set_runtime_env(args): environment["team_env"] = ( teams_json[team].get(context, {}).get(effective_mode, {}) ) + # If the job is running in dev environment but no dev environment is defined in teams.json, + # use production environment. + environment["production_team_env"] = ( + teams_json[team].get("production", {}).get(effective_mode, {}) + ) environment["default_env"] = ( teams_json.get("default", {}) .get(context, {}) @@ -314,10 +324,10 @@ def set_runtime_env(args): [ k for k in [ - "chronon", - conf_type, - args.mode.replace("-", "_") if args.mode else None, - ] + "chronon", + conf_type, + args.mode.replace("-", "_") if args.mode else None, + ] if k is not None ] ) @@ -326,7 +336,7 @@ def set_runtime_env(args): environment["cli_args"]["CHRONON_DRIVER_JAR"] = args.chronon_jar environment["cli_args"]["CHRONON_ONLINE_JAR"] = args.online_jar environment["cli_args"]["CHRONON_ONLINE_CLASS"] = args.online_class - order = ["conf_env", "team_env", "default_env", "common_env", "cli_args"] + order = ["conf_env", "team_env", "production_team_env", "default_env", "common_env", "cli_args"] print("Setting env variables:") for key in os.environ: if any([key in environment[set_key] for set_key in order]): @@ -367,7 +377,7 @@ def __init__(self, args, jar_path): raise e possible_modes = list(ROUTES[self.conf_type].keys()) + UNIVERSAL_ROUTES assert ( - args.mode in possible_modes + args.mode in possible_modes ), "Invalid mode:{} for conf:{} of type:{}, please choose from {}".format( args.mode, self.conf, self.conf_type, possible_modes ) @@ -443,7 +453,7 @@ def run(self): ) if self.mode == "streaming": assert ( - len(filtered_apps) == 1 + len(filtered_apps) == 1 ), "More than one found, please kill them all" print("All good. No need to start a new app.") return @@ -568,6 +578,12 @@ def set_defaults(parser): required=False, help="Conf param - required for every mode except fetch", ) + parser.add_argument( + "--env", + required=False, + default='dev', + help="Running environment - default to be dev" + ) parser.add_argument("--mode", choices=MODE_ARGS.keys()) parser.add_argument("--ds", help="the end partition to backfill the data") parser.add_argument( @@ -589,7 +605,7 @@ def set_defaults(parser): parser.add_argument( "--online-jar", help="Jar containing Online KvStore & Deserializer Impl. " - + "Used for streaming and metadata-upload mode.", + + "Used for streaming and metadata-upload mode.", ) parser.add_argument( "--online-class", @@ -606,7 +622,7 @@ def set_defaults(parser): parser.add_argument( "--online-jar-fetch", help="Path to script that can pull online jar. " - + "This will run only when a file doesn't exist at location specified by online_jar", + + "This will run only when a file doesn't exist at location specified by online_jar", ) parser.add_argument( "--sub-help", @@ -631,7 +647,7 @@ def set_defaults(parser): parser.add_argument( "--render-info", help="Path to script rendering additional information of the given config. " - + "Only applicable when mode is set to info", + + "Only applicable when mode is set to info", ) set_defaults(parser) pre_parse_args, _ = parser.parse_known_args() @@ -652,4 +668,4 @@ def set_defaults(parser): spark_version=os.environ.get("SPARK_VERSION", args.spark_version), ) ) - Runner(args, os.path.expanduser(jar_path)).run() + Runner(args, os.path.expanduser(jar_path)).run() \ No newline at end of file