Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle MySQL Deadlocks #52

Open
wants to merge 2 commits into
base: 0.28-affirm
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import absolute_import
import threading
import logging
import time

from enum import Enum
from importlib import import_module
Expand All @@ -20,6 +21,10 @@
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel

MYSQL_DEADLOCK_ERR = 1213
MYSQL_WRITE_RETRIES = 3
MYSQL_READ_RETRIES = 3


class ConnectionType(Enum):
RAW = 0
Expand Down Expand Up @@ -71,8 +76,8 @@ def _get_conn(self, config: RepoConfig) -> Union[Connection, ConnectionType]:

if online_store_config.session_manager_module:
return (
self._get_conn_session_manager(session_manager_module=online_store_config.session_manager_module),
ConnectionType.SESSION
self._get_conn_session_manager(session_manager_module=online_store_config.session_manager_module),
ConnectionType.SESSION
)

if not hasattr(self._tls, 'conn') or not self._tls.conn.open:
Expand All @@ -96,14 +101,37 @@ def _close_conn(self, conn: Connection, conn_type: ConnectionType) -> None:
if str(exc) != 'Already closed':
raise exc

def _execute_query_with_retry(self, cur: Cursor,
conn: Connection,
query: str,
values: Union[List, Tuple],
retries: int,
progress=None
) -> bool:
for _ in range(retries):
try:
cur.execute(query, values)
conn.commit()
if progress:
progress(1)
return True
except pymysql.Error as e:
if e.args[0] == MYSQL_DEADLOCK_ERR:
time.sleep(0.5)
else:
conn.rollback()
logging.error("Error %d: %s" % (e.args[0], e.args[1]))
return False
return False

