diff --git a/registry/sql-registry/.dockerignore b/registry/sql-registry/.dockerignore new file mode 100644 index 000000000..bc0ed1f7a --- /dev/null +++ b/registry/sql-registry/.dockerignore @@ -0,0 +1,3 @@ +__pycache__ +.env +.vscode diff --git a/registry/sql-registry/.gitignore b/registry/sql-registry/.gitignore new file mode 100644 index 000000000..ed2a6faed --- /dev/null +++ b/registry/sql-registry/.gitignore @@ -0,0 +1,4 @@ +__pycache__ +.env +.vscode +.idea diff --git a/registry/sql-registry/Dockerfile b/registry/sql-registry/Dockerfile new file mode 100644 index 000000000..d2647021d --- /dev/null +++ b/registry/sql-registry/Dockerfile @@ -0,0 +1,9 @@ +FROM python:3.9 + +COPY ./ /usr/src + +WORKDIR /usr/src +RUN pip install -r requirements.txt + +# Start web server +CMD [ "uvicorn","main:app","--host", "0.0.0.0", "--port", "80" ] diff --git a/registry/sql-registry/README.md b/registry/sql-registry/README.md new file mode 100644 index 000000000..f06ca7def --- /dev/null +++ b/registry/sql-registry/README.md @@ -0,0 +1,5 @@ +# SQL-Based Registry for Feathr + +This is the reference implementation of [the Feathr API spec](./api-spec.md), base on SQL databases instead of PurView. + +Please note that this implementation uses iterations of `select` to retrieve graph lineages, this approach is very inefficient and should **not** be considered as production-ready. We only suggest to use this implementation for testing/researching purposes. \ No newline at end of file diff --git a/registry/sql-registry/api-spec.md b/registry/sql-registry/api-spec.md new file mode 100644 index 000000000..1b14cae8b --- /dev/null +++ b/registry/sql-registry/api-spec.md @@ -0,0 +1,366 @@ +# Feathr Registry API Specifications + +## Data Models + +### EntityType +Type: Enum + +| Value | +|-----------------------------| +| `feathr_workspace_v1` | +| `feathr_source_v1` | +| `feathr_anchor_v1` | +| `feathr_anchor_feature_v1` | +| `feathr_derived_feature_v1` | + +### ValueType +Type: Enum + +| Value | +|---------------| +| `UNSPECIFIED` | +| `BOOL` | +| `INT32` | +| `INT64` | +| `FLOAT` | +| `DOUBLE` | +| `STRING` | +| `BYTES` | + +### VectorType +Type: Enum + +| Value | +|----------| +| `TENSOR` | + +### TensorCategory +Type: Enum + +| Value | +|----------| +| `DENSE` | +| `SPARSE` | + +### FeatureType +Type: Object + +| Field | Type | +|----------------|-------------------------------------| +| type | [`VectorType`](#valuetype) | +| tensorCategory | [`TensorCategory`](#tensorcategory) | +| dimensionType | [`array`](#valuetype) | +| valType | [`ValueType`](#valuetype) | + +### TypedKey +Type: Object + +| Field | Type | +|------------------|-----------------------------| +| key_column | `string` | +| key_column_type | [`ValueType`](#valuetype) | +| full_name | `string`, optional | +| description | `string`, optional | +| key_column_alias | `string`, optional | + +### ExpressionTransformation +Type: Object + +| Field | Type | +|----------------|----------| +| transform_expr | `string` | + +### WindowAggregationTransformation +Type: Object + +| Field | Type | +|----------|--------------------| +| def_expr | `string` | +| agg_func | `string`, optional | +| window | `string`, optional | +| group_by | `string`, optional | +| filter | `string`, optional | +| limit | `number`, optional | + +### UdfTransformation +Type: Object + +| Field | Type | +|-------|----------| +| name | `string` | + +### EntityReference +Type: Object + +| Field | Type | Comments | +|------------------|-----------------------------|--------------------------------------| +| guid | `Guid` | | +| typeName | [`EntityType`](#entitytype) | | +| uniqueAttributes | `map` | Contains `qualifiedName` only so far | + +### ProjectAttributes +Type: Object + +| Field | Type | +|------------------|----------------------------------------------| +| qualifiedName | `string` | +| name | `string` | +| anchors | [`array`](#entityreference) | +| sources | [`array`](#entityreference) | +| anchor_features | [`array`](#entityreference) | +| derived_features | [`array`](#entityreference) | +| tags | `map` | + +### SourceAttributes +Type: Object + +| Field | Type | +|----------------------|-----------------------| +| qualifiedName | `string` | +| name | `string` | +| path | `string` | +| preprocessing | `string`, optional | +| eventTimestampColumn | `string`, optional | +| timestampFormat | `string`, optional | +| type | `string` | +| tags | `map` | + +### AnchorAttributes +Type: Object + +| Field | Type | +|---------------|----------------------------------------------| +| qualifiedName | `string` | +| name | `string` | +| features | [`array`](#entityreference) | +| source | [`EntityReference`](#entityreference) | +| tags | `map` | + +### AnchorFeatureAttributes +Type: Object + +| Field | Type | +|----------------|--------------------------------| +| qualifiedName | `string` | +| name | `string` | +| type | [`FeatureType`](#featuretype) | +| transformation | [`ExpressionTransformation`](#expressiontransformation)
`or` [`WindowAggregationTransformation`](#windowaggregationtransformation)
`or` [`UdfTransformation`](#udftransformation) | +| key | [`array`](#typedkey) | +| tags | `map` | + +### DerivedFeatureAttributes +Type: Object + +| Field | Type | +|------------------------|--------------------------------| +| qualifiedName | `string` | +| name | `string` | +| type | [`FeatureType`](#featuretype) | +| transformation | [`ExpressionTransformation`](#expressiontransformation)
`or` [`WindowAggregationTransformation`](#windowaggregationtransformation)
`or` [`UdfTransformation`](#udftransformation) | +| key | [`array`](#typedkey) | +| input_anchor_features | [`array`](#entityreference) | +| input_derived_features | [`array`](#entityreference) | +| tags | `map` | + +### EntityStatus +Type: Enum + +| Value | +|----------| +| `ACTIVE` | + +### Entity +Type: Object + +| Field | Type | +|----------------|---------------------------------| +| guid | `Guid` | +| lastModifiedTS | `string` | +| status | [`EntityStatus`](#entitystatus) | +| displayText | `string` | +| typeName | [`EntityType`](#entitytype) | +| attributes | [`ProjectAttributes`](#projectattributes)
`or` [`SourceAttributes`](#sourceattributes)
`or` [`AnchorAttributes`](#anchorattributes)
`or` [`AnchorFeatureAttributes`](#anchorfeatureattributes)
`or` [`DerivedFeatureAttributes`](#derivedfeatureattributes) | + +### RelationshipType +Type: Enum + +| Value | +|-------------| +| `BelongsTo` | +| `Contains` | +| `Produces` | +| `Consumes` | + +### Relationship +Type: Object + +| Field | Type | +|------------------|-----------------------------------------| +| relationshipId | `Guid` | +| relationshipType | [`RelationshipType`](#relationshiptype) | +| fromEntityId | `Guid` | +| toEntityId | `Guid` | + +### ProjectDefinition +Type: Object + +| Field | Type | +|----------------------|-----------------------| +| qualifiedName | `string` | +| tags | `map` | + + +### SourceDefinition +Type: Object + +| Field | Type | +|----------------------|-----------------------| +| qualifiedName | `string` | +| name | `string` | +| path | `string` | +| preprocessing | `string`, optional | +| eventTimestampColumn | `string`, optional | +| timestampFormat | `string`, optional | +| type | `string` | +| tags | `map` | + +### AnchorDefinition +Type: Object + +| Field | Type | +|----------------------|-----------------------| +| qualifiedName | `string` | +| name | `string` | +| source_id | `Guid` | +| tags | `map` | + +### AnchorFeatureDefinition +Type: Object + +| Field | Type | +|----------------|--------------------------------| +| qualifiedName | `string` | +| name | `string` | +| featureType | [`FeatureType`](#featuretype) | +| transformation | [`ExpressionTransformation`](#expressiontransformation)
`or` [`WindowAggregationTransformation`](#windowaggregationtransformation)
`or` [`UdfTransformation`](#udftransformation) | +| key | [`array`](#typedkey) | +| tags | `map` | + +### DerivedFeatureDefinition +Type: Object + +| Field | Type | +|------------------------|--------------------------------| +| qualifiedName | `string` | +| name | `string` | +| featureType | [`FeatureType`](#featuretype) | +| transformation | [`ExpressionTransformation`](#expressiontransformation)
`or` [`WindowAggregationTransformation`](#windowaggregationtransformation)
`or` [`UdfTransformation`](#udftransformation) | +| key | [`array`](#typedkey) | +| input_anchor_features | `array` | +| input_derived_features | `array` | +| tags | `map` | + + +### EntitiesAndRelationships +Type: Object + +| Field | Type | +|---------------|----------------------------------------| +| guidEntityMap | [`map`](#entity) | +| relations | [`array`](#relationship) | + + +## Feathr Registry API + +### `GET /projects` +List **names** of all projects. + +Response Type: `array` + +### `GET /projects/{project}` +Get everything defined in the project + +Response Type: [`EntitiesAndRelationships`](#entitiesandrelationships) + +### `GET /projects/{project}/datasources` +Get all sources defined in the project. + +Response Type: [`array`](#entity) + +### `GET /projects/{project}/features` +Get all anchor features and derived features in the project, or only features meet the search criteria in the project. + +Query Parameters: + +| Field | Type | +|---------|--------| +| keyword | string | +| size | number | +| offset | number | + + +Response Type: Object + +| Field | Type | +|----------|----------------------------| +| features | [`array`](#entity) | + +### `GET /features/:feature` +Get feature details. + +Response Type: Object + +| Field | Type | Comments | +|-----------------|-----------------------|-----------------------------| +| entity | [`Entity`](#entity) | | +| referredEntities| `map` | For compatibility, not used | + +### `POST /projects` +Create new project + ++ Request Type: [`ProjectDefinition`](#projectdefinition) ++ Response Type: Object + +| Field | Type | +|-------|------| +| guid | Guid | + +### `POST /projects/{project}/datasources` +Create new source in the project + ++ Request Type: [`SourceDefinition`](#sourcedefinition) ++ Response Type: Object + +| Field | Type | +|-------|------| +| guid | Guid | + +### `POST /projects/{project}/anchors` +Create new anchor in the project + ++ Request Type: [`AnchorDefinition`](#anchordefinition) ++ Response Type: Object + +| Field | Type | +|-------|------| +| guid | Guid | + +### `POST /projects/{project}/anchors/{anchor}/features` +Create new anchor feature in the project under specified anchor + ++ Request Type: [`AnchorFeatureDefinition`](#anchorfeaturedefinition) ++ Response Type: Object + +| Field | Type | +|-------|------| +| guid | Guid | + +### `POST /projects/{project}/derivedfeatures` +Create new derived feature in the project + ++ Request Type: [`DerivedFeatureDefinition`](#derivedfeaturedefinition) ++ Response Type: Object + +| Field | Type | +|-------|------| +| guid | Guid | diff --git a/registry/sql-registry/main.py b/registry/sql-registry/main.py new file mode 100644 index 000000000..a40fae89c --- /dev/null +++ b/registry/sql-registry/main.py @@ -0,0 +1,77 @@ +import os +from typing import Optional +from fastapi import APIRouter, FastAPI, HTTPException +from starlette.middleware.cors import CORSMiddleware +from registry import * +from registry.db_registry import DbRegistry +from registry.models import EntityType + +rp = "/" +try: + rp = os.environ["API_BASE"] + if rp[0] != '/': + rp = '/' + rp +except: + pass +print("Using API BASE: ", rp) + +registry = DbRegistry() +app = FastAPI() +router = APIRouter() + +# Enables CORS +app.add_middleware(CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +@router.get("/projects") +def get_projects() -> list[str]: + return registry.get_projects() + + +@router.get("/projects/{project}") +def get_projects(project: str) -> dict: + return registry.get_project(project).to_dict() + + +@router.get("/projects/{project}/datasources") +def get_project_datasources(project: str) -> list: + p = registry.get_entity(project) + source_ids = [s.id for s in p.attributes.sources] + sources = registry.get_entities(source_ids) + return list([e.to_dict() for e in sources]) + + +@router.get("/projects/{project}/features") +def get_project_features(project: str, keyword: Optional[str] = None) -> list: + if keyword is None: + p = registry.get_entity(project) + feature_ids = [s.id for s in p.attributes.anchor_features] + \ + [s.id for s in p.attributes.derived_features] + features = registry.get_entities(feature_ids) + return list([e.to_dict() for e in features]) + else: + efs = registry.search_entity(keyword, [EntityType.AnchorFeature, EntityType.DerivedFeature]) + feature_ids = [ef.id for ef in efs] + features = registry.get_entities(feature_ids) + return list([e.to_dict() for e in features]) + + +@router.get("/features/{feature}") +def get_feature(feature: str) -> dict: + e = registry.get_entity(feature) + if e.entity_type not in [EntityType.DerivedFeature, EntityType.AnchorFeature]: + raise HTTPException(status_code=404, detail=f"Feature {feature} not found") + return e + + +@router.get("/features/{feature}/lineage") +def get_feature_lineage(feature: str) -> dict: + lineage = registry.get_lineage(feature) + return lineage.to_dict() + + +app.include_router(prefix = rp, router=router) diff --git a/registry/sql-registry/registry/__init__.py b/registry/sql-registry/registry/__init__.py new file mode 100644 index 000000000..5ce157408 --- /dev/null +++ b/registry/sql-registry/registry/__init__.py @@ -0,0 +1,6 @@ +__all__ = ["interface", "models", "database", "db_registry"] + +from registry.models import * +from registry.interface import Registry +from registry.database import DbConnection, connect +from registry.db_registry import DbRegistry \ No newline at end of file diff --git a/registry/sql-registry/registry/database.py b/registry/sql-registry/registry/database.py new file mode 100644 index 000000000..d82568972 --- /dev/null +++ b/registry/sql-registry/registry/database.py @@ -0,0 +1,85 @@ +from abc import ABC, abstractmethod +import threading +from distutils.log import debug, warn +import os +import pymssql + + +providers = [] + +class DbConnection(ABC): + @abstractmethod + def execute(self, sql: str, *args, **kwargs) -> list[dict]: + pass + +def quote(id): + if isinstance(id, str): + return f"'{id}'" + else: + return ",".join([f"'{i}'" for i in id]) + + +def parse_conn_str(s: str) -> dict: + """ + TODO: Not a sound and safe implementation, but useful enough in this case + as the connection string is provided by users themselves. + """ + parts = dict([p.strip().split("=", 1) + for p in s.split(";") if len(p.strip()) > 0]) + server = parts["Server"].split(":")[1].split(",")[0] + return { + "host": server, + "database": parts["Initial Catalog"], + "user": parts["User ID"], + "password": parts["Password"], + # "charset": "utf-8", ## For unknown reason this causes connection failure + } + + +class MssqlConnection(DbConnection): + @staticmethod + def connect(*args, **kwargs): + conn_str = os.environ["CONNECTION_STR"] + if "Server=" not in conn_str: + debug("`CONNECTION_STR` is not in ADO connection string format") + return None + return MssqlConnection(parse_conn_str(conn_str)) + + def __init__(self, params): + self.params = params + self.make_connection() + self.mutex = threading.Lock() + + def make_connection(self): + self.conn = pymssql.connect(**self.params) + + def execute(self, sql: str, *args, **kwargs) -> list[dict]: + debug(f"SQL: `{sql}`") + # NOTE: Only one cursor is allowed at the same time + retry = 0 + while True: + try: + with self.mutex: + c = self.conn.cursor(as_dict=True) + c.execute(sql, *args, **kwargs) + return c.fetchall() + except pymssql.OperationalError: + warn("Database error, retrying...") + # Reconnect + self.make_connection() + retry += 1 + if retry >= 3: + # Stop retrying + raise + pass + + +providers.append(MssqlConnection) + + +def connect(): + for p in providers: + ret = p.connect() + if ret is not None: + return ret + raise RuntimeError("Cannot connect to database") \ No newline at end of file diff --git a/registry/sql-registry/registry/db_registry.py b/registry/sql-registry/registry/db_registry.py new file mode 100644 index 000000000..f5456c5e5 --- /dev/null +++ b/registry/sql-registry/registry/db_registry.py @@ -0,0 +1,194 @@ +from typing import Optional, Tuple, Union +from uuid import UUID +from registry import Registry +from registry import connect +from registry.models import Edge, EntitiesAndRelations, Entity, EntityRef, EntityType, RelationshipType, _to_type, _to_uuid +import json + + +def quote(id): + if isinstance(id, str): + return f"'{id}'" + else: + return ",".join([f"'{i}'" for i in id]) + + +class DbRegistry(Registry): + def __init__(self): + self.conn = connect() + + def get_projects(self) -> list[str]: + ret = self.conn.execute( + f"select qualified_name from entities where entity_type='{EntityType.Project}'") + return list([r["qualified_name"] for r in ret]) + + def get_entity(self, id_or_name: Union[str, UUID]) -> Entity: + return self._fill_entity(self._get_entity(id_or_name)) + + def get_entities(self, ids: list[UUID]) -> list[Entity]: + return list([self._fill_entity(e) for e in self._get_entities(ids)]) + + def get_entity_id(self, id_or_name: Union[str, UUID]) -> UUID: + try: + id = _to_uuid(id_or_name) + return id + except ValueError: + pass + # It is a name + ret = self.conn.execute( + f"select entity_id from entities where qualified_name='{id_or_name}'") + return ret[0]["entity_id"] + + def get_neighbors(self, id_or_name: Union[str, UUID], relationship: RelationshipType) -> list[Edge]: + rows = self.conn.execute(fr''' + select edge_id, from_id, to_id, conn_type + from edges + where from_id = '{self.get_entity_id(id_or_name)}' + and conn_type = '{relationship.name}' + ''') + return list([Edge(**row) for row in rows]) + + def get_lineage(self, id_or_name: Union[str, UUID]) -> EntitiesAndRelations: + """ + Get feature lineage on both upstream and downstream + Returns [entity_id:entity] map and list of edges have been traversed. + """ + id = self.get_entity_id(id_or_name) + upstream_entities, upstream_edges = self._bfs( + id, RelationshipType.Consumes) + downstream_entities, downstream_edges = self._bfs( + id, RelationshipType.Produces) + return EntitiesAndRelations( + upstream_entities + downstream_entities, + upstream_edges + downstream_edges) + + def get_project(self, id_or_name: Union[str, UUID]) -> EntitiesAndRelations: + """ + This function returns not only the project itself, but also everything in the project + """ + project = self._get_entity(id_or_name) + edges = set(self.get_neighbors(id_or_name, RelationshipType.Contains)) + ids = list([e.to_id for e in edges]) + children = self._get_entities(ids) + child_map = dict([(e.id, e) for e in children]) + project.attributes.children = children + for anchor in project.attributes.anchors: + conn = self.get_neighbors(anchor.id, RelationshipType.Contains) + feature_ids = [e.to_id for e in conn] + edges = edges.union(conn) + features = list([child_map[id] for id in feature_ids]) + anchor.attributes.features = features + source_id = self.get_neighbors(anchor.id, RelationshipType.Consumes)[0].to_id + anchor.attributes.source = child_map[source_id] + for df in project.attributes.derived_features: + conn = self.get_neighbors(anchor.id, RelationshipType.Consumes) + input_ids = [e.to_id for e in conn] + edges = edges.union(conn) + features = list([child_map[id] for id in input_ids]) + df.attributes.input_features = features + all_edges = self._get_edges(ids) + return EntitiesAndRelations([project] + children, list(edges.union(all_edges))) + + def _fill_entity(self, e: Entity) -> Entity: + """ + Entities in the DB contains only attributes belong to itself, but the returned + data model contains connections/contents, so we need to fill this gap + """ + if e.entity_type == EntityType.Project: + edges = self.get_neighbors(e.id, RelationshipType.Contains) + ids = list([e.to_id for e in edges]) + children = self._get_entities(ids) + e.attributes.children = children + return e + if e.entity_type == EntityType.Anchor: + conn = self.get_neighbors(e.id, RelationshipType.Contains) + feature_ids = [e.to_id for e in conn] + features = self._get_entities(feature_ids) + e.attributes.features = features + source_id = self.get_neighbors(e.id, RelationshipType.Consumes)[0].to_id + source = self.get_entity(source_id) + e.attributes.source = source + return e + if e.entity_type == EntityType.DerivedFeature: + conn = self.get_neighbors(e.id, RelationshipType.Consumes) + feature_ids = [e.to_id for e in conn] + features = self._get_entities(feature_ids) + e.attributes.input_features = features + return e + return e + + def _get_edges(self, ids: list[UUID], types: list[RelationshipType] = []) -> list[Edge]: + sql = fr"""select edge_id, from_id, to_id, conn_type from edges + where from_id in ({quote(ids)}) + and to_id in ({quote(ids)})""" + if len(types)>0: + sql = fr"""select edge_id, from_id, to_id, conn_type from edges + where conn_type in ({quote(types)}) + and from_id in ({quote(ids)}) + and to_id in ({quote(ids)})""" + rows = self.conn.execute(sql) + return list([_to_type(row, Edge) for row in rows]) + + def _get_entity(self, id_or_name: Union[str, UUID]) -> Entity: + row = self.conn.execute(fr''' + select entity_id, qualified_name, entity_type, attributes + from entities + where entity_id = '{self.get_entity_id(id_or_name)}' + ''')[0] + row["attributes"] = json.loads(row["attributes"]) + return _to_type(row, Entity) + + def _get_entities(self, ids: list[UUID]) -> list[Entity]: + rows = self.conn.execute(fr''' + select entity_id, qualified_name, entity_type, attributes + from entities + where entity_id in ({quote(ids)}) + ''') + ret = [] + for row in rows: + row["attributes"] = json.loads(row["attributes"]) + ret.append(Entity(**row)) + return ret + + def _bfs(self, id: UUID, conn_type: RelationshipType) -> Tuple[list[Entity], list[Edge]]: + """ + Breadth first traversal + Starts from `id`, follow edges with `conn_type` only. + + WARN: There is no depth limit. + """ + connections = [] + to_ids = [{ + "to_id": id, + }] + # BFS over SQL + while len(to_ids) != 0: + to_ids = self._bfs_step(to_ids, conn_type) + connections.extend(to_ids) + ids = set([id]) + for r in connections: + ids.add(r["from_id"]) + ids.add(r["to_id"]) + entities = self.get_entities(ids) + edges = list([Edge(**c) for c in connections]) + return (entities, edges) + + def _bfs_step(self, ids: list[UUID], conn_type: RelationshipType) -> set[dict]: + """ + One step of the BFS process + Returns all edges that connect to node ids the next step + """ + ids = list([id["to_id"] for id in ids]) + sql = fr"""select edge_id, from_id, to_id, conn_type from edges where conn_type = '{conn_type.name}' and from_id in ({quote(ids)})""" + return self.conn.execute(sql) + + def search_entity(self, + keyword: str, + type: list[EntityType]) -> list[EntityRef]: + """ + WARN: This search function is implemented via `like` operator, which could be extremely slow. + """ + types = ",".join([quote(str(t)) for t in type]) + sql = fr'''select entity_id as id, qualified_name, entity_type as type from entities where qualified_name like %s and entity_type in ({types})''' + rows = self.conn.execute(sql, ('%' + keyword + '%', )) + return list([EntityRef(**row) for row in rows]) diff --git a/registry/sql-registry/registry/interface.py b/registry/sql-registry/registry/interface.py new file mode 100644 index 000000000..406c52ace --- /dev/null +++ b/registry/sql-registry/registry/interface.py @@ -0,0 +1,69 @@ +from abc import ABC, abstractmethod +from typing import Union +from uuid import UUID +from registry.database import DbConnection + +from registry.models import * + +class Registry(ABC): + @abstractmethod + def get_projects(self) -> list[str]: + """ + Returns the names of all projects + """ + pass + + @abstractmethod + def get_entity(self, id_or_name: Union[str, UUID]) -> Entity: + """ + Get one entity by its id or qualified name + """ + pass + + @abstractmethod + def get_entities(self, ids: list[UUID]) -> list[Entity]: + """ + Get list of entities by their ids + """ + pass + + @abstractmethod + def get_entity_id(self, id_or_name: Union[str, UUID]) -> UUID: + """ + Get entity id by its name + """ + pass + + @abstractmethod + def get_neighbors(self, id_or_name: Union[str, UUID], relationship: RelationshipType) -> list[Edge]: + """ + Get list of edges with specified type that connect to this entity. + The edge contains fromId and toId so we can follow to the entity it connects to + """ + pass + + @abstractmethod + def get_lineage(self, id_or_name: Union[str, UUID]) -> EntitiesAndRelations: + """ + Get all the upstream and downstream entities of an entity, along with all edges connect them. + Only meaningful to features and data sources. + """ + pass + + @abstractmethod + def get_project(self, id_or_name: Union[str, UUID]) -> EntitiesAndRelations: + """ + Get a project and everything inside of it, both entities and edges + """ + pass + + @abstractmethod + def search_entity(self, + keyword: str, + type: list[EntityType], + project: Optional[Union[str, UUID]] = None) -> list[EntityRef]: + """ + Search entities with specified type that also match the keyword in a project + """ + pass + diff --git a/registry/sql-registry/registry/models.py b/registry/sql-registry/registry/models.py new file mode 100644 index 000000000..3c08d2692 --- /dev/null +++ b/registry/sql-registry/registry/models.py @@ -0,0 +1,728 @@ +from abc import ABC, abstractmethod +from enum import Enum +from typing import Optional, Union +from uuid import UUID +import json +import re + + +def _to_snake(d, level: int = 0): + """ + Convert `string`, `list[string]`, or all keys in a `dict` into snake case + The maximum length of input string or list is 100, or it will be truncated before being processed, for dict, the exception will be thrown if it has more than 100 keys. + the maximum nested level is 10, otherwise the exception will be thrown + """ + if level >= 10: + raise ValueError("Too many nested levels") + if isinstance(d, str): + d = d[:100] + return re.sub(r'([A-Z]\w+$)', r'_\1', d).lower() + if isinstance(d, list): + d = d[:100] + return [_to_snake(i, level + 1) if isinstance(i, (dict, list)) else i for i in d] + if len(d) > 100: + raise ValueError("Dict has too many keys") + return {_to_snake(a, level + 1): _to_snake(b, level + 1) if isinstance(b, (dict, list)) else b for a, b in d.items()} + + +def _to_type(value, type): + """ + Convert `value` into `type`, + or `list[type]` if `value` is a list + NOTE: This is **not** a generic implementation, only for objects in this module + """ + if isinstance(value, type): + return value + if isinstance(value, list): + return list([_to_type(v, type) for v in value]) + if isinstance(value, dict): + if hasattr(type, "new"): + try: + # The convention is to use `new` method to create the object from a dict + return type.new(**_to_snake(value)) + except TypeError: + pass + return type(**_to_snake(value)) + if issubclass(type, Enum): + try: + n = int(value) + return type(n) + except ValueError: + pass + if hasattr(type, "new"): + try: + # As well as Enum types, some of them have alias that cannot be handled by default Enum constructor + return type.new(value) + except KeyError: + pass + return type[value] + return type(value) + + +def _to_uuid(value): + return _to_type(value, UUID) + + +class ValueType(Enum): + UNSPECIFIED = 0 + BOOLEAN = 1 + INT = 2 + LONG = 3 + FLOAT = 4 + DOUBLE = 5 + STRING = 6 + BYTES = 7 + + +class VectorType(Enum): + TENSOR = 0 + + +class TensorCategory(Enum): + DENSE = 0 + SPARSE = 1 + + +class EntityType(Enum): + Project = 1 + Source = 2 + Anchor = 3 + AnchorFeature = 4 + DerivedFeature = 5 + + @staticmethod + def new(v): + return { + "feathr_workspace_v1": EntityType.Project, + "feathr_source_v1": EntityType.Source, + "feathr_anchor_v1": EntityType.Anchor, + "feathr_anchor_feature_v1": EntityType.AnchorFeature, + "feathr_derived_feature_v1": EntityType.DerivedFeature, + }[v] + + def __str__(self): + return { + EntityType.Project: "feathr_workspace_v1", + EntityType.Source: "feathr_source_v1", + EntityType.Anchor: "feathr_anchor_v1", + EntityType.AnchorFeature: "feathr_anchor_feature_v1", + EntityType.DerivedFeature: "feathr_derived_feature_v1", + }[self] + + +class RelationshipType(Enum): + Contains = 1 + BelongsTo = 2 + Consumes = 3 + Produces = 4 + + +class ToDict(ABC): + """ + This ABC is used to convert object to dict, then JSON. + """ + @abstractmethod + def to_dict(self) -> dict: + pass + + def to_json(self, indent=None) -> str: + return json.dumps(self.to_dict(), indent=indent) + + +class FeatureType(ToDict): + def __init__(self, + type: Union[str, VectorType], + tensor_category: Union[str, TensorCategory], + dimension_type: list[Union[str, ValueType]], + val_type: Union[str, ValueType]): + self.type = _to_type(type, VectorType) + self.tensor_category = _to_type(tensor_category, TensorCategory) + self.dimension_type = _to_type(dimension_type, ValueType) + self.val_type = _to_type(val_type, ValueType) + + def to_dict(self) -> dict: + return { + "type": self.type.name, + "tensorCategory": self.tensor_category.name, + "dimensionType": [t.name for t in self.dimension_type], + "valType": self.val_type.name, + } + + +class TypedKey(ToDict): + def __init__(self, + key_column: str, + key_column_type: ValueType, + full_name: Optional[str] = None, + description: Optional[str] = None, + key_column_alias: Optional[str] = None): + self.key_column = key_column + self.key_column_type = _to_type(key_column_type, ValueType) + self.full_name = full_name + self.description = description + self.key_column_alias = key_column_alias + + def to_dict(self) -> dict: + ret = { + "key_column": self.key_column, + "key_column_type": self.key_column_type.name, + } + if self.full_name is not None: + ret["full_name"] = self.full_name + if self.description is not None: + ret["description"] = self.full_name + if self.key_column_alias is not None: + ret["key_column_alias"] = self.key_column_alias + return ret + + +class Transformation(ToDict): + @staticmethod + def new(**kwargs): + if "transform_expr" in kwargs: + return ExpressionTransformation(**kwargs) + elif "def_expr" in kwargs: + return WindowAggregationTransformation(**kwargs) + elif "name" in kwargs: + return UdfTransformation(**kwargs) + else: + raise ValueError(kwargs) + + +class ExpressionTransformation(Transformation): + def __init__(self, transform_expr: str): + self.transform_expr = transform_expr + + def to_dict(self) -> dict: + return { + "transform_expr": self.transform_expr + } + + +class WindowAggregationTransformation(Transformation): + def __init__(self, + def_expr: str, + agg_func: Optional[str] = None, + window: Optional[str] = None, + group_by: Optional[str] = None, + filter: Optional[str] = None, + limit: Optional[int] = None): + self.def_expr = def_expr + self.agg_func = agg_func + self.window = window + self.group_by = group_by + self.filter = filter + self.limit = limit + + def to_dict(self) -> dict: + ret = { + "def_expr": self.def_expr, + } + if self.agg_func is not None: + ret["agg_func"] = self.agg_func + if self.window is not None: + ret["window"] = self.window + if self.group_by is not None: + ret["group_by"] = self.group_by + if self.filter is not None: + ret["filter"] = self.filter + if self.limit is not None: + ret["limit"] = self.limit + return ret + + +class UdfTransformation(Transformation): + def __init__(self, name: str): + self.name = name + + def to_dict(self) -> dict: + return { + "name": self.name + } + + +class EntityRef(ToDict): + def __init__(self, + id: UUID, + type: Union[str, EntityType], + qualified_name: Optional[str] = None, + uniq_attr: dict = {}): + self.id = id + self.type = _to_type(type, EntityType) + if qualified_name is not None: + self.uniq_attr = {"qualifiedName": qualified_name} + else: + self.uniq_attr = uniq_attr + + @property + def entity_type(self) -> EntityType: + return self.type + + @property + def qualified_name(self) -> EntityType: + return self.uniq_attr['qualifiedName'] + + def get_ref(self): + return self + + def to_dict(self) -> dict: + return { + "guid": str(self.id), + "typeName": str(self.type), + "uniqueAttributes": self.uniq_attr, + } + + +class Attributes(ToDict): + @staticmethod + def new(entity_type: Union[str, EntityType], **kwargs): + return { + EntityType.Project: ProjectAttributes, + EntityType.Source: SourceAttributes, + EntityType.Anchor: AnchorAttributes, + EntityType.AnchorFeature: AnchorFeatureAttributes, + EntityType.DerivedFeature: DerivedFeatureAttributes, + }[_to_type(entity_type, EntityType)](**kwargs) + + +class Entity(ToDict): + def __init__(self, + entity_id: Union[str, UUID], + qualified_name: str, + entity_type: Union[str, EntityType], + attributes: Union[dict, Attributes], + **kwargs): + self.id = _to_uuid(entity_id) + self.qualified_name = qualified_name + self.entity_type = _to_type(entity_type, EntityType) + if isinstance(attributes, Attributes): + self.attributes = attributes + else: + self.attributes = Attributes.new( + entity_type, **_to_snake(attributes)) + + def get_ref(self) -> EntityRef: + return EntityRef(self.id, + self.attributes.entity_type, + self.qualified_name) + + def to_dict(self) -> dict: + return { + "guid": str(self.id), + "lastModifiedTS": "1", + "status": "ACTIVE", + "displayText": self.attributes.name, + "typeName": str(self.attributes.entity_type), + "attributes": self.attributes.to_dict(), + } + + +class ProjectAttributes(Attributes): + def __init__(self, + name: str, + children: list[Union[dict, Entity]] = [], + tags: dict = {}, + **kwargs): + self.name = name + self.tags = tags + self._children = [] + if len(children) > 0: + self.children = children + + @property + def entity_type(self) -> EntityType: + return EntityType.Project + + @property + def children(self): + return self._children + + @children.setter + def children(self, v: list[Union[dict, Entity]]): + for f in v: + if isinstance(f, Entity): + self._children.append(f) + elif isinstance(f, dict): + self._children.append(_to_type(f, Entity)) + else: + raise TypeError(f) + + @property + def sources(self): + return [ + e for e in self.children if e.entity_type == EntityType.Source] + + @property + def anchors(self): + return [ + e for e in self.children if e.entity_type == EntityType.Anchor] + + @property + def anchor_features(self): + return [ + e for e in self.children if e.entity_type == EntityType.AnchorFeature] + + @property + def derived_features(self): + return [ + e for e in self.children if e.entity_type == EntityType.DerivedFeature] + + def to_dict(self) -> dict: + return { + "qualifiedName": self.name, + "name": self.name, + "sources": list([e.get_ref().to_dict() for e in self.sources]), + "anchors": list([e.get_ref().to_dict() for e in self.anchors]), + "anchor_features": list([e.get_ref().to_dict() for e in self.anchor_features]), + "derived_features": list([e.get_ref().to_dict() for e in self.derived_features]), + "tags": self.tags, + } + + +class SourceAttributes(Attributes): + def __init__(self, + qualified_name: str, + name: str, + type: str, + path: str, + preprocessing: Optional[str] = None, + event_timestamp_column: Optional[str] = None, + timestamp_format: Optional[str] = None, + tags: dict = {}): + self.qualified_name = qualified_name + self.name = name + self.type = type + self.path = path + self.preprocessing = preprocessing + self.event_timestamp_column = event_timestamp_column + self.timestamp_format = timestamp_format + self.tags = tags + + @property + def entity_type(self) -> EntityType: + return EntityType.Source + + def to_dict(self) -> dict: + ret = { + "qualifiedName": self.qualified_name, + "name": self.name, + "type": self.type, + "path": self.path, + "tags": self.tags, + } + if self.preprocessing is not None: + ret["preprocessing"] = self.preprocessing + if self.event_timestamp_column is not None: + ret["eventTimestampColumn"] = self.event_timestamp_column + if self.timestamp_format is not None: + ret["timestampFormat"] = self.timestamp_format + return ret + + +class AnchorAttributes(Attributes): + def __init__(self, + qualified_name: str, + name: str, + # source: Optional[Union[dict, EntityRef, Entity]] = None, + # features: list[Union[dict, EntityRef, Entity]] = [], + tags: dict = {}, + **kwargs): + self.qualified_name = qualified_name + self.name = name + self._source = None + self._features = [] + # if source is not None: + # self._source = source.get_ref() + # if len(features)>0: + # self._set_feature(features) + self.tags = tags + + @property + def entity_type(self) -> EntityType: + return EntityType.Anchor + + @property + def source(self) -> EntityRef: + return self._source + + @source.setter + def source(self, s): + if isinstance(s, Entity): + self._source = s.get_ref() + elif isinstance(s, EntityRef): + self._source = s + elif isinstance(s, dict): + self._source = _to_type(s, Entity).get_ref() + else: + raise TypeError(s) + + @property + def features(self): + return self._features + + @features.setter + def features(self, features): + self._features = [] + for f in features: + if isinstance(f, Entity): + self._features.append(f.get_ref()) + elif isinstance(f, EntityRef): + self._features.append(f) + elif isinstance(f, dict): + self._features.append(_to_type(f, Entity).get_ref()) + else: + raise TypeError(f) + + def to_dict(self) -> dict: + ret = { + "qualifiedName": self.qualified_name, + "name": self.name, + "features": list([e.get_ref().to_dict() for e in self.features]), + "tags": self.tags, + } + if self.source is not None: + ret["source"] = self.source.get_ref().to_dict() + return ret + + +class AnchorFeatureAttributes(Attributes): + def __init__(self, + qualified_name: str, + name: str, + type: Union[dict, FeatureType], + transformation: Union[dict, Transformation], + key: list[Union[dict, TypedKey]], + tags: dict = {}): + self.qualified_name = qualified_name + self.name = name + self.type = _to_type(type, FeatureType) + self.transformation = _to_type(transformation, Transformation) + self.key = _to_type(key, TypedKey) + self.tags = tags + + @property + def entity_type(self) -> EntityType: + return EntityType.AnchorFeature + + def to_dict(self) -> dict: + return { + "qualifiedName": self.qualified_name, + "name": self.name, + "type": self.type.to_dict(), + "transformation": self.transformation.to_dict(), + "key": list([k.to_dict() for k in self.key]), + "tags": self.tags, + } + + +class DerivedFeatureAttributes(Attributes): + def __init__(self, + qualified_name: str, + name: str, + type: Union[dict, FeatureType], + transformation: Union[dict, Transformation], + key: list[Union[dict, TypedKey]], + # input_anchor_features: list[Union[dict, EntityRef, Entity]] = [], + # input_derived_features: list[Union[dict, EntityRef, Entity]] = [], + tags: dict = {}, + **kwargs): + self.qualified_name = qualified_name + self.name = name + self.type = _to_type(type, FeatureType) + self.transformation = _to_type(transformation, Transformation) + self.key = _to_type(key, TypedKey) + self._input_anchor_features = [] + self._input_derived_features = [] + self.tags = tags + # self._set_input_anchor_features(input_anchor_features) + # self._set_input_derived_features(input_derived_features) + + @property + def entity_type(self) -> EntityType: + return EntityType.DerivedFeature + + @property + def input_features(self): + return self._input_anchor_features + self._input_derived_features + + @input_features.setter + def input_features(self, v: Union[dict, Entity]): + self._input_anchor_features = [] + self._input_derived_features = [] + for f in v: + e = None + if isinstance(f, Entity): + e = f + elif isinstance(f, dict): + e = _to_type(f, Entity) + else: + raise TypeError(f) + + if e.entity_type == EntityType.AnchorFeature: + self._input_anchor_features.append(e) + elif e.entity_type == EntityType.DerivedFeature: + self._input_derived_features.append(e) + else: + pass + + @property + def input_anchor_features(self): + return self._input_anchor_features + + # @input_anchor_features.setter + # def input_anchor_features(self, v): + # self._input_anchor_features = [] + # for f in v: + # if isinstance(f, Entity): + # self._input_anchor_features.append(f.get_ref()) + # elif isinstance(f, EntityRef): + # self._input_anchor_features.append(f) + # elif isinstance(f, dict): + # self._input_anchor_features.append( + # to_type(f, Entity).get_ref()) + # else: + # raise TypeError(f) + + @property + def input_derived_features(self): + return self._input_derived_features + + # @input_derived_features.setter + # def input_derived_features(self, v): + # self._input_derived_features = [] + # for f in v: + # if isinstance(f, Entity): + # self._input_derived_features.append(f.get_ref()) + # elif isinstance(f, EntityRef): + # self._input_derived_features.append(f) + # elif isinstance(f, dict): + # self._input_derived_features.append( + # to_type(f, Entity).get_ref()) + # else: + # raise TypeError(f) + + def to_dict(self) -> dict: + return { + "qualifiedName": self.qualified_name, + "name": self.name, + "type": self.type.to_dict(), + "transformation": self.transformation.to_dict(), + "key": list([k.to_dict() for k in self.key]), + "input_anchor_features": [e.get_ref().to_dict() for e in self.input_anchor_features], + "input_derived_features": [e.get_ref().to_dict() for e in self.input_derived_features], + "tags": self.tags, + } + + +class Edge(ToDict): + def __init__(self, + edge_id: Union[str, UUID], + from_id: Union[str, UUID], + to_id: Union[str, UUID], + conn_type: Union[str, RelationshipType]): + self.id = _to_uuid(edge_id) + self.from_id = _to_uuid(from_id) + self.to_id = _to_uuid(to_id) + self.conn_type = _to_type(conn_type, RelationshipType) + + def __eq__(self, o: object) -> bool: + # Edge ID is kinda useless + return self.from_id == o.from_id and self.to_id == o.to_id and self.conn_type == o.conn_type + + def __hash__(self) -> int: + return hash((self.from_id, self.to_id, self.conn_type)) + + def to_dict(self) -> dict: + return { + "relationshipId": str(self.id), + "fromEntityId": str(self.from_id), + "toEntityId": str(self.to_id), + "relationshipType": self.conn_type.name, + } + + +class EntitiesAndRelations(ToDict): + def __init__(self, entities: list[Entity], edges: list[Edge]): + self.entities = dict([(e.id, e) for e in entities]) + self.edges = set(edges) + + def to_dict(self) -> dict: + return { + "guidEntityMap": dict([(str(id), self.entities[id].to_dict()) for id in self.entities]), + "relations": list([e.to_dict() for e in self.edges]), + } + + +class ProjectDef: + def __init__(self, qualified_name: str, tags: dict = {}): + self.qualified_name = qualified_name + self.name = qualified_name + self.tags = tags + + +class SourceDef: + def __init__(self, + qualified_name: str, + name: str, + path: str, + type: str, + preprocessing: Optional[str] = None, + event_timestamp_column: Optional[str] = None, + timestamp_format: Optional[str] = None, + tags: dict = {}): + self.qualified_name = qualified_name + self.name = name + self.path = path + self.type = type + self.preprocessing = preprocessing + self.event_timestamp_column = event_timestamp_column + self.timestamp_format = timestamp_format + self.tags = tags + + +class AnchorDef: + def __init__(self, + qualified_name: str, + name: str, + source_id: Union[str, UUID], + tags: dict = {}): + self.qualified_name = qualified_name + self.name = name + self.source_id = _to_uuid(source_id) + self.tags = tags + + +class AnchorFeatureDef: + def __init__(self, + qualified_name: str, + name: str, + feature_type: Union[dict, FeatureType], + transformation: Union[dict, Transformation], + key: list[Union[dict, TypedKey]], + tags: dict = {}): + self.qualified_name = qualified_name + self.name = name + self.feature_type = _to_type(feature_type, FeatureType) + self.transformation = _to_type(transformation, Transformation) + self.key = _to_type(key, TypedKey) + self.tags = tags + + +class DerivedFeatureDef: + def __init__(self, + qualified_name: str, + name: str, + feature_type: Union[dict, FeatureType], + transformation: Union[dict, Transformation], + key: list[Union[dict, TypedKey]], + input_anchor_features: list[Union[str, UUID]], + input_derived_features: list[Union[str, UUID]], + tags: dict = {}): + self.qualified_name = qualified_name + self.name = name + self.feature_type = _to_type(feature_type, FeatureType) + self.transformation = _to_type(transformation, Transformation) + self.key = _to_type(key, TypedKey) + self.input_anchor_features = _to_uuid(input_anchor_features) + self.input_derived_features = _to_uuid(input_derived_features) + self.tags = tags diff --git a/registry/sql-registry/requirements.txt b/registry/sql-registry/requirements.txt new file mode 100644 index 000000000..c6d61de98 --- /dev/null +++ b/registry/sql-registry/requirements.txt @@ -0,0 +1,3 @@ +pymssql +fastapi +uvicorn \ No newline at end of file diff --git a/registry/sql-registry/scripts/schema.sql b/registry/sql-registry/scripts/schema.sql new file mode 100644 index 000000000..d7258d577 --- /dev/null +++ b/registry/sql-registry/scripts/schema.sql @@ -0,0 +1,15 @@ +create table entities +( + entity_id varchar(50) not null primary key, + qualified_name varchar(200) not null, + entity_type varchar(100) not null, + attributes NVARCHAR(MAX) not null, +) + +create table edges +( + edge_id varchar(50) not null primary key, + from_id varchar(50) not null, + to_id varchar(50) not null, + conn_type varchar(20) not null, +) \ No newline at end of file diff --git a/registry/sql-registry/scripts/test_data.sql b/registry/sql-registry/scripts/test_data.sql new file mode 100644 index 000000000..a248d56fe --- /dev/null +++ b/registry/sql-registry/scripts/test_data.sql @@ -0,0 +1,92 @@ +insert into entities (entity_id, qualified_name, entity_type, attributes) +values('a4cfbc03-c65d-4f32-be3d-1d11247c9cdd', 'feathr_ci_registry_12_33_182947__PASSTHROUGH', 'feathr_source_v1', '{"path": "PASSTHROUGH", "qualifiedName": "feathr_ci_registry_12_33_182947__PASSTHROUGH", "name": "PASSTHROUGH", "type": "PASSTHROUGH"}'); +insert into entities (entity_id, qualified_name, entity_type, attributes) +values('dc24b1d5-206d-40db-b10a-606dd16a0297', 'feathr_ci_registry_12_33_182947__request_features__f_is_long_trip_distance', 'feathr_anchor_feature_v1', '{"qualifiedName": "feathr_ci_registry_12_33_182947__request_features__f_is_long_trip_distance", "name": "f_is_long_trip_distance", "type": {"type": "TENSOR", "tensorCategory": "DENSE", "dimensionType": [], "valType": "BOOLEAN"}, "transformation": {"transform_expr": "cast_float(trip_distance)>30"}, "key": [{"full_name": "feathr.dummy_typedkey", "key_column": "NOT_NEEDED", "description": "A dummy typed key for passthrough/request feature.", "key_column_alias": "NOT_NEEDED", "key_column_type": "UNSPECIFIED"}]}'); +insert into entities (entity_id, qualified_name, entity_type, attributes) +values('c626c41c-d6c2-4b16-a267-6cdeea497c52', 'feathr_ci_registry_12_33_182947__f_trip_time_rounded', 'feathr_derived_feature_v1', '{"qualifiedName": "feathr_ci_registry_12_33_182947__f_trip_time_rounded", "name": "f_trip_time_rounded", "input_derived_features": [], "type": {"type": "TENSOR", "tensorCategory": "DENSE", "dimensionType": [], "valType": "INT"}, "transformation": {"transform_expr": "f_trip_time_duration % 10"}, "input_anchor_features": [{"guid": "103baca1-377a-4ddf-8429-5da91026c269", "typeName": "feathr_anchor_feature_v1", "uniqueAttributes": {"qualifiedName": "feathr_ci_registry_12_33_182947__request_features__f_trip_time_duration"}}], "key": [{"full_name": "feathr.dummy_typedkey", "key_column": "NOT_NEEDED", "description": "A dummy typed key for passthrough/request feature.", "key_column_alias": "NOT_NEEDED", "key_column_type": "UNSPECIFIED"}]}'); +insert into entities (entity_id, qualified_name, entity_type, attributes) +values('537bc481-aa15-4a3b-be4e-2042da6f5a09', 'feathr_ci_registry_12_33_182947__aggregationFeatures__f_location_max_fare', 'feathr_anchor_feature_v1', '{"qualifiedName": "feathr_ci_registry_12_33_182947__aggregationFeatures__f_location_max_fare", "name": "f_location_max_fare", "type": {"type": "TENSOR", "tensorCategory": "DENSE", "dimensionType": [], "valType": "FLOAT"}, "transformation": {"filter": null, "agg_func": "MAX", "limit": null, "group_by": null, "window": "90d", "def_expr": "cast_float(fare_amount)"}, "key": [{"full_name": "nyc_taxi.location_id", "key_column": "DOLocationID", "description": "location id in NYC", "key_column_alias": "DOLocationID", "key_column_type": "2"}]}'); +insert into entities (entity_id, qualified_name, entity_type, attributes) +values('479c6306-5fdb-4e06-9008-c18f68db52a4', 'feathr_ci_registry_12_33_182947__f_trip_time_rounded_plus', 'feathr_derived_feature_v1', '{"qualifiedName": "feathr_ci_registry_12_33_182947__f_trip_time_rounded_plus", "name": "f_trip_time_rounded_plus", "input_derived_features": [{"guid": "c626c41c-d6c2-4b16-a267-6cdeea497c52", "typeName": "feathr_derived_feature_v1", "uniqueAttributes": {"qualifiedName": "feathr_ci_registry_12_33_182947__f_trip_time_rounded"}}], "type": {"type": "TENSOR", "tensorCategory": "DENSE", "dimensionType": [], "valType": "INT"}, "transformation": {"transform_expr": "f_trip_time_rounded + 100"}, "input_anchor_features": [], "key": [{"full_name": "feathr.dummy_typedkey", "key_column": "NOT_NEEDED", "description": "A dummy typed key for passthrough/request feature.", "key_column_alias": "NOT_NEEDED", "key_column_type": "UNSPECIFIED"}]}'); +insert into entities (entity_id, qualified_name, entity_type, attributes) +values('c4a0ae0f-09cc-43bf-94e9-21ff178fbda6', 'feathr_ci_registry_12_33_182947__nycTaxiBatchSource', 'feathr_source_v1', '{"timestamp_format": "yyyy-MM-dd HH:mm:ss", "path": "wasbs://public@azurefeathrstorage.blob.core.windows.net/sample_data/green_tripdata_2020-04.csv", "event_timestamp_column": "lpep_dropoff_datetime", "preprocessing": " def add_new_dropoff_and_fare_amount_column(df: DataFrame):\n df = df.withColumn(\"new_lpep_dropoff_datetime\", col(\"lpep_dropoff_datetime\"))\n df = df.withColumn(\"new_fare_amount\", col(\"fare_amount\") + 1000000)\n return df\n", "qualifiedName": "feathr_ci_registry_12_33_182947__nycTaxiBatchSource", "name": "nycTaxiBatchSource", "type": "wasbs", "tags": {"for_test_purpose": "true"}}'); +insert into entities (entity_id, qualified_name, entity_type, attributes) +values('2a052ccd-3e31-46a7-bffb-2ab1302b1b00', 'feathr_ci_registry_12_33_182947__aggregationFeatures__f_location_avg_fare', 'feathr_anchor_feature_v1', '{"qualifiedName": "feathr_ci_registry_12_33_182947__aggregationFeatures__f_location_avg_fare", "name": "f_location_avg_fare", "type": {"type": "TENSOR", "tensorCategory": "DENSE", "dimensionType": [], "valType": "FLOAT"}, "transformation": {"filter": null, "agg_func": "AVG", "limit": null, "group_by": null, "window": "90d", "def_expr": "cast_float(fare_amount)"}, "key": [{"full_name": "nyc_taxi.location_id", "key_column": "DOLocationID", "description": "location id in NYC", "key_column_alias": "DOLocationID", "key_column_type": "2"}]}'); +insert into entities (entity_id, qualified_name, entity_type, attributes) +values('cd7306a7-c458-45e8-a00b-44a2f2117135', 'feathr_ci_registry_12_33_182947', 'feathr_workspace_v1', '{"anchor_features": [{"guid": "a5c47bd8-3729-45fa-8701-b8b76ada150a", "typeName": "feathr_anchor_v1", "uniqueAttributes": {"qualifiedName": "feathr_ci_registry_12_33_182947__aggregationFeatures"}}, {"guid": "260325a5-27f9-40d1-8697-c727feb1dbdc", "typeName": "feathr_anchor_v1", "uniqueAttributes": {"qualifiedName": "feathr_ci_registry_12_33_182947__request_features"}}], "derived_features": [{"guid": "226b42ee-0c34-4329-b935-744aecc63fb4", "typeName": "feathr_derived_feature_v1", "uniqueAttributes": {"qualifiedName": "feathr_ci_registry_12_33_182947__f_trip_time_distance"}}, {"guid": "c626c41c-d6c2-4b16-a267-6cdeea497c52", "typeName": "feathr_derived_feature_v1", "uniqueAttributes": {"qualifiedName": "feathr_ci_registry_12_33_182947__f_trip_time_rounded"}}, {"guid": "479c6306-5fdb-4e06-9008-c18f68db52a4", "typeName": "feathr_derived_feature_v1", "uniqueAttributes": {"qualifiedName": "feathr_ci_registry_12_33_182947__f_trip_time_rounded_plus"}}], "qualifiedName": "feathr_ci_registry_12_33_182947", "name": "feathr_ci_registry_12_33_182947", "tags": {"for_test_purpose": "true"}}'); +insert into entities (entity_id, qualified_name, entity_type, attributes) +values('5316c516-77f9-4be4-a7ec-8bf6e893e2aa', 'feathr_ci_registry_12_33_182947__request_features__f_trip_distance', 'feathr_anchor_feature_v1', '{"qualifiedName": "feathr_ci_registry_12_33_182947__request_features__f_trip_distance", "name": "f_trip_distance", "type": {"type": "TENSOR", "tensorCategory": "DENSE", "dimensionType": [], "valType": "FLOAT"}, "transformation": {"transform_expr": "trip_distance"}, "key": [{"full_name": "feathr.dummy_typedkey", "key_column": "NOT_NEEDED", "description": "A dummy typed key for passthrough/request feature.", "key_column_alias": "NOT_NEEDED", "key_column_type": "UNSPECIFIED"}], "tags": {"for_test_purpose": "true"}}'); +insert into entities (entity_id, qualified_name, entity_type, attributes) +values('103baca1-377a-4ddf-8429-5da91026c269', 'feathr_ci_registry_12_33_182947__request_features__f_trip_time_duration', 'feathr_anchor_feature_v1', '{"qualifiedName": "feathr_ci_registry_12_33_182947__request_features__f_trip_time_duration", "name": "f_trip_time_duration", "type": {"type": "TENSOR", "tensorCategory": "DENSE", "dimensionType": [], "valType": "INT"}, "transformation": {"transform_expr": "(to_unix_timestamp(lpep_dropoff_datetime) - to_unix_timestamp(lpep_pickup_datetime))/60"}, "key": [{"full_name": "feathr.dummy_typedkey", "key_column": "NOT_NEEDED", "description": "A dummy typed key for passthrough/request feature.", "key_column_alias": "NOT_NEEDED", "key_column_type": "UNSPECIFIED"}]}'); +insert into entities (entity_id, qualified_name, entity_type, attributes) +values('a5c47bd8-3729-45fa-8701-b8b76ada150a', 'feathr_ci_registry_12_33_182947__aggregationFeatures', 'feathr_anchor_v1', '{"features": [{"guid": "2a052ccd-3e31-46a7-bffb-2ab1302b1b00", "typeName": "feathr_anchor_feature_v1", "uniqueAttributes": {"qualifiedName": "feathr_ci_registry_12_33_182947__aggregationFeatures__f_location_avg_fare"}}, {"guid": "537bc481-aa15-4a3b-be4e-2042da6f5a09", "typeName": "feathr_anchor_feature_v1", "uniqueAttributes": {"qualifiedName": "feathr_ci_registry_12_33_182947__aggregationFeatures__f_location_max_fare"}}], "qualifiedName": "feathr_ci_registry_12_33_182947__aggregationFeatures", "name": "aggregationFeatures", "source": {"guid": "c4a0ae0f-09cc-43bf-94e9-21ff178fbda6", "typeName": "feathr_source_v1", "uniqueAttributes": {"qualifiedName": "feathr_ci_registry_12_33_182947__nycTaxiBatchSource"}}}'); +insert into entities (entity_id, qualified_name, entity_type, attributes) +values('260325a5-27f9-40d1-8697-c727feb1dbdc', 'feathr_ci_registry_12_33_182947__request_features', 'feathr_anchor_v1', '{"features": [{"guid": "5316c516-77f9-4be4-a7ec-8bf6e893e2aa", "typeName": "feathr_anchor_feature_v1", "uniqueAttributes": {"qualifiedName": "feathr_ci_registry_12_33_182947__request_features__f_trip_distance"}}, {"guid": "103baca1-377a-4ddf-8429-5da91026c269", "typeName": "feathr_anchor_feature_v1", "uniqueAttributes": {"qualifiedName": "feathr_ci_registry_12_33_182947__request_features__f_trip_time_duration"}}, {"guid": "dc24b1d5-206d-40db-b10a-606dd16a0297", "typeName": "feathr_anchor_feature_v1", "uniqueAttributes": {"qualifiedName": "feathr_ci_registry_12_33_182947__request_features__f_is_long_trip_distance"}}, {"guid": "2380fe5b-ce2a-401e-98bf-af8b98460f67", "typeName": "feathr_anchor_feature_v1", "uniqueAttributes": {"qualifiedName": "feathr_ci_registry_12_33_182947__request_features__f_day_of_week"}}], "qualifiedName": "feathr_ci_registry_12_33_182947__request_features", "name": "request_features", "source": {"guid": "a4cfbc03-c65d-4f32-be3d-1d11247c9cdd", "typeName": "feathr_source_v1", "uniqueAttributes": {"qualifiedName": "feathr_ci_registry_12_33_182947__PASSTHROUGH"}}, "tags": {"for_test_purpose": "true"}}'); +insert into entities (entity_id, qualified_name, entity_type, attributes) +values('2380fe5b-ce2a-401e-98bf-af8b98460f67', 'feathr_ci_registry_12_33_182947__request_features__f_day_of_week', 'feathr_anchor_feature_v1', '{"qualifiedName": "feathr_ci_registry_12_33_182947__request_features__f_day_of_week", "name": "f_day_of_week", "type": {"type": "TENSOR", "tensorCategory": "DENSE", "dimensionType": [], "valType": "INT"}, "transformation": {"transform_expr": "dayofweek(lpep_dropoff_datetime)"}, "key": [{"full_name": "feathr.dummy_typedkey", "key_column": "NOT_NEEDED", "description": "A dummy typed key for passthrough/request feature.", "key_column_alias": "NOT_NEEDED", "key_column_type": "UNSPECIFIED"}]}'); +insert into entities (entity_id, qualified_name, entity_type, attributes) +values('226b42ee-0c34-4329-b935-744aecc63fb4', 'feathr_ci_registry_12_33_182947__f_trip_time_distance', 'feathr_derived_feature_v1', '{"qualifiedName": "feathr_ci_registry_12_33_182947__f_trip_time_distance", "name": "f_trip_time_distance", "input_derived_features": [], "type": {"type": "TENSOR", "tensorCategory": "DENSE", "dimensionType": [], "valType": "FLOAT"}, "transformation": {"transform_expr": "f_trip_distance * f_trip_time_duration"}, "input_anchor_features": [{"guid": "5316c516-77f9-4be4-a7ec-8bf6e893e2aa", "typeName": "feathr_anchor_feature_v1", "uniqueAttributes": {"qualifiedName": "feathr_ci_registry_12_33_182947__request_features__f_trip_distance"}}, {"guid": "103baca1-377a-4ddf-8429-5da91026c269", "typeName": "feathr_anchor_feature_v1", "uniqueAttributes": {"qualifiedName": "feathr_ci_registry_12_33_182947__request_features__f_trip_time_duration"}}], "key": [{"full_name": "feathr.dummy_typedkey", "key_column": "NOT_NEEDED", "description": "A dummy typed key for passthrough/request feature.", "key_column_alias": "NOT_NEEDED", "key_column_type": "UNSPECIFIED"}]}'); + +insert into edges (edge_id, from_id, to_id, conn_type) values ('455f7195-8463-4c60-9cf0-65bd9db0ae0a', 'cd7306a7-c458-45e8-a00b-44a2f2117135', 'a4cfbc03-c65d-4f32-be3d-1d11247c9cdd', 'Contains'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('a2777fd2-1136-40d0-8686-47b5b5fed1ef', 'a4cfbc03-c65d-4f32-be3d-1d11247c9cdd', 'cd7306a7-c458-45e8-a00b-44a2f2117135', 'BelongsTo'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('ca88290d-03d1-4641-bf36-cbf3280b3e9d', 'cd7306a7-c458-45e8-a00b-44a2f2117135', 'c4a0ae0f-09cc-43bf-94e9-21ff178fbda6', 'Contains'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('1da26ec9-6608-4971-ac04-8ed170543325', 'c4a0ae0f-09cc-43bf-94e9-21ff178fbda6', 'cd7306a7-c458-45e8-a00b-44a2f2117135', 'BelongsTo'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('9ae3a7c0-0163-4170-b0cf-81705e5b6aca', 'cd7306a7-c458-45e8-a00b-44a2f2117135', 'a5c47bd8-3729-45fa-8701-b8b76ada150a', 'Contains'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('50068cf8-7f5e-482a-a018-68bf27f89f6d', 'a5c47bd8-3729-45fa-8701-b8b76ada150a', 'cd7306a7-c458-45e8-a00b-44a2f2117135', 'BelongsTo'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('898a10fd-0315-4fcb-9803-0144047033c7', 'cd7306a7-c458-45e8-a00b-44a2f2117135', '260325a5-27f9-40d1-8697-c727feb1dbdc', 'Contains'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('08540367-3d1f-4d57-8af1-36b3a11762ed', '260325a5-27f9-40d1-8697-c727feb1dbdc', 'cd7306a7-c458-45e8-a00b-44a2f2117135', 'BelongsTo'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('d05ca3c6-2610-4352-9be7-c4d8dd6ab6b6', 'cd7306a7-c458-45e8-a00b-44a2f2117135', 'dc24b1d5-206d-40db-b10a-606dd16a0297', 'Contains'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('ed6ca745-ee85-403c-b1bf-a3f1a0463132', 'dc24b1d5-206d-40db-b10a-606dd16a0297', 'cd7306a7-c458-45e8-a00b-44a2f2117135', 'BelongsTo'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('55d85311-8e42-4273-a538-ef4dc33b1570', 'cd7306a7-c458-45e8-a00b-44a2f2117135', '537bc481-aa15-4a3b-be4e-2042da6f5a09', 'Contains'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('215cdc37-678a-4c56-a390-76d6471fa629', '537bc481-aa15-4a3b-be4e-2042da6f5a09', 'cd7306a7-c458-45e8-a00b-44a2f2117135', 'BelongsTo'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('4166aca1-e0f4-4883-b2ff-0051ee80a830', 'cd7306a7-c458-45e8-a00b-44a2f2117135', '2a052ccd-3e31-46a7-bffb-2ab1302b1b00', 'Contains'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('e1174ff1-0b1a-4b03-a1f5-b8b923a131ba', '2a052ccd-3e31-46a7-bffb-2ab1302b1b00', 'cd7306a7-c458-45e8-a00b-44a2f2117135', 'BelongsTo'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('b314c95b-e4ea-4a39-9584-9b7eb0a6b8d2', 'cd7306a7-c458-45e8-a00b-44a2f2117135', '5316c516-77f9-4be4-a7ec-8bf6e893e2aa', 'Contains'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('7a37d438-8c33-4cc8-a0bb-9db3963f073a', '5316c516-77f9-4be4-a7ec-8bf6e893e2aa', 'cd7306a7-c458-45e8-a00b-44a2f2117135', 'BelongsTo'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('31265953-4820-470f-8cfc-38efefec9fa7', 'cd7306a7-c458-45e8-a00b-44a2f2117135', '2380fe5b-ce2a-401e-98bf-af8b98460f67', 'Contains'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('58c448cf-87bd-4f36-92d5-e6f6de48569d', '2380fe5b-ce2a-401e-98bf-af8b98460f67', 'cd7306a7-c458-45e8-a00b-44a2f2117135', 'BelongsTo'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('c86e4e49-88c9-4ac8-a1b8-ff473e1dc588', 'cd7306a7-c458-45e8-a00b-44a2f2117135', '103baca1-377a-4ddf-8429-5da91026c269', 'Contains'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('3fddedcf-c590-43b9-aaf4-4c4ce6600f2e', '103baca1-377a-4ddf-8429-5da91026c269', 'cd7306a7-c458-45e8-a00b-44a2f2117135', 'BelongsTo'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('be429eb2-758d-4783-b166-cfcc7d2fb4f2', 'cd7306a7-c458-45e8-a00b-44a2f2117135', 'c626c41c-d6c2-4b16-a267-6cdeea497c52', 'Contains'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('bd66f19f-3508-4b13-ba40-598dd4abbd0d', 'c626c41c-d6c2-4b16-a267-6cdeea497c52', 'cd7306a7-c458-45e8-a00b-44a2f2117135', 'BelongsTo'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('178c7c56-0f25-4048-be18-435ab5a169f4', 'cd7306a7-c458-45e8-a00b-44a2f2117135', '479c6306-5fdb-4e06-9008-c18f68db52a4', 'Contains'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('c8dc4c34-7950-4a04-a4bd-829a4c20ab4e', '479c6306-5fdb-4e06-9008-c18f68db52a4', 'cd7306a7-c458-45e8-a00b-44a2f2117135', 'BelongsTo'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('92f143c0-59a8-461b-b9cc-179f3403dd38', 'cd7306a7-c458-45e8-a00b-44a2f2117135', '226b42ee-0c34-4329-b935-744aecc63fb4', 'Contains'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('e9a6f066-f4cc-4de4-bb26-2bc9522760a4', '226b42ee-0c34-4329-b935-744aecc63fb4', 'cd7306a7-c458-45e8-a00b-44a2f2117135', 'BelongsTo'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('805b2c95-fc22-48b1-81cc-f86e3f2c2956', 'a5c47bd8-3729-45fa-8701-b8b76ada150a', '2a052ccd-3e31-46a7-bffb-2ab1302b1b00', 'Contains'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('e54835e6-b399-4d49-8ab8-6b452cbc00ca', '2a052ccd-3e31-46a7-bffb-2ab1302b1b00', 'a5c47bd8-3729-45fa-8701-b8b76ada150a', 'BelongsTo'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('02a774bb-0a1c-4874-b9e2-bbd4806c2f3e', 'a5c47bd8-3729-45fa-8701-b8b76ada150a', '537bc481-aa15-4a3b-be4e-2042da6f5a09', 'Contains'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('17c74f58-18bc-4c4a-974b-2732a84576c3', '537bc481-aa15-4a3b-be4e-2042da6f5a09', 'a5c47bd8-3729-45fa-8701-b8b76ada150a', 'BelongsTo'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('c8dded57-22f8-46d0-a29c-3e7c072ebee4', '260325a5-27f9-40d1-8697-c727feb1dbdc', '5316c516-77f9-4be4-a7ec-8bf6e893e2aa', 'Contains'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('7e2995b8-2a4f-49c1-b72c-80c8c6d1e060', '5316c516-77f9-4be4-a7ec-8bf6e893e2aa', '260325a5-27f9-40d1-8697-c727feb1dbdc', 'BelongsTo'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('3ba9b460-5878-43c1-bb00-c88916bf5e6b', '260325a5-27f9-40d1-8697-c727feb1dbdc', '103baca1-377a-4ddf-8429-5da91026c269', 'Contains'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('a4bea335-611d-49cc-9e94-b18896103d40', '103baca1-377a-4ddf-8429-5da91026c269', '260325a5-27f9-40d1-8697-c727feb1dbdc', 'BelongsTo'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('f0d65e5b-9762-4c65-9333-83aa6c1beb75', '260325a5-27f9-40d1-8697-c727feb1dbdc', 'dc24b1d5-206d-40db-b10a-606dd16a0297', 'Contains'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('cd72ee26-b867-4321-8566-7daa488f7a61', 'dc24b1d5-206d-40db-b10a-606dd16a0297', '260325a5-27f9-40d1-8697-c727feb1dbdc', 'BelongsTo'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('bc211906-6d48-4649-8e49-518cd89c61f8', '260325a5-27f9-40d1-8697-c727feb1dbdc', '2380fe5b-ce2a-401e-98bf-af8b98460f67', 'Contains'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('aeafba5d-b352-4c36-8fd5-d135cea70605', '2380fe5b-ce2a-401e-98bf-af8b98460f67', '260325a5-27f9-40d1-8697-c727feb1dbdc', 'BelongsTo'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('2b6126be-63fd-4140-8891-e0b5db880573', '260325a5-27f9-40d1-8697-c727feb1dbdc', 'a4cfbc03-c65d-4f32-be3d-1d11247c9cdd', 'Consumes'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('89a348be-ae8d-40ad-a1cd-12d00b981b2a', 'a4cfbc03-c65d-4f32-be3d-1d11247c9cdd', '260325a5-27f9-40d1-8697-c727feb1dbdc', 'Produces'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('7e6219a9-e433-4706-9145-f791a58ef7c3', '5316c516-77f9-4be4-a7ec-8bf6e893e2aa', 'a4cfbc03-c65d-4f32-be3d-1d11247c9cdd', 'Consumes'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('a447062c-c75e-48b2-bbe3-5d9eab770c26', 'a4cfbc03-c65d-4f32-be3d-1d11247c9cdd', '5316c516-77f9-4be4-a7ec-8bf6e893e2aa', 'Produces'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('faf70a4b-2358-4881-952b-aae3cec55053', '103baca1-377a-4ddf-8429-5da91026c269', 'a4cfbc03-c65d-4f32-be3d-1d11247c9cdd', 'Consumes'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('b6647b84-5043-4ac2-8b31-3db7c2d6cf32', 'a4cfbc03-c65d-4f32-be3d-1d11247c9cdd', '103baca1-377a-4ddf-8429-5da91026c269', 'Produces'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('b63cb71d-71c1-49a1-93bd-ea63cb4ab4e7', 'dc24b1d5-206d-40db-b10a-606dd16a0297', 'a4cfbc03-c65d-4f32-be3d-1d11247c9cdd', 'Consumes'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('c582313e-9840-4ae6-827c-d2a01363ee6b', 'a4cfbc03-c65d-4f32-be3d-1d11247c9cdd', 'dc24b1d5-206d-40db-b10a-606dd16a0297', 'Produces'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('a2d2ca10-11eb-4e6f-9a5c-b8196a56d10a', '2380fe5b-ce2a-401e-98bf-af8b98460f67', 'a4cfbc03-c65d-4f32-be3d-1d11247c9cdd', 'Consumes'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('50701b58-58ef-4b6e-ac00-5f69d8ceebf6', 'a4cfbc03-c65d-4f32-be3d-1d11247c9cdd', '2380fe5b-ce2a-401e-98bf-af8b98460f67', 'Produces'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('6d010032-02fb-4e94-aef7-85e56d5cf99c', 'a5c47bd8-3729-45fa-8701-b8b76ada150a', 'c4a0ae0f-09cc-43bf-94e9-21ff178fbda6', 'Consumes'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('421b8c1f-01b9-49c3-8820-ee05313d103a', 'c4a0ae0f-09cc-43bf-94e9-21ff178fbda6', 'a5c47bd8-3729-45fa-8701-b8b76ada150a', 'Produces'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('b6b8af3a-2531-46bc-9b1e-acf3a4c51396', '2a052ccd-3e31-46a7-bffb-2ab1302b1b00', 'c4a0ae0f-09cc-43bf-94e9-21ff178fbda6', 'Consumes'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('323d044f-bbbd-45fc-a93c-40ee0f17ab87', 'c4a0ae0f-09cc-43bf-94e9-21ff178fbda6', '2a052ccd-3e31-46a7-bffb-2ab1302b1b00', 'Produces'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('cbe5c0d4-621b-4e6b-a6c1-76c8ca6105f2', '537bc481-aa15-4a3b-be4e-2042da6f5a09', 'c4a0ae0f-09cc-43bf-94e9-21ff178fbda6', 'Consumes'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('bedbe18b-a7c3-40d3-b230-22f9ac5c6c76', 'c4a0ae0f-09cc-43bf-94e9-21ff178fbda6', '537bc481-aa15-4a3b-be4e-2042da6f5a09', 'Produces'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('84902105-164c-4fc3-9690-e638f16c3075', 'c626c41c-d6c2-4b16-a267-6cdeea497c52', '103baca1-377a-4ddf-8429-5da91026c269', 'Consumes'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('44668672-1520-4371-98d2-3ed48bddf9ea', '103baca1-377a-4ddf-8429-5da91026c269', 'c626c41c-d6c2-4b16-a267-6cdeea497c52', 'Produces'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('e8fe5609-53e7-4793-a39d-2ce75c630cd9', '479c6306-5fdb-4e06-9008-c18f68db52a4', 'c626c41c-d6c2-4b16-a267-6cdeea497c52', 'Consumes'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('591285ff-53dd-4e27-a30d-754fe97ea3be', 'c626c41c-d6c2-4b16-a267-6cdeea497c52', '479c6306-5fdb-4e06-9008-c18f68db52a4', 'Produces'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('df901204-59c1-4f3e-87f0-8ac07e90bc39', '226b42ee-0c34-4329-b935-744aecc63fb4', '5316c516-77f9-4be4-a7ec-8bf6e893e2aa', 'Consumes'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('23a05c64-b204-4c48-b906-9fabb6ea298b', '5316c516-77f9-4be4-a7ec-8bf6e893e2aa', '226b42ee-0c34-4329-b935-744aecc63fb4', 'Produces'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('3721c600-6423-4a38-a9b2-d85b426b6eaa', '226b42ee-0c34-4329-b935-744aecc63fb4', '103baca1-377a-4ddf-8429-5da91026c269', 'Consumes'); +insert into edges (edge_id, from_id, to_id, conn_type) values ('5f9b86fe-bdc9-4a76-b07c-876b9d7c1ee1', '103baca1-377a-4ddf-8429-5da91026c269', '226b42ee-0c34-4329-b935-744aecc63fb4', 'Produces'); + diff --git a/registry/sql-registry/test/test_basic.py b/registry/sql-registry/test/test_basic.py new file mode 100644 index 000000000..22e343aba --- /dev/null +++ b/registry/sql-registry/test/test_basic.py @@ -0,0 +1,17 @@ +import registry +r=registry.DbRegistry() + +l=r.get_lineage('226b42ee-0c34-4329-b935-744aecc63fb4').to_dict() +assert(len(l["guidEntityMap"]) == 4) + +af1=r.get_entity('2380fe5b-ce2a-401e-98bf-af8b98460f67') +af2=r.get_entity('feathr_ci_registry_12_33_182947__request_features__f_day_of_week') +assert(af1.to_dict()==af2.to_dict()) +df1=r.get_entity('226b42ee-0c34-4329-b935-744aecc63fb4') +df2=r.get_entity('feathr_ci_registry_12_33_182947__f_trip_time_distance') +assert(df1.to_dict()==df2.to_dict()) + +p=r.get_project('feathr_ci_registry_12_33_182947') +assert(len(p.to_dict()['guidEntityMap'])==14) + +es=r.search_entity("time", [registry.EntityType.DerivedFeature])