Skip to content

Commit

Permalink
Move data reading to a new task (#104)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kimahriman authored May 27, 2024
1 parent 20f23c3 commit efcad6f
Showing 1 changed file with 76 additions and 35 deletions.
111 changes: 76 additions & 35 deletions crates/hdfs-native/src/hdfs/block_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@ use futures::{
Stream, StreamExt,
};
use log::{debug, warn};
use tokio::{
sync::mpsc::{self, Receiver, Sender},
task::JoinHandle,
};

use crate::{
ec::EcSchema,
hdfs::connection::{DatanodeConnection, Op, DATANODE_CACHE},
proto::{
common,
hdfs::{self, BlockOpResponseProto},
hdfs::{self, BlockOpResponseProto, PacketHeaderProto, ReadOpChecksumInfoProto},
},
HdfsError, Result,
};
Expand Down Expand Up @@ -99,8 +103,9 @@ struct ReplicatedBlockStream {
offset: usize,
len: usize,

connection: Option<DatanodeConnection>,
checksum_info: Option<hdfs::ReadOpChecksumInfoProto>,
listener: Option<JoinHandle<Result<DatanodeConnection>>>,
sender: Sender<Result<(PacketHeaderProto, Bytes)>>,
receiver: Receiver<Result<(PacketHeaderProto, Bytes)>>,
current_replica: usize,
}

Expand All @@ -111,29 +116,33 @@ impl ReplicatedBlockStream {
offset: usize,
len: usize,
) -> Self {
let (sender, receiver) = mpsc::channel(10);

Self {
protocol,
block,
offset,
len,
connection: None,
checksum_info: None,
listener: None,
sender,
receiver,
current_replica: 0,
}
}

async fn select_next_datanode(&mut self) -> Result<()> {
if self.connection.is_some() {
self.current_replica += 1;
if self.current_replica >= self.block.locs.len() {
return Err(HdfsError::DataTransferError(
"All DataNodes failed".to_string(),
));
}
async fn select_next_datanode(
&mut self,
) -> Result<(DatanodeConnection, Option<ReadOpChecksumInfoProto>)> {
if self.current_replica >= self.block.locs.len() {
return Err(HdfsError::DataTransferError(
"All DataNodes failed".to_string(),
));
}

let datanode = &self.block.locs[self.current_replica].id;

self.current_replica += 1;

let (connection, response) = connect_and_send(
&self.protocol,
datanode,
Expand All @@ -148,48 +157,80 @@ impl ReplicatedBlockStream {
return Err(HdfsError::DataTransferError(response.message().to_string()));
}

self.connection = Some(connection);
self.checksum_info = response.read_op_checksum_info;

Ok(())
Ok((connection, response.read_op_checksum_info))
}

async fn next_packet(&mut self) -> Result<Option<Bytes>> {
if self.connection.is_none() {
self.select_next_datanode().await?;
}

if self.len == 0 {
let mut conn = self.connection.take().unwrap();
let (header, data) = loop {
if self.listener.is_none() {
let (connection, checksum_info) = self.select_next_datanode().await?;
self.listener = Some(Self::start_packet_listener(
connection,
checksum_info,
self.sender.clone(),
));
}

// Read the final empty packet
conn.read_packet().await?;
match self.receiver.recv().await {
Some(Ok(data)) => break data,
Some(Err(e)) => {
// Some error communicating with datanode, log a warning and then retry on a different Datanode
warn!("Error occured while reading from DataNode: {:?}", e);
self.listener = None;
}
None => {
// This means there's a disconnect between the data we are getting back and what we asked for,
// so just raise an error
return Err(HdfsError::DataTransferError(
"Not enough data returned from DataNode".to_string(),
));
}
}
};

conn.send_read_success().await?;
if self.len == 0 {
let conn = self.listener.take().unwrap().await.unwrap()?;
DATANODE_CACHE.release(conn);
return Ok(None);
}

let conn = self.connection.as_mut().unwrap();

let packet = conn.read_packet().await?;

let packet_offset = if self.offset > packet.header.offset_in_block as usize {
self.offset - packet.header.offset_in_block as usize
let packet_offset = if self.offset > header.offset_in_block as usize {
self.offset - header.offset_in_block as usize
} else {
0
};
let packet_len = usize::min(packet.header.data_len as usize - packet_offset, self.len);
let packet_data = packet.get_data(&self.checksum_info)?;
let packet_len = usize::min(header.data_len as usize - packet_offset, self.len);

self.offset += packet_len;
self.len -= packet_len;

Ok(Some(
packet_data.slice(packet_offset..(packet_offset + packet_len)),
data.slice(packet_offset..(packet_offset + packet_len)),
))
}

fn start_packet_listener(
mut connection: DatanodeConnection,
checksum_info: Option<ReadOpChecksumInfoProto>,
sender: Sender<Result<(PacketHeaderProto, Bytes)>>,
) -> JoinHandle<Result<DatanodeConnection>> {
tokio::spawn(async move {
loop {
let packet = connection.read_packet().await?;
let header = packet.header.clone();
let data = packet.get_data(&checksum_info)?;
let empty_packet = data.is_empty();
sender.send(Ok((header, data))).await.unwrap();

if empty_packet {
connection.send_read_success().await?;
break;
}
}
Ok(connection)
})
}

fn into_stream(self) -> impl Stream<Item = Result<Bytes>> {
stream::unfold(self, |mut state| async move {
let next = state.next_packet().await.transpose();
Expand Down

0 comments on commit efcad6f

Please sign in to comment.