Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
arvidn committed Sep 9, 2023
1 parent fa0ee17 commit c7c4d45
Show file tree
Hide file tree
Showing 13 changed files with 118 additions and 51 deletions.
7 changes: 7 additions & 0 deletions docs/upgrade_to_2.1.rst
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,10 @@ to allow reporting the oversized_file condition (which was introduced in
The flags can be found in the ``lt::disk_status`` namespace.

This change affects custom storage implementations.

storage_params
--------------

The storage_params type has two new fields indicating whether the torrent has v1
and/or v2 hashes. this allows disk I/O subsystems to know in advance whether
block-hashes (v2 torrents) or flat piece hashes (v1 torrents) will be required.
1 change: 1 addition & 0 deletions include/libtorrent/aux_/debug_disk_thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ see LICENSE file.
#include <string>
#include <sstream>
#include <unordered_map>
#include <thread>

#include "libtorrent/aux_/disk_job.hpp"
#include "libtorrent/disk_interface.hpp"
Expand Down
96 changes: 61 additions & 35 deletions include/libtorrent/aux_/disk_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ struct cached_piece_entry
// this indicates that this piece belongs to a v2 torrent, and it has the
// block_hash member of cached_block_entry and we need to compute the block
// hashes as well
bool v1_hashes = false;
bool v2_hashes = false;

// TODO: bool v1_hashes = false;
Expand Down Expand Up @@ -193,7 +194,7 @@ static bool have_buffers(span<const cached_block_entry> blocks)
return true;
}

int compute_flushed_cursor(span<const cached_block_entry> blocks)
static int compute_flushed_cursor(span<const cached_block_entry> blocks)
{
int ret = 0;
for (auto const& b : blocks)
Expand All @@ -204,7 +205,7 @@ int compute_flushed_cursor(span<const cached_block_entry> blocks)
return ret;
}

int count_jobs(span<const cached_block_entry> blocks)
static int count_jobs(span<const cached_block_entry> blocks)
{
return std::count_if(blocks.begin(), blocks.end()
, [](cached_block_entry const& b) { return b.write_job; });
Expand Down Expand Up @@ -351,7 +352,9 @@ struct disk_cache
return false;
}

clear_piece_impl(const_cast<cached_piece_entry&>(*i), aborted);
view.modify(i, [&](cached_piece_entry& e) {
clear_piece_impl(e, aborted);
});
return true;
}

Expand All @@ -376,7 +379,9 @@ struct disk_cache
}

