diff --git a/sdk/python/feast/infra/online_stores/contrib/mysql_online_store/mysql.py b/sdk/python/feast/infra/online_stores/contrib/mysql_online_store/mysql.py index 708d3beefc..81e01d3ab3 100644 --- a/sdk/python/feast/infra/online_stores/contrib/mysql_online_store/mysql.py +++ b/sdk/python/feast/infra/online_stores/contrib/mysql_online_store/mysql.py @@ -1,6 +1,7 @@ from __future__ import absolute_import import threading import logging +import time from enum import Enum from importlib import import_module @@ -20,6 +21,10 @@ from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel +MYSQL_DEADLOCK_ERR = 1213 +MYSQL_WRITE_RETRIES = 3 +MYSQL_READ_RETRIES = 3 + class ConnectionType(Enum): RAW = 0 @@ -71,8 +76,8 @@ def _get_conn(self, config: RepoConfig) -> Union[Connection, ConnectionType]: if online_store_config.session_manager_module: return ( - self._get_conn_session_manager(session_manager_module=online_store_config.session_manager_module), - ConnectionType.SESSION + self._get_conn_session_manager(session_manager_module=online_store_config.session_manager_module), + ConnectionType.SESSION ) if not hasattr(self._tls, 'conn') or not self._tls.conn.open: @@ -96,14 +101,37 @@ def _close_conn(self, conn: Connection, conn_type: ConnectionType) -> None: if str(exc) != 'Already closed': raise exc + def _execute_query_with_retry(self, cur: Cursor, + conn: Connection, + query: str, + values: Union[List, Tuple], + retries: int, + progress=None + ) -> bool: + for _ in range(retries): + try: + cur.execute(query, values) + conn.commit() + if progress: + progress(1) + return True + except pymysql.Error as e: + if e.args[0] == MYSQL_DEADLOCK_ERR: + time.sleep(0.5) + else: + conn.rollback() + logging.error("Error %d: %s" % (e.args[0], e.args[1])) + return False + return False + def online_write_batch( - self, - config: RepoConfig, - table: FeatureView, - data: List[ - Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] - ], - progress: Optional[Callable[[int], Any]], + self, + config: RepoConfig, + table: FeatureView, + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], ) -> None: raw_conn, conn_type = self._get_conn(config) conn = raw_conn.connection if conn_type == ConnectionType.SESSION else raw_conn @@ -120,10 +148,9 @@ def online_write_batch( created_ts = _to_naive_utc(created_ts) rows_to_insert = [(entity_key_bin, feature_name, val.SerializeToString(), timestamp, created_ts) - for feature_name, val in values.items()] + for feature_name, val in values.items()] value_formatters = ', '.join(['(%s, %s, %s, %s, %s)'] * len(rows_to_insert)) - cur.execute( - f""" + query = f""" INSERT INTO {_table_id(project, table)} (entity_key, feature_name, value, event_ts, created_ts) VALUES {value_formatters} @@ -131,24 +158,21 @@ def online_write_batch( value = VALUES(value), event_ts = VALUES(event_ts), created_ts = VALUES(created_ts) - """, - [item for row in rows_to_insert for item in row] - ) - try: - conn.commit() - if progress: - progress(1) - except pymysql.Error as e: - conn.rollback() - logging.error("Error %d: %s" % (e.args[0], e.args[1])) + """ + query_values = [item for row in rows_to_insert for item in row] + self._execute_query_with_retry(cur=cur, + conn=conn, + query=query, + values=query_values, + retries=MYSQL_WRITE_RETRIES) self._close_conn(raw_conn, conn_type) def online_read( - self, - config: RepoConfig, - table: FeatureView, - entity_keys: List[EntityKeyProto], - _: Optional[List[str]] = None, + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + _: Optional[List[str]] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: raw_conn, conn_type = self._get_conn(config) conn = raw_conn.connection if conn_type == ConnectionType.SESSION else raw_conn @@ -156,18 +180,16 @@ def online_read( result: List[Tuple[Optional[datetime], Optional[Dict[str, Any]]]] = [] project = config.project for entity_key in entity_keys: - try: - entity_key_bin = serialize_entity_key( - entity_key, - entity_key_serialization_version=2, - ).hex() - - cur.execute( - f"SELECT feature_name, value, event_ts FROM {_table_id(project, table)} WHERE entity_key = %s", - (entity_key_bin,), - ) - conn.commit() - + entity_key_bin = serialize_entity_key( + entity_key, + entity_key_serialization_version=2, + ).hex() + query = f"SELECT feature_name, value, event_ts FROM {_table_id(project, table)} WHERE entity_key = %s" + if self._execute_query_with_retry(cur=cur, + conn=conn, + query=query, + values=(entity_key_bin,), + retries=MYSQL_READ_RETRIES): res = {} res_ts: Optional[datetime] = None records = cur.fetchall() @@ -182,20 +204,19 @@ def online_read( result.append((None, None)) else: result.append((res_ts, res)) - except pymysql.Error as e: - conn.rollback() - logging.error("Error %d: %s" % (e.args[0], e.args[1])) + else: + logging.error(f'Skipping read for (entity, table)): ({entity_key}, {_table_id(project, table)})') self._close_conn(raw_conn, conn_type) return result def update( - self, - config: RepoConfig, - tables_to_delete: Sequence[FeatureView], - tables_to_keep: Sequence[FeatureView], - entities_to_delete: Sequence[Entity], - entities_to_keep: Sequence[Entity], - partial: bool, + self, + config: RepoConfig, + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, ) -> None: raw_conn, conn_type = self._get_conn(config) conn = raw_conn.connection if conn_type == ConnectionType.SESSION else raw_conn @@ -241,10 +262,10 @@ def update( self._close_conn(raw_conn, conn_type) def teardown( - self, - config: RepoConfig, - tables: Sequence[FeatureView], - entities: Sequence[Entity], + self, + config: RepoConfig, + tables: Sequence[FeatureView], + entities: Sequence[Entity], ) -> None: raw_conn, conn_type = self._get_conn(config) conn = raw_conn.connection if conn_type == ConnectionType.SESSION else raw_conn @@ -260,6 +281,7 @@ def teardown( "" % (e.args[0], e.args[1])) self._close_conn(raw_conn, conn_type) + def _drop_table_and_index(cur: Cursor, project: str, table: FeatureView) -> None: table_name = _table_id(project, table) cur.execute(f"DROP INDEX {table_name}_ek ON {table_name};") diff --git a/sdk/python/feast/ui/package.json b/sdk/python/feast/ui/package.json index 30ef44e415..75eebe275b 100644 --- a/sdk/python/feast/ui/package.json +++ b/sdk/python/feast/ui/package.json @@ -6,7 +6,7 @@ "@elastic/datemath": "^5.0.3", "@elastic/eui": "^55.0.1", "@emotion/react": "^11.9.0", - "@feast-dev/feast-ui": "0.29.0", + "@feast-dev/feast-ui": "0.30.0", "@testing-library/jest-dom": "^5.16.4", "@testing-library/react": "^13.2.0", "@testing-library/user-event": "^13.5.0", diff --git a/sdk/python/feast/ui/yarn.lock b/sdk/python/feast/ui/yarn.lock index 61c75e653c..6c7ffa0c01 100644 --- a/sdk/python/feast/ui/yarn.lock +++ b/sdk/python/feast/ui/yarn.lock @@ -1300,10 +1300,10 @@ minimatch "^3.1.2" strip-json-comments "^3.1.1" -"@feast-dev/feast-ui@0.29.0": - version "0.29.0" - resolved "https://registry.yarnpkg.com/@feast-dev/feast-ui/-/feast-ui-0.29.0.tgz#b78070b51c3f83b2b823946b64fea4f223820429" - integrity sha512-XF/C3CcLmQTAUV9vHbW37BEACoNXXbUaMUoWPIJMrZvW6IStoVUlBuA4bx995XSE4gUcZ7j/5SmrOUAAlanL9Q== +"@feast-dev/feast-ui@0.30.0": + version "0.30.0" + resolved "https://registry.affirm-stage.com/artifactory/api/npm/npm/@feast-dev/feast-ui/-/feast-ui-0.30.0.tgz#6c68b243d65f8a3a1df029a39f4c382d17a4b272" + integrity sha1-bGiyQ9Zfijod8Cmjn0w4LReksnI= dependencies: "@elastic/datemath" "^5.0.3" "@elastic/eui" "^55.0.1"