Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add getContentSummary support #125

Merged
merged 4 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions python/hdfs_native/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,9 @@ def set_replication(self, path: str, replication: int) -> bool:
Sets the replication for file at `path` to `replication`
"""
return self.inner.set_replication(path, replication)

def get_content_summary(self, path: str) -> "ContentSummary":
"""
Gets a content summary for `path`
"""
return self.inner.get_content_summary(path)
9 changes: 9 additions & 0 deletions python/hdfs_native/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ class FileStatus:
replication: Optional[int]
blocksize: Optional[int]

class ContentSummary:
length: int
file_count: int
directory_count: int
quota: int
space_consumed: int
space_quota: int

class WriteOptions:
block_size: Optional[int]
replication: Optional[int]
Expand Down Expand Up @@ -64,3 +72,4 @@ class RawClient:
) -> None: ...
def set_permission(self, path: str, permission: int) -> None: ...
def set_replication(self, path: str, replication: int) -> bool: ...
def get_content_summary(self, path) -> ContentSummary: ...
17 changes: 17 additions & 0 deletions python/hdfs_native/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,23 @@ def touch(self, path: str, truncate=True, **kwargs):
now = int(time.time() * 1000)
self.client.set_times(path, now, now)

def du(
self,
path: str,
total=True,
maxdepth: Optional[int] = None,
withdirs=False,
**kwargs,
) -> Union[int, Dict[str, int]]:
if total:
if maxdepth is not None:
raise NotImplementedError("maxdepth is not supported with total")

content_summary = self.client.get_content_summary(path)
return content_summary.length
else:
return super().du(path, total, maxdepth, withdirs, **kwargs)

def mkdir(self, path: str, create_parents=True, **kwargs):
self.client.mkdirs(
self._strip_protocol(path),
Expand Down
31 changes: 31 additions & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use ::hdfs_native::{
Client,
};
use bytes::Bytes;
use hdfs_native::client::ContentSummary;
use pyo3::{exceptions::PyRuntimeError, prelude::*};
use tokio::runtime::Runtime;

Expand Down Expand Up @@ -72,6 +73,29 @@ impl PyFileStatusIter {
}
}

#[pyclass(get_all, frozen, name = "ContentSummary")]
struct PyContentSummary {
length: u64,
file_count: u64,
directory_count: u64,
quota: u64,
space_consumed: u64,
space_quota: u64,
}

impl From<ContentSummary> for PyContentSummary {
fn from(value: ContentSummary) -> Self {
Self {
length: value.length,
file_count: value.file_count,
directory_count: value.directory_count,
quota: value.quota,
space_consumed: value.space_consumed,
space_quota: value.space_quota,
}
}
}

#[pyclass]
struct RawFileReader {
inner: FileReader,
Expand Down Expand Up @@ -278,6 +302,13 @@ impl RawClient {
.rt
.block_on(self.inner.set_replication(path, replication))?)
}

pub fn get_content_summary(&self, path: &str) -> PyHdfsResult<PyContentSummary> {
Ok(self
.rt
.block_on(self.inner.get_content_summary(path))?
.into())
}
}

/// A Python module implemented in Rust.
Expand Down
16 changes: 16 additions & 0 deletions python/tests/test_fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ def test_listing(fs: HdfsFileSystem):
assert listing[0]["name"] == "/testdir"
assert listing[0]["type"] == "directory"

fs.rm("/testdir", True)


def test_parsing(minidfs: str):
with fsspec.open(f"{minidfs}/test", "wb") as f:
Expand All @@ -85,3 +87,17 @@ def test_parsing(minidfs: str):
assert urlpath == "/path"

assert fs.unstrip_protocol("/path") == f"{minidfs}/path"


def test_du(fs: HdfsFileSystem):
with fs.open("/test", mode="wb") as file:
file.write(b"hello there")

with fs.open("/test2", mode="wb") as file:
file.write(b"hello again")

assert fs.du("/test") == 11
assert fs.du("/test2") == 11
assert fs.du("/") == 22

assert fs.du("/", total=False) == {"/test": 11, "/test2": 11}
5 changes: 5 additions & 0 deletions python/tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,9 @@ def test_integration(minidfs: str):
file_info = client.get_file_info("/testfile")
assert file_info.replication == 2

content_summary = client.get_content_summary("/")
assert content_summary.file_count == 1
assert content_summary.directory_count == 1
assert content_summary.length == 33 * 1024 * 1024 * 4

client.delete("/testfile", False)
37 changes: 36 additions & 1 deletion rust/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::hdfs::protocol::NamenodeProtocol;
use crate::hdfs::proxy::NameServiceProxy;
use crate::proto::hdfs::hdfs_file_status_proto::FileType;

use crate::proto::hdfs::HdfsFileStatusProto;
use crate::proto::hdfs::{ContentSummaryProto, HdfsFileStatusProto};

#[derive(Clone)]
pub struct WriteOptions {
Expand Down Expand Up @@ -475,6 +475,18 @@ impl Client {

Ok(result)
}

/// Gets a content summary for a file or directory rooted at `path
pub async fn get_content_summary(&self, path: &str) -> Result<ContentSummary> {
let (link, resolved_path) = self.mount_table.resolve(path);
let result = link
.protocol
.get_content_summary(&resolved_path)
.await?
.summary;

Ok(result.into())
}
}

impl Default for Client {
Expand Down Expand Up @@ -664,6 +676,29 @@ impl FileStatus {
}
}

#[derive(Debug)]
pub struct ContentSummary {
pub length: u64,
pub file_count: u64,
pub directory_count: u64,
pub quota: u64,
pub space_consumed: u64,
pub space_quota: u64,
}

impl From<ContentSummaryProto> for ContentSummary {
fn from(value: ContentSummaryProto) -> Self {
ContentSummary {
length: value.length,
file_count: value.file_count,
directory_count: value.directory_count,
quota: value.quota,
space_consumed: value.space_consumed,
space_quota: value.space_quota,
}
}
}

#[cfg(test)]
mod test {
use std::{
Expand Down
23 changes: 23 additions & 0 deletions rust/src/hdfs/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,29 @@ impl NamenodeProtocol {
debug!("setReplication response: {:?}", &decoded);
Ok(decoded)
}

pub(crate) async fn get_content_summary(
&self,
path: &str,
) -> Result<hdfs::GetContentSummaryResponseProto> {
let message = hdfs::GetContentSummaryRequestProto {
path: path.to_string(),
};

debug!("getContentSummary request: {:?}", &message);

let response = self
.proxy
.call(
"getContentSummary",
message.encode_length_delimited_to_vec(),
)
.await?;

let decoded = hdfs::GetContentSummaryResponseProto::decode_length_delimited(response)?;
debug!("getContentSummary response: {:?}", &decoded);
Ok(decoded)
}
}

impl Drop for NamenodeProtocol {
Expand Down
23 changes: 23 additions & 0 deletions rust/tests/test_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ mod test {
test_set_owner(&client).await?;
test_set_permission(&client).await?;
test_set_replication(&client).await?;
test_get_content_summary(&client).await?;

Ok(())
}
Expand Down Expand Up @@ -426,4 +427,26 @@ mod test {

Ok(())
}

async fn test_get_content_summary(client: &Client) -> Result<()> {
let mut file1 = client.create("/test", WriteOptions::default()).await?;

file1.write(vec![0, 1, 2, 3].into()).await?;
file1.close().await?;

let mut file2 = client.create("/test2", WriteOptions::default()).await?;

file2.write(vec![0, 1, 2, 3, 4, 5].into()).await?;
file2.close().await?;

client.mkdirs("/testdir", 0o755, true).await?;

let content_summary = client.get_content_summary("/").await?;
assert_eq!(content_summary.file_count, 3,);
assert_eq!(content_summary.directory_count, 2);
// Test file plus the two we made above
assert_eq!(content_summary.length, TEST_FILE_INTS as u64 * 4 + 4 + 6);

Ok(())
}
}
Loading