// returns true if this piece needs to have its hasher kicked
bool insert(piece_location const loc, int const block_idx, pread_disk_job* write_job)
bool insert(piece_location const loc
, int const block_idx
, pread_disk_job* write_job)
{
std::unique_lock<std::mutex> l(m_mutex);

Expand All @@ -388,15 +393,20 @@ struct disk_cache
{
//#error this computation is not right for v2 torrents. it will make v2 hashes computed incorrectly
//#error we don't know what the block size actually is here. If the piece size is less than 16 kiB, this computation is incorrect
int const blocks_in_piece = (write_job->storage->files().piece_size(loc.piece) + default_block_size - 1) / default_block_size;
pread_storage* storage = write_job->storage.get();
int const blocks_in_piece = (storage->files().piece_size(loc.piece) + default_block_size - 1) / default_block_size;
cached_piece_entry pe(loc, blocks_in_piece);
pe.v2_hashes = write_job->storage->files().v2();
pe.v1_hashes = storage->v1();
pe.v2_hashes = storage->v2();
i = m_pieces.insert(std::move(pe)).first;
}

cached_block_entry& blk = i->blocks[block_idx];
TORRENT_ASSERT(!blk.buf_holder);
TORRENT_ASSERT(blk.write_job == nullptr);
TORRENT_ASSERT(blk.flushed_to_disk == false);
TORRENT_ASSERT(block_idx >= i->flushed_cursor);
TORRENT_ASSERT(block_idx >= i->hasher_cursor);

TORRENT_ASSERT(write_job->get_type() == aux::job_action_t::write);
blk.write_job = write_job;
Expand Down Expand Up @@ -495,6 +505,7 @@ struct disk_cache

view.modify(piece_iter, [](cached_piece_entry& e) { e.hashing = true; });

bool const need_v1 = piece_iter->v1_hashes;
bool const need_v2 = piece_iter->v2_hashes;

l.unlock();
Expand All @@ -503,8 +514,8 @@ struct disk_cache
{
cached_block_entry& cbe = piece_iter->blocks[cursor];

// TODO: don't compute sha1 hash for v2-only torrents
ctx.update(buf);
if (need_v1)
ctx.update(buf);

if (need_v2)
cbe.block_hash = hasher256(buf).final();
Expand Down Expand Up @@ -585,10 +596,10 @@ struct disk_cache
INVARIANT_CHECK;
#endif

// We avoid flushing if other threads have already initiated sufficient
// amount of flushing
if (m_blocks - m_flushing_blocks <= target_blocks)
return;
// We want to flush all pieces that are ready to flush regardless of
// the flush target. There's not much value in keeping them in RAM
// when we've completely downloaded the piece and hashed it
// so, we don't check flush target in this loop

if (piece_iter->flushing)
{
Expand Down Expand Up @@ -621,6 +632,7 @@ struct disk_cache
l.lock();
view.modify(piece_iter, [&blocks](cached_piece_entry& e) {
e.flushing = false;
e.ready_to_flush = false;
});
TORRENT_ASSERT(m_flushing_blocks >= num_blocks);
m_flushing_blocks -= num_blocks;
Expand Down Expand Up @@ -658,10 +670,12 @@ struct disk_cache
if (piece_iter->clear_piece)
{
jobqueue_t aborted;
auto& cpe = const_cast<cached_piece_entry&>(*piece_iter);
clear_piece_impl(cpe, aborted);
clear_piece_fun(std::move(aborted), std::exchange(cpe.clear_piece, nullptr));
return;
pread_disk_job* clear_piece = nullptr;
view.modify(piece_iter, [&](cached_piece_entry& e) {
clear_piece_impl(e, aborted);
clear_piece = std::exchange(e.clear_piece, nullptr);
});
clear_piece_fun(std::move(aborted), clear_piece);
}
if (piece_iter->piece_hash_returned)
{
Expand Down Expand Up @@ -762,10 +776,12 @@ struct disk_cache
if (piece_iter->clear_piece)
{
jobqueue_t aborted;
auto& cpe = const_cast<cached_piece_entry&>(*piece_iter);
clear_piece_impl(cpe, aborted);
clear_piece_fun(std::move(aborted), std::exchange(cpe.clear_piece, nullptr));
return;
pread_disk_job* clear_piece = nullptr;
view2.modify(piece_iter, [&](cached_piece_entry& e) {
clear_piece_impl(e, aborted);
clear_piece = std::exchange(e.clear_piece, nullptr);
});
clear_piece_fun(std::move(aborted), clear_piece);
}
// if we failed to flush all blocks we wanted to, we're done
if (count < num_blocks)
Expand Down Expand Up @@ -847,10 +863,12 @@ struct disk_cache
if (piece_iter->clear_piece)
{
jobqueue_t aborted;
auto& cpe = const_cast<cached_piece_entry&>(*piece_iter);
clear_piece_impl(cpe, aborted);
clear_piece_fun(std::move(aborted), std::exchange(cpe.clear_piece, nullptr));
return;
pread_disk_job* clear_piece = nullptr;
view3.modify(piece_iter, [&](cached_piece_entry& e) {
clear_piece_impl(e, aborted);
clear_piece = std::exchange(e.clear_piece, nullptr);
});
clear_piece_fun(std::move(aborted), clear_piece);
}
if (count < num_blocks)
return;
Expand Down Expand Up @@ -953,18 +971,20 @@ struct disk_cache
if (piece_iter->clear_piece)
{
jobqueue_t aborted;
auto& cpe = const_cast<cached_piece_entry&>(*piece_iter);
clear_piece_impl(cpe, aborted);
clear_piece_fun(std::move(aborted), std::exchange(cpe.clear_piece, nullptr));
return;
pread_disk_job* clear_piece = nullptr;
piece_view.modify(piece_iter, [&](cached_piece_entry& e) {
clear_piece_impl(e, aborted);
clear_piece = std::exchange(e.clear_piece, nullptr);
});
clear_piece_fun(std::move(aborted), clear_piece);
}

if (piece_iter->piece_hash_returned)
{
TORRENT_ASSERT(!piece_iter->flushing);
TORRENT_ASSERT(!piece_iter->hashing);
// if (piece_iter->piece_hash_returned)
// {
// TORRENT_ASSERT(!piece_iter->flushing);
// TORRENT_ASSERT(!piece_iter->hashing);
piece_iter = piece_view.erase(piece_iter);
}
// }
}
}

Expand Down Expand Up @@ -1031,8 +1051,8 @@ struct disk_cache
// if (idx < piece_entry.hasher_cursor)
// TORRENT_ASSERT(!be.buf_holder);

// if (piece_entry.ready_to_flush)
// TORRENT_ASSERT(be.write_job != nullptr);
if (piece_entry.ready_to_flush)
TORRENT_ASSERT(be.write_job != nullptr);
++idx;
}
}
Expand Down Expand Up @@ -1060,10 +1080,16 @@ struct disk_cache
{
aborted.push_back(cbe.write_job);
cbe.write_job = nullptr;
cbe.flushed_to_disk = false;
--m_blocks;
}
cbe.buf_holder.reset();
}
cpe.ready_to_flush = false;
cpe.piece_hash_returned = false;
cpe.hasher_cursor = 0;
cpe.flushed_cursor = 0;
cpe.ph = hasher{};
}

