Skip to content

Commit

Permalink
fix: Revert mypy config (#3952)
Browse files Browse the repository at this point in the history
* fetch_arrow_all returns empty table

Signed-off-by: Chester Ong <chester.ong.ch@gmail.com>

* fix spark_kafka_processor typing errors

Signed-off-by: Chester Ong <chester.ong.ch@gmail.com>

* fix correct return type

Signed-off-by: Chester Ong <chester.ong.ch@gmail.com>

* revert _to_arrow_internal

Signed-off-by: Chester Ong <chester.ong.ch@gmail.com>

* revert kafkaStreamProcessor changes, change base type instead

Signed-off-by: Chester Ong <chester.ong.ch@gmail.com>

---------

Signed-off-by: Chester Ong <chester.ong.ch@gmail.com>
  • Loading branch information
bushwhackr authored Feb 20, 2024
1 parent dd79dbb commit 6b8e96c
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 16 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ format-python:
cd ${ROOT_DIR}/sdk/python; python -m black --target-version py38 feast tests

lint-python:
cd ${ROOT_DIR}/sdk/python; python -m mypy --exclude=/tests/ --follow-imports=skip feast
cd ${ROOT_DIR}/sdk/python; python -m mypy feast
cd ${ROOT_DIR}/sdk/python; python -m isort feast/ tests/ --check-only
cd ${ROOT_DIR}/sdk/python; python -m flake8 feast/ tests/
cd ${ROOT_DIR}/sdk/python; python -m black --check feast tests
Expand Down
8 changes: 5 additions & 3 deletions sdk/python/feast/infra/contrib/stream_processor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from abc import ABC, abstractmethod
from types import MethodType
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING, Any, Optional

from pyspark.sql import DataFrame
from typing_extensions import TypeAlias
Expand Down Expand Up @@ -51,7 +51,9 @@ def __init__(
self.data_source = data_source

@abstractmethod
def ingest_stream_feature_view(self, to: PushMode = PushMode.ONLINE) -> None:
def ingest_stream_feature_view(
self, to: PushMode = PushMode.ONLINE
) -> Optional[Any]:
"""
Ingests data from the stream source attached to the stream feature view; transforms the data
and then persists it to the online store and/or offline store, depending on the 'to' parameter.
Expand All @@ -75,7 +77,7 @@ def _construct_transformation_plan(self, table: StreamTable) -> StreamTable:
raise NotImplementedError

@abstractmethod
def _write_stream_data(self, table: StreamTable, to: PushMode) -> None:
def _write_stream_data(self, table: StreamTable, to: PushMode) -> Optional[Any]:
"""
Launches a job to persist stream data to the online store and/or offline store, depending
on the 'to' parameter, and returns a handle for the job.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import Dict, Optional
from typing import Dict, Literal, Optional

import pandas as pd
import pytest
Expand All @@ -12,6 +12,7 @@
PostgreSQLSource,
)
from feast.infra.utils.postgres.connection_utils import df_to_postgres_table
from feast.infra.utils.postgres.postgres_config import PostgreSQLConfig
from tests.integration.feature_repos.universal.data_source_creator import (
DataSourceCreator,
)
Expand All @@ -26,6 +27,10 @@
POSTGRES_DB = "test"


class PostgreSQLOnlineStoreConfig(PostgreSQLConfig):
type: Literal["postgres"] = "postgres"


@pytest.fixture(scope="session")
def postgres_container():
container = (
Expand Down Expand Up @@ -106,17 +111,17 @@ def create_offline_store_config(self) -> PostgreSQLOfflineStoreConfig:
def get_prefixed_table_name(self, suffix: str) -> str:
return f"{self.project_name}_{suffix}"

def create_online_store(self) -> Dict[str, str]:
def create_online_store(self) -> PostgreSQLOnlineStoreConfig:
assert self.container
return {
"type": "postgres",
"host": "localhost",
"port": self.container.get_exposed_port(5432),
"database": POSTGRES_DB,
"db_schema": "feature_store",
"user": POSTGRES_USER,
"password": POSTGRES_PASSWORD,
}
return PostgreSQLOnlineStoreConfig(
type="postgres",
host="localhost",
port=self.container.get_exposed_port(5432),
database=POSTGRES_DB,
db_schema="feature_store",
user=POSTGRES_USER,
password=POSTGRES_PASSWORD,
)

def create_saved_dataset_destination(self):
# FIXME: ...
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/offline_stores/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table:
pa_table = execute_snowflake_statement(
self.snowflake_conn, self.to_sql()
).fetch_arrow_all()
).fetch_arrow_all(force_return_table=False)

if pa_table:
return pa_table
Expand Down

0 comments on commit 6b8e96c

Please sign in to comment.