From e89f1be660396fff0646ba17e5166cb35d44e00d Mon Sep 17 00:00:00 2001 From: Green Sky Date: Mon, 16 Jan 2023 03:50:47 +0100 Subject: [PATCH] transfers kinda work now. request algo broken tho --- src/ft_sha1_info.hpp | 2 +- src/states/sha1.cpp | 217 ++++++++++++++++++++++++++++++++----------- src/states/sha1.hpp | 11 +++ 3 files changed, 177 insertions(+), 53 deletions(-) diff --git a/src/ft_sha1_info.hpp b/src/ft_sha1_info.hpp index 706fe76..2bb6f6b 100644 --- a/src/ft_sha1_info.hpp +++ b/src/ft_sha1_info.hpp @@ -43,7 +43,7 @@ namespace std { // inject struct FTInfoSHA1 { std::string file_name; uint64_t file_size {0}; - static constexpr size_t chunk_size {4*1024}; // 4KiB for now + static constexpr size_t chunk_size {64*1024}; // 64KiB for now std::vector chunks; std::vector toBuffer(void) const; diff --git a/src/states/sha1.cpp b/src/states/sha1.cpp index 59dd401..6b6d0ed 100644 --- a/src/states/sha1.cpp +++ b/src/states/sha1.cpp @@ -2,6 +2,8 @@ #include "../tox_client.hpp" +#include "../hash_utils.hpp" + #include #include @@ -23,16 +25,21 @@ SHA1::SHA1( _sha1_info_hash(std::move(sha1_info_hash)), _have_chunk(std::move(have_chunk)) { + assert(_have_chunk.size() == _sha1_info.chunks.size()); + _have_all = true; _have_count = 0; - for (const bool it : _have_chunk) { - if (!it) { - _have_all = false; - } else { + for (size_t i = 0; i < _have_chunk.size(); i++) { + if (_have_chunk[i]) { _have_count++; + } else { + _have_all = false; + _chunk_want_queue.push_back(i); } } + // if not sequential, shuffle _chunk_want_queue + // build lookup table for (size_t i = 0; i < _sha1_info.chunks.size(); i++) { _chunk_hash_to_index[_sha1_info.chunks[i]] = i; @@ -40,49 +47,50 @@ SHA1::SHA1( } bool SHA1::iterate(float delta) { - // do ongoing transfers, send data?, timeout - // info - for (auto it = _transfers_requested_info.begin(); it != _transfers_requested_info.end();) { - float& time_since_remove_activity = std::get(*it); - time_since_remove_activity += delta; + { // timer and timeouts + // info + for (auto it = _transfers_requested_info.begin(); it != _transfers_requested_info.end();) { + float& time_since_remove_activity = std::get(*it); + time_since_remove_activity += delta; - // if we have not heard for 10sec, timeout - if (time_since_remove_activity >= 10.f) { - std::cerr << "SHA1 info tansfer timed out " << std::get<0>(*it) << ":" << std::get<1>(*it) << "." << std::get<2>(*it) << "\n"; - it = _transfers_requested_info.erase(it); - } else { - it++; + // if we have not heard for 10sec, timeout + if (time_since_remove_activity >= 10.f) { + std::cerr << "SHA1 info tansfer timed out " << std::get<0>(*it) << ":" << std::get<1>(*it) << "." << int(std::get<2>(*it)) << "\n"; + it = _transfers_requested_info.erase(it); + } else { + it++; + } } - } - // chunk sending - for (auto it = _transfers_sending_chunk.begin(); it != _transfers_sending_chunk.end();) { - float& time_since_remove_activity = std::get(*it); - time_since_remove_activity += delta; + // chunk sending + for (auto it = _transfers_sending_chunk.begin(); it != _transfers_sending_chunk.end();) { + float& time_since_remove_activity = std::get(*it); + time_since_remove_activity += delta; - // if we have not heard for 10sec, timeout - if (time_since_remove_activity >= 10.f) { - std::cerr << "SHA1 sending chunk tansfer timed out " << std::get<0>(*it) << ":" << std::get<1>(*it) << "." << std::get<2>(*it) << "\n"; - it = _transfers_sending_chunk.erase(it); - } else { - it++; + // if we have not heard for 10sec, timeout + if (time_since_remove_activity >= 10.f) { + std::cerr << "SHA1 sending chunk tansfer timed out " << std::get<0>(*it) << ":" << std::get<1>(*it) << "." << int(std::get<2>(*it)) << "\n"; + it = _transfers_sending_chunk.erase(it); + } else { + it++; + } } - } - // chunk receiving - for (auto it = _transfers_receiving_chunk.begin(); it != _transfers_receiving_chunk.end();) { - float& time_since_remove_activity = std::get(*it); - time_since_remove_activity += delta; + // chunk receiving + for (auto it = _transfers_receiving_chunk.begin(); it != _transfers_receiving_chunk.end();) { + float& time_since_remove_activity = std::get(*it); + time_since_remove_activity += delta; - // if we have not heard for 10sec, timeout - if (time_since_remove_activity >= 10.f) { - std::cerr << "SHA1 receiving chunk tansfer timed out " << std::get<0>(*it) << ":" << std::get<1>(*it) << "." << std::get<2>(*it) << "\n"; - it = _transfers_receiving_chunk.erase(it); - } else { - it++; + // if we have not heard for 10sec, timeout + if (time_since_remove_activity >= 10.f) { + std::cerr << "SHA1 receiving chunk tansfer timed out " << std::get<0>(*it) << ":" << std::get<1>(*it) << "." << int(std::get<2>(*it)) << "\n"; + it = _transfers_receiving_chunk.erase(it); + } else { + it++; + } } } // if we have not reached the total cap for transfers - if (true) { + if (_transfers_requested_info.size() + _transfers_sending_chunk.size() < _max_concurrent_out) { // for each peer? transfer cap per peer? // first check requests for info @@ -111,11 +119,7 @@ bool SHA1::iterate(float delta) { const auto [group_number, peer_number, chunk_hash] = _queue_requested_chunk.front(); size_t chunk_index = chunkIndex(chunk_hash).value(); - size_t chunk_file_size = _sha1_info.chunk_size; - if (chunk_index+1 == _sha1_info.chunks.size()) { - // last chunk - chunk_file_size = _sha1_info.file_size - chunk_index * _sha1_info.chunk_size; - } + size_t chunk_file_size = chunkSize(chunk_index); uint8_t transfer_id {0}; @@ -138,6 +142,33 @@ bool SHA1::iterate(float delta) { } } + if (!_have_all && !_chunk_want_queue.empty() && _transfers_receiving_chunk.size() < _max_concurrent_in) { + // send out request, no burst tho + std::vector> target_peers; + _tcl.forEachGroup([&target_peers, this](uint32_t group_number) { + _tcl.forEachGroupPeer(group_number, [&target_peers, group_number](uint32_t peer_number) { + target_peers.push_back({group_number, peer_number}); + }); + }); + + if (!target_peers.empty()) { + //if (_distrib.max() != target_peers.size()) { + //std::uniform_int_distribution new_dist{0, target_peers.size()-1}; + //_distrib.param(new_dist.param()); + //} + + //size_t target_index = _distrib(_rng); + size_t target_index = _rng()%target_peers.size(); + auto [group_number, peer_number] = target_peers.at(target_index); + + size_t chunk_index = _chunk_want_queue.front(); + _chunks_requested.emplace(chunk_index); + _chunk_want_queue.pop_front(); + + _tcl.sendFT1RequestPrivate(group_number, peer_number, NGC_FT1_file_kind::HASH_SHA1_CHUNK, _sha1_info.chunks[chunk_index].data.data(), 20); + } + } + // TODO: unmap and remap the file every couple of minutes to keep ram usage down? // TODO: when to stop? return false; @@ -227,10 +258,89 @@ void SHA1::onFT1ReceiveRequestSHA1Chunk(uint32_t group_number, uint32_t peer_num } bool SHA1::onFT1ReceiveInitSHA1Chunk(uint32_t group_number, uint32_t peer_number, const uint8_t* file_id, size_t file_id_size, const uint8_t transfer_id, const size_t file_size) { - return false; + if (_transfers_receiving_chunk.size() >= _max_concurrent_in) { + // reject, max tf in + return false; + } + + if (file_id_size != 20) { + std::cerr << "SHA1 got request for sha1_chunk of wrong size!!\n"; + return false; + } + + SHA1Digest incomming_hash(file_id, file_id_size); + + if (haveChunk(incomming_hash)) { + std::cout << "SHA1 ignoring init for chunk we allready have " << incomming_hash << "\n"; + return false; + } + + auto chunk_i_opt = chunkIndex(incomming_hash); + + if (!chunk_i_opt.has_value()) { + std::cout << "SHA1 ignoring init for unrelated chunk " << incomming_hash << "\n"; + return false; + } + + size_t chunk_index = chunk_i_opt.value(); + + // check transfers + for (const auto& it : _transfers_receiving_chunk) { + if (std::get<4>(it) == chunk_index) { + // allready in transition + return false; + } + } + + _transfers_receiving_chunk.push_back( + std::make_tuple( + group_number, peer_number, + transfer_id, + 0.f, + chunk_index + ) + ); + + return true; } void SHA1::onFT1ReceiveDataSHA1Chunk(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, size_t data_offset, const uint8_t* data, size_t data_size) { + // check transfers + for (auto it = _transfers_receiving_chunk.begin(); it != _transfers_receiving_chunk.end(); it++) { + if (std::get<0>(*it) == group_number && std::get<1>(*it) == peer_number && std::get<2>(*it) == transfer_id) { + std::get(*it) = 0.f; // time + + const size_t chunk_index = std::get<4>(*it); + + size_t file_offset = chunk_index * _sha1_info.chunk_size; + + // TODO: optimize + for (size_t i = 0; i < data_size; i++) { + _file_map[file_offset+data_offset+i] = data[i]; + } + + size_t chunk_file_size = chunkSize(chunk_index); + + // if last data block + if (data_offset + data_size == chunk_file_size) { + // hash and verify + SHA1Digest test_hash = hash_sha1(_file_map.data()+file_offset, chunk_file_size); + if (test_hash != _sha1_info.chunks[chunk_index]) { + std::cerr << "SHA1 received chunks's hash does not match!, discarding\n"; + _transfers_receiving_chunk.erase(it); + break; + } + + _have_chunk[chunk_index] = true; + _have_count++; + _have_all = _have_count == _sha1_info.chunks.size(); + + std::cout << "SHA1 chunk received " << std::get<0>(*it) << ":" << std::get<1>(*it) << "." << int(std::get<2>(*it)) << " " << chunk_index << " (" << 100.f * float(_have_count) / _sha1_info.chunks.size() << "%)\n"; + _transfers_receiving_chunk.erase(it); + } + break; + } + } } void SHA1::onFT1SendDataSHA1Chunk(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, size_t data_offset, uint8_t* data, size_t data_size) { @@ -248,15 +358,9 @@ void SHA1::onFT1SendDataSHA1Chunk(uint32_t group_number, uint32_t peer_number, u data[i] = _file_map[file_offset+data_offset+i]; } - size_t chunk_file_size = _sha1_info.chunk_size; - if (chunk_index+1 == _sha1_info.chunks.size()) { - // last chunk - chunk_file_size = _sha1_info.file_size - chunk_index * _sha1_info.chunk_size; - } - // if last data block - if (data_offset + data_size == chunk_file_size) { - std::cout << "SHA1 chunk sent " << std::get<0>(*it) << ":" << std::get<1>(*it) << "." << std::get<2>(*it) << " " << chunk_index << "\n"; + if (data_offset + data_size == chunkSize(chunk_index)) { + std::cout << "SHA1 chunk sent " << std::get<0>(*it) << ":" << std::get<1>(*it) << "." << int(std::get<2>(*it)) << " " << chunk_index << "\n"; _transfers_sending_chunk.erase(it); } @@ -308,6 +412,15 @@ std::optional SHA1::chunkIndex(const SHA1Digest& hash) const { } } +size_t SHA1::chunkSize(size_t chunk_index) const { + if (chunk_index+1 == _sha1_info.chunks.size()) { + // last chunk + return _sha1_info.file_size - chunk_index * _sha1_info.chunk_size; + } else { + return _sha1_info.chunk_size; + } +} + bool SHA1::haveChunk(const SHA1Digest& hash) const { if (_have_all) { return true; diff --git a/src/states/sha1.hpp b/src/states/sha1.hpp index 8a065e0..767cf77 100644 --- a/src/states/sha1.hpp +++ b/src/states/sha1.hpp @@ -7,8 +7,10 @@ #include #include +#include #include #include +#include namespace States { @@ -50,6 +52,7 @@ struct SHA1 final : public StateI { void queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, const SHA1Digest& hash); std::optional chunkIndex(const SHA1Digest& hash) const; + size_t chunkSize(size_t chunk_index) const; bool haveChunk(const SHA1Digest& hash) const; private: @@ -62,6 +65,14 @@ struct SHA1 final : public StateI { std::vector _have_chunk; bool _have_all {false}; size_t _have_count {0}; + std::deque _chunk_want_queue; + std::set _chunks_requested; + + const size_t _max_concurrent_out {4}; + const size_t _max_concurrent_in {4}; + + std::minstd_rand _rng {1337}; + std::uniform_int_distribution _distrib; std::unordered_map _chunk_hash_to_index;