mutable std::mutex m_mutex;
Expand Down
8 changes: 8 additions & 0 deletions include/libtorrent/aux_/pread_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ namespace libtorrent::aux {
storage_index_t storage_index() const { return m_storage_index; }
void set_storage_index(storage_index_t st) { m_storage_index = st; }

bool v1() const { return m_v1; }
bool v2() const { return m_v2; }

private:

bool m_need_tick = false;
Expand Down Expand Up @@ -172,6 +175,11 @@ namespace libtorrent::aux {
mutable typed_bitfield<file_index_t> m_file_created;

bool m_allocate_files;
// this is a v1 torrent
bool m_v1;
// this is a v2 torrent. If both v1 and v2 are set, it's a hybrid
// torrent
bool m_v2;
};

}
Expand Down
6 changes: 5 additions & 1 deletion include/libtorrent/storage_defs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,24 @@ namespace libtorrent {
storage_params(file_storage const& f, file_storage const* mf
, std::string const& sp, storage_mode_t const sm
, aux::vector<download_priority_t, file_index_t> const& prio
, sha1_hash const& ih)
, sha1_hash const& ih, bool v1_torrent, bool v2_torrent)
: files(f)
, mapped_files(mf)
, path(sp)
, mode(sm)
, priorities(prio)
, info_hash(ih)
, v1(v1_torrent)
, v2(v2_torrent)
{}
file_storage const& files;
file_storage const* mapped_files = nullptr; // optional
std::string const& path;
storage_mode_t mode{storage_mode_sparse};
aux::vector<download_priority_t, file_index_t> const& priorities;
sha1_hash info_hash;
bool v1;
bool v2;
};
}

