Skip to content

Commit

Permalink
Add native hdfs support for gz (#128)
Browse files Browse the repository at this point in the history
* use native support for gz

* add readline for hdfs
  • Loading branch information
yupbank authored and menshikh-iv committed Aug 22, 2017
1 parent 3f90172 commit 6a24f99
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 3 deletions.
12 changes: 11 additions & 1 deletion smart_open/smart_open_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,14 +492,24 @@ def __init__(self, parsed_uri):
if parsed_uri.scheme not in ("hdfs"):
raise TypeError("can only process HDFS files")
self.parsed_uri = parsed_uri
self._readline_iter = None

def __iter__(self):
hdfs = subprocess.Popen(["hdfs", "dfs", "-cat", self.parsed_uri.uri_path], stdout=subprocess.PIPE)
hdfs = subprocess.Popen(["hdfs", "dfs", '-text', self.parsed_uri.uri_path], stdout=subprocess.PIPE)
return hdfs.stdout

def read(self, size=None):
raise NotImplementedError("read() not implemented yet")

def readline(self):
if self._readline_iter is None:
self._readline_iter = self.__iter__()
try:
return next(self._readline_iter)
except StopIteration:
# When readline runs out of data, it just returns an empty string
return ''

def seek(self, offset, whence=None):
raise NotImplementedError("seek() not implemented yet")

Expand Down
4 changes: 2 additions & 2 deletions smart_open/tests/test_smart_open.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,12 @@ def test_hdfs(self, mock_subprocess):
smart_open_object = smart_open.HdfsOpenRead(smart_open.ParseUri("hdfs:///tmp/test.txt"))
smart_open_object.__iter__()
# called with the correct params?
mock_subprocess.Popen.assert_called_with(["hdfs", "dfs", "-cat", "/tmp/test.txt"], stdout=mock_subprocess.PIPE)
mock_subprocess.Popen.assert_called_with(["hdfs", "dfs", "-text", "/tmp/test.txt"], stdout=mock_subprocess.PIPE)

# second possibility of schema
smart_open_object = smart_open.HdfsOpenRead(smart_open.ParseUri("hdfs://tmp/test.txt"))
smart_open_object.__iter__()
mock_subprocess.Popen.assert_called_with(["hdfs", "dfs", "-cat", "/tmp/test.txt"], stdout=mock_subprocess.PIPE)
mock_subprocess.Popen.assert_called_with(["hdfs", "dfs", "-text", "/tmp/test.txt"], stdout=mock_subprocess.PIPE)

@responses.activate
def test_webhdfs(self):
Expand Down

0 comments on commit 6a24f99

Please sign in to comment.