very experimental ledbat++ -ish cca

This commit is contained in:
2023-01-31 01:59:29 +01:00
parent a7b5c31369
commit e1b5dd2080
3 changed files with 421 additions and 2 deletions

View File

@@ -2,6 +2,9 @@
#include "ngc_ext.hpp"
#include "./ledbat.hpp"
#include <algorithm>
#include <vector>
#include <array>
#include <deque>
@@ -11,6 +14,7 @@
#include <optional>
#include <cassert>
#include <cstdio>
#include <iostream>
struct SendSequenceBuffer {
struct SSBEntry {
@@ -114,6 +118,8 @@ struct NGC_FT1 {
struct Group {
struct Peer {
LEDBAT cca;
struct RecvTransfer {
uint32_t file_kind;
std::vector<uint8_t> file_id;
@@ -240,7 +246,9 @@ void NGC_FT1_iterate(Tox *tox, NGC_FT1* ngc_ft1_ctx, float time_delta) {
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
// no ack after 5 sec -> resend
if (time_since_activity >= ngc_ft1_ctx->options.sending_resend_without_ack_after) {
// TODO: can fail
_send_pkg_FT1_DATA(tox, group_number, peer_number, idx, id, data.data(), data.size());
peer.cca.onLoss({idx, id}, false);
time_since_activity = 0.f;
}
});
@@ -249,6 +257,12 @@ void NGC_FT1_iterate(Tox *tox, NGC_FT1* ngc_ft1_ctx, float time_delta) {
// no ack after 30sec, close ft
// TODO: notify app
fprintf(stderr, "FT: warning, sending ft in progress timed out, deleting\n");
// clean up cca
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
peer.cca.onLoss({idx, id}, true);
});
tf_opt.reset();
continue; // dangerous control flow
}
@@ -256,11 +270,23 @@ void NGC_FT1_iterate(Tox *tox, NGC_FT1* ngc_ft1_ctx, float time_delta) {
assert(ngc_ft1_ctx->cb_send_data.count(tf.file_kind));
// if chunks in flight < window size (2)
while (tf.ssb.size() < ngc_ft1_ctx->options.packet_window_size) {
//while (tf.ssb.size() < ngc_ft1_ctx->options.packet_window_size) {
int64_t can_packet_size {peer.cca.canSend()};
//if (can_packet_size) {
//std::cerr << "FT: can_packet_size: " << can_packet_size;
//}
size_t count {0};
while (can_packet_size > 0 && tf.file_size > 0) {
std::vector<uint8_t> new_data;
// TODO: parameterize packet size? -> only if JF increases lossy packet size >:)
size_t chunk_size = std::min<size_t>(496u, tf.file_size - tf.file_size_current);
//size_t chunk_size = std::min<size_t>(496u, tf.file_size - tf.file_size_current);
//size_t chunk_size = std::min<size_t>(can_packet_size, tf.file_size - tf.file_size_current);
size_t chunk_size = std::min<size_t>({
496u,
can_packet_size,
tf.file_size - tf.file_size_current
});
if (chunk_size == 0) {
tf.state = State::FINISHING;
break; // we done
@@ -278,13 +304,19 @@ void NGC_FT1_iterate(Tox *tox, NGC_FT1* ngc_ft1_ctx, float time_delta) {
);
uint16_t seq_id = tf.ssb.add(std::move(new_data));
_send_pkg_FT1_DATA(tox, group_number, peer_number, idx, seq_id, tf.ssb.entries.at(seq_id).data.data(), tf.ssb.entries.at(seq_id).data.size());
peer.cca.onSent({idx, seq_id}, chunk_size);
#if defined(EXTRA_LOGGING) && EXTRA_LOGGING == 1
fprintf(stderr, "FT: sent data size: %ld (seq %d)\n", chunk_size, seq_id);
#endif
tf.file_size_current += chunk_size;
can_packet_size -= chunk_size;
count++;
}
//if (count) {
//std::cerr << " split over " << count << "\n";
//}
}
break;
case State::FINISHING: // we still have unacked packets
@@ -292,6 +324,7 @@ void NGC_FT1_iterate(Tox *tox, NGC_FT1* ngc_ft1_ctx, float time_delta) {
// no ack after 5 sec -> resend
if (time_since_activity >= ngc_ft1_ctx->options.sending_resend_without_ack_after) {
_send_pkg_FT1_DATA(tox, group_number, peer_number, idx, id, data.data(), data.size());
peer.cca.onLoss({idx, id}, false);
time_since_activity = 0.f;
}
});
@@ -299,6 +332,12 @@ void NGC_FT1_iterate(Tox *tox, NGC_FT1* ngc_ft1_ctx, float time_delta) {
// no ack after 30sec, close ft
// TODO: notify app
fprintf(stderr, "FT: warning, sending ft finishing timed out, deleting\n");
// clean up cca
tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector<uint8_t>& data, float& time_since_activity) {
peer.cca.onLoss({idx, id}, true);
});
tf_opt.reset();
}
break;
@@ -828,12 +867,15 @@ static void _handle_FT1_DATA_ACK(
transfer.time_since_activity = 0.f;
std::vector<LEDBAT::SeqIDType> seqs;
while (curser < length) {
uint16_t seq_id = data[curser++];
seq_id |= data[curser++] << (1*8);
seqs.push_back({transfer_id, seq_id});
transfer.ssb.erase(seq_id);
}
peer.cca.onAck(seqs);
// delete if all packets acked
if (transfer.file_size == transfer.file_size_current && transfer.ssb.size() == 0) {