Skip to content

Commit

Permalink
thread executor
Browse files Browse the repository at this point in the history
  • Loading branch information
ohadmata committed Jun 2, 2024
1 parent d54372d commit 5c87446
Showing 1 changed file with 5 additions and 19 deletions.
24 changes: 5 additions & 19 deletions src/shmessy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from pandas import DataFrame

from .exceptions import exception_router
from .schema import Field, ShmessySchema
from .schema import ShmessySchema
from .types_handler import TypesHandler
from .utils import (
_check_number_of_columns,
Expand Down Expand Up @@ -60,31 +60,17 @@ def get_inferred_schema(self) -> ShmessySchema:
return self.__inferred_schema

def infer_schema(self, df: DataFrame) -> ShmessySchema:
futures: list[Future] = []
_check_number_of_columns(df=df, max_columns_num=self.__max_columns_num)
start_time = time.time()
df = _get_sampled_df(
df=df,
sample_size=self.__sample_size,
random_sample=self.__use_random_sample,
)
with concurrent.futures.ThreadPoolExecutor(
max_workers=self.__max_number_of_workers
) as executor:
for column in df:
futures.append(
executor.submit(
self.__types_handler.infer_field,
field_name=column,
data=df[column].values,
)
)
columns: list[Field] = []
for future in concurrent.futures.as_completed(futures):
if e := future.exception():
raise e
columns.append(future.result())

columns = [
self.__types_handler.infer_field(field_name=column, data=df[column].values)
for column in df
]
infer_duration_ms = int((time.time() - start_time) * 1000)
inferred_schema = ShmessySchema(
columns=columns, infer_duration_ms=infer_duration_ms
Expand Down

0 comments on commit 5c87446

Please sign in to comment.