diff --git a/notebooks/scenarios/bigquery/000-start-and-configure-server-and-admins.ipynb b/notebooks/scenarios/bigquery/000-start-and-configure-server-and-admins.ipynb index f89a05e61aa..8035f5e61e1 100644 --- a/notebooks/scenarios/bigquery/000-start-and-configure-server-and-admins.ipynb +++ b/notebooks/scenarios/bigquery/000-start-and-configure-server-and-admins.ipynb @@ -26,7 +26,7 @@ "\n", "# syft absolute\n", "import syft as sy\n", - "from syft import get_helpers # noqa: F401\n", + "from syft import test_helpers # noqa: F401\n", "\n", "# third party\n", "from email_helpers import get_email_server\n", diff --git a/notebooks/scenarios/bigquery/001-scale-delete-worker-pools.ipynb b/notebooks/scenarios/bigquery/001-scale-delete-worker-pools.ipynb index 6c5f07a1c19..be9579059eb 100644 --- a/notebooks/scenarios/bigquery/001-scale-delete-worker-pools.ipynb +++ b/notebooks/scenarios/bigquery/001-scale-delete-worker-pools.ipynb @@ -26,7 +26,7 @@ "\n", "# syft absolute\n", "import syft as sy\n", - "from syft import get_helpers # noqa: F401\n", + "from syft import test_helpers # noqa: F401\n", "\n", "# third party\n", "from email_helpers import Timeout\n", diff --git a/notebooks/scenarios/bigquery/010-setup-bigquery-pool.ipynb b/notebooks/scenarios/bigquery/010-setup-bigquery-pool.ipynb index 79fc982462b..22f6dfaa977 100644 --- a/notebooks/scenarios/bigquery/010-setup-bigquery-pool.ipynb +++ b/notebooks/scenarios/bigquery/010-setup-bigquery-pool.ipynb @@ -24,7 +24,7 @@ "\n", "# syft absolute\n", "import syft as sy\n", - "from syft import get_helpers # noqa: F401\n", + "from syft import test_helpers # noqa: F401\n", "from syft import test_settings\n", "\n", "# third party\n", diff --git a/notebooks/scenarios/bigquery/011-users-emails-passwords.ipynb b/notebooks/scenarios/bigquery/011-users-emails-passwords.ipynb index 0bad0701e14..b87fd2a7731 100644 --- a/notebooks/scenarios/bigquery/011-users-emails-passwords.ipynb +++ b/notebooks/scenarios/bigquery/011-users-emails-passwords.ipynb @@ -28,7 +28,7 @@ "\n", "# syft absolute\n", "import syft as sy\n", - "from syft import get_helpers # noqa: F401\n", + "from syft import test_helpers # noqa: F401\n", "\n", "# third party\n", "from email_helpers import SENDER\n", diff --git a/notebooks/scenarios/bigquery/020-configure-api.ipynb b/notebooks/scenarios/bigquery/020-configure-api.ipynb index 718bdf352ca..83abef20ff7 100644 --- a/notebooks/scenarios/bigquery/020-configure-api.ipynb +++ b/notebooks/scenarios/bigquery/020-configure-api.ipynb @@ -33,7 +33,7 @@ "\n", "# syft absolute\n", "import syft as sy\n", - "from syft import get_helpers # noqa: F401\n", + "from syft import test_helpers # noqa: F401\n", "from syft import test_settings\n", "\n", "# third party\n", diff --git a/notebooks/scenarios/bigquery/021-create-jobs.ipynb b/notebooks/scenarios/bigquery/021-create-jobs.ipynb index 8224177b4cf..5a14895133a 100644 --- a/notebooks/scenarios/bigquery/021-create-jobs.ipynb +++ b/notebooks/scenarios/bigquery/021-create-jobs.ipynb @@ -40,7 +40,7 @@ "\n", "# syft absolute\n", "import syft as sy\n", - "from syft import get_helpers # noqa: F401\n", + "from syft import test_helpers # noqa: F401\n", "from syft.service.job.job_stash import JobStatus\n", "\n", "# third party\n", diff --git a/notebooks/scenarios/bigquery/040-do-review-requests.ipynb b/notebooks/scenarios/bigquery/040-do-review-requests.ipynb index b42e79d2048..aa4a7b0c2a1 100644 --- a/notebooks/scenarios/bigquery/040-do-review-requests.ipynb +++ b/notebooks/scenarios/bigquery/040-do-review-requests.ipynb @@ -24,7 +24,7 @@ "\n", "# syft absolute\n", "import syft as sy\n", - "from syft import get_helpers # noqa: F401\n", + "from syft import test_helpers # noqa: F401\n", "from syft.service.job.job_stash import Job\n", "\n", "# third party\n", diff --git a/notebooks/scenarios/bigquery/050-ds-get-results.ipynb b/notebooks/scenarios/bigquery/050-ds-get-results.ipynb index d82e2efff39..9a9bc1ef588 100644 --- a/notebooks/scenarios/bigquery/050-ds-get-results.ipynb +++ b/notebooks/scenarios/bigquery/050-ds-get-results.ipynb @@ -21,7 +21,7 @@ "# isort: off\n", "# syft absolute\n", "import syft as sy\n", - "from syft import get_helpers # noqa: F401\n", + "from syft import test_helpers # noqa: F401\n", "\n", "# third party\n", "from email_helpers import get_email_server\n", diff --git a/notebooks/scenarios/bigquery/apis/__init__.py b/notebooks/scenarios/bigquery/apis/__init__.py deleted file mode 100644 index 7231b580696..00000000000 --- a/notebooks/scenarios/bigquery/apis/__init__.py +++ /dev/null @@ -1,23 +0,0 @@ -# stdlib -import os - -# syft absolute -from syft.util.util import str_to_bool - -# relative -from .submit_query import make_submit_query - -env_var = "TEST_BIGQUERY_APIS_LIVE" -use_live = str_to_bool(str(os.environ.get(env_var, "False"))) -env_name = "Live" if use_live else "Mock" -print(f"Using {env_name} API Code, this will query BigQuery. ${env_var}=={use_live}") - - -if use_live: - # relative - from .live.schema import make_schema - from .live.test_query import make_test_query -else: - # relative - from .mock.schema import make_schema - from .mock.test_query import make_test_query diff --git a/notebooks/scenarios/bigquery/apis/live/__init__.py b/notebooks/scenarios/bigquery/apis/live/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/notebooks/scenarios/bigquery/apis/live/schema.py b/notebooks/scenarios/bigquery/apis/live/schema.py deleted file mode 100644 index 5b39d9d9066..00000000000 --- a/notebooks/scenarios/bigquery/apis/live/schema.py +++ /dev/null @@ -1,108 +0,0 @@ -# stdlib -from collections.abc import Callable - -# syft absolute -import syft as sy -from syft import test_settings - -# relative -from ..rate_limiter import is_within_rate_limit - - -def make_schema(settings: dict, worker_pool: str) -> Callable: - updated_settings = { - "calls_per_min": 5, - "rate_limiter_enabled": True, - "credentials": test_settings.gce_service_account.to_dict(), - "region": test_settings.gce_region, - "project_id": test_settings.gce_project_id, - "dataset_1": test_settings.dataset_1, - "table_1": test_settings.table_1, - "table_2": test_settings.table_2, - } | settings - - @sy.api_endpoint( - path="bigquery.schema", - description="This endpoint allows for visualising the metadata of tables available in BigQuery.", - settings=updated_settings, - helper_functions=[ - is_within_rate_limit - ], # Adds ratelimit as this is also a method available to data scientists - worker_pool=worker_pool, - ) - def live_schema( - context, - ) -> str: - # stdlib - import datetime - - # third party - from google.cloud import bigquery # noqa: F811 - from google.oauth2 import service_account - import pandas as pd - - # syft absolute - from syft import SyftException - - # Auth for Bigquer based on the workload identity - credentials = service_account.Credentials.from_service_account_info( - context.settings["credentials"] - ) - scoped_credentials = credentials.with_scopes( - ["https://www.googleapis.com/auth/cloud-platform"] - ) - - client = bigquery.Client( - credentials=scoped_credentials, - location=context.settings["region"], - ) - - # Store a dict with the calltimes for each user, via the email. - if context.settings["rate_limiter_enabled"]: - if context.user.email not in context.state.keys(): - context.state[context.user.email] = [] - - if not context.code.is_within_rate_limit(context): - raise SyftException( - public_message="Rate limit of calls per minute has been reached." - ) - context.state[context.user.email].append(datetime.datetime.now()) - - try: - # Formats the data schema in a data frame format - # Warning: the only supported format types are primitives, np.ndarrays and pd.DataFrames - - data_schema = [] - for table_id in [ - f"{context.settings['dataset_1']}.{context.settings['table_1']}", - f"{context.settings['dataset_1']}.{context.settings['table_2']}", - ]: - table = client.get_table(table_id) - for schema in table.schema: - data_schema.append( - { - "project": str(table.project), - "dataset_id": str(table.dataset_id), - "table_id": str(table.table_id), - "schema_name": str(schema.name), - "schema_field": str(schema.field_type), - "description": str(table.description), - "num_rows": str(table.num_rows), - } - ) - return pd.DataFrame(data_schema) - - except Exception as e: - # not a bigquery exception - if not hasattr(e, "_errors"): - output = f"got exception e: {type(e)} {str(e)}" - raise SyftException( - public_message=f"An error occured executing the API call {output}" - ) - - # Should add appropriate error handling for what should be exposed to the data scientists. - raise SyftException( - public_message="An error occured executing the API call, please contact the domain owner." - ) - - return live_schema diff --git a/notebooks/scenarios/bigquery/apis/live/test_query.py b/notebooks/scenarios/bigquery/apis/live/test_query.py deleted file mode 100644 index 344879dcb62..00000000000 --- a/notebooks/scenarios/bigquery/apis/live/test_query.py +++ /dev/null @@ -1,113 +0,0 @@ -# stdlib -from collections.abc import Callable - -# syft absolute -import syft as sy -from syft import test_settings - -# relative -from ..rate_limiter import is_within_rate_limit - - -def make_test_query(settings) -> Callable: - updated_settings = { - "calls_per_min": 10, - "rate_limiter_enabled": True, - "credentials": test_settings.gce_service_account.to_dict(), - "region": test_settings.gce_region, - "project_id": test_settings.gce_project_id, - } | settings - - # these are the same if you allow the rate limiter to be turned on and off - @sy.api_endpoint_method( - settings=updated_settings, - helper_functions=[is_within_rate_limit], - ) - def live_test_query( - context, - sql_query: str, - ) -> str: - # stdlib - import datetime - - # third party - from google.cloud import bigquery # noqa: F811 - from google.oauth2 import service_account - - # syft absolute - from syft import SyftException - - # Auth for Bigquer based on the workload identity - credentials = service_account.Credentials.from_service_account_info( - context.settings["credentials"] - ) - scoped_credentials = credentials.with_scopes( - ["https://www.googleapis.com/auth/cloud-platform"] - ) - - client = bigquery.Client( - credentials=scoped_credentials, - location=context.settings["region"], - ) - - # Store a dict with the calltimes for each user, via the email. - if context.settings["rate_limiter_enabled"]: - if context.user.email not in context.state.keys(): - context.state[context.user.email] = [] - - if not context.code.is_within_rate_limit(context): - raise SyftException( - public_message="Rate limit of calls per minute has been reached." - ) - context.state[context.user.email].append(datetime.datetime.now()) - - try: - rows = client.query_and_wait( - sql_query, - project=context.settings["project_id"], - ) - - if rows.total_rows > 1_000_000: - raise SyftException( - public_message="Please only write queries that gather aggregate statistics" - ) - - return rows.to_dataframe() - - except Exception as e: - # not a bigquery exception - if not hasattr(e, "_errors"): - output = f"got exception e: {type(e)} {str(e)}" - raise SyftException( - public_message=f"An error occured executing the API call {output}" - ) - - # Treat all errors that we would like to be forwarded to the data scientists - # By default, any exception is only visible to the data owner. - - if e._errors[0]["reason"] in [ - "badRequest", - "blocked", - "duplicate", - "invalidQuery", - "invalid", - "jobBackendError", - "jobInternalError", - "notFound", - "notImplemented", - "rateLimitExceeded", - "resourceInUse", - "resourcesExceeded", - "tableUnavailable", - "timeout", - ]: - raise SyftException( - public_message="Error occured during the call: " - + e._errors[0]["message"] - ) - else: - raise SyftException( - public_message="An error occured executing the API call, please contact the domain owner." - ) - - return live_test_query diff --git a/notebooks/scenarios/bigquery/apis/mock/__init__.py b/notebooks/scenarios/bigquery/apis/mock/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/notebooks/scenarios/bigquery/apis/mock/data.py b/notebooks/scenarios/bigquery/apis/mock/data.py deleted file mode 100644 index 82262bf7a01..00000000000 --- a/notebooks/scenarios/bigquery/apis/mock/data.py +++ /dev/null @@ -1,268 +0,0 @@ -# stdlib -from math import nan - -schema_dict = { - "project": { - 0: "example-project", - 1: "example-project", - 2: "example-project", - 3: "example-project", - 4: "example-project", - 5: "example-project", - 6: "example-project", - 7: "example-project", - 8: "example-project", - 9: "example-project", - 10: "example-project", - 11: "example-project", - 12: "example-project", - 13: "example-project", - 14: "example-project", - 15: "example-project", - 16: "example-project", - 17: "example-project", - 18: "example-project", - 19: "example-project", - 20: "example-project", - 21: "example-project", - 22: "example-project", - }, - "dataset_id": { - 0: "test_1gb", - 1: "test_1gb", - 2: "test_1gb", - 3: "test_1gb", - 4: "test_1gb", - 5: "test_1gb", - 6: "test_1gb", - 7: "test_1gb", - 8: "test_1gb", - 9: "test_1gb", - 10: "test_1gb", - 11: "test_1gb", - 12: "test_1gb", - 13: "test_1gb", - 14: "test_1gb", - 15: "test_1gb", - 16: "test_1gb", - 17: "test_1gb", - 18: "test_1gb", - 19: "test_1gb", - 20: "test_1gb", - 21: "test_1gb", - 22: "test_1gb", - }, - "table_id": { - 0: "posts", - 1: "posts", - 2: "posts", - 3: "posts", - 4: "posts", - 5: "posts", - 6: "posts", - 7: "comments", - 8: "comments", - 9: "comments", - 10: "comments", - 11: "comments", - 12: "comments", - 13: "comments", - 14: "comments", - 15: "comments", - 16: "comments", - 17: "comments", - 18: "comments", - 19: "comments", - 20: "comments", - 21: "comments", - 22: "comments", - }, - "schema_name": { - 0: "int64_field_0", - 1: "id", - 2: "name", - 3: "subscribers_count", - 4: "permalink", - 5: "nsfw", - 6: "spam", - 7: "int64_field_0", - 8: "id", - 9: "body", - 10: "parent_id", - 11: "created_at", - 12: "last_modified_at", - 13: "gilded", - 14: "permalink", - 15: "score", - 16: "comment_id", - 17: "post_id", - 18: "author_id", - 19: "spam", - 20: "deleted", - 21: "upvote_raio", - 22: "collapsed_in_crowd_control", - }, - "schema_field": { - 0: "INTEGER", - 1: "STRING", - 2: "STRING", - 3: "INTEGER", - 4: "STRING", - 5: "FLOAT", - 6: "BOOLEAN", - 7: "INTEGER", - 8: "STRING", - 9: "STRING", - 10: "STRING", - 11: "INTEGER", - 12: "INTEGER", - 13: "BOOLEAN", - 14: "STRING", - 15: "INTEGER", - 16: "STRING", - 17: "STRING", - 18: "STRING", - 19: "BOOLEAN", - 20: "BOOLEAN", - 21: "FLOAT", - 22: "BOOLEAN", - }, - "description": { - 0: "None", - 1: "None", - 2: "None", - 3: "None", - 4: "None", - 5: "None", - 6: "None", - 7: "None", - 8: "None", - 9: "None", - 10: "None", - 11: "None", - 12: "None", - 13: "None", - 14: "None", - 15: "None", - 16: "None", - 17: "None", - 18: "None", - 19: "None", - 20: "None", - 21: "None", - 22: "None", - }, - "num_rows": { - 0: "2000000", - 1: "2000000", - 2: "2000000", - 3: "2000000", - 4: "2000000", - 5: "2000000", - 6: "2000000", - 7: "2000000", - 8: "2000000", - 9: "2000000", - 10: "2000000", - 11: "2000000", - 12: "2000000", - 13: "2000000", - 14: "2000000", - 15: "2000000", - 16: "2000000", - 17: "2000000", - 18: "2000000", - 19: "2000000", - 20: "2000000", - 21: "2000000", - 22: "2000000", - }, -} - - -query_dict = { - "int64_field_0": { - 0: 4, - 1: 5, - 2: 10, - 3: 16, - 4: 17, - 5: 23, - 6: 24, - 7: 25, - 8: 27, - 9: 40, - }, - "id": { - 0: "t5_via1x", - 1: "t5_cv9gn", - 2: "t5_8p2tq", - 3: "t5_8fcro", - 4: "t5_td5of", - 5: "t5_z01fv", - 6: "t5_hmqjk", - 7: "t5_1flyj", - 8: "t5_5rwej", - 9: "t5_uurcv", - }, - "name": { - 0: "/channel/mylittlepony", - 1: "/channel/polyamory", - 2: "/channel/Catholicism", - 3: "/channel/cordcutters", - 4: "/channel/stevenuniverse", - 5: "/channel/entitledbitch", - 6: "/channel/engineering", - 7: "/channel/nottheonion", - 8: "/channel/FoodPorn", - 9: "/channel/puppysmiles", - }, - "subscribers_count": { - 0: 4323081, - 1: 2425929, - 2: 4062607, - 3: 7543226, - 4: 2692168, - 5: 2709080, - 6: 8766144, - 7: 2580984, - 8: 7784809, - 9: 3715991, - }, - "permalink": { - 0: "/channel//channel/mylittlepony", - 1: "/channel//channel/polyamory", - 2: "/channel//channel/Catholicism", - 3: "/channel//channel/cordcutters", - 4: "/channel//channel/stevenuniverse", - 5: "/channel//channel/entitledbitch", - 6: "/channel//channel/engineering", - 7: "/channel//channel/nottheonion", - 8: "/channel//channel/FoodPorn", - 9: "/channel//channel/puppysmiles", - }, - "nsfw": { - 0: nan, - 1: nan, - 2: nan, - 3: nan, - 4: nan, - 5: nan, - 6: nan, - 7: nan, - 8: nan, - 9: nan, - }, - "spam": { - 0: False, - 1: False, - 2: False, - 3: False, - 4: False, - 5: False, - 6: False, - 7: False, - 8: False, - 9: False, - }, -} diff --git a/notebooks/scenarios/bigquery/apis/mock/schema.py b/notebooks/scenarios/bigquery/apis/mock/schema.py deleted file mode 100644 index a95e04f2f1d..00000000000 --- a/notebooks/scenarios/bigquery/apis/mock/schema.py +++ /dev/null @@ -1,52 +0,0 @@ -# stdlib -from collections.abc import Callable - -# syft absolute -import syft as sy - -# relative -from ..rate_limiter import is_within_rate_limit -from .data import schema_dict - - -def make_schema(settings, worker_pool) -> Callable: - updated_settings = { - "calls_per_min": 5, - "rate_limiter_enabled": True, - "schema_dict": schema_dict, - } | settings - - @sy.api_endpoint( - path="bigquery.schema", - description="This endpoint allows for visualising the metadata of tables available in BigQuery.", - settings=updated_settings, - helper_functions=[is_within_rate_limit], - worker_pool=worker_pool, - ) - def mock_schema( - context, - ) -> str: - # syft absolute - from syft import SyftException - - # Store a dict with the calltimes for each user, via the email. - if context.settings["rate_limiter_enabled"]: - # stdlib - import datetime - - if context.user.email not in context.state.keys(): - context.state[context.user.email] = [] - - if not context.code.is_within_rate_limit(context): - raise SyftException( - public_message="Rate limit of calls per minute has been reached." - ) - context.state[context.user.email].append(datetime.datetime.now()) - - # third party - import pandas as pd - - df = pd.DataFrame(context.settings["schema_dict"]) - return df - - return mock_schema diff --git a/notebooks/scenarios/bigquery/apis/mock/test_query.py b/notebooks/scenarios/bigquery/apis/mock/test_query.py deleted file mode 100644 index ae028a8cf36..00000000000 --- a/notebooks/scenarios/bigquery/apis/mock/test_query.py +++ /dev/null @@ -1,138 +0,0 @@ -# stdlib -from collections.abc import Callable - -# syft absolute -import syft as sy - -# relative -from ..rate_limiter import is_within_rate_limit -from .data import query_dict - - -def extract_limit_value(sql_query: str) -> int: - # stdlib - import re - - limit_pattern = re.compile(r"\bLIMIT\s+(\d+)\b", re.IGNORECASE) - match = limit_pattern.search(sql_query) - if match: - return int(match.group(1)) - return None - - -def is_valid_sql(query: str) -> bool: - # stdlib - import sqlite3 - - # Prepare an in-memory SQLite database - conn = sqlite3.connect(":memory:") - cursor = conn.cursor() - - try: - # Use the EXPLAIN QUERY PLAN command to get the query plan - cursor.execute(f"EXPLAIN QUERY PLAN {query}") - except sqlite3.Error as e: - if "no such table" in str(e).lower(): - return True - return False - finally: - conn.close() - - -def adjust_dataframe_rows(df, target_rows: int): - # third party - import pandas as pd - - current_rows = len(df) - - if target_rows > current_rows: - # Repeat rows to match target_rows - repeat_times = (target_rows + current_rows - 1) // current_rows - df_expanded = pd.concat([df] * repeat_times, ignore_index=True).head( - target_rows - ) - else: - # Truncate rows to match target_rows - df_expanded = df.head(target_rows) - - return df_expanded - - -def make_test_query(settings: dict) -> Callable: - updated_settings = { - "calls_per_min": 10, - "rate_limiter_enabled": True, - "query_dict": query_dict, - } | settings - - # these are the same if you allow the rate limiter to be turned on and off - @sy.api_endpoint_method( - settings=updated_settings, - helper_functions=[ - is_within_rate_limit, - extract_limit_value, - is_valid_sql, - adjust_dataframe_rows, - ], - ) - def mock_test_query( - context, - sql_query: str, - ) -> str: - # stdlib - import datetime - - # third party - from google.api_core.exceptions import BadRequest - - # syft absolute - from syft import SyftException - - # Store a dict with the calltimes for each user, via the email. - if context.settings["rate_limiter_enabled"]: - if context.user.email not in context.state.keys(): - context.state[context.user.email] = [] - - if not context.code.is_within_rate_limit(context): - raise SyftException( - public_message="Rate limit of calls per minute has been reached." - ) - context.state[context.user.email].append(datetime.datetime.now()) - - bad_table = "invalid_table" - bad_post = ( - "BadRequest: 400 POST " - "https://bigquery.googleapis.com/bigquery/v2/projects/project-id/" - "queries?prettyPrint=false: " - ) - if bad_table in sql_query: - try: - raise BadRequest( - f'{bad_post} Table "{bad_table}" must be qualified ' - "with a dataset (e.g. dataset.table)." - ) - except Exception as e: - raise SyftException( - public_message=f"*must be qualified with a dataset*. {e}" - ) - - if not context.code.is_valid_sql(sql_query): - raise BadRequest( - f'{bad_post} Syntax error: Unexpected identifier "{sql_query}" at [1:1]' - ) - - # third party - import pandas as pd - - limit = context.code.extract_limit_value(sql_query) - if limit > 1_000_000: - raise SyftException( - public_message="Please only write queries that gather aggregate statistics" - ) - - base_df = pd.DataFrame(context.settings["query_dict"]) - - df = context.code.adjust_dataframe_rows(base_df, limit) - return df - - return mock_test_query diff --git a/notebooks/scenarios/bigquery/apis/rate_limiter.py b/notebooks/scenarios/bigquery/apis/rate_limiter.py deleted file mode 100644 index 8ce319b61f4..00000000000 --- a/notebooks/scenarios/bigquery/apis/rate_limiter.py +++ /dev/null @@ -1,16 +0,0 @@ -def is_within_rate_limit(context) -> bool: - """Rate limiter for custom API calls made by users.""" - # stdlib - import datetime - - state = context.state - settings = context.settings - email = context.user.email - - current_time = datetime.datetime.now() - calls_last_min = [ - 1 if (current_time - call_time).seconds < 60 else 0 - for call_time in state[email] - ] - - return sum(calls_last_min) < settings.get("calls_per_min", 5) diff --git a/notebooks/scenarios/bigquery/apis/submit_query.py b/notebooks/scenarios/bigquery/apis/submit_query.py deleted file mode 100644 index a0125ee009b..00000000000 --- a/notebooks/scenarios/bigquery/apis/submit_query.py +++ /dev/null @@ -1,42 +0,0 @@ -# syft absolute -import syft as sy - - -def make_submit_query(settings, worker_pool): - updated_settings = {"user_code_worker": worker_pool} | settings - - @sy.api_endpoint( - path="bigquery.submit_query", - description="API endpoint that allows you to submit SQL queries to run on the private data.", - worker_pool=worker_pool, - settings=updated_settings, - ) - def submit_query( - context, - func_name: str, - query: str, - ) -> str: - # syft absolute - import syft as sy - - @sy.syft_function( - name=func_name, - input_policy=sy.MixedInputPolicy( - endpoint=sy.Constant( - val=context.admin_client.api.services.bigquery.test_query - ), - query=sy.Constant(val=query), - client=context.admin_client, - ), - worker_pool_name=context.settings["user_code_worker"], - ) - def execute_query(query: str, endpoint): - res = endpoint(sql_query=query) - return res - - request = context.user_client.code.request_code_execution(execute_query) - context.admin_client.requests.set_tags(request, ["autosync"]) - - return f"Query submitted {request}. Use `client.code.{func_name}()` to run your query" - - return submit_query diff --git a/notebooks/scenarios/bigquery/sync/01-setup-high-low-datasites.ipynb b/notebooks/scenarios/bigquery/sync/01-setup-high-low-datasites.ipynb index 2b9b3b68201..633a73c38e4 100644 --- a/notebooks/scenarios/bigquery/sync/01-setup-high-low-datasites.ipynb +++ b/notebooks/scenarios/bigquery/sync/01-setup-high-low-datasites.ipynb @@ -9,13 +9,18 @@ "# stdlib\n", "import os\n", "\n", - "use_k8s_dev = False\n", - "if use_k8s_dev:\n", - " os.environ[\"ORCHESTRA_DEPLOYMENT_TYPE\"] = \"remote\"\n", - " os.environ[\"DEV_MODE\"] = \"True\"\n", - " os.environ[\"TEST_EXTERNAL_REGISTRY\"] = \"k3d-registry.localhost:5800\"\n", - " os.environ[\"CLUSTER_HTTP_PORT_HIGH\"] = \"9081\"\n", - " os.environ[\"CLUSTER_HTTP_PORT_LOW\"] = \"9083\"" + "# Testing works over 4 possibilities\n", + "# 1. (python/in-memory workers and using tox commands)\n", + "# 2. (python/in-memory workers and manually running notebooks)\n", + "# 3. (using k8s and using tox commands)\n", + "# 4. (using k8s and manually running notebooks)\n", + "# Uncomment the lines below if in the 4th possibility\n", + "\n", + "# os.environ[\"ORCHESTRA_DEPLOYMENT_TYPE\"] = \"remote\"\n", + "# os.environ[\"DEV_MODE\"] = \"True\"\n", + "# os.environ[\"TEST_EXTERNAL_REGISTRY\"] = \"k3d-registry.localhost:5800\"\n", + "# os.environ[\"CLUSTER_HTTP_PORT_HIGH\"] = \"9081\"\n", + "# os.environ[\"CLUSTER_HTTP_PORT_LOW\"] = \"9083\"" ] }, { @@ -37,10 +42,15 @@ "metadata": {}, "outputs": [], "source": [ + "# isort: off\n", "# syft absolute\n", "import syft as sy\n", - "from syft import get_helpers # noqa: F401\n", - "from syft import test_settings" + "from syft import test_helpers # noqa: F401\n", + "from syft import test_settings\n", + "\n", + "from worker_helpers import build_and_launch_worker_pool_from_docker_str\n", + "from worker_helpers import launch_worker_pool_from_docker_tag_and_registry\n", + "# isort: on" ] }, { @@ -118,22 +128,11 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "# Setup High First" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "- If you want to use the k8s registry, we submit and build an image, and we scale a worker pool with that image\n", - "- If you want to use the k8s registry, do ?????" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "- helper for launching worker pools" + "# Setup High First\n", + "\n", + "- If using an external registery, we want to get this from the test_settings.\n", + "- We build the docker image over the base docker image in Syft\n", + "- We give a tag called worker-bigquery to our custom pool image" ] }, { @@ -143,209 +142,19 @@ "outputs": [], "source": [ "external_registry = test_settings.get(\"external_registry\", default=\"docker.io\")\n", - "external_registry" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "result = high_client.api.services.image_registry.add(external_registry)\n", - "result" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "image_registry_list = high_client.api.services.image_registry.get_all()\n", - "image_registry_list" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "local_registry = image_registry_list[0]\n", - "local_registry" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ + "\n", "base_worker_image = high_client.images.get_all()[0]\n", - "base_worker_image" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ + "\n", "worker_dockerfile = f\"\"\"\n", "FROM {str(base_worker_image.image_identifier)}\n", "\n", "RUN uv pip install db-dtypes google-cloud-bigquery \n", "\n", "\"\"\".strip()\n", - "worker_dockerfile" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "docker_config = sy.DockerWorkerConfig(dockerfile=worker_dockerfile)\n", - "assert docker_config.dockerfile == worker_dockerfile" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "submit_result = high_client.api.services.worker_image.submit(\n", - " worker_config=docker_config\n", - ")\n", - "submit_result" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# get non prebuilt\n", - "dockerfile_list = high_client.images.get_all()\n", - "worker_image = next(\n", - " (\n", - " image\n", - " for image in dockerfile_list\n", - " if not image.is_prebuilt and image.config.dockerfile == worker_dockerfile\n", - " ),\n", - " None,\n", - ")\n", - "worker_image" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ + "\n", "docker_tag = str(base_worker_image.image_identifier).replace(\n", " \"backend\", \"worker-bigquery\"\n", - ")\n", - "docker_tag" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "if environment == \"remote\":\n", - " docker_build_result = high_client.api.services.worker_image.build(\n", - " image_uid=worker_image.id,\n", - " tag=docker_tag,\n", - " registry_uid=local_registry.id,\n", - " )\n", - " print(docker_build_result)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "if environment == \"remote\":\n", - " push_result = high_client.api.services.worker_image.push(worker_image.id)\n", - " print(push_result)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "docker_config = sy.PrebuiltWorkerConfig(tag=docker_tag)\n", - "docker_config" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "result = high_client.api.services.worker_image.submit(worker_config=docker_config)\n", - "worker_image_id = result.value.id\n", - "result" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Update the list\n", - "dockerfile_list = high_client.images.get_all()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# get prebuilt\n", - "# dockerfile_list = high_client.images.get_all()\n", - "# worker_image = next(\n", - "# (image for image in dockerfile_list if image.is_prebuilt),\n", - "# None,\n", - "# )\n", - "# worker_image\n", - "\n", - "# TODO: fix\n", - "# Similar issue as in non-sync notebooks. Refer to 01-setup-datasite.ipynb\n", - "\n", - "worker_image = next(\n", - " (\n", - " image\n", - " for image in dockerfile_list\n", - " if \"worker-bigquery\" in str(image.image_identifier)\n", - " ),\n", - " None,\n", - ")\n", - "worker_image" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "assert worker_image" + ")" ] }, { @@ -365,52 +174,17 @@ "metadata": {}, "outputs": [], "source": [ - "result = high_client.api.services.worker_pool.launch(\n", - " pool_name=worker_pool_name,\n", - " image_uid=worker_image.id,\n", - " num_workers=1,\n", - " pod_annotations=custom_pool_pod_annotations,\n", - " pod_labels=custom_pool_pod_labels,\n", - ")\n", - "result" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "if environment == \"remote\":\n", - " result = high_client.worker_pools.scale(number=2, pool_name=worker_pool_name)\n", - " print(result)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "assert len(high_client.worker_pools.get_all()) == 2" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "widget = sy.sync(from_client=high_client, to_client=low_client, hide_usercode=False)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "widget" + "build_and_launch_worker_pool_from_docker_str(\n", + " environment=environment,\n", + " client=high_client,\n", + " worker_pool_name=worker_pool_name,\n", + " custom_pool_pod_annotations=custom_pool_pod_annotations,\n", + " custom_pool_pod_labels=custom_pool_pod_labels,\n", + " worker_dockerfile=worker_dockerfile,\n", + " external_registry=external_registry,\n", + " docker_tag=docker_tag,\n", + " scale_to=2,\n", + ")" ] }, { @@ -444,52 +218,16 @@ "metadata": {}, "outputs": [], "source": [ - "result = low_client.api.services.image_registry.add(external_registry)\n", - "result" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "docker_config = sy.PrebuiltWorkerConfig(tag=docker_tag)\n", - "docker_config" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "result = low_client.api.services.worker_image.submit(worker_config=docker_config)\n", - "result" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# get prebuilt\n", - "dockerfile_list = low_client.images.get_all()\n", - "worker_image = next(\n", - " (\n", - " image\n", - " for image in dockerfile_list\n", - " if \"worker-bigquery\" in str(image.image_identifier)\n", - " ),\n", - " None,\n", - ")\n", - "worker_image\n", - "# worker_image = next(\n", - "# (image for image in dockerfile_list if image.is_prebuilt),\n", - "# None,\n", - "# )\n", - "# worker_image" + "launch_result = launch_worker_pool_from_docker_tag_and_registry(\n", + " environment=environment,\n", + " client=low_client,\n", + " worker_pool_name=worker_pool_name,\n", + " custom_pool_pod_annotations=custom_pool_pod_annotations,\n", + " custom_pool_pod_labels=custom_pool_pod_labels,\n", + " docker_tag=docker_tag,\n", + " external_registry=external_registry,\n", + " scale_to=1,\n", + ")" ] }, { @@ -498,23 +236,14 @@ "metadata": {}, "outputs": [], "source": [ - "result = low_client.api.services.worker_pool.launch(\n", - " pool_name=worker_pool_name,\n", - " image_uid=worker_image.id,\n", - " num_workers=1,\n", - " pod_annotations=custom_pool_pod_annotations,\n", - " pod_labels=custom_pool_pod_labels,\n", - ")\n", - "result" + "assert len(low_client.worker_pools.get_all()) == 2" ] }, { - "cell_type": "code", - "execution_count": null, + "cell_type": "markdown", "metadata": {}, - "outputs": [], "source": [ - "assert len(low_client.worker_pools.get_all()) == 2" + "# Register a DS only on the low side" ] }, { @@ -555,12 +284,10 @@ ] }, { - "cell_type": "code", - "execution_count": null, + "cell_type": "markdown", "metadata": {}, - "outputs": [], "source": [ - "# TODO: close email client" + "# Close" ] }, { @@ -579,7 +306,9 @@ "execution_count": null, "metadata": {}, "outputs": [], - "source": [] + "source": [ + "# TODO close email client" + ] } ], "metadata": { diff --git a/notebooks/scenarios/bigquery/sync/02-configure-api-and-sync.ipynb b/notebooks/scenarios/bigquery/sync/02-configure-api-and-sync.ipynb index 3e9a5700d72..094841ef58e 100644 --- a/notebooks/scenarios/bigquery/sync/02-configure-api-and-sync.ipynb +++ b/notebooks/scenarios/bigquery/sync/02-configure-api-and-sync.ipynb @@ -9,7 +9,6 @@ "# stdlib\n", "import os\n", "\n", - "# TODO: if\n", "# os.environ[\"ORCHESTRA_DEPLOYMENT_TYPE\"] = \"remote\"\n", "# os.environ[\"DEV_MODE\"] = \"True\"\n", "# os.environ[\"TEST_EXTERNAL_REGISTRY\"] = \"k3d-registry.localhost:5800\"\n", @@ -42,8 +41,9 @@ "\n", "# syft absolute\n", "import syft as sy\n", - "from syft import get_helpers # noqa: F401\n", + "from syft import test_helpers # noqa: F401\n", "from syft import test_settings\n", + "from syft.client.syncing import compare_clients\n", "\n", "# set to use the live APIs\n", "# import os\n", @@ -52,6 +52,7 @@ "from apis import make_schema\n", "from apis import make_submit_query\n", "from apis import make_test_query\n", + "import pandas as pd\n", "# isort: on" ] }, @@ -116,13 +117,69 @@ "this_worker_pool_name = \"bigquery-pool\"" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Load database information from test_settings" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "dataset_1 = test_settings.get(\"dataset_1\", default=\"dataset_1\")\n", + "dataset_2 = test_settings.get(\"dataset_2\", default=\"dataset_2\")\n", + "table_1 = test_settings.get(\"table_1\", default=\"table_1\")\n", + "table_2 = test_settings.get(\"table_2\", default=\"table_2\")\n", + "table_2_col_id = test_settings.get(\"table_2_col_id\", default=\"table_id\")\n", + "table_2_col_score = test_settings.get(\"table_2_col_score\", default=\"colname\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Create and test different endpoints" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "----" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Create `biquery.schema` endpoint" + ] + }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ - "# !pip list | grep bigquery" + "schema_function = make_schema(\n", + " settings={\n", + " \"calls_per_min\": 5,\n", + " },\n", + " worker_pool=this_worker_pool_name,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "high_client.custom_api.add(endpoint=schema_function)" ] }, { @@ -131,14 +188,23 @@ "metadata": {}, "outputs": [], "source": [ - "# !pip install db-dtypes google-cloud-bigquery" + "result = high_client.api.services.bigquery.schema()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "assert len(result) == 23" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "# load schema data" + "TODO: Note that when we do not create a job, the type of result is `syft.service.action.pandas.PandasDataFrameObject` and not pandas but the `.get()` method will get you the expected answer" ] }, { @@ -147,19 +213,26 @@ "metadata": {}, "outputs": [], "source": [ - "dataset_1 = test_settings.get(\"dataset_1\", default=\"dataset_1\")\n", - "dataset_2 = test_settings.get(\"dataset_2\", default=\"dataset_2\")\n", - "table_1 = test_settings.get(\"table_1\", default=\"table_1\")\n", - "table_2 = test_settings.get(\"table_2\", default=\"table_2\")\n", - "table_2_col_id = test_settings.get(\"table_2_col_id\", default=\"table_id\")\n", - "table_2_col_score = test_settings.get(\"table_2_col_score\", default=\"colname\")" + "# syft absolute\n", + "from syft.service.action.pandas import PandasDataFrameObject\n", + "\n", + "# assert isinstance(result, pd.DataFrame)\n", + "assert isinstance(result, PandasDataFrameObject)\n", + "assert isinstance(result.get(), pd.DataFrame)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "# Twin endpoints" + "____" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Create `biquery.test_query` endpoint" ] }, { @@ -206,6 +279,13 @@ "high_client.custom_api.add(endpoint=new_endpoint)" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Some features for updating endpoint" + ] + }, { "cell_type": "code", "execution_count": null, @@ -229,6 +309,13 @@ ")" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Test the `bigquery.test_query` endpoint" + ] + }, { "cell_type": "code", "execution_count": null, @@ -248,11 +335,7 @@ "metadata": {}, "outputs": [], "source": [ - "# Test mock version\n", - "result = high_client.api.services.bigquery.test_query.mock(\n", - " sql_query=f\"SELECT * FROM {dataset_1}.{table_1} LIMIT 10\"\n", - ")\n", - "result" + "assert len(result) == 10" ] }, { @@ -261,8 +344,6 @@ "metadata": {}, "outputs": [], "source": [ - "# Bug with the new Error PR: message printed multiple times. TODO clean up the duplicate exception messages.\n", - "\n", "# Test mock version for wrong queries\n", "with sy.raises(\n", " sy.SyftException(public_message=\"*must be qualified with a dataset*\"), show=True\n", @@ -280,66 +361,32 @@ "source": [ "# Test private version\n", "result = high_client.api.services.bigquery.test_query.private(\n", - " sql_query=f\"SELECT * FROM {dataset_1}.{table_1} LIMIT 10\"\n", + " sql_query=f\"SELECT * FROM {dataset_1}.{table_1} LIMIT 12\"\n", ")\n", "result" ] }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Create `biquery.schema` endpoint" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "schema_function = make_schema(\n", - " settings={\n", - " \"calls_per_min\": 5,\n", - " },\n", - " worker_pool=this_worker_pool_name,\n", - ")" - ] - }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ - "high_client.custom_api.add(endpoint=schema_function)\n", - "# can we delete this?\n", - "# high_client.refresh()" + "assert len(result) == 12" ] }, { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "high_client.api.services.bigquery.schema()" - ] - }, - { - "cell_type": "code", - "execution_count": null, + "cell_type": "markdown", "metadata": {}, - "outputs": [], "source": [ - "# todo add tests" + "____" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "# Create `submit_query` endpoint" + "### Create `submit_query` endpoint" ] }, { @@ -382,7 +429,7 @@ "# Testing submit query\n", "result = high_client.api.services.bigquery.submit_query(\n", " func_name=\"my_func\",\n", - " query=f\"SELECT * FROM {dataset_1}.{table_1} LIMIT 1\",\n", + " query=f\"SELECT * FROM {dataset_1}.{table_1} LIMIT 2\",\n", ")" ] }, @@ -420,9 +467,6 @@ "metadata": {}, "outputs": [], "source": [ - "# third party\n", - "import pandas as pd\n", - "\n", "assert isinstance(res, pd.DataFrame)" ] }, @@ -463,18 +507,6 @@ ")" ] }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# syft absolute\n", - "from syft.client.syncing import compare_clients\n", - "from syft.service.job.job_stash import Job\n", - "from syft.service.job.job_stash import JobStatus" - ] - }, { "cell_type": "markdown", "metadata": {}, @@ -482,45 +514,6 @@ "# Syncing" ] }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# todo: move to helper" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "def is_job_to_sync(batch):\n", - " if batch.status != \"NEW\":\n", - " return False\n", - " if not isinstance(batch.root.high_obj, Job):\n", - " return False\n", - " job = batch.root.high_obj\n", - " return job.status in (JobStatus.ERRORED, JobStatus.COMPLETED)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Request\n", - "\n", - "# UserCode - UserCodeStatus\n", - "\n", - "# Job - Log - Result - ExecutionOutput\n", - "\n", - "# TwinAPIEndpoint - EndPoint" - ] - }, { "cell_type": "code", "execution_count": null, @@ -532,82 +525,14 @@ ")" ] }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "- verify that jobs are actually filtered out\n", - "- we need to think about whether its possible for the admin to create more data here that would break sync" - ] - }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ - "widget = diff.resolve()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "widget._share_all()\n", - "widget._sync_all()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# def sync_new_objects(\n", - "# from_client, to_client, dry_run: bool = True, private_data: bool = False\n", - "# ):\n", - "# sim = \"Simulating \" if dry_run else \"\"\n", - "# priv = \"WITH PRIVATE DATA\" if private_data else \"\"\n", - "# print(f\"{sim}Syncing from {from_client.name} to {to_client.name} {priv}\")\n", - "# changes = []\n", - "# diff = compare_clients(\n", - "# from_client=from_client, to_client=to_client, hide_usercode=False\n", - "# )\n", - "# if isinstance(diff, sy.SyftError):\n", - "# return diff\n", - "\n", - "# for batch in diff.batches:\n", - "# try:\n", - "# if is_job_to_sync(batch) or batch.status == \"NEW\":\n", - "# w = batch.resolve(build_state=False)\n", - "# if private_data:\n", - "# w.click_share_all_private_data()\n", - "# if not dry_run:\n", - "# w.click_sync()\n", - "# change_text = f\"Synced {batch.status} {batch.root_type.__name__}\"\n", - "# if not dry_run:\n", - "# changes.append(change_text)\n", - "# else:\n", - "# print(f\"Would have run: {change_text}\")\n", - "# except Exception as e:\n", - "# print(\"sync_new_objects\", e)\n", - "# raise e\n", - "# return changes" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# result = sync_new_objects(high_client, low_client)\n", - "# result\n", - "\n", - "# result = sync_new_objects(high_client, low_client, dry_run=False)\n", - "# result" + "# TODO verify that jobs are actually filtered out\n", + "# TODO we need to think about whether its possible for the admin to create more data here that would break sync" ] }, { @@ -616,41 +541,7 @@ "metadata": {}, "outputs": [], "source": [ - "# assert [\n", - "# \"Synced NEW TwinAPIEndpoint\",\n", - "# \"Synced NEW TwinAPIEndpoint\",\n", - "# \"Synced NEW TwinAPIEndpoint\",\n", - "# ] == result" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# widget = sy.sync(from_client=high_client, to_client=low_client, hide_usercode=False)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# # TODO: ignore private function from high side in diff\n", - "# widget" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# widget.click_sync(0)\n", - "# widget.click_sync(1)\n", - "# widget.click_sync(2)" + "widget = diff.resolve()" ] }, { @@ -659,10 +550,9 @@ "metadata": {}, "outputs": [], "source": [ - "# Some internal helper methods\n", - "\n", - "# widget._share_all()\n", - "# widget._sync_all()" + "# TODO maybe see if non-internal method we can use or make it public\n", + "widget._share_all()\n", + "widget._sync_all()" ] }, { diff --git a/notebooks/scenarios/bigquery/sync/03-ds-submit-request.ipynb b/notebooks/scenarios/bigquery/sync/03-ds-submit-request.ipynb index fe6bf1b1166..a2759038134 100644 --- a/notebooks/scenarios/bigquery/sync/03-ds-submit-request.ipynb +++ b/notebooks/scenarios/bigquery/sync/03-ds-submit-request.ipynb @@ -89,7 +89,8 @@ "metadata": {}, "outputs": [], "source": [ - "assert len(ds_client.custom_api.api_endpoints()) == 3" + "# Find available endpoints\n", + "ds_client.custom_api.api_endpoints()" ] }, { @@ -98,12 +99,7 @@ "metadata": {}, "outputs": [], "source": [ - "dataset_1 = test_settings.get(\"dataset_1\", default=\"dataset_1\")\n", - "dataset_2 = test_settings.get(\"dataset_2\", default=\"dataset_2\")\n", - "table_1 = test_settings.get(\"table_1\", default=\"table_1\")\n", - "table_2 = test_settings.get(\"table_2\", default=\"table_2\")\n", - "table_2_col_id = test_settings.get(\"table_2_col_id\", default=\"table_id\")\n", - "table_2_col_score = test_settings.get(\"table_2_col_score\", default=\"colname\")" + "assert len(ds_client.custom_api.api_endpoints()) == 3" ] }, { @@ -120,14 +116,45 @@ "outputs": [], "source": [ "res = ds_client.api.services.bigquery.schema()\n", + "res.get()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ "assert isinstance(res.get(), pd.DataFrame)" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Load these variables for testing but they ideally come from the schema\n", + "dataset_1 = test_settings.get(\"dataset_1\", default=\"dataset_1\")\n", + "dataset_2 = test_settings.get(\"dataset_2\", default=\"dataset_2\")\n", + "table_1 = test_settings.get(\"table_1\", default=\"table_1\")\n", + "table_2 = test_settings.get(\"table_2\", default=\"table_2\")\n", + "table_2_col_id = test_settings.get(\"table_2_col_id\", default=\"table_id\")\n", + "table_2_col_score = test_settings.get(\"table_2_col_score\", default=\"colname\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "____" + ] + }, { "cell_type": "markdown", "metadata": {}, "source": [ - "# Test the mock" + "## Test the `bigquery.test_query` endpoint" ] }, { @@ -160,7 +187,6 @@ "metadata": {}, "outputs": [], "source": [ - "FUNC_NAME = \"large_sample\"\n", "LARGE_SAMPLE_QUERY = f\"SELECT * FROM {dataset_2}.{table_2} LIMIT 10000\"" ] }, @@ -173,6 +199,15 @@ "mock_res = ds_client.api.services.bigquery.test_query(sql_query=LARGE_SAMPLE_QUERY)" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "assert len(mock_res) == 10000" + ] + }, { "cell_type": "markdown", "metadata": {}, @@ -180,6 +215,15 @@ "# Submit a query" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "FUNC_NAME = \"large_sample\"" + ] + }, { "cell_type": "code", "execution_count": null, @@ -191,6 +235,15 @@ ")" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "submission" + ] + }, { "cell_type": "code", "execution_count": null, @@ -216,6 +269,13 @@ " ds_client.code.large_sample()" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Shutdown" + ] + }, { "cell_type": "code", "execution_count": null, @@ -259,7 +319,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.12.5" + "version": "3.12.3" } }, "nbformat": 4, diff --git a/notebooks/scenarios/bigquery/sync/04-do-review-requests.ipynb b/notebooks/scenarios/bigquery/sync/04-do-review-requests.ipynb index b3ca7b71d53..4eec3d6e7b1 100644 --- a/notebooks/scenarios/bigquery/sync/04-do-review-requests.ipynb +++ b/notebooks/scenarios/bigquery/sync/04-do-review-requests.ipynb @@ -9,10 +9,12 @@ "# stdlib\n", "import os\n", "\n", + "# third party\n", + "import pandas as pd\n", + "\n", "# syft absolute\n", "import syft as sy\n", - "from syft.service.code.user_code import UserCode\n", - "from syft.service.request.request import Request\n", + "from syft.client.syncing import compare_clients\n", "\n", "# os.environ[\"ORCHESTRA_DEPLOYMENT_TYPE\"] = \"remote\"\n", "# os.environ[\"CLUSTER_HTTP_PORT_HIGH\"] = \"9081\"\n", @@ -30,6 +32,13 @@ "low_port = os.environ.get(\"CLUSTER_HTTP_PORT_LOW\", \"auto\")" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Launch server and login" + ] + }, { "cell_type": "code", "execution_count": null, @@ -66,13 +75,10 @@ ] }, { - "cell_type": "code", - "execution_count": null, + "cell_type": "markdown", "metadata": {}, - "outputs": [], "source": [ - "# syft absolute\n", - "from syft.client.syncing import compare_clients" + "# Sync UserCode and Requests to High Side" ] }, { @@ -81,9 +87,9 @@ "metadata": {}, "outputs": [], "source": [ - "# syft absolute\n", - "from syft.service.job.job_stash import Job\n", - "from syft.service.job.job_stash import JobStatus" + "diffs = compare_clients(\n", + " from_client=low_client, to_client=high_client, hide_usercode=False\n", + ")" ] }, { @@ -92,13 +98,11 @@ "metadata": {}, "outputs": [], "source": [ - "def is_job_to_sync(batch):\n", - " if batch.status != \"NEW\":\n", - " return False\n", - " if not isinstance(batch.root.high_obj, Job):\n", - " return False\n", - " job = batch.root.high_obj\n", - " return job.status in (JobStatus.ERRORED, JobStatus.COMPLETED)" + "# check that only requests and usercode are in the diff\n", + "assert {diff.root_diff.obj_type.__qualname__ for diff in diffs.batches} == {\n", + " \"Request\",\n", + " \"UserCode\",\n", + "}" ] }, { @@ -107,64 +111,17 @@ "metadata": {}, "outputs": [], "source": [ - "def sync_new_objects(\n", - " from_client, to_client, dry_run: bool = True, private_data: bool = False\n", - "):\n", - " sim = \"Simulating \" if dry_run else \"\"\n", - " priv = \"WITH PRIVATE DATA\" if private_data else \"\"\n", - " print(f\"{sim}Syncing from {from_client.name} to {to_client.name} {priv}\")\n", - " changes = []\n", - " diff = compare_clients(\n", - " from_client=from_client, to_client=to_client, hide_usercode=False\n", - " )\n", - " if isinstance(diff, sy.SyftError):\n", - " return diff\n", + "widget = diffs.resolve()\n", "\n", - " for batch in diff.batches:\n", - " try:\n", - " if is_job_to_sync(batch) or batch.status == \"NEW\":\n", - " w = batch.resolve(build_state=False)\n", - " if private_data:\n", - " w.click_share_all_private_data()\n", - " if not dry_run:\n", - " w.click_sync()\n", - " change_text = f\"Synced {batch.status} {batch.root_type.__name__}\"\n", - " if not dry_run:\n", - " changes.append(change_text)\n", - " else:\n", - " print(f\"Would have run: {change_text}\")\n", - " except Exception as e:\n", - " print(\"sync_new_objects\", e)\n", - " raise e\n", - " return changes" + "widget._share_all()\n", + "widget._sync_all()" ] }, { - "cell_type": "code", - "execution_count": null, + "cell_type": "markdown", "metadata": {}, - "outputs": [], "source": [ - "sync_new_objects(low_client, high_client)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "result = sync_new_objects(low_client, high_client, dry_run=False, private_data=True)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "assert \"Synced NEW UserCode\" in result\n", - "assert \"Synced NEW Request\" in result" + "# Check that request synced over to high side" ] }, { @@ -192,29 +149,20 @@ "metadata": {}, "outputs": [], "source": [ - "user_request = None" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ + "user_request = None\n", "for request in requests:\n", " if \"large_sample\" in getattr(\n", " getattr(request, \"code\", None), \"service_func_name\", None\n", " ):\n", - " user_request = request" + " user_request = request\n", + "assert user_request" ] }, { - "cell_type": "code", - "execution_count": null, + "cell_type": "markdown", "metadata": {}, - "outputs": [], "source": [ - "assert user_request" + "# Execute Request" ] }, { @@ -223,21 +171,7 @@ "metadata": {}, "outputs": [], "source": [ - "def execute_request(client, request) -> dict:\n", - " if not isinstance(request, Request):\n", - " return \"This is not a request\"\n", - "\n", - " code = request.code\n", - " if not isinstance(code, UserCode):\n", - " return \"No usercode found\"\n", - "\n", - " func_name = request.code.service_func_name\n", - " api_func = getattr(client.code, func_name, None)\n", - " if api_func is None:\n", - " return \"Code name was not found on the client.\"\n", - "\n", - " job = api_func(blocking=False)\n", - " return job" + "job = high_client.code.large_sample(blocking=False)" ] }, { @@ -246,16 +180,16 @@ "metadata": {}, "outputs": [], "source": [ - "job = execute_request(high_client, user_request)" + "res = job.wait().get()\n", + "\n", + "assert isinstance(res, pd.DataFrame)" ] }, { - "cell_type": "code", - "execution_count": null, + "cell_type": "markdown", "metadata": {}, - "outputs": [], "source": [ - "job" + "# Sync job result to low side" ] }, { @@ -264,7 +198,9 @@ "metadata": {}, "outputs": [], "source": [ - "job.wait()" + "diffs = compare_clients(\n", + " from_client=high_client, to_client=low_client, hide_usercode=False\n", + ")" ] }, { @@ -273,7 +209,8 @@ "metadata": {}, "outputs": [], "source": [ - "sync_new_objects(high_client, low_client)" + "assert len(diffs.batches) == 1\n", + "assert diffs.batches[0].root_diff.obj_type.__qualname__ == \"Job\"" ] }, { @@ -282,16 +219,17 @@ "metadata": {}, "outputs": [], "source": [ - "result = sync_new_objects(high_client, low_client, dry_run=False, private_data=True)" + "widget = diffs.resolve()\n", + "\n", + "widget._share_all()\n", + "widget._sync_all()" ] }, { - "cell_type": "code", - "execution_count": null, + "cell_type": "markdown", "metadata": {}, - "outputs": [], "source": [ - "assert \"Synced NEW Job\" in result" + "# Check that job was synced to low side" ] }, { @@ -310,15 +248,7 @@ "metadata": {}, "outputs": [], "source": [ - "user_request = None" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ + "user_request = None\n", "for request in requests:\n", " if \"large_sample\" in getattr(\n", " getattr(request, \"code\", None), \"service_func_name\", None\n", @@ -326,15 +256,6 @@ " user_request = request" ] }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "user_request.status" - ] - }, { "cell_type": "code", "execution_count": null, @@ -379,7 +300,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.12.5" + "version": "3.12.3" } }, "nbformat": 4, diff --git a/notebooks/scenarios/bigquery/sync/05-ds-get-results.ipynb b/notebooks/scenarios/bigquery/sync/05-ds-get-results.ipynb index cc4e50a6306..1e61e0d8587 100644 --- a/notebooks/scenarios/bigquery/sync/05-ds-get-results.ipynb +++ b/notebooks/scenarios/bigquery/sync/05-ds-get-results.ipynb @@ -30,6 +30,13 @@ "print(environment, low_port)" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Launch server and login" + ] + }, { "cell_type": "code", "execution_count": null, @@ -58,15 +65,10 @@ ] }, { - "cell_type": "code", - "execution_count": null, + "cell_type": "markdown", "metadata": {}, - "outputs": [], "source": [ - "api_method = None\n", - "for code in ds_client.code:\n", - " if \"large_sample\" in code.service_func_name:\n", - " api_method = getattr(ds_client.code, code.service_func_name, None)" + "# Check result of job on low side" ] }, { @@ -75,7 +77,7 @@ "metadata": {}, "outputs": [], "source": [ - "job = api_method(blocking=False)" + "job = ds_client.code.large_sample(blocking=False)" ] }, { @@ -139,7 +141,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.12.5" + "version": "3.12.3" } }, "nbformat": 4, diff --git a/packages/syft/src/syft/__init__.py b/packages/syft/src/syft/__init__.py index b20ce094cb0..fb0fdfa69b1 100644 --- a/packages/syft/src/syft/__init__.py +++ b/packages/syft/src/syft/__init__.py @@ -155,7 +155,7 @@ def _test_settings() -> Any: @module_property -def _get_helpers() -> None: +def _test_helpers() -> None: # relative from .util.util import add_helper_path_to_python_path diff --git a/packages/syft/src/syft/util/util.py b/packages/syft/src/syft/util/util.py index a2c392c09c7..83efaa196e7 100644 --- a/packages/syft/src/syft/util/util.py +++ b/packages/syft/src/syft/util/util.py @@ -1154,7 +1154,7 @@ def add_helper_path_to_python_path() -> None: current_path = import_path base_dir = find_base_dir_with_tox_ini(current_path) - notebook_helper_path = os.path.join(base_dir, "notebooks/notebook_helpers") + notebook_helper_path = os.path.join(base_dir, "test_helpers") sys.path.append(notebook_helper_path) diff --git a/notebooks/notebook_helpers/apis/__init__.py b/test_helpers/apis/__init__.py similarity index 100% rename from notebooks/notebook_helpers/apis/__init__.py rename to test_helpers/apis/__init__.py diff --git a/notebooks/notebook_helpers/apis/live/__init__.py b/test_helpers/apis/live/__init__.py similarity index 100% rename from notebooks/notebook_helpers/apis/live/__init__.py rename to test_helpers/apis/live/__init__.py diff --git a/notebooks/notebook_helpers/apis/live/schema.py b/test_helpers/apis/live/schema.py similarity index 100% rename from notebooks/notebook_helpers/apis/live/schema.py rename to test_helpers/apis/live/schema.py diff --git a/notebooks/notebook_helpers/apis/live/test_query.py b/test_helpers/apis/live/test_query.py similarity index 100% rename from notebooks/notebook_helpers/apis/live/test_query.py rename to test_helpers/apis/live/test_query.py diff --git a/notebooks/notebook_helpers/apis/mock/__init__.py b/test_helpers/apis/mock/__init__.py similarity index 100% rename from notebooks/notebook_helpers/apis/mock/__init__.py rename to test_helpers/apis/mock/__init__.py diff --git a/notebooks/notebook_helpers/apis/mock/data.py b/test_helpers/apis/mock/data.py similarity index 100% rename from notebooks/notebook_helpers/apis/mock/data.py rename to test_helpers/apis/mock/data.py diff --git a/notebooks/notebook_helpers/apis/mock/schema.py b/test_helpers/apis/mock/schema.py similarity index 100% rename from notebooks/notebook_helpers/apis/mock/schema.py rename to test_helpers/apis/mock/schema.py diff --git a/notebooks/notebook_helpers/apis/mock/test_query.py b/test_helpers/apis/mock/test_query.py similarity index 100% rename from notebooks/notebook_helpers/apis/mock/test_query.py rename to test_helpers/apis/mock/test_query.py diff --git a/notebooks/notebook_helpers/apis/rate_limiter.py b/test_helpers/apis/rate_limiter.py similarity index 100% rename from notebooks/notebook_helpers/apis/rate_limiter.py rename to test_helpers/apis/rate_limiter.py diff --git a/notebooks/notebook_helpers/apis/submit_query.py b/test_helpers/apis/submit_query.py similarity index 100% rename from notebooks/notebook_helpers/apis/submit_query.py rename to test_helpers/apis/submit_query.py diff --git a/notebooks/notebook_helpers/email_helpers.py b/test_helpers/email_helpers.py similarity index 100% rename from notebooks/notebook_helpers/email_helpers.py rename to test_helpers/email_helpers.py diff --git a/notebooks/notebook_helpers/job_helpers.py b/test_helpers/job_helpers.py similarity index 100% rename from notebooks/notebook_helpers/job_helpers.py rename to test_helpers/job_helpers.py diff --git a/notebooks/notebook_helpers/sync_helpers.py b/test_helpers/sync_helpers.py similarity index 100% rename from notebooks/notebook_helpers/sync_helpers.py rename to test_helpers/sync_helpers.py diff --git a/test_helpers/worker_helpers.py b/test_helpers/worker_helpers.py new file mode 100644 index 00000000000..5acef1610ca --- /dev/null +++ b/test_helpers/worker_helpers.py @@ -0,0 +1,86 @@ +# syft absolute +import syft as sy + + +def build_and_launch_worker_pool_from_docker_str( + environment: str, + client: sy.DatasiteClient, + worker_pool_name: str, + custom_pool_pod_annotations: dict, + custom_pool_pod_labels: dict, + worker_dockerfile: str, + external_registry: str, + docker_tag: str, + scale_to: int, +): + result = client.api.services.image_registry.add(external_registry) + assert "success" in result.message + + # For some reason, when using k9s, result.value is empty so can't use the below line + # local_registry = result.value + local_registry = client.api.services.image_registry[0] + + docker_config = sy.DockerWorkerConfig(dockerfile=worker_dockerfile) + assert docker_config.dockerfile == worker_dockerfile + submit_result = client.api.services.worker_image.submit(worker_config=docker_config) + print(submit_result.message) + assert "success" in submit_result.message + + worker_image = submit_result.value + + if environment == "remote": + docker_build_result = client.api.services.worker_image.build( + image_uid=worker_image.id, + tag=docker_tag, + registry_uid=local_registry.id, + ) + print(docker_build_result) + + if environment == "remote": + push_result = client.api.services.worker_image.push(worker_image.id) + print(push_result) + + result = client.api.services.worker_pool.launch( + pool_name=worker_pool_name, + image_uid=worker_image.id, + num_workers=1, + pod_annotations=custom_pool_pod_annotations, + pod_labels=custom_pool_pod_labels, + ) + print(result) + # assert 'success' in str(result.message) + + if environment == "remote": + result = client.worker_pools.scale(number=scale_to, pool_name=worker_pool_name) + print(result) + + +def launch_worker_pool_from_docker_tag_and_registry( + environment: str, + client: sy.DatasiteClient, + worker_pool_name: str, + custom_pool_pod_annotations: dict, + custom_pool_pod_labels: dict, + docker_tag: str, + external_registry: str, + scale_to: int = 1, +): + res = client.api.services.image_registry.add(external_registry) + assert "success" in res.message + docker_config = sy.PrebuiltWorkerConfig(tag=docker_tag) + image_result = client.api.services.worker_image.submit(worker_config=docker_config) + assert "success" in res.message + worker_image = image_result.value + + launch_result = client.api.services.worker_pool.launch( + pool_name=worker_pool_name, + image_uid=worker_image.id, + num_workers=1, + pod_annotations=custom_pool_pod_annotations, + pod_labels=custom_pool_pod_labels, + ) + if environment == "remote" and scale_to > 1: + result = client.worker_pools.scale(number=scale_to, pool_name=worker_pool_name) + print(result) + + return launch_result diff --git a/tox.ini b/tox.ini index 6e37f408ded..15353917ac2 100644 --- a/tox.ini +++ b/tox.ini @@ -425,7 +425,7 @@ setenv = DEVSPACE_PROFILE = bigquery-scenario-tests GITHUB_CI = {env:GITHUB_CI:false} SYFT_BASE_IMAGE_REGISTRY = {env:SYFT_BASE_IMAGE_REGISTRY:k3d-registry.localhost:5800} - DATASITE_CLUSTER_NAME = {env:DATASITE_CLUSTER_NAME:test-datasite-1} + DATASITE_CLUSTER_NAME = {env:DATASITE_CLUSTER_NAME:bigquery-high} SERVER_URL = {env:SERVER_URL:http://localhost} SERVER_PORT = {env:SERVER_PORT:8080} TEST_EXTERNAL_REGISTRY = {env:TEST_EXTERNAL_REGISTRY:k3d-registry.localhost:5800} @@ -452,7 +452,7 @@ commands = tox -e dev.k8s.registry - # Creating test-datasite-1 cluster on port SERVER_PORT + # Creating bigquery-high cluster on port SERVER_PORT bash -c '\ export CLUSTER_NAME=${DATASITE_CLUSTER_NAME} CLUSTER_HTTP_PORT=${SERVER_PORT} && \ tox -e dev.k8s.start && \ @@ -466,7 +466,8 @@ commands = ; sleep 30 - # wait for test-datasite-1 + + # wait for bigquery-high bash packages/grid/scripts/wait_for.sh service postgres --context k3d-{env:DATASITE_CLUSTER_NAME} --namespace syft bash packages/grid/scripts/wait_for.sh service backend --context k3d-{env:DATASITE_CLUSTER_NAME} --namespace syft bash packages/grid/scripts/wait_for.sh service proxy --context k3d-{env:DATASITE_CLUSTER_NAME} --namespace syft @@ -474,7 +475,7 @@ commands = bash packages/grid/scripts/wait_for.sh service frontend --context k3d-{env:DATASITE_CLUSTER_NAME} --namespace syft bash -c '(kubectl logs service/frontend --context k3d-${DATASITE_CLUSTER_NAME} --namespace syft -f &) | grep -q -E "Network:\s+https?://[a-zA-Z0-9.-]+:[0-9]+/" || true' - # Checking logs generated & startup of test-datasite 1 + # Checking logs generated & startup of bigquery-high bash -c '(kubectl logs service/backend --context k3d-${DATASITE_CLUSTER_NAME} --namespace syft -f &) | grep -q "Application startup complete" || true' bash -c "pytest -s -x --nbmake notebooks/scenarios/bigquery -p no:randomly --ignore=notebooks/scenarios/bigquery/sync -vvvv --nbmake-timeout=1000 --log-cli-level=DEBUG --capture=no;"