Skip to content

Commit

Permalink
Piece writing
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanjermakov committed Oct 29, 2023
1 parent 311a162 commit 9df384d
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 118 deletions.
12 changes: 0 additions & 12 deletions src/bencode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,6 @@ impl From<&str> for BencodeValue {
}
}

#[allow(dead_code)]
pub fn bencode_string(value: ByteString) -> ByteString {
vec![
value.len().to_string().as_bytes().to_vec(),
":".as_bytes().to_vec(),
value,
]
.into_iter()
.flatten()
.collect()
}

pub fn parse_bencoded(bencoded: ByteString) -> (Option<BencodeValue>, ByteString) {
let next = match bencoded.first() {
Some(f) => f,
Expand Down
15 changes: 11 additions & 4 deletions src/metainfo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,24 @@ pub enum FileInfo {
}

impl FileInfo {
pub fn total_length(&self) -> i64 {
pub fn total_length(&self) -> u32 {
match self {
FileInfo::Single(path) => path.length,
FileInfo::Multi(files) => files.iter().map(|f| f.length).sum(),
}
}

pub fn files(&self) -> Vec<&PathInfo> {
match self {
FileInfo::Single(path) => vec![path],
FileInfo::Multi(files) => files.iter().collect(),
}
}
}

#[derive(Clone, Debug, PartialEq, PartialOrd, Hash)]
pub struct PathInfo {
pub length: i64,
pub length: u32,
pub path: PathBuf,
pub md5_sum: Option<String>,
}
Expand Down Expand Up @@ -81,7 +88,7 @@ impl TryFrom<BencodeValue> for Metainfo {
None => FileInfo::Single(PathInfo {
path: PathBuf::from(&name),
length: match info_dict.get("length") {
Some(BencodeValue::Int(v)) => *v,
Some(BencodeValue::Int(v)) => *v as u32,
_ => return Err("'length' missing".into()),
},
md5_sum: match info_dict.get("md5_sum") {
Expand Down Expand Up @@ -165,7 +172,7 @@ fn parse_files_info(value: &BencodeValue) -> Result<Vec<PathInfo>, String> {
};
Ok(PathInfo {
length: match d.get("length") {
Some(BencodeValue::Int(v)) => *v,
Some(BencodeValue::Int(v)) => *v as u32,
_ => return Err("'length' missing".into()),
},
path,
Expand Down
97 changes: 60 additions & 37 deletions src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::{
hex::hex,
sha1,
state::{Block, Peer, PeerInfo, PeerStatus, State, TorrentStatus, BLOCK_SIZE},
torrent::write_piece,
types::ByteString,
};

Expand Down Expand Up @@ -459,47 +460,69 @@ async fn read_loop(
continue;
}
let block_index = begin / BLOCK_SIZE;
let mut state = state.lock().await;
let piece = match state.pieces.get_mut(&piece_index) {
Some(p) => p,
_ => {
debug!("no piece with index {:?}", piece_index);

{
let mut state = state.lock().await;
let piece = match state.pieces.get_mut(&piece_index) {
Some(p) => p,
_ => {
debug!("no piece with index {:?}", piece_index);
continue;
}
};
if piece.status != TorrentStatus::Started {
debug!("downloaded block of already completed piece, loss");
continue;
}
};
if piece.completed {
debug!("downloaded block of already completed piece, loss");
continue;
}
let total_blocks = piece.total_blocks();
if block_index != total_blocks - 1 && block.0.len() != BLOCK_SIZE as usize {
debug!("block of unexpected size: {}", block.0.len());
continue;
}
if piece.blocks.insert(block_index, block).is_some() {
debug!("repeaded block download, loss");
};
trace!("got block {}/{}", piece.blocks.len(), total_blocks);
if piece.blocks.len() as u32 == total_blocks {
let piece_data: Vec<u8> = piece
.blocks
.values()
.flat_map(|b| b.0.as_slice())
.copied()
.collect();
let piece_hash = sha1::encode(piece_data);
if piece_hash != piece.hash.0 {
warn!("piece hash does not match: {:?}", piece);
trace!("{}", hex(&piece_hash));
trace!("{}", hex(&piece.hash.0));
let total_blocks = piece.total_blocks();
if block_index != total_blocks - 1 && block.0.len() != BLOCK_SIZE as usize {
debug!("block of unexpected size: {}", block.0.len());
continue;
}
piece.completed = true;
info!(
"piece completed {}/{}",
state.pieces.values().filter(|p| p.completed).count(),
state.pieces.len(),
);
if piece.blocks.insert(block_index, block).is_some() {
debug!("repeaded block download, loss");
};
trace!("got block {}/{}", piece.blocks.len(), total_blocks);
if piece.blocks.len() as u32 == total_blocks {
let piece_data: Vec<u8> = piece
.blocks
.values()
.flat_map(|b| b.0.as_slice())
.copied()
.collect();
let piece_hash = sha1::encode(piece_data);
if piece_hash != piece.hash.0 {
warn!("piece hash does not match: {:?}", piece);
trace!("{}", hex(&piece_hash));
trace!("{}", hex(&piece.hash.0));
continue;
}
piece.status = TorrentStatus::Downloaded;
info!(
"piece {}/{}/{}",
state
.pieces
.values()
.filter(|p| p.status != TorrentStatus::Started)
.count(),
state
.pieces
.values()
.filter(|p| p.status > TorrentStatus::Started)
.count(),
state.pieces.len(),
);
}
}

if state.lock().await.pieces.get(&piece_index).unwrap().status
== TorrentStatus::Downloaded
{
// TODO: async
match spawn(write_piece(piece_index, state.clone())).await {
Ok(_) => debug!("piece saved"),
Err(e) => error!("error writing piece: {:#}", e),
};
}
}
Ok(Message::Port { port }) => match state.lock().await.peers.get_mut(&peer) {
Expand Down
78 changes: 65 additions & 13 deletions src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{

pub const BLOCK_SIZE: u32 = 1 << 14;

#[derive(Clone, Debug, PartialEq, PartialOrd, Hash)]
#[derive(Clone, Debug, PartialEq)]
pub struct State {
pub config: Config,
pub metainfo: Metainfo,
Expand All @@ -32,7 +32,7 @@ impl State {
let piece = self
.pieces
.values()
.filter(|p| !p.completed)
.filter(|p| p.status == TorrentStatus::Started)
.choose(&mut thread_rng())
.cloned();
if piece.is_none() {
Expand All @@ -50,14 +50,15 @@ pub enum TorrentStatus {
Saved,
}

#[derive(Clone, Debug, PartialEq, PartialOrd, Hash)]
#[derive(Clone, Debug, PartialEq, PartialOrd)]
pub struct Piece {
pub hash: PieceHash,
pub index: u32,
pub length: u32,
/// Map of blocks <block index> -> <block>
pub blocks: BTreeMap<u32, Block>,
pub completed: bool,
pub status: TorrentStatus,
pub file_locations: Vec<FileLocation>,
}

impl Piece {
Expand Down Expand Up @@ -147,29 +148,80 @@ impl TryFrom<&[u8]> for PeerInfo {
}

pub fn init_pieces(info: &Info) -> BTreeMap<u32, Piece> {
let total_len = info.file_info.total_length() as u32;
let files_start = info
.file_info
.files()
.iter()
.scan(0, |acc, f| {
let res = *acc;
*acc += f.length;
Some(res)
})
.collect::<Vec<_>>();
let total_len = info.file_info.total_length();
if info.pieces.len() != (total_len as f64 / info.piece_length as f64).ceil() as usize {
warn!("total length/piece size/piece count inconsistent");
}
info.pieces
.iter()
.cloned()
.enumerate()
.map(|(i, p)| (i as u32, p))
.map(|(i, p)| {
let length = if i == info.pieces.len() as u32 - 1 {
total_len % info.piece_length
} else {
info.piece_length
};
let file_locations: Vec<_> = files_start
.iter()
.copied()
.enumerate()
.flat_map(|(f_i, f_start)| {
let f_len = info.file_info.files()[f_i].length;
let f_end = f_start + f_len;
let piece_start = i * info.piece_length;
let piece_end = piece_start + length;
let p_start = (f_start as i64).clamp(piece_start as i64, piece_end as i64);
let p_end = (f_end as i64).clamp(piece_start as i64, piece_end as i64);
let p_len = p_end - p_start;
let offset = p_start - f_start as i64;
let piece_offset = (p_start - piece_start as i64) as usize;
if p_len != 0 {
vec![FileLocation {
file_index: f_i,
offset: offset as usize,
piece_offset,
length: p_len as usize,
}]
} else {
vec![]
}
})
.collect();
// TODO: verify files' location integrity
if file_locations.iter().map(|f| f.length as u32).sum::<u32>() != length {
panic!("incorrect file location length");
}
(
i as u32,
i,
Piece {
hash: p,
index: i as u32,
length: if i == info.pieces.len() - 1 {
total_len % info.piece_length
} else {
info.piece_length
},
index: i,
length,
blocks: BTreeMap::new(),
completed: false,
status: TorrentStatus::Started,
file_locations,
},
)
})
.collect()
}

#[derive(Clone, Debug, PartialEq, PartialOrd)]
pub struct FileLocation {
pub file_index: usize,
pub offset: usize,
pub piece_offset: usize,
pub length: usize,
}
Loading

0 comments on commit 9df384d

Please sign in to comment.