diff --git a/rust/src/file.rs b/rust/src/file.rs index 1d2f68c..6abbd41 100644 --- a/rust/src/file.rs +++ b/rust/src/file.rs @@ -1,3 +1,4 @@ +use std::io::{ErrorKind, Read}; use std::sync::Arc; use std::time::Duration; @@ -15,7 +16,7 @@ use crate::{HdfsError, Result}; const COMPLETE_RETRY_DELAY_MS: u64 = 500; const COMPLETE_RETRIES: u32 = 5; - +#[derive(Clone)] pub struct FileReader { protocol: Arc, status: hdfs::HdfsFileStatusProto, @@ -111,10 +112,22 @@ impl FileReader { /// Panics if the requested range is outside of the file pub async fn read_range_buf(&self, mut buf: &mut [u8], offset: usize) -> Result<()> { let mut stream = self.read_range_stream(offset, buf.len()).boxed(); - while let Some(bytes) = stream.next().await.transpose()? { - buf.put(bytes); + println!("read_range_buf: start"); + loop { + match stream.next().await.transpose() { + Ok(Some(bytes)) => { + println!("read_range_buf:while start"); + + buf.put(bytes); + + println!("read_range_buf:while end"); + }, + Ok(None) => break, + Err(e) => { println!("read_range_buf:err:{:?}",&e); break;}, + } + } - + println!("read_range_buf: end"); Ok(()) } @@ -127,7 +140,7 @@ impl FileReader { len: usize, ) -> impl Stream> { if offset + len > self.file_length() { - panic!("Cannot read past end of the file"); + panic!("offset={} , len={} , file_length={}, Cannot read past end of the file", offset, len, self.file_length()); } let block_streams: Vec>> = self @@ -138,6 +151,8 @@ impl FileReader { let block_file_start = block.offset as usize; let block_file_end = block_file_start + block.b.num_bytes() as usize; + println!("block_file_start={}, block_file_end={}", &block_file_start, &block_file_end); + if block_file_start < (offset + len) && block_file_end > offset { // We need to read this block let block_start = offset - usize::min(offset, block_file_start); @@ -160,6 +175,44 @@ impl FileReader { } } +impl Read for FileReader { + + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + let mut err:std::io::Error= std::io::Error::new(ErrorKind::Other, "error"); + let mut success =true; + let mut read_bytes:usize=0; + futures::executor::block_on(async { + + + if self.position + buf.len() >= self.file_length() { + let buf_len = self.file_length() - self.position as usize; + match self.read_buf(&mut buf[..buf_len]).await { + Ok(r) => {read_bytes=r;}, + Err(e) =>{ + success=false; + err =std::io::Error::new(ErrorKind::Other, e); + } + } + } else { + + match self.read_buf(buf).await { + Ok(r) => {read_bytes=r;}, + Err(e) =>{ + success=false; + err =std::io::Error::new(ErrorKind::Other, e); + } + } + } + }); + if success{ + Ok(read_bytes) + }else{ + Err(err) + } + } +} + + pub struct FileWriter { src: String, protocol: Arc,