Expand Down
5 changes: 3 additions & 2 deletions src/create_torrent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -477,14 +477,15 @@ namespace {
file_storage fs = make_file_storage(t.file_list(), t.piece_length());

aux::vector<download_priority_t, file_index_t> priorities;
sha1_hash info_hash;
storage_params params{
fs,
nullptr,
path,
storage_mode_t::storage_mode_sparse,
priorities,
info_hash
sha1_hash{},
!t.is_v2_only(), // v1-hashes
!t.is_v1_only() // v2-hashes
};

storage_holder storage = disk_thread->new_torrent(params
Expand Down
3 changes: 2 additions & 1 deletion src/disk_buffer_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ see LICENSE file.
#include "libtorrent/io_context.hpp"
#include "libtorrent/disk_observer.hpp"
#include "libtorrent/disk_interface.hpp" // for default_block_size
#include "libtorrent/aux_/debug_disk_thread.hpp"

#include "libtorrent/aux_/disable_warnings_push.hpp"

Expand Down Expand Up @@ -192,7 +193,7 @@ namespace {
std::optional<int> disk_buffer_pool::flush_request() const
{
std::unique_lock<std::mutex> l(m_pool_mutex);
if (m_in_use >= m_max_use)
if (m_in_use >= m_low_watermark)
return m_in_use - m_low_watermark;
return std::nullopt;
}
Expand Down
5 changes: 4 additions & 1 deletion src/pread_disk_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1394,10 +1394,11 @@ TORRENT_EXPORT std::unique_ptr<disk_interface> pread_disk_io_constructor(
TORRENT_ASSERT(j->get_type() == aux::job_action_t::write);
auto& a = std::get<aux::job::write>(j->action);

if (count == 0) start_offset = job_offset;
iovec[count] = span<char>{ a.buf.data(), a.buffer_size};
++count;
flags = j->flags;
piece = std::get<aux::job::write>(j->action).piece;
piece = a.piece;
file_mode = file_mode_for_job(j);
end_offset = job_offset + a.buffer_size;
++idx;
Expand Down Expand Up @@ -1474,6 +1475,7 @@ TORRENT_EXPORT std::unique_ptr<disk_interface> pread_disk_io_constructor(
void pread_disk_io::flush_storage(std::shared_ptr<aux::pread_storage> const& storage)
{
storage_index_t const torrent = storage->storage_index();
DLOG("flush_storage (%d)\n", torrent);
jobqueue_t completed_jobs;
m_cache.flush_storage(
[&](bitfield& flushed, span<aux::cached_block_entry const> blocks, int const hash_cursor) {
Expand All @@ -1483,6 +1485,7 @@ TORRENT_EXPORT std::unique_ptr<disk_interface> pread_disk_io_constructor(
, [&](jobqueue_t aborted, aux::pread_disk_job* clear) {
clear_piece_jobs(std::move(aborted), clear);
});
DLOG("flush_storage - done (%d left)\n", m_cache.size());
if (!completed_jobs.empty())
add_completed_jobs(std::move(completed_jobs));
}
Expand Down
4 changes: 4 additions & 0 deletions src/pread_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ namespace {
, m_part_file_name("." + to_hex(params.info_hash) + ".parts")
, m_pool(pool)
, m_allocate_files(params.mode == storage_mode_allocate)
, m_v1(params.v1)
, m_v2(params.v2)
{
// a torrent must be either v1 or v2 (or both)
TORRENT_ASSERT(m_v1 || m_v2);
if (params.mapped_files) m_mapped_files = std::make_unique<file_storage>(*params.mapped_files);

TORRENT_ASSERT(files().num_files() > 0);
Expand Down
4 changes: 3 additions & 1 deletion src/torrent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1744,7 +1744,9 @@ bool is_downloading_state(int const st)
m_save_path,
static_cast<storage_mode_t>(m_storage_mode),
m_file_priority,
m_info_hash.get_best()
m_info_hash.get_best(),
m_torrent_file->v1(),
m_torrent_file->v2()
};

// the shared_from_this() will create an intentional
Expand Down
5 changes: 3 additions & 2 deletions test/make_torrent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,15 @@ void generate_files(lt::torrent_info const& ti, std::string const& path
, bool alternate_data)
{
aux::vector<download_priority_t, file_index_t> priorities;
sha1_hash info_hash;
storage_params params{
ti.files(),
nullptr,
path,
storage_mode_t::storage_mode_sparse,
priorities,
info_hash
sha1_hash{},
true, // v1-hashes
true // v2-hashes
};

// default settings
Expand Down
Loading

0 comments on commit c7c4d45

Please sign in to comment.