refactor sending transfers
always do timeouts and resending first then roundrobin over sending new data
This commit is contained in:
@@ -15,56 +15,19 @@
|
|||||||
#include <cassert>
|
#include <cassert>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set<CCAI::SeqIDType>& timeouts_set, int64_t& can_packet_size) {
|
void NGCFT1::updateSendTransferPhase1(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set<CCAI::SeqIDType>& timeouts_set, int64_t& can_packet_size) {
|
||||||
|
using State = Group::Peer::SendTransfer::State;
|
||||||
auto& tf_opt = peer.send_transfers.at(idx);
|
auto& tf_opt = peer.send_transfers.at(idx);
|
||||||
assert(tf_opt.has_value());
|
assert(tf_opt.has_value());
|
||||||
auto& tf = tf_opt.value();
|
auto& tf = tf_opt.value();
|
||||||
|
|
||||||
tf.time_since_activity += time_delta;
|
tf.time_since_activity += time_delta;
|
||||||
|
|
||||||
switch (tf.state) {
|
if (tf.state == State::INIT_SENT) {
|
||||||
using State = Group::Peer::SendTransfer::State;
|
if (tf.time_since_activity >= init_retry_timeout_after) {
|
||||||
case State::INIT_SENT:
|
if (tf.inits_sent >= 3) {
|
||||||
if (tf.time_since_activity >= init_retry_timeout_after) {
|
// delete, timed out 3 times
|
||||||
if (tf.inits_sent >= 3) {
|
std::cerr << "NGCFT1 warning: sending ft init timed out, deleting\n";
|
||||||
// delete, timed out 3 times
|
|
||||||
std::cerr << "NGCFT1 warning: ft init timed out, deleting\n";
|
|
||||||
dispatch(
|
|
||||||
NGCFT1_Event::send_done,
|
|
||||||
Events::NGCFT1_send_done{
|
|
||||||
group_number, peer_number,
|
|
||||||
static_cast<uint8_t>(idx),
|
|
||||||
}
|
|
||||||
);
|
|
||||||
tf_opt.reset();
|
|
||||||
} else {
|
|
||||||
// timed out, resend
|
|
||||||
std::cerr << "NGCFT1 warning: ft init timed out, resending\n";
|
|
||||||
_neep.send_ft1_init(group_number, peer_number, tf.file_kind, tf.file_size, idx, tf.file_id.data(), tf.file_id.size());
|
|
||||||
tf.inits_sent++;
|
|
||||||
tf.time_since_activity = 0.f;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case State::FINISHING: // we still have unacked packets
|
|
||||||
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
|
|
||||||
if (timeouts_set.count({idx, id})) {
|
|
||||||
if (can_packet_size >= int64_t(data.size())) {
|
|
||||||
_neep.send_ft1_data(group_number, peer_number, idx, id, data.data(), data.size());
|
|
||||||
peer.cca->onLoss({idx, id}, false);
|
|
||||||
time_since_activity = 0.f;
|
|
||||||
timeouts_set.erase({idx, id});
|
|
||||||
can_packet_size -= data.size();
|
|
||||||
} else {
|
|
||||||
#if 0 // too spammy
|
|
||||||
std::cerr << "NGCFT1 warning: no space to resend timedout\n";
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
if (tf.time_since_activity >= (sending_give_up_after * peer.active_send_transfers)) {
|
|
||||||
// no ack after 30sec, close ft
|
|
||||||
std::cerr << "NGCFT1 warning: sending ft finishing timed out, deleting\n";
|
|
||||||
dispatch(
|
dispatch(
|
||||||
NGCFT1_Event::send_done,
|
NGCFT1_Event::send_done,
|
||||||
Events::NGCFT1_send_done{
|
Events::NGCFT1_send_done{
|
||||||
@@ -72,101 +35,131 @@ void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_
|
|||||||
static_cast<uint8_t>(idx),
|
static_cast<uint8_t>(idx),
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
// clean up cca
|
|
||||||
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
|
|
||||||
peer.cca->onLoss({idx, id}, true);
|
|
||||||
timeouts_set.erase({idx, id});
|
|
||||||
});
|
|
||||||
|
|
||||||
tf_opt.reset();
|
tf_opt.reset();
|
||||||
|
} else {
|
||||||
|
// timed out, resend
|
||||||
|
std::cerr << "NGCFT1 warning: sending ft init timed out, resending\n";
|
||||||
|
_neep.send_ft1_init(group_number, peer_number, tf.file_kind, tf.file_size, idx, tf.file_id.data(), tf.file_id.size());
|
||||||
|
tf.inits_sent++;
|
||||||
|
tf.time_since_activity = 0.f;
|
||||||
}
|
}
|
||||||
break;
|
}
|
||||||
case State::SENDING: {
|
return;
|
||||||
// first handle overall timeout (could otherwise do resends directly before, which is useless)
|
} else if (tf.state == State::FINISHING || tf.state == State::SENDING) {
|
||||||
// timeout increases with active transfers (otherwise we could starve them)
|
// timeout increases with active transfers (otherwise we could starve them)
|
||||||
if (tf.time_since_activity >= (sending_give_up_after * peer.active_send_transfers)) {
|
if (tf.time_since_activity >= (sending_give_up_after * peer.active_send_transfers)) {
|
||||||
// no ack after 30sec, close ft
|
// no ack after Xsec, close ft
|
||||||
std::cerr << "NGCFT1 warning: sending ft in progress timed out, deleting (ifc:" << peer.cca->inFlightCount() << ")\n";
|
if (tf.state == State::FINISHING) {
|
||||||
dispatch(
|
std::cerr << "NGCFT1 warning: sending ft finishing timed out, deleting\n";
|
||||||
NGCFT1_Event::send_done,
|
} else {
|
||||||
Events::NGCFT1_send_done{
|
std::cerr << "NGCFT1 warning: sending ft in progress timed out, deleting (ifc:" << peer.cca->inFlightCount() << ")\n";
|
||||||
group_number, peer_number,
|
|
||||||
static_cast<uint8_t>(idx),
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
// clean up cca
|
|
||||||
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
|
|
||||||
peer.cca->onLoss({idx, id}, true);
|
|
||||||
timeouts_set.erase({idx, id});
|
|
||||||
});
|
|
||||||
|
|
||||||
tf_opt.reset();
|
|
||||||
//continue; // dangerous control flow
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// do resends
|
|
||||||
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
|
|
||||||
if (can_packet_size >= int64_t(data.size()) && time_since_activity >= peer.cca->getCurrentDelay() && timeouts_set.count({idx, id})) {
|
|
||||||
// TODO: can fail
|
|
||||||
_neep.send_ft1_data(group_number, peer_number, idx, id, data.data(), data.size());
|
|
||||||
peer.cca->onLoss({idx, id}, false);
|
|
||||||
time_since_activity = 0.f;
|
|
||||||
timeouts_set.erase({idx, id});
|
|
||||||
can_packet_size -= data.size();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// if chunks in flight < window size (2)
|
|
||||||
while (can_packet_size > 0 && tf.file_size > 0) {
|
|
||||||
std::vector<uint8_t> new_data;
|
|
||||||
|
|
||||||
size_t chunk_size = std::min<size_t>({
|
|
||||||
peer.cca->MAXIMUM_SEGMENT_DATA_SIZE,
|
|
||||||
static_cast<size_t>(can_packet_size),
|
|
||||||
static_cast<size_t>(tf.file_size - tf.file_size_current),
|
|
||||||
});
|
|
||||||
if (chunk_size == 0) {
|
|
||||||
tf.state = State::FINISHING;
|
|
||||||
break; // we done
|
|
||||||
}
|
|
||||||
|
|
||||||
new_data.resize(chunk_size);
|
|
||||||
|
|
||||||
assert(idx <= 0xffu);
|
|
||||||
// TODO: check return value
|
|
||||||
dispatch(
|
|
||||||
NGCFT1_Event::send_data,
|
|
||||||
Events::NGCFT1_send_data{
|
|
||||||
group_number, peer_number,
|
|
||||||
static_cast<uint8_t>(idx),
|
|
||||||
tf.file_size_current,
|
|
||||||
new_data.data(), static_cast<uint32_t>(new_data.size()),
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
uint16_t seq_id = tf.ssb.add(std::move(new_data));
|
|
||||||
const bool sent = _neep.send_ft1_data(group_number, peer_number, idx, seq_id, tf.ssb.entries.at(seq_id).data.data(), tf.ssb.entries.at(seq_id).data.size());
|
|
||||||
if (sent) {
|
|
||||||
peer.cca->onSent({idx, seq_id}, chunk_size);
|
|
||||||
} else {
|
|
||||||
std::cerr << "NGCFT1: failed to send packet (queue full?) --------------\n";
|
|
||||||
peer.cca->onCongestion();
|
|
||||||
can_packet_size = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
tf.file_size_current += chunk_size;
|
|
||||||
can_packet_size -= chunk_size;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
break;
|
dispatch(
|
||||||
default: // invalid state, delete
|
NGCFT1_Event::send_done,
|
||||||
std::cerr << "NGCFT1 error: ft in invalid state, deleting\n";
|
Events::NGCFT1_send_done{
|
||||||
assert(false && "ft in invalid state");
|
group_number, peer_number,
|
||||||
|
static_cast<uint8_t>(idx),
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
// clean up cca
|
||||||
|
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
|
||||||
|
peer.cca->onLoss({idx, id}, true);
|
||||||
|
});
|
||||||
|
|
||||||
tf_opt.reset();
|
tf_opt.reset();
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// do send buffer and resending
|
||||||
|
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
|
||||||
|
time_since_activity += time_delta;
|
||||||
|
|
||||||
|
if (tf.state != State::FINISHING && tf.state != State::SENDING) {
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (
|
||||||
|
time_since_activity >= peer.cca->getCurrentDelay() && // TODO: use OR instead?
|
||||||
|
timeouts_set.count({idx, id})
|
||||||
|
) {
|
||||||
|
if (can_packet_size >= int64_t(data.size() /*+ peer.cca->SEGMENT_OVERHEAD*/)) {
|
||||||
|
if (_neep.send_ft1_data(group_number, peer_number, idx, id, data.data(), data.size())) {
|
||||||
|
if (!peer.cca->onLoss({idx, id}, false)) { // might not be in cca
|
||||||
|
peer.cca->onSent({idx, id}, data.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
time_since_activity = 0.f;
|
||||||
|
can_packet_size -= data.size();
|
||||||
|
} else {
|
||||||
|
std::cerr << "NGCFT1 warning: failed to re-send packet (send queue full?)\n";
|
||||||
|
|
||||||
|
// signal ce (we did not call onLoss()
|
||||||
|
peer.cca->onCongestion();
|
||||||
|
can_packet_size = 0;
|
||||||
|
}
|
||||||
|
#if 0
|
||||||
|
} else {
|
||||||
|
std::cerr << "NGCFT1 warning: no space to resend timed-out\n";
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
timeouts_set.erase({idx, id});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
void NGCFT1::updateSendTransferPhase2(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, int64_t& can_packet_size) {
|
||||||
|
using State = Group::Peer::SendTransfer::State;
|
||||||
|
auto& tf_opt = peer.send_transfers.at(idx);
|
||||||
|
assert(tf_opt.has_value());
|
||||||
|
auto& tf = tf_opt.value();
|
||||||
|
|
||||||
|
if (tf.state != State::SENDING) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// if chunks in flight < window size (2)
|
||||||
|
while (can_packet_size > 0 && tf.file_size > 0) {
|
||||||
|
std::vector<uint8_t> new_data;
|
||||||
|
|
||||||
|
size_t chunk_size = std::min<size_t>({
|
||||||
|
peer.cca->MAXIMUM_SEGMENT_DATA_SIZE,
|
||||||
|
static_cast<size_t>(can_packet_size),
|
||||||
|
static_cast<size_t>(tf.file_size - tf.file_size_current),
|
||||||
|
});
|
||||||
|
if (chunk_size == 0) {
|
||||||
|
tf.state = State::FINISHING;
|
||||||
|
break; // we done
|
||||||
|
}
|
||||||
|
|
||||||
|
new_data.resize(chunk_size);
|
||||||
|
|
||||||
|
assert(idx <= 0xffu);
|
||||||
|
// TODO: check return value
|
||||||
|
dispatch(
|
||||||
|
NGCFT1_Event::send_data,
|
||||||
|
Events::NGCFT1_send_data{
|
||||||
|
group_number, peer_number,
|
||||||
|
static_cast<uint8_t>(idx),
|
||||||
|
tf.file_size_current,
|
||||||
|
new_data.data(), static_cast<uint32_t>(new_data.size()),
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
uint16_t seq_id = tf.ssb.add(std::move(new_data));
|
||||||
|
const bool sent = _neep.send_ft1_data(group_number, peer_number, idx, seq_id, tf.ssb.entries.at(seq_id).data.data(), tf.ssb.entries.at(seq_id).data.size());
|
||||||
|
if (sent) {
|
||||||
|
peer.cca->onSent({idx, seq_id}, chunk_size);
|
||||||
|
} else {
|
||||||
|
std::cerr << "NGCFT1 warn: failed to send packet (send queue full?)\n";
|
||||||
|
peer.cca->onCongestion();
|
||||||
|
can_packet_size = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
tf.file_size_current += chunk_size;
|
||||||
|
can_packet_size -= chunk_size;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -201,26 +194,30 @@ bool NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_
|
|||||||
|
|
||||||
int64_t can_packet_size {peer.cca->canSend(time_delta)}; // might get more space while iterating (time)
|
int64_t can_packet_size {peer.cca->canSend(time_delta)}; // might get more space while iterating (time)
|
||||||
|
|
||||||
// get number current running transfers TODO: improve
|
// resend and get number current running transfers
|
||||||
peer.active_send_transfers = 0;
|
peer.active_send_transfers = 0;
|
||||||
for (const auto& it : peer.send_transfers) {
|
for (size_t idx = 0; idx < peer.send_transfers.size(); idx++) {
|
||||||
if (it.has_value()) {
|
if (!peer.send_transfers.at(idx).has_value()) {
|
||||||
peer.active_send_transfers++;
|
continue;
|
||||||
}
|
}
|
||||||
|
peer.active_send_transfers++;
|
||||||
|
updateSendTransferPhase1(time_delta, group_number, peer_number, peer, idx, timeouts_set, can_packet_size);
|
||||||
}
|
}
|
||||||
|
|
||||||
// change iterate start position to not starve transfers in the back
|
if (can_packet_size > 0) {
|
||||||
size_t iterated_count = 0;
|
// change iterate start position to not starve transfers in the back
|
||||||
bool last_send_found = false;
|
size_t iterated_count = 0;
|
||||||
for (size_t idx = peer.next_send_transfer_send_idx; iterated_count < peer.send_transfers.size(); idx++, iterated_count++) {
|
bool last_send_found = false;
|
||||||
idx = idx % peer.send_transfers.size();
|
for (size_t idx = peer.next_send_transfer_send_idx; iterated_count < peer.send_transfers.size(); idx++, iterated_count++) {
|
||||||
|
idx = idx % peer.send_transfers.size();
|
||||||
|
|
||||||
if (peer.send_transfers.at(idx).has_value()) {
|
if (peer.send_transfers.at(idx).has_value()) {
|
||||||
if (!last_send_found && can_packet_size <= 0) {
|
if (!last_send_found && can_packet_size <= 0) {
|
||||||
peer.next_send_transfer_send_idx = idx;
|
peer.next_send_transfer_send_idx = idx;
|
||||||
last_send_found = true; // only set once
|
last_send_found = true; // only set once
|
||||||
|
}
|
||||||
|
updateSendTransferPhase2(time_delta, group_number, peer_number, peer, idx, can_packet_size);
|
||||||
}
|
}
|
||||||
updateSendTransfer(time_delta, group_number, peer_number, peer, idx, timeouts_set, can_packet_size);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -209,7 +209,11 @@ class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProvider
|
|||||||
std::map<uint32_t, Group> groups;
|
std::map<uint32_t, Group> groups;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
void updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set<CCAI::SeqIDType>& timeouts_set, int64_t& can_packet_size);
|
// general update with timeouts and resending
|
||||||
|
void updateSendTransferPhase1(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set<CCAI::SeqIDType>& timeouts_set, int64_t& can_packet_size);
|
||||||
|
// does sending new data
|
||||||
|
void updateSendTransferPhase2(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, int64_t& can_packet_size);
|
||||||
|
|
||||||
bool iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer);
|
bool iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer);
|
||||||
|
|
||||||
const CCAI* getPeerCCA(uint32_t group_number, uint32_t peer_number) const;
|
const CCAI* getPeerCCA(uint32_t group_number, uint32_t peer_number) const;
|
||||||
|
|||||||
@@ -26,7 +26,6 @@ struct SendSequenceBuffer {
|
|||||||
template<typename FN>
|
template<typename FN>
|
||||||
void for_each(float time_delta, FN&& fn) {
|
void for_each(float time_delta, FN&& fn) {
|
||||||
for (auto& [id, entry] : entries) {
|
for (auto& [id, entry] : entries) {
|
||||||
entry.time_since_activity += time_delta;
|
|
||||||
fn(id, entry.data, entry.time_since_activity);
|
fn(id, entry.data, entry.time_since_activity);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user