Skip to content

Commit

Permalink
BUG: Use live live query for all join queries in tables (#161)
Browse files Browse the repository at this point in the history
* Use live live query for all join queries with timestamp in tables interface

* bug fixes for tablemanager with v2 and edge cases
  • Loading branch information
ceesem authored Mar 15, 2024
1 parent 9476f04 commit 8f198f1
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 20 deletions.
16 changes: 14 additions & 2 deletions caveclient/emannotationschemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
from .base import ClientBase, _api_endpoints, handle_response
from .endpoints import schema_api_versions, schema_endpoints_common
from .auth import AuthClient
from requests import HTTPError
import logging
logger = logging.getLogger(__name__)

server_key = "emas_server_address"

Expand Down Expand Up @@ -115,7 +118,11 @@ def schema_definition_multi(self, schema_types: list[str]) -> dict:
url = self._endpoints["schema_definition_multi"].format_map(endpoint_mapping)
data={'schema_names': ','.join(schema_types)}
response = self.session.post(url, params=data)
return handle_response(response)
try:
return handle_response(response)
except HTTPError:
logger.warning('Client requested an schema service endpoint (see "schema_definition_multi") not yet available on your deployment. Please talk to your admin about updating your deployment')
return None

def schema_definition_all(self) -> dict[str]:
"""Get the definition of all schema_types
Expand All @@ -128,7 +135,12 @@ def schema_definition_all(self) -> dict[str]:
endpoint_mapping = self.default_url_mapping
url = self._endpoints["schema_definition_all"].format_map(endpoint_mapping)
response = self.session.get(url)
return handle_response(response)
try:
return handle_response(response)
except HTTPError:
logger.warning('Client requested an schema service endpoint (see "schema_definition_all") not yet available on your deployment. Please talk to your admin about updating your deployment')
return None



client_mapping = {
Expand Down
40 changes: 29 additions & 11 deletions caveclient/materializationengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import warnings
from datetime import datetime, timezone
from typing import Iterable, Optional, Union
from urllib.error import HTTPError
from requests import HTTPError

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -252,6 +252,8 @@ def __init__(
self._cg_client = cg_client
self.synapse_table = synapse_table
self.desired_resolution = desired_resolution
self._tables = None
self._views = None

@property
def datastack_name(self):
Expand All @@ -267,13 +269,13 @@ def cg_client(self):
return self._cg_client

@property
def version(self):
def version(self) -> int:
if self._version is None:
self._version = self.most_recent_version()
return self._version

@property
def homepage(self):
def homepage(self) -> HTML:
url = (
f"{self._server_address}/materialize/views/datastack/{self._datastack_name}"
)
Expand All @@ -286,6 +288,24 @@ def version(self, x):
else:
raise ValueError("Version not in materialized database")

@property
def tables(self) -> TableManager:
if self._tables is None:
if self.fc is not None and self.fc._materialize is not None:
self._tables = TableManager(self.fc)
else:
raise ValueError("No full CAVEclient specified")
return self._tables

@property
def views(self) -> ViewManager:
if self._views is None:
if self.fc is not None and self.fc._materialize is not None:
self._views = ViewManager(self.fc)
else:
raise ValueError("No full CAVEclient specified")
return self._views

def most_recent_version(self, datastack_name=None) -> int:
"""
Get the most recent version of materialization for this datastack name
Expand Down Expand Up @@ -356,7 +376,6 @@ def get_tables(self, datastack_name=None, version=None):
endpoint_mapping = self.default_url_mapping
endpoint_mapping["datastack_name"] = datastack_name
endpoint_mapping["version"] = version
# TODO fix up latest version
url = self._endpoints["tables"].format_map(endpoint_mapping)

response = self.session.get(url)
Expand Down Expand Up @@ -1890,17 +1909,16 @@ def __init__(self, *args, **kwargs):
self.get_view_schemas
)
)
tables = None
if self.fc is not None:
tables = TableManager(self.fc, metadata[0].result(), metadata[1].result())
else:
tables = None
self.tables = tables

if metadata[0].result() is not None and metadata[1].result() is not None:
tables = TableManager(self.fc, metadata[0].result(), metadata[1].result())
self._tables = tables
if self.fc is not None:
views = ViewManager(self.fc, metadata[2].result(), metadata[3].result())
else:
views = None
self.views = views
self._views = views

@cached(cache=TTLCache(maxsize=100, ttl=60 * 60 * 12))
def get_tables_metadata(
Expand Down Expand Up @@ -2510,7 +2528,7 @@ def get_unique_string_values(
client_mapping = {
2: MaterializationClientV2,
3: MaterializationClientV3,
"latest": MaterializationClientV2,
"latest": MaterializationClientV3,
}

MaterializationClientType = Union[MaterializationClientV2, MaterializationClientV3]
26 changes: 19 additions & 7 deletions caveclient/tools/table_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,8 @@ def _schema_key(schema_name, client, **kwargs):

def populate_schema_cache(client, schema_definitions=None):
if schema_definitions is None:
try:
schema_definitions = client.schema.schema_definition_all()
except:
schema_definitions = client.schema.schema_definition_all()
if schema_definitions is None:
schema_definitions = {sn:None for sn in client.schema.get_schemas()}
for schema_name, schema_definition in schema_definitions.items():
get_col_info(schema_name, client, schema_definition=schema_definition)
Expand Down Expand Up @@ -251,6 +250,9 @@ def get_table_info(
Dict mapping columns to table names
"""
ref_table = meta.get("reference_table")
if ref_table is not None:
if len(ref_table) == 0:
ref_table = None
if ref_table is None or merge_schema is False:
schema = meta["schema"]
ref_pts = []
Expand Down Expand Up @@ -463,9 +465,6 @@ def query(
desired_resolution=None,
get_counts=False,
):
logger.warning(
"The `client.materialize.tables` interface is experimental and might experience breaking changes before the feature is stabilized."
)
if self._reference_table is None:
qry_table = self._base_table
return client.materialize.query_table(
Expand All @@ -481,7 +480,10 @@ def query(
metadata=metadata,
**self.filter_kwargs_mat,
)
else:
elif timestamp is None:
logger.warning(
"The `client.materialize.tables` interface is experimental and might experience breaking changes before the feature is stabilized."
)
qry_table = self._reference_table
return client.materialize.join_query(
tables=self.basic_join,
Expand All @@ -495,6 +497,16 @@ def query(
metadata=metadata,
**self.filter_kwargs_mat,
)
else:
return self.live_query(
timestamp=timestamp,
offset=offset,
limit=limit,
split_positions=split_positions,
metadata=metadata,
desired_resolution=desired_resolution,
allow_missing_lookups=False,
)

def live_query(
self,
Expand Down

0 comments on commit 8f198f1

Please sign in to comment.