Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FileReader implement Read Trait for The case like reading orc file #189

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 58 additions & 5 deletions rust/src/file.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::io::{ErrorKind, Read};
use std::sync::Arc;
use std::time::Duration;

Expand All @@ -15,7 +16,7 @@ use crate::{HdfsError, Result};

const COMPLETE_RETRY_DELAY_MS: u64 = 500;
const COMPLETE_RETRIES: u32 = 5;

#[derive(Clone)]
pub struct FileReader {
protocol: Arc<NamenodeProtocol>,
status: hdfs::HdfsFileStatusProto,
Expand Down Expand Up @@ -111,10 +112,22 @@ impl FileReader {
/// Panics if the requested range is outside of the file
pub async fn read_range_buf(&self, mut buf: &mut [u8], offset: usize) -> Result<()> {
let mut stream = self.read_range_stream(offset, buf.len()).boxed();
while let Some(bytes) = stream.next().await.transpose()? {
buf.put(bytes);
println!("read_range_buf: start");
loop {
match stream.next().await.transpose() {
Ok(Some(bytes)) => {
println!("read_range_buf:while start");

buf.put(bytes);

println!("read_range_buf:while end");
},
Ok(None) => break,
Err(e) => { println!("read_range_buf:err:{:?}",&e); break;},
}

}

println!("read_range_buf: end");
Ok(())
}

Expand All @@ -127,7 +140,7 @@ impl FileReader {
len: usize,
) -> impl Stream<Item = Result<Bytes>> {
if offset + len > self.file_length() {
panic!("Cannot read past end of the file");
panic!("offset={} , len={} , file_length={}, Cannot read past end of the file", offset, len, self.file_length());
}

let block_streams: Vec<BoxStream<Result<Bytes>>> = self
Expand All @@ -138,6 +151,8 @@ impl FileReader {
let block_file_start = block.offset as usize;
let block_file_end = block_file_start + block.b.num_bytes() as usize;

println!("block_file_start={}, block_file_end={}", &block_file_start, &block_file_end);

if block_file_start < (offset + len) && block_file_end > offset {
// We need to read this block
let block_start = offset - usize::min(offset, block_file_start);
Expand All @@ -160,6 +175,44 @@ impl FileReader {
}
}

impl Read for FileReader {

fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let mut err:std::io::Error= std::io::Error::new(ErrorKind::Other, "error");
let mut success =true;
let mut read_bytes:usize=0;
futures::executor::block_on(async {


if self.position + buf.len() >= self.file_length() {
let buf_len = self.file_length() - self.position as usize;
match self.read_buf(&mut buf[..buf_len]).await {
Ok(r) => {read_bytes=r;},
Err(e) =>{
success=false;
err =std::io::Error::new(ErrorKind::Other, e);
}
}
} else {

match self.read_buf(buf).await {
Ok(r) => {read_bytes=r;},
Err(e) =>{
success=false;
err =std::io::Error::new(ErrorKind::Other, e);
}
}
}
});
if success{
Ok(read_bytes)
}else{
Err(err)
}
}
}


pub struct FileWriter {
src: String,
protocol: Arc<NamenodeProtocol>,
Expand Down