def online_write_batch(
self,
config: RepoConfig,
table: FeatureView,
data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]],
self,
config: RepoConfig,
table: FeatureView,
data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]],
) -> None:
raw_conn, conn_type = self._get_conn(config)
conn = raw_conn.connection if conn_type == ConnectionType.SESSION else raw_conn
Expand All @@ -120,54 +148,48 @@ def online_write_batch(
created_ts = _to_naive_utc(created_ts)

rows_to_insert = [(entity_key_bin, feature_name, val.SerializeToString(), timestamp, created_ts)
for feature_name, val in values.items()]
for feature_name, val in values.items()]
value_formatters = ', '.join(['(%s, %s, %s, %s, %s)'] * len(rows_to_insert))
cur.execute(
f"""
query = f"""
INSERT INTO {_table_id(project, table)}
(entity_key, feature_name, value, event_ts, created_ts)
VALUES {value_formatters}
ON DUPLICATE KEY UPDATE
value = VALUES(value),
event_ts = VALUES(event_ts),
created_ts = VALUES(created_ts)
""",
[item for row in rows_to_insert for item in row]
)
try:
conn.commit()
if progress:
progress(1)
except pymysql.Error as e:
conn.rollback()
logging.error("Error %d: %s" % (e.args[0], e.args[1]))
"""
query_values = [item for row in rows_to_insert for item in row]
self._execute_query_with_retry(cur=cur,
conn=conn,
query=query,
values=query_values,
retries=MYSQL_WRITE_RETRIES)
self._close_conn(raw_conn, conn_type)

def online_read(
self,
config: RepoConfig,
table: FeatureView,
entity_keys: List[EntityKeyProto],
_: Optional[List[str]] = None,
self,
config: RepoConfig,
table: FeatureView,
entity_keys: List[EntityKeyProto],
_: Optional[List[str]] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
raw_conn, conn_type = self._get_conn(config)
conn = raw_conn.connection if conn_type == ConnectionType.SESSION else raw_conn
with conn.cursor() as cur:
result: List[Tuple[Optional[datetime], Optional[Dict[str, Any]]]] = []
project = config.project
for entity_key in entity_keys:
try:
entity_key_bin = serialize_entity_key(
entity_key,
entity_key_serialization_version=2,
).hex()

cur.execute(
f"SELECT feature_name, value, event_ts FROM {_table_id(project, table)} WHERE entity_key = %s",
(entity_key_bin,),
)
conn.commit()

entity_key_bin = serialize_entity_key(
entity_key,
entity_key_serialization_version=2,
).hex()
query = f"SELECT feature_name, value, event_ts FROM {_table_id(project, table)} WHERE entity_key = %s"
if self._execute_query_with_retry(cur=cur,
conn=conn,
query=query,
values=(entity_key_bin,),
retries=MYSQL_READ_RETRIES):
res = {}
res_ts: Optional[datetime] = None
records = cur.fetchall()
Expand All @@ -182,20 +204,19 @@ def online_read(
result.append((None, None))
else:
result.append((res_ts, res))
except pymysql.Error as e:
conn.rollback()
logging.error("Error %d: %s" % (e.args[0], e.args[1]))
else:
logging.error(f'Skipping read for (entity, table)): ({entity_key}, {_table_id(project, table)})')
self._close_conn(raw_conn, conn_type)
return result

def update(
self,
config: RepoConfig,
tables_to_delete: Sequence[FeatureView],
tables_to_keep: Sequence[FeatureView],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
partial: bool,
self,
config: RepoConfig,
tables_to_delete: Sequence[FeatureView],
tables_to_keep: Sequence[FeatureView],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
partial: bool,
) -> None:
raw_conn, conn_type = self._get_conn(config)
conn = raw_conn.connection if conn_type == ConnectionType.SESSION else raw_conn
Expand Down Expand Up @@ -241,10 +262,10 @@ def update(
self._close_conn(raw_conn, conn_type)

def teardown(
self,
config: RepoConfig,
tables: Sequence[FeatureView],
entities: Sequence[Entity],
self,
config: RepoConfig,
tables: Sequence[FeatureView],
entities: Sequence[Entity],
) -> None:
raw_conn, conn_type = self._get_conn(config)
conn = raw_conn.connection if conn_type == ConnectionType.SESSION else raw_conn
Expand All @@ -260,6 +281,7 @@ def teardown(
"" % (e.args[0], e.args[1]))
self._close_conn(raw_conn, conn_type)


def _drop_table_and_index(cur: Cursor, project: str, table: FeatureView) -> None:
table_name = _table_id(project, table)
cur.execute(f"DROP INDEX {table_name}_ek ON {table_name};")
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/ui/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"@elastic/datemath": "^5.0.3",
"@elastic/eui": "^55.0.1",
"@emotion/react": "^11.9.0",
"@feast-dev/feast-ui": "0.29.0",
"@feast-dev/feast-ui": "0.30.0",
"@testing-library/jest-dom": "^5.16.4",
"@testing-library/react": "^13.2.0",
"@testing-library/user-event": "^13.5.0",
Expand Down
8 changes: 4 additions & 4 deletions sdk/python/feast/ui/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1300,10 +1300,10 @@
minimatch "^3.1.2"
strip-json-comments "^3.1.1"

"@feast-dev/feast-ui@0.29.0":
version "0.29.0"
resolved "https://registry.yarnpkg.com/@feast-dev/feast-ui/-/feast-ui-0.29.0.tgz#b78070b51c3f83b2b823946b64fea4f223820429"
integrity sha512-XF/C3CcLmQTAUV9vHbW37BEACoNXXbUaMUoWPIJMrZvW6IStoVUlBuA4bx995XSE4gUcZ7j/5SmrOUAAlanL9Q==
"@feast-dev/feast-ui@0.30.0":
version "0.30.0"
resolved "https://registry.affirm-stage.com/artifactory/api/npm/npm/@feast-dev/feast-ui/-/feast-ui-0.30.0.tgz#6c68b243d65f8a3a1df029a39f4c382d17a4b272"
integrity sha1-bGiyQ9Zfijod8Cmjn0w4LReksnI=
dependencies:
"@elastic/datemath" "^5.0.3"
"@elastic/eui" "^55.0.1"
Expand Down