Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Bump psycopg2 to psycopg3 for all Postgres components #1

Closed
wants to merge 20 commits into from

Conversation

job-almekinders
Copy link
Owner

@job-almekinders job-almekinders commented Jun 21, 2024

What this PR does / why we need it:

This PR upgrades the psycopg2 dependency to the newer psycopg3 dependency. See here for more information on the differences between the two versions.

This is the 1st out of 2 PRs, required to enable async feature retrieval for the Postgres Online Store.

While here:

Additional remarks

The changes in this commit are related to the linter. In psycopg3, stricter type hints on the Cursor object require handling cases where cursor.description might be None. Although psycopg2 could also return None for this, it wasn't previously accounted for.

Which issue(s) this PR fixes:

1st out of 2 PRs required to fix feast-dev#4260

Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
Set connection read only

Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
Addition

Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
Use new ConnectionPool

Pass kwargs as named argument

Use executemany over execute_values

Remove not-required open argument in psycopg.connect

Improve

Use SpooledTemporaryFile

Use max_size and add docstring

Properly write with StringIO

Utils: Use SpooledTemporaryFile over StringIO object

Add replace

Fix df_to_postgres_table

Remove import

Utils

Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
Copy link
Owner Author

@job-almekinders job-almekinders left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some clarifications from my side!

@@ -64,57 +75,56 @@ def online_write_batch(
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]],
batch_size: int = 5000,
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make configurable, addressing feast-dev#4036

Comment on lines +80 to +119
# Format insert values
insert_values = []
for entity_key, values, timestamp, created_ts in data:
entity_key_bin = serialize_entity_key(
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
)
timestamp = _to_naive_utc(timestamp)
if created_ts is not None:
created_ts = _to_naive_utc(created_ts)

with self._get_conn(config) as conn, conn.cursor() as cur:
insert_values = []
for entity_key, values, timestamp, created_ts in data:
entity_key_bin = serialize_entity_key(
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
)
timestamp = _to_naive_utc(timestamp)
if created_ts is not None:
created_ts = _to_naive_utc(created_ts)

for feature_name, val in values.items():
vector_val = None
if config.online_store.pgvector_enabled:
vector_val = get_list_val_str(val)
insert_values.append(
(
entity_key_bin,
feature_name,
val.SerializeToString(),
vector_val,
timestamp,
created_ts,
)
for feature_name, val in values.items():
vector_val = None
if config.online_store.pgvector_enabled:
vector_val = get_list_val_str(val)
insert_values.append(
(
entity_key_bin,
feature_name,
val.SerializeToString(),
vector_val,
timestamp,
created_ts,
)
# Control the batch so that we can update the progress
batch_size = 5000
)

# Create insert query
sql_query = sql.SQL(
"""
INSERT INTO {}
(entity_key, feature_name, value, vector_value, event_ts, created_ts)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (entity_key, feature_name) DO
UPDATE SET
value = EXCLUDED.value,
vector_value = EXCLUDED.vector_value,
event_ts = EXCLUDED.event_ts,
created_ts = EXCLUDED.created_ts;
"""
).format(sql.Identifier(_table_id(config.project, table)))
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No changes here, only moving code further up in the function to make it more readable.

"""
INSERT INTO {}
(entity_key, feature_name, value, vector_value, event_ts, created_ts)
VALUES (%s, %s, %s, %s, %s, %s)
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 out of 2 actual changes to the function:

We need to explicitly set the number of placeholder values.

cur_batch,
page_size=batch_size,
)
cur.executemany(sql_query, cur_batch)
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 out of 2 actual changes to the function:

The psycopg2.extras.execute_values functionality is removed in psycopg3. The maintainer of psycopg3 advices to use executemany. See psycopg/psycopg#576 and psycopg/psycopg#114

@@ -172,7 +182,9 @@ def online_read(
# when we iterate through the keys since they are in the correct order
values_dict = defaultdict(list)
for row in rows if rows is not None else []:
values_dict[row[0].tobytes()].append(row[1:])
values_dict[
row[0] if isinstance(row[0], bytes) else row[0].tobytes()
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only call tobytes() when row[0] is not already of bytes type. Otherwise, this will result in Errors.

Comment on lines +35 to +56
def _get_conninfo(config: PostgreSQLConfig) -> str:
"""Get the `conninfo` argument required for connection objects."""
return (
f"postgresql://{config.user}"
f":{config.password}"
f"@{config.host}"
f":{int(config.port)}"
f"/{config.database}"
)


def _get_conn_kwargs(config: PostgreSQLConfig) -> Dict[str, Any]:
"""Get the additional `kwargs` required for connection objects."""
return {
"sslmode": config.sslmode,
"sslkey": config.sslkey_path,
"sslcert": config.sslcert_path,
"sslrootcert": config.sslrootcert_path,
"options": "-c search_path={}".format(config.db_schema or config.user),
}


Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Helper functions to prevent code duplication in the above methods.

Comment on lines +75 to +82
nr_columns = df.shape[1]
placeholders = ", ".join(["%s"] * nr_columns)
query = f"INSERT INTO {table_name} VALUES ({placeholders})"
values = df.replace({np.NaN: None}).to_numpy().tolist()

with _get_conn(config) as conn, conn.cursor() as cur:
cur.execute(_df_to_create_table_sql(df, table_name))
psycopg2.extras.execute_values(
cur,
f"""
INSERT INTO {table_name}
VALUES %s
""",
df.replace({np.NaN: None}).to_numpy(),
)
cur.executemany(query, values)
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the parsing of variables further to the top of the function.

  1. Again, we need to replace execute_values by executemany.
  2. Again, we need to explicitly set the number of placeholders. Since this function should be able to handle a dynamic amount of columns, we use the placeholders variable

Comment on lines +42 to +46
@pytest.mark.parametrize(
"conn_type",
[ConnectionType.singleton, ConnectionType.pool],
ids=lambda v: f"conn_type:{v}",
)
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test both ConnectionTypes

@@ -149,7 +149,7 @@ def pg_registry():

registry_config = RegistryConfig(
registry_type="sql",
path=f"postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{container_host}:{container_port}/{POSTGRES_DB}",
path=f"postgresql+psycopg://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{container_host}:{container_port}/{POSTGRES_DB}",
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Required to instruct that we are using psycopg3. If we don't add this, it will try to still use psycopg2.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate why? In picnic-feature-store we do not have this issue and we don't need the psycopg part. We shouldn't still have psycopg2 in the venv anymore, right?

Copy link

@TomSteenbergen TomSteenbergen Jun 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or is it some weird behaviour by SQLAlchemy's create_engine, which gets called in the init of SQLRegistry?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or is it some weird behaviour by SQLAlchemy's create_engine, which gets called in the init of SQLRegistry?

Yes, it is because of the create_engine function. I'll add a comment in the code 👍

Copy link

@TomSteenbergen TomSteenbergen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, LGTM! Just one question

@TomSteenbergen
Copy link

I notice one mention of psycopg2 btw in docs/tutorials/using-scalable-registry.md. Shall we get rid of the 2 there as well?

Add log statement

Lint: Fix _to_arrow_internal

Lint: Fix _get_entity_df_event_timestamp_range

Update exception

Use ZeroColumnQueryResult

Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
@job-almekinders
Copy link
Owner Author

I notice one mention of psycopg2 btw in docs/tutorials/using-scalable-registry.md. Shall we get rid of the 2 there as well?

Good catch! I've removed it :)

@job-almekinders
Copy link
Owner Author

PR on feast repo: feast-dev#4303

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants