Skip to content

Commit 2761a19

Browse files
WweiLHyukjinKwon
authored andcommitted
[SPARK-42951][SS][CONNECT] DataStreamReader APIs
### What changes were proposed in this pull request? This PR adds the `orc`, `parquet`, and `text` APIs in connect's DataStreamReader ### Why are the changes needed? Part of Streaming Connect project. ### Does this PR introduce _any_ user-facing change? Yes, now the three APIs are enabled. But everything is pretty much still under developed so far. ### How was this patch tested? Manually tested, unit tests will be added in SPARK-43031 as a follow-up PR #40691. Closes #40689 from WweiL/SPARK-42951-reader-apis. Authored-by: Wei Liu <wei.liu@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
1 parent ab90ccc commit 2761a19

File tree

1 file changed

+70
-4
lines changed

1 file changed

+70
-4
lines changed

python/pyspark/sql/connect/streaming/readwriter.py

Lines changed: 70 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -168,9 +168,75 @@ def json(
168168

169169
json.__doc__ = PySparkDataStreamReader.json.__doc__
170170

171-
# def orc() TODO
172-
# def parquet() TODO
173-
# def text() TODO
171+
def orc(
172+
self,
173+
path: str,
174+
mergeSchema: Optional[bool] = None,
175+
pathGlobFilter: Optional[Union[bool, str]] = None,
176+
recursiveFileLookup: Optional[Union[bool, str]] = None,
177+
) -> "DataFrame":
178+
self._set_opts(
179+
mergeSchema=mergeSchema,
180+
pathGlobFilter=pathGlobFilter,
181+
recursiveFileLookup=recursiveFileLookup,
182+
)
183+
if isinstance(path, str):
184+
return self.load(path=path, format="orc")
185+
else:
186+
raise TypeError("path can be only a single string")
187+
188+
orc.__doc__ = PySparkDataStreamReader.orc.__doc__
189+
190+
def parquet(
191+
self,
192+
path: str,
193+
mergeSchema: Optional[bool] = None,
194+
pathGlobFilter: Optional[Union[bool, str]] = None,
195+
recursiveFileLookup: Optional[Union[bool, str]] = None,
196+
datetimeRebaseMode: Optional[Union[bool, str]] = None,
197+
int96RebaseMode: Optional[Union[bool, str]] = None,
198+
) -> "DataFrame":
199+
self._set_opts(
200+
mergeSchema=mergeSchema,
201+
pathGlobFilter=pathGlobFilter,
202+
recursiveFileLookup=recursiveFileLookup,
203+
datetimeRebaseMode=datetimeRebaseMode,
204+
int96RebaseMode=int96RebaseMode,
205+
)
206+
self._set_opts(
207+
mergeSchema=mergeSchema,
208+
pathGlobFilter=pathGlobFilter,
209+
recursiveFileLookup=recursiveFileLookup,
210+
datetimeRebaseMode=datetimeRebaseMode,
211+
int96RebaseMode=int96RebaseMode,
212+
)
213+
if isinstance(path, str):
214+
return self.load(path=path, format="parquet")
215+
else:
216+
raise TypeError("path can be only a single string")
217+
218+
parquet.__doc__ = PySparkDataStreamReader.parquet.__doc__
219+
220+
def text(
221+
self,
222+
path: str,
223+
wholetext: bool = False,
224+
lineSep: Optional[str] = None,
225+
pathGlobFilter: Optional[Union[bool, str]] = None,
226+
recursiveFileLookup: Optional[Union[bool, str]] = None,
227+
) -> "DataFrame":
228+
self._set_opts(
229+
wholetext=wholetext,
230+
lineSep=lineSep,
231+
pathGlobFilter=pathGlobFilter,
232+
recursiveFileLookup=recursiveFileLookup,
233+
)
234+
if isinstance(path, str):
235+
return self.load(path=path, format="text")
236+
else:
237+
raise TypeError("path can be only a single string")
238+
239+
text.__doc__ = PySparkDataStreamReader.text.__doc__
174240

175241
def csv(
176242
self,
@@ -245,7 +311,7 @@ def csv(
245311

246312
csv.__doc__ = PySparkDataStreamReader.csv.__doc__
247313

248-
# def table() TODO. Use Read(table_name) relation.
314+
# def table() TODO(SPARK-43042). Use Read(table_name) relation.
249315

250316

251317
DataStreamReader.__doc__ = PySparkDataStreamReader.__doc__

0 commit comments

Comments
 (0)