diff --git a/solanaceae/ngc_ft1/cca.hpp b/solanaceae/ngc_ft1/cca.hpp index d89acf8..cee1064 100644 --- a/solanaceae/ngc_ft1/cca.hpp +++ b/solanaceae/ngc_ft1/cca.hpp @@ -46,10 +46,13 @@ struct CCAI { // return the current believed window in bytes of how much data can be inflight, //virtual float getCWnD(void) const = 0; + // returns current rtt/delay + virtual float getCurrentDelay(void) const = 0; + // TODO: api for how much data we should send // take time since last sent into account // respect max_byterate_allowed - virtual size_t canSend(void) = 0; + virtual int64_t canSend(void) = 0; // get the list of timed out seq_ids virtual std::vector getTimeouts(void) const = 0; diff --git a/solanaceae/ngc_ft1/cubic.cpp b/solanaceae/ngc_ft1/cubic.cpp index b9548dd..8c0a9a6 100644 --- a/solanaceae/ngc_ft1/cubic.cpp +++ b/solanaceae/ngc_ft1/cubic.cpp @@ -35,16 +35,23 @@ float CUBIC::getCWnD(void) const { void CUBIC::onCongestion(void) { if (getTimeNow() - _time_point_reduction >= getCurrentDelay()*4.f) { const auto current_cwnd = getCWnD(); + const auto tmp_old_tp = _time_point_reduction; _time_point_reduction = getTimeNow(); _window_max = current_cwnd * BETA; _window_max = std::max(_window_max, 2.*MAXIMUM_SEGMENT_SIZE); - //std::cout << "CONGESTION! cwnd:" << current_cwnd << "\n"; - std::cout << "CONGESTION! cwnd_max:" << _window_max << "\n"; +#if 1 + std::cout << "CONGESTION!" + << " cwnd_max:" << _window_max + << " pts:" << tmp_old_tp + << " rtt:" << getCurrentDelay() + << "\n" + ; +#endif } } -size_t CUBIC::canSend(void) { +int64_t CUBIC::canSend(void) { const auto fspace_pkgs = FlowOnly::canSend(); if (fspace_pkgs == 0u) { @@ -57,7 +64,7 @@ size_t CUBIC::canSend(void) { } // limit to whole packets - size_t cspace_pkgs = std::floor(cspace_bytes / MAXIMUM_SEGMENT_DATA_SIZE) * MAXIMUM_SEGMENT_DATA_SIZE; + int64_t cspace_pkgs = (cspace_bytes / MAXIMUM_SEGMENT_DATA_SIZE) * MAXIMUM_SEGMENT_DATA_SIZE; return std::min(cspace_pkgs, fspace_pkgs); } diff --git a/solanaceae/ngc_ft1/cubic.hpp b/solanaceae/ngc_ft1/cubic.hpp index e146d72..377143c 100644 --- a/solanaceae/ngc_ft1/cubic.hpp +++ b/solanaceae/ngc_ft1/cubic.hpp @@ -35,7 +35,7 @@ struct CUBIC : public FlowOnly { // TODO: api for how much data we should send // take time since last sent into account // respect max_byterate_allowed - size_t canSend(void) override; + int64_t canSend(void) override; // get the list of timed out seq_ids //std::vector getTimeouts(void) const override; diff --git a/solanaceae/ngc_ft1/flow_only.cpp b/solanaceae/ngc_ft1/flow_only.cpp index a33e40a..e984cec 100644 --- a/solanaceae/ngc_ft1/flow_only.cpp +++ b/solanaceae/ngc_ft1/flow_only.cpp @@ -28,7 +28,7 @@ void FlowOnly::updateWindow(void) { _fwnd = std::max(_fwnd, 2.f * MAXIMUM_SEGMENT_DATA_SIZE); } -size_t FlowOnly::canSend(void) { +int64_t FlowOnly::canSend(void) { if (_in_flight.empty()) { assert(_in_flight_bytes == 0); return MAXIMUM_SEGMENT_DATA_SIZE; @@ -42,10 +42,7 @@ size_t FlowOnly::canSend(void) { } // limit to whole packets - size_t space = std::floor(fspace / MAXIMUM_SEGMENT_DATA_SIZE) - * MAXIMUM_SEGMENT_DATA_SIZE; - - return space; + return (fspace / MAXIMUM_SEGMENT_DATA_SIZE) * MAXIMUM_SEGMENT_DATA_SIZE; } std::vector FlowOnly::getTimeouts(void) const { @@ -147,16 +144,10 @@ void FlowOnly::onLoss(SeqIDType seq, bool discard) { assert(_in_flight_bytes >= 0); _in_flight.erase(it); } - // TODO: reset timestamp? -#if 0 // temporarily disable ce for timeout - // at most once per rtt? - // TODO: use delay at event instead - if (getTimeNow() >= _last_congestion_event + _last_congestion_rtt) { - _recently_lost_data = true; - _last_congestion_event = getTimeNow(); - _last_congestion_rtt = getCurrentDelay(); - } -#endif + // TODO: reset timestamp? + // and not take into rtt + + // no ce, since this is usually after data arrived out-of-order/duplicate } diff --git a/solanaceae/ngc_ft1/flow_only.hpp b/solanaceae/ngc_ft1/flow_only.hpp index 4ef7c83..770e7e8 100644 --- a/solanaceae/ngc_ft1/flow_only.hpp +++ b/solanaceae/ngc_ft1/flow_only.hpp @@ -45,7 +45,7 @@ struct FlowOnly : public CCAI { // moving avg over the last few delay samples // VERY sensitive to bundling acks - float getCurrentDelay(void) const; + float getCurrentDelay(void) const override; void addRTT(float new_delay); @@ -59,7 +59,7 @@ struct FlowOnly : public CCAI { // TODO: api for how much data we should send // take time since last sent into account // respect max_byterate_allowed - size_t canSend(void) override; + int64_t canSend(void) override; // get the list of timed out seq_ids std::vector getTimeouts(void) const override; diff --git a/solanaceae/ngc_ft1/ledbat.cpp b/solanaceae/ngc_ft1/ledbat.cpp index 2cdbe42..45d6e99 100644 --- a/solanaceae/ngc_ft1/ledbat.cpp +++ b/solanaceae/ngc_ft1/ledbat.cpp @@ -19,7 +19,7 @@ LEDBAT::LEDBAT(size_t maximum_segment_data_size) : CCAI(maximum_segment_data_siz _time_start_offset = clock::now(); } -size_t LEDBAT::canSend(void) { +int64_t LEDBAT::canSend(void) { if (_in_flight.empty()) { return MAXIMUM_SEGMENT_DATA_SIZE; } @@ -34,9 +34,7 @@ size_t LEDBAT::canSend(void) { return 0u; } - size_t space = std::ceil(std::min(cspace, fspace) / MAXIMUM_SEGMENT_DATA_SIZE) * MAXIMUM_SEGMENT_DATA_SIZE; - - return space; + return std::ceil(std::min(cspace, fspace) / MAXIMUM_SEGMENT_DATA_SIZE) * MAXIMUM_SEGMENT_DATA_SIZE; } std::vector LEDBAT::getTimeouts(void) const { diff --git a/solanaceae/ngc_ft1/ledbat.hpp b/solanaceae/ngc_ft1/ledbat.hpp index c34a86b..02a474c 100644 --- a/solanaceae/ngc_ft1/ledbat.hpp +++ b/solanaceae/ngc_ft1/ledbat.hpp @@ -61,7 +61,7 @@ struct LEDBAT : public CCAI{ // TODO: api for how much data we should send // take time since last sent into account // respect max_byterate_allowed - size_t canSend(void) override; + int64_t canSend(void) override; // get the list of timed out seq_ids std::vector getTimeouts(void) const override; @@ -86,7 +86,7 @@ struct LEDBAT : public CCAI{ // moving avg over the last few delay samples // VERY sensitive to bundling acks - float getCurrentDelay(void) const; + float getCurrentDelay(void) const override; void addRTT(float new_delay); diff --git a/solanaceae/ngc_ft1/ngcft1.cpp b/solanaceae/ngc_ft1/ngcft1.cpp index 5bff1b3..101a5e0 100644 --- a/solanaceae/ngc_ft1/ngcft1.cpp +++ b/solanaceae/ngc_ft1/ngcft1.cpp @@ -143,7 +143,7 @@ bool NGCFT1::sendPKG_FT1_MESSAGE( return _t.toxGroupSendCustomPacket(group_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PACKET_OK; } -void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set& timeouts_set) { +void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set& timeouts_set, int64_t& can_packet_size) { auto& tf_opt = peer.send_transfers.at(idx); assert(tf_opt.has_value()); auto& tf = tf_opt.value(); @@ -177,14 +177,13 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_ return; case State::SENDING: { tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector& data, float& time_since_activity) { - // no ack after 5 sec -> resend - //if (time_since_activity >= ngc_ft1_ctx->options.sending_resend_without_ack_after) { - if (timeouts_set.count({idx, id})) { + if (can_packet_size >= data.size() && time_since_activity >= peer.cca->getCurrentDelay() && timeouts_set.count({idx, id})) { // TODO: can fail sendPKG_FT1_DATA(group_number, peer_number, idx, id, data.data(), data.size()); peer.cca->onLoss({idx, id}, false); time_since_activity = 0.f; timeouts_set.erase({idx, id}); + can_packet_size -= data.size(); } }); @@ -211,21 +210,10 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_ } // if chunks in flight < window size (2) - //while (tf.ssb.size() < ngc_ft1_ctx->options.packet_window_size) { - int64_t can_packet_size {static_cast(peer.cca->canSend())}; - //if (can_packet_size) { - //std::cerr << "FT: can_packet_size: " << can_packet_size; - //} - size_t count {0}; while (can_packet_size > 0 && tf.file_size > 0) { std::vector new_data; - // TODO: parameterize packet size? -> only if JF increases lossy packet size >:) - //size_t chunk_size = std::min(496u, tf.file_size - tf.file_size_current); - //size_t chunk_size = std::min(can_packet_size, tf.file_size - tf.file_size_current); size_t chunk_size = std::min({ - //496u, - //996u, peer.cca->MAXIMUM_SEGMENT_DATA_SIZE, static_cast(can_packet_size), tf.file_size - tf.file_size_current @@ -237,14 +225,6 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_ new_data.resize(chunk_size); - //ngc_ft1_ctx->cb_send_data[tf.file_kind]( - //tox, - //group_number, peer_number, - //idx, - //tf.file_size_current, - //new_data.data(), new_data.size(), - //ngc_ft1_ctx->ud_send_data.count(tf.file_kind) ? ngc_ft1_ctx->ud_send_data.at(tf.file_kind) : nullptr - //); assert(idx <= 0xffu); // TODO: check return value dispatch( @@ -267,22 +247,17 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_ tf.file_size_current += chunk_size; can_packet_size -= chunk_size; - count++; } - //if (count) { - //std::cerr << " split over " << count << "\n"; - //} } break; case State::FINISHING: // we still have unacked packets tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector& data, float& time_since_activity) { - // no ack after 5 sec -> resend - //if (time_since_activity >= ngc_ft1_ctx->options.sending_resend_without_ack_after) { - if (timeouts_set.count({idx, id})) { + if (can_packet_size >= data.size() && timeouts_set.count({idx, id})) { sendPKG_FT1_DATA(group_number, peer_number, idx, id, data.data(), data.size()); peer.cca->onLoss({idx, id}, false); time_since_activity = 0.f; timeouts_set.erase({idx, id}); + can_packet_size -= data.size(); } }); if (tf.time_since_activity >= sending_give_up_after) { @@ -311,9 +286,11 @@ void NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_ auto timeouts = peer.cca->getTimeouts(); std::set timeouts_set{timeouts.cbegin(), timeouts.cend()}; + for (size_t idx = 0; idx < peer.send_transfers.size(); idx++) { if (peer.send_transfers.at(idx).has_value()) { - updateSendTransfer(time_delta, group_number, peer_number, peer, idx, timeouts_set); + int64_t can_packet_size {peer.cca->canSend()}; // might get more space while iterating (time) + updateSendTransfer(time_delta, group_number, peer_number, peer, idx, timeouts_set, can_packet_size); } } diff --git a/solanaceae/ngc_ft1/ngcft1.hpp b/solanaceae/ngc_ft1/ngcft1.hpp index 7515ff5..b07119d 100644 --- a/solanaceae/ngc_ft1/ngcft1.hpp +++ b/solanaceae/ngc_ft1/ngcft1.hpp @@ -201,7 +201,7 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider bool sendPKG_FT1_DATA_ACK(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, const uint16_t* seq_ids, size_t seq_ids_size); bool sendPKG_FT1_MESSAGE(uint32_t group_number, uint32_t message_id, uint32_t file_kind, const uint8_t* file_id, size_t file_id_size); - void updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set& timeouts_set); + void updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set& timeouts_set, int64_t& can_packet_size); void iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer); public: