From 3d4e286a7c45e6177bf1ed885da3895d3136f245 Mon Sep 17 00:00:00 2001 From: Green Sky Date: Sat, 13 Dec 2025 16:54:37 +0100 Subject: [PATCH] refactor sending transfers always do timeouts and resending first then roundrobin over sending new data --- solanaceae/ngc_ft1/ngcft1.cpp | 293 ++++++++++++++++----------------- solanaceae/ngc_ft1/ngcft1.hpp | 6 +- solanaceae/ngc_ft1/snd_buf.hpp | 1 - 3 files changed, 150 insertions(+), 150 deletions(-) diff --git a/solanaceae/ngc_ft1/ngcft1.cpp b/solanaceae/ngc_ft1/ngcft1.cpp index a6e32ea..fea54aa 100644 --- a/solanaceae/ngc_ft1/ngcft1.cpp +++ b/solanaceae/ngc_ft1/ngcft1.cpp @@ -15,56 +15,19 @@ #include #include -void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set& timeouts_set, int64_t& can_packet_size) { +void NGCFT1::updateSendTransferPhase1(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set& timeouts_set, int64_t& can_packet_size) { + using State = Group::Peer::SendTransfer::State; auto& tf_opt = peer.send_transfers.at(idx); assert(tf_opt.has_value()); auto& tf = tf_opt.value(); tf.time_since_activity += time_delta; - switch (tf.state) { - using State = Group::Peer::SendTransfer::State; - case State::INIT_SENT: - if (tf.time_since_activity >= init_retry_timeout_after) { - if (tf.inits_sent >= 3) { - // delete, timed out 3 times - std::cerr << "NGCFT1 warning: ft init timed out, deleting\n"; - dispatch( - NGCFT1_Event::send_done, - Events::NGCFT1_send_done{ - group_number, peer_number, - static_cast(idx), - } - ); - tf_opt.reset(); - } else { - // timed out, resend - std::cerr << "NGCFT1 warning: ft init timed out, resending\n"; - _neep.send_ft1_init(group_number, peer_number, tf.file_kind, tf.file_size, idx, tf.file_id.data(), tf.file_id.size()); - tf.inits_sent++; - tf.time_since_activity = 0.f; - } - } - break; - case State::FINISHING: // we still have unacked packets - tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector& data, float& time_since_activity) { - if (timeouts_set.count({idx, id})) { - if (can_packet_size >= int64_t(data.size())) { - _neep.send_ft1_data(group_number, peer_number, idx, id, data.data(), data.size()); - peer.cca->onLoss({idx, id}, false); - time_since_activity = 0.f; - timeouts_set.erase({idx, id}); - can_packet_size -= data.size(); - } else { -#if 0 // too spammy - std::cerr << "NGCFT1 warning: no space to resend timedout\n"; -#endif - } - } - }); - if (tf.time_since_activity >= (sending_give_up_after * peer.active_send_transfers)) { - // no ack after 30sec, close ft - std::cerr << "NGCFT1 warning: sending ft finishing timed out, deleting\n"; + if (tf.state == State::INIT_SENT) { + if (tf.time_since_activity >= init_retry_timeout_after) { + if (tf.inits_sent >= 3) { + // delete, timed out 3 times + std::cerr << "NGCFT1 warning: sending ft init timed out, deleting\n"; dispatch( NGCFT1_Event::send_done, Events::NGCFT1_send_done{ @@ -72,101 +35,131 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_ static_cast(idx), } ); - - // clean up cca - tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector& data, float& time_since_activity) { - peer.cca->onLoss({idx, id}, true); - timeouts_set.erase({idx, id}); - }); - tf_opt.reset(); + } else { + // timed out, resend + std::cerr << "NGCFT1 warning: sending ft init timed out, resending\n"; + _neep.send_ft1_init(group_number, peer_number, tf.file_kind, tf.file_size, idx, tf.file_id.data(), tf.file_id.size()); + tf.inits_sent++; + tf.time_since_activity = 0.f; } - break; - case State::SENDING: { - // first handle overall timeout (could otherwise do resends directly before, which is useless) - // timeout increases with active transfers (otherwise we could starve them) - if (tf.time_since_activity >= (sending_give_up_after * peer.active_send_transfers)) { - // no ack after 30sec, close ft - std::cerr << "NGCFT1 warning: sending ft in progress timed out, deleting (ifc:" << peer.cca->inFlightCount() << ")\n"; - dispatch( - NGCFT1_Event::send_done, - Events::NGCFT1_send_done{ - group_number, peer_number, - static_cast(idx), - } - ); - - // clean up cca - tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector& data, float& time_since_activity) { - peer.cca->onLoss({idx, id}, true); - timeouts_set.erase({idx, id}); - }); - - tf_opt.reset(); - //continue; // dangerous control flow - return; - } - - // do resends - tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector& data, float& time_since_activity) { - if (can_packet_size >= int64_t(data.size()) && time_since_activity >= peer.cca->getCurrentDelay() && timeouts_set.count({idx, id})) { - // TODO: can fail - _neep.send_ft1_data(group_number, peer_number, idx, id, data.data(), data.size()); - peer.cca->onLoss({idx, id}, false); - time_since_activity = 0.f; - timeouts_set.erase({idx, id}); - can_packet_size -= data.size(); - } - }); - - // if chunks in flight < window size (2) - while (can_packet_size > 0 && tf.file_size > 0) { - std::vector new_data; - - size_t chunk_size = std::min({ - peer.cca->MAXIMUM_SEGMENT_DATA_SIZE, - static_cast(can_packet_size), - static_cast(tf.file_size - tf.file_size_current), - }); - if (chunk_size == 0) { - tf.state = State::FINISHING; - break; // we done - } - - new_data.resize(chunk_size); - - assert(idx <= 0xffu); - // TODO: check return value - dispatch( - NGCFT1_Event::send_data, - Events::NGCFT1_send_data{ - group_number, peer_number, - static_cast(idx), - tf.file_size_current, - new_data.data(), static_cast(new_data.size()), - } - ); - - uint16_t seq_id = tf.ssb.add(std::move(new_data)); - const bool sent = _neep.send_ft1_data(group_number, peer_number, idx, seq_id, tf.ssb.entries.at(seq_id).data.data(), tf.ssb.entries.at(seq_id).data.size()); - if (sent) { - peer.cca->onSent({idx, seq_id}, chunk_size); - } else { - std::cerr << "NGCFT1: failed to send packet (queue full?) --------------\n"; - peer.cca->onCongestion(); - can_packet_size = 0; - } - - tf.file_size_current += chunk_size; - can_packet_size -= chunk_size; - } + } + return; + } else if (tf.state == State::FINISHING || tf.state == State::SENDING) { + // timeout increases with active transfers (otherwise we could starve them) + if (tf.time_since_activity >= (sending_give_up_after * peer.active_send_transfers)) { + // no ack after Xsec, close ft + if (tf.state == State::FINISHING) { + std::cerr << "NGCFT1 warning: sending ft finishing timed out, deleting\n"; + } else { + std::cerr << "NGCFT1 warning: sending ft in progress timed out, deleting (ifc:" << peer.cca->inFlightCount() << ")\n"; } - break; - default: // invalid state, delete - std::cerr << "NGCFT1 error: ft in invalid state, deleting\n"; - assert(false && "ft in invalid state"); + dispatch( + NGCFT1_Event::send_done, + Events::NGCFT1_send_done{ + group_number, peer_number, + static_cast(idx), + } + ); + + // clean up cca + tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector& data, float& time_since_activity) { + peer.cca->onLoss({idx, id}, true); + }); + tf_opt.reset(); + } + return; + } + + // do send buffer and resending + tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector& data, float& time_since_activity) { + time_since_activity += time_delta; + + if (tf.state != State::FINISHING && tf.state != State::SENDING) { return; + } + + if ( + time_since_activity >= peer.cca->getCurrentDelay() && // TODO: use OR instead? + timeouts_set.count({idx, id}) + ) { + if (can_packet_size >= int64_t(data.size() /*+ peer.cca->SEGMENT_OVERHEAD*/)) { + if (_neep.send_ft1_data(group_number, peer_number, idx, id, data.data(), data.size())) { + if (!peer.cca->onLoss({idx, id}, false)) { // might not be in cca + peer.cca->onSent({idx, id}, data.size()); + } + + time_since_activity = 0.f; + can_packet_size -= data.size(); + } else { + std::cerr << "NGCFT1 warning: failed to re-send packet (send queue full?)\n"; + + // signal ce (we did not call onLoss() + peer.cca->onCongestion(); + can_packet_size = 0; + } +#if 0 + } else { + std::cerr << "NGCFT1 warning: no space to resend timed-out\n"; +#endif + } + + timeouts_set.erase({idx, id}); + } + }); +} + +void NGCFT1::updateSendTransferPhase2(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, int64_t& can_packet_size) { + using State = Group::Peer::SendTransfer::State; + auto& tf_opt = peer.send_transfers.at(idx); + assert(tf_opt.has_value()); + auto& tf = tf_opt.value(); + + if (tf.state != State::SENDING) { + return; + } + + // if chunks in flight < window size (2) + while (can_packet_size > 0 && tf.file_size > 0) { + std::vector new_data; + + size_t chunk_size = std::min({ + peer.cca->MAXIMUM_SEGMENT_DATA_SIZE, + static_cast(can_packet_size), + static_cast(tf.file_size - tf.file_size_current), + }); + if (chunk_size == 0) { + tf.state = State::FINISHING; + break; // we done + } + + new_data.resize(chunk_size); + + assert(idx <= 0xffu); + // TODO: check return value + dispatch( + NGCFT1_Event::send_data, + Events::NGCFT1_send_data{ + group_number, peer_number, + static_cast(idx), + tf.file_size_current, + new_data.data(), static_cast(new_data.size()), + } + ); + + uint16_t seq_id = tf.ssb.add(std::move(new_data)); + const bool sent = _neep.send_ft1_data(group_number, peer_number, idx, seq_id, tf.ssb.entries.at(seq_id).data.data(), tf.ssb.entries.at(seq_id).data.size()); + if (sent) { + peer.cca->onSent({idx, seq_id}, chunk_size); + } else { + std::cerr << "NGCFT1 warn: failed to send packet (send queue full?)\n"; + peer.cca->onCongestion(); + can_packet_size = 0; + } + + tf.file_size_current += chunk_size; + can_packet_size -= chunk_size; } } @@ -201,26 +194,30 @@ bool NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_ int64_t can_packet_size {peer.cca->canSend(time_delta)}; // might get more space while iterating (time) - // get number current running transfers TODO: improve + // resend and get number current running transfers peer.active_send_transfers = 0; - for (const auto& it : peer.send_transfers) { - if (it.has_value()) { - peer.active_send_transfers++; + for (size_t idx = 0; idx < peer.send_transfers.size(); idx++) { + if (!peer.send_transfers.at(idx).has_value()) { + continue; } + peer.active_send_transfers++; + updateSendTransferPhase1(time_delta, group_number, peer_number, peer, idx, timeouts_set, can_packet_size); } - // change iterate start position to not starve transfers in the back - size_t iterated_count = 0; - bool last_send_found = false; - for (size_t idx = peer.next_send_transfer_send_idx; iterated_count < peer.send_transfers.size(); idx++, iterated_count++) { - idx = idx % peer.send_transfers.size(); + if (can_packet_size > 0) { + // change iterate start position to not starve transfers in the back + size_t iterated_count = 0; + bool last_send_found = false; + for (size_t idx = peer.next_send_transfer_send_idx; iterated_count < peer.send_transfers.size(); idx++, iterated_count++) { + idx = idx % peer.send_transfers.size(); - if (peer.send_transfers.at(idx).has_value()) { - if (!last_send_found && can_packet_size <= 0) { - peer.next_send_transfer_send_idx = idx; - last_send_found = true; // only set once + if (peer.send_transfers.at(idx).has_value()) { + if (!last_send_found && can_packet_size <= 0) { + peer.next_send_transfer_send_idx = idx; + last_send_found = true; // only set once + } + updateSendTransferPhase2(time_delta, group_number, peer_number, peer, idx, can_packet_size); } - updateSendTransfer(time_delta, group_number, peer_number, peer, idx, timeouts_set, can_packet_size); } } } diff --git a/solanaceae/ngc_ft1/ngcft1.hpp b/solanaceae/ngc_ft1/ngcft1.hpp index 529c830..7899fd4 100644 --- a/solanaceae/ngc_ft1/ngcft1.hpp +++ b/solanaceae/ngc_ft1/ngcft1.hpp @@ -209,7 +209,11 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider std::map groups; protected: - void updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set& timeouts_set, int64_t& can_packet_size); + // general update with timeouts and resending + void updateSendTransferPhase1(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set& timeouts_set, int64_t& can_packet_size); + // does sending new data + void updateSendTransferPhase2(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, int64_t& can_packet_size); + bool iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer); const CCAI* getPeerCCA(uint32_t group_number, uint32_t peer_number) const; diff --git a/solanaceae/ngc_ft1/snd_buf.hpp b/solanaceae/ngc_ft1/snd_buf.hpp index ca67753..c0a4f6b 100644 --- a/solanaceae/ngc_ft1/snd_buf.hpp +++ b/solanaceae/ngc_ft1/snd_buf.hpp @@ -26,7 +26,6 @@ struct SendSequenceBuffer { template void for_each(float time_delta, FN&& fn) { for (auto& [id, entry] : entries) { - entry.time_since_activity += time_delta; fn(id, entry.data, entry.time_since_activity); } }