Skip to content

Commit

Permalink
Batur/enable multiple connections (modin-project#71) [not to be upstr…
Browse files Browse the repository at this point in the history
…eamed]
  • Loading branch information
batur-ponder authored and vnlitvinov committed Mar 16, 2023
1 parent b8f534c commit 1db7fcb
Showing 1 changed file with 13 additions and 10 deletions.
23 changes: 13 additions & 10 deletions modin/core/execution/client/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class ClientIO(BaseIO):
"""Factory providing methods for performing I/O operations using a given Client as the execution engine."""

_server_conn = None
_data_conn = None
_data_conn = []
query_compiler_cls = ClientQueryCompiler

@classmethod
Expand All @@ -35,10 +35,14 @@ def set_server_connection(cls, conn):
conn : Any
Connection object that implements various methods.
"""
cls._server_conn = conn
if cls._server_conn is not None:
if cls._server_conn != conn:
raise PonderReinitError("Ponder Connection already initialized.")
else:
cls._server_conn = conn

@classmethod
def set_data_connection(cls, conn):
def add_data_connection(cls, conn):
"""
Set the data connection for the I/O object.
Expand All @@ -47,7 +51,7 @@ def set_data_connection(cls, conn):
conn : Any
Connection object that is implementation specific.
"""
cls._data_conn = conn
cls._data_conn.append(conn)

@classmethod
def read_csv(cls, filepath_or_buffer, **kwargs):
Expand Down Expand Up @@ -78,8 +82,10 @@ def read_csv(cls, filepath_or_buffer, **kwargs):
raise ConnectionError(
"Missing server connection, did you initialize the connection?"
)
# with pushdown - the csv file has to move to some database. Assume for now
# that it is the first database the user connected to.
return cls.query_compiler_cls(
cls._server_conn.read_csv(cls._data_conn, filepath_or_buffer, **kwargs)
cls._server_conn.read_csv(cls._data_conn[0], filepath_or_buffer, **kwargs)
)

@classmethod
Expand Down Expand Up @@ -109,15 +115,12 @@ def read_sql(cls, sql, con, **kwargs):
raise ConnectionError(
"Cannot connect with parameter 'auto' because connection is not set. Did you initialize it?"
)
if cls._data_conn is None:
cls._data_conn = con
if cls._server_conn is None:
raise ConnectionError(
"Missing server connection, did you initialize the connection?"
)
return cls.query_compiler_cls(
cls._server_conn.read_sql(sql, cls._data_conn, **kwargs)
)
cls._data_conn.append(con)
return cls.query_compiler_cls(cls._server_conn.read_sql(sql, con, **kwargs))

@classmethod
def to_sql(cls, qc, **kwargs):
Expand Down

0 comments on commit 1db7fcb

Please sign in to comment.