From 7f361cbc4b5d1c1c41d9827d29358f7cd6be29dd Mon Sep 17 00:00:00 2001 From: Green Sky Date: Tue, 8 Aug 2023 01:03:01 +0200 Subject: [PATCH] partly import/port code --- CMakeLists.txt | 2 +- src/CMakeLists.txt | 31 ++++++ src/ledbat.cpp | 250 ++++++++++++++++++++++++++++++++++++++++++ src/ledbat.hpp | 122 +++++++++++++++++++++ src/ngcext.cpp | 241 ++++++++++++++++++++++++++++++++++++++++ src/ngcext.hpp | 248 +++++++++++++++++++++++++++++++++++++++++ src/ngcft1.cpp | 54 +++++++++ src/ngcft1.hpp | 230 ++++++++++++++++++++++++++++++++++++++ src/plugin_ngcft1.cpp | 73 ++++++++++++ src/rcv_buf.cpp | 44 ++++++++ src/rcv_buf.hpp | 35 ++++++ src/snd_buf.cpp | 16 +++ src/snd_buf.hpp | 33 ++++++ 13 files changed, 1378 insertions(+), 1 deletion(-) create mode 100644 src/ledbat.cpp create mode 100644 src/ledbat.hpp create mode 100644 src/ngcext.cpp create mode 100644 src/ngcext.hpp create mode 100644 src/ngcft1.cpp create mode 100644 src/ngcft1.hpp create mode 100644 src/plugin_ngcft1.cpp create mode 100644 src/rcv_buf.cpp create mode 100644 src/rcv_buf.hpp create mode 100644 src/snd_buf.cpp create mode 100644 src/snd_buf.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index be0ea18..23b6b9d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -48,5 +48,5 @@ endif() # cmake setup end -#add_subdirectory(./src) +add_subdirectory(./src) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 9403a2a..7adb1dd 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,11 +1,42 @@ cmake_minimum_required(VERSION 3.9 FATAL_ERROR) +add_library(ngcext STATIC + ./ngcext.hpp + ./ngcext.cpp +) +target_link_libraries(ngcext PUBLIC + solanaceae_toxcore + solanaceae_util +) + +######################################## + +add_library(ngcft1 STATIC + ./ngcft1.hpp + ./ngcft1.cpp + + ./ledbat.hpp + ./ledbat.cpp + + ./rcv_buf.hpp + ./rcv_buf.cpp + ./snd_buf.hpp + ./snd_buf.cpp +) +target_link_libraries(ngcft1 PUBLIC + ngcext +) + +######################################## + add_library(plugin_ngcft1 SHARED ./plugin_ngcft1.cpp ) target_link_libraries(plugin_ngcft1 PUBLIC solanaceae_plugin + ngcext + ngcft1 ) ######################################## diff --git a/src/ledbat.cpp b/src/ledbat.cpp new file mode 100644 index 0000000..5f7c5d5 --- /dev/null +++ b/src/ledbat.cpp @@ -0,0 +1,250 @@ +#include "./ledbat.hpp" + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +// https://youtu.be/0HRwNSA-JYM + +inline constexpr bool PLOTTING = false; + +LEDBAT::LEDBAT(size_t maximum_segment_data_size) : MAXIMUM_SEGMENT_DATA_SIZE(maximum_segment_data_size) { + _time_start_offset = clock::now(); +} + +size_t LEDBAT::canSend(void) const { + if (_in_flight.empty()) { + return MAXIMUM_SEGMENT_DATA_SIZE; + } + + const int64_t cspace = _cwnd - _in_flight_bytes; + if (cspace < MAXIMUM_SEGMENT_DATA_SIZE) { + return 0u; + } + + const int64_t fspace = _fwnd - _in_flight_bytes; + if (fspace < MAXIMUM_SEGMENT_DATA_SIZE) { + return 0u; + } + + size_t space = std::ceil(std::min(cspace, fspace) / MAXIMUM_SEGMENT_DATA_SIZE) * MAXIMUM_SEGMENT_DATA_SIZE; + + return space; +} + +std::vector LEDBAT::getTimeouts(void) const { + std::vector list; + + // after 2 delays we trigger timeout + const auto now_adjusted = getTimeNow() - getCurrentDelay()*2.f; + + for (const auto& [seq, time_stamp, size] : _in_flight) { + if (now_adjusted > time_stamp) { + list.push_back(seq); + } + } + + return list; +} + + +void LEDBAT::onSent(SeqIDType seq, size_t data_size) { + if (true) { + for (const auto& it : _in_flight) { + assert(std::get<0>(it) != seq); + } + } + + _in_flight.push_back({seq, getTimeNow(), data_size + SEGMENT_OVERHEAD}); + _in_flight_bytes += data_size + SEGMENT_OVERHEAD; + _recently_sent_bytes += data_size + SEGMENT_OVERHEAD; +} + +void LEDBAT::onAck(std::vector seqs) { + // only take the smallest value + float most_recent {-std::numeric_limits::infinity()}; + + int64_t acked_data {0}; + + const auto now {getTimeNow()}; + + for (const auto& seq : seqs) { + auto it = std::find_if(_in_flight.begin(), _in_flight.end(), [seq](const auto& v) -> bool { + return std::get<0>(v) == seq; + }); + + if (it == _in_flight.end()) { + continue; // not found, ignore + } else { + addRTT(now - std::get<1>(*it)); + + // TODO: remove + most_recent = std::max(most_recent, std::get<1>(*it)); + _in_flight_bytes -= std::get<2>(*it); + _recently_acked_data += std::get<2>(*it); + assert(_in_flight_bytes >= 0); // TODO: this triggers + _in_flight.erase(it); + } + } + + if (most_recent == -std::numeric_limits::infinity()) { + return; // not found, ignore + } + + updateWindows(); +} + +void LEDBAT::onLoss(SeqIDType seq, bool discard) { + auto it = std::find_if(_in_flight.begin(), _in_flight.end(), [seq](const auto& v) -> bool { + assert(!std::isnan(std::get<1>(v))); + return std::get<0>(v) == seq; + }); + + if (it == _in_flight.end()) { + // error + return; // not found, ignore ?? + } + + _recently_lost_data = true; + + // at most once per rtt? + + if (PLOTTING) { + std::cerr << "CCA: onLoss: TIME: " << getTimeNow() << "\n"; + } + + // TODO: "if data lost is not to be retransmitted" + if (discard) { + _in_flight_bytes -= std::get<2>(*it); + assert(_in_flight_bytes >= 0); + _in_flight.erase(it); + } + // TODO: reset timestamp? + + updateWindows(); +} + +float LEDBAT::getCurrentDelay(void) const { + float sum {0.f}; + size_t count {0}; + for (size_t i = 0; i < _tmp_rtt_buffer.size(); i++) { + //sum += _tmp_rtt_buffer.at(_tmp_rtt_buffer.size()-(1+i)); + sum += _tmp_rtt_buffer.at(i); + count++; + } + + if (count) { + return sum / count; + } else { + return std::numeric_limits::infinity(); + } +} + +void LEDBAT::addRTT(float new_delay) { + auto now = getTimeNow(); + + _base_delay = std::min(_base_delay, new_delay); + // TODO: use fixed size instead? allocations can ruin perf + _rtt_buffer.push_back({now, new_delay}); + + _tmp_rtt_buffer.push_front(new_delay); + // HACKY + if (_tmp_rtt_buffer.size() > current_delay_filter_window) { + _tmp_rtt_buffer.resize(current_delay_filter_window); + } + + // is it 1 minute yet + if (now - _rtt_buffer.front().first >= 30.f) { + + float new_section_minimum = new_delay; + for (const auto it : _rtt_buffer) { + new_section_minimum = std::min(it.second, new_section_minimum); + } + + _rtt_buffer_minutes.push_back(new_section_minimum); + + _rtt_buffer.clear(); + + if (_rtt_buffer_minutes.size() > 20) { + _rtt_buffer_minutes.pop_front(); + } + + _base_delay = std::numeric_limits::infinity(); + for (const float it : _rtt_buffer_minutes) { + _base_delay = std::min(_base_delay, it); + } + } +} + +void LEDBAT::updateWindows(void) { + const auto now {getTimeNow()}; + + const float current_delay {getCurrentDelay()}; + + if (now - _last_cwnd >= current_delay) { + const float queuing_delay {current_delay - _base_delay}; + + _fwnd = max_byterate_allowed * current_delay; + _fwnd *= 1.3f; // try do balance conservative algo a bit, current_delay + + float gain {1.f / std::min(16.f, std::ceil(2.f*target_delay/_base_delay))}; + //gain *= 400.f; // from packets to bytes ~ + gain *= _recently_acked_data/5.f; // from packets to bytes ~ + //gain *= 0.1f; + + if (_recently_lost_data) { + _cwnd = std::clamp( + _cwnd / 2.f, + //_cwnd / 1.6f, + 2.f * MAXIMUM_SEGMENT_SIZE, + _cwnd + ); + } else { + // LEDBAT++ (the Rethinking the LEDBAT Protocol paper) + // "Multiplicative decrease" + const float constant {2.f}; // spec recs 1 + if (queuing_delay < target_delay) { + _cwnd = std::min( + _cwnd + gain, + _fwnd + ); + } else if (queuing_delay > target_delay) { + _cwnd = std::clamp( + _cwnd + std::max( + gain - constant * _cwnd * (queuing_delay / target_delay - 1.f), + -_cwnd/2.f // at most halve + ), + + // never drop below 2 "packets" in flight + 2.f * MAXIMUM_SEGMENT_SIZE, + + // cap rate + _fwnd + ); + } // no else, we on point. very unlikely with float + } + + if (PLOTTING) { // plotting + std::cerr << std::fixed << "CCA: onAck: TIME: " << now << " cwnd: " << _cwnd << "\n"; + std::cerr << std::fixed << "CCA: onAck: TIME: " << now << " fwnd: " << _fwnd << "\n"; + std::cerr << std::fixed << "CCA: onAck: TIME: " << now << " current_delay: " << current_delay << "\n"; + std::cerr << std::fixed << "CCA: onAck: TIME: " << now << " base_delay: " << _base_delay << "\n"; + std::cerr << std::fixed << "CCA: onAck: TIME: " << now << " gain: " << gain << "\n"; + std::cerr << std::fixed << "CCA: onAck: TIME: " << now << " speed: " << (_recently_sent_bytes / (now - _last_cwnd)) / (1024*1024) << "\n"; + std::cerr << std::fixed << "CCA: onAck: TIME: " << now << " in_flight_bytes: " << _in_flight_bytes << "\n"; + } + + _last_cwnd = now; + _recently_acked_data = 0; + _recently_lost_data = false; + _recently_sent_bytes = 0; + } +} + diff --git a/src/ledbat.hpp b/src/ledbat.hpp new file mode 100644 index 0000000..d39e2b4 --- /dev/null +++ b/src/ledbat.hpp @@ -0,0 +1,122 @@ +#pragma once + +#include +#include +#include +#include + +// LEDBAT: https://www.rfc-editor.org/rfc/rfc6817 +// LEDBAT++: https://www.ietf.org/archive/id/draft-irtf-iccrg-ledbat-plus-plus-01.txt + +// LEDBAT++ implementation +struct LEDBAT { + public: // config + using SeqIDType = std::pair; // tf_id, seq_id + + static constexpr size_t IPV4_HEADER_SIZE {20}; + static constexpr size_t IPV6_HEADER_SIZE {40}; // bru + static constexpr size_t UDP_HEADER_SIZE {8}; + + // TODO: tcp AND IPv6 will be different + static constexpr size_t SEGMENT_OVERHEAD { + 4+ // ft overhead + 46+ // tox? + UDP_HEADER_SIZE+ + IPV4_HEADER_SIZE + }; + + // TODO: make configurable, set with tox ngc lossy packet size + //const size_t MAXIMUM_SEGMENT_DATA_SIZE {1000-4}; + const size_t MAXIMUM_SEGMENT_DATA_SIZE {500-4}; + + //static constexpr size_t maximum_segment_size {496 + segment_overhead}; // tox 500 - 4 from ft + const size_t MAXIMUM_SEGMENT_SIZE {MAXIMUM_SEGMENT_DATA_SIZE + SEGMENT_OVERHEAD}; // tox 500 - 4 from ft + //static_assert(maximum_segment_size == 574); // mesured in wireshark + + // ledbat++ says 60ms, we might need other values if relayed + //const float target_delay {0.060f}; + const float target_delay {0.030f}; + //const float target_delay {0.120f}; // 2x if relayed? + + // TODO: use a factor for multiple of rtt + static constexpr size_t current_delay_filter_window {16*4}; + + //static constexpr size_t rtt_buffer_size_max {2000}; + + float max_byterate_allowed {10*1024*1024}; // 10MiB/s + + public: + LEDBAT(size_t maximum_segment_data_size); + + // return the current believed window in bytes of how much data can be inflight, + // without overstepping the delay requirement + float getCWnD(void) const { + return _cwnd; + } + + // TODO: api for how much data we should send + // take time since last sent into account + // respect max_byterate_allowed + size_t canSend(void) const; + + // get the list of timed out seq_ids + std::vector getTimeouts(void) const; + + public: // callbacks + // data size is without overhead + void onSent(SeqIDType seq, size_t data_size); + + void onAck(std::vector seqs); + + // if discard, not resent, not inflight + void onLoss(SeqIDType seq, bool discard); + + private: + using clock = std::chrono::steady_clock; + + // make values relative to algo start for readability (and precision) + // get timestamp in seconds + float getTimeNow(void) const { + return std::chrono::duration{clock::now() - _time_start_offset}.count(); + } + + // moving avg over the last few delay samples + // VERY sensitive to bundling acks + float getCurrentDelay(void) const; + + void addRTT(float new_delay); + + void updateWindows(void); + + private: // state + //float _cto {2.f}; // congestion timeout value in seconds + + float _cwnd {2.f * MAXIMUM_SEGMENT_SIZE}; // in bytes + float _base_delay {2.f}; // lowest mesured delay in _rtt_buffer in seconds + + float _last_cwnd {0.f}; // timepoint of last cwnd correction + int64_t _recently_acked_data {0}; // reset on _last_cwnd + bool _recently_lost_data {false}; + int64_t _recently_sent_bytes {0}; + + // initialize to low value, will get corrected very fast + float _fwnd {0.01f * max_byterate_allowed}; // in bytes + + + // ssthresh + + // spec recomends 10min + // TODO: optimize and devide into spans of 1min (spec recom) + std::deque _tmp_rtt_buffer; + std::deque> _rtt_buffer; // timepoint, delay + std::deque _rtt_buffer_minutes; + + // list of sequence ids and timestamps of when they where sent + std::deque> _in_flight; + + int64_t _in_flight_bytes {0}; + + private: // helper + clock::time_point _time_start_offset; +}; + diff --git a/src/ngcext.cpp b/src/ngcext.cpp new file mode 100644 index 0000000..e443f44 --- /dev/null +++ b/src/ngcext.cpp @@ -0,0 +1,241 @@ +#include "./ngcext.hpp" + +#include + +NGCEXTEventProvider::NGCEXTEventProvider(ToxEventProviderI& tep) : _tep(tep) { + _tep.subscribe(this, Tox_Event::TOX_EVENT_GROUP_CUSTOM_PACKET); + _tep.subscribe(this, Tox_Event::TOX_EVENT_GROUP_CUSTOM_PRIVATE_PACKET); +} + +#define _DATA_HAVE(x, error) if ((data_size - curser) < (x)) { error; } + +bool NGCEXTEventProvider::parse_hs1_request_last_ids( + uint32_t group_number, uint32_t peer_number, + const uint8_t* data, size_t data_size, + bool _private +) { + return false; +} + +bool NGCEXTEventProvider::parse_hs1_response_last_ids( + uint32_t group_number, uint32_t peer_number, + const uint8_t* data, size_t data_size, + bool _private +) { + return false; +} + +bool NGCEXTEventProvider::parse_ft1_request( + uint32_t group_number, uint32_t peer_number, + const uint8_t* data, size_t data_size, + bool // dont care private +) { + Events::NGCEXT_ft1_request e; + e.group_number = group_number; + e.peer_number = peer_number; + size_t curser = 0; + + // - 4 byte (file_kind) + e.file_kind = 0u; + _DATA_HAVE(sizeof(e.file_kind), std::cerr << "NGCEXT: packet too small, missing file_kind\n"; return false) + for (size_t i = 0; i < sizeof(e.file_kind); i++, curser++) { + e.file_kind |= uint32_t(data[curser]) << (i*8); + } + + // - X bytes (file_kind dependent id, differnt sizes) + e.file_id = {data+curser, data+curser+(data_size-curser)}; + + return dispatch( + NGCEXT_Event::FT1_REQUEST, + e + ); +} + +bool NGCEXTEventProvider::parse_ft1_init( + uint32_t group_number, uint32_t peer_number, + const uint8_t* data, size_t data_size, + bool _private +) { + if (!_private) { + std::cerr << "NGCEXT: ft1_init cant be public\n"; + return false; + } + + Events::NGCEXT_ft1_init e; + e.group_number = group_number; + e.peer_number = peer_number; + size_t curser = 0; + + // - 4 byte (file_kind) + e.file_kind = 0u; + _DATA_HAVE(sizeof(e.file_kind), std::cerr << "NGCEXT: packet too small, missing file_kind\n"; return false) + for (size_t i = 0; i < sizeof(e.file_kind); i++, curser++) { + e.file_kind |= uint32_t(data[curser]) << (i*8); + } + + // - 8 bytes (data size) + e.file_size = 0u; + _DATA_HAVE(sizeof(e.file_size), std::cerr << "NGCEXT: packet too small, missing file_size\n"; return false) + for (size_t i = 0; i < sizeof(e.file_size); i++, curser++) { + e.file_size |= size_t(data[curser]) << (i*8); + } + + // - 1 byte (temporary_file_tf_id) + _DATA_HAVE(sizeof(e.transfer_id), std::cerr << "NGCEXT: packet too small, missing transfer_id\n"; return false) + e.transfer_id = data[curser++]; + + // - X bytes (file_kind dependent id, differnt sizes) + e.file_id = {data+curser, data+curser+(data_size-curser)}; + + return dispatch( + NGCEXT_Event::FT1_INIT, + e + ); +} + +bool NGCEXTEventProvider::parse_ft1_init_ack( + uint32_t group_number, uint32_t peer_number, + const uint8_t* data, size_t data_size, + bool _private +) { + if (!_private) { + std::cerr << "NGCEXT: ft1_init_ack cant be public\n"; + return false; + } + + Events::NGCEXT_ft1_init_ack e; + e.group_number = group_number; + e.peer_number = peer_number; + size_t curser = 0; + + // - 1 byte (temporary_file_tf_id) + _DATA_HAVE(sizeof(e.transfer_id), std::cerr << "NGCEXT: packet too small, missing transfer_id\n"; return false) + e.transfer_id = data[curser++]; + + return dispatch( + NGCEXT_Event::FT1_INIT_ACK, + e + ); +} + +bool NGCEXTEventProvider::parse_ft1_data( + uint32_t group_number, uint32_t peer_number, + const uint8_t* data, size_t data_size, + bool _private +) { + if (!_private) { + std::cerr << "NGCEXT: ft1_data cant be public\n"; + return false; + } + + Events::NGCEXT_ft1_data e; + e.group_number = group_number; + e.peer_number = peer_number; + size_t curser = 0; + + // - 1 byte (temporary_file_tf_id) + _DATA_HAVE(sizeof(e.transfer_id), std::cerr << "NGCEXT: packet too small, missing transfer_id\n"; return false) + e.transfer_id = data[curser++]; + + // - 2 bytes (sequence_id) + e.sequence_id = 0u; + _DATA_HAVE(sizeof(e.sequence_id), std::cerr << "NGCEXT: packet too small, missing sequence_id\n"; return false) + for (size_t i = 0; i < sizeof(e.sequence_id); i++, curser++) { + e.sequence_id |= uint32_t(data[curser]) << (i*8); + } + + // - X bytes (the data fragment) + // (size is implicit) + e.data = {data+curser, data+curser+(data_size-curser)}; + + return dispatch( + NGCEXT_Event::FT1_DATA, + e + ); +} + +bool NGCEXTEventProvider::parse_ft1_data_ack( + uint32_t group_number, uint32_t peer_number, + const uint8_t* data, size_t data_size, + bool _private +) { + if (!_private) { + std::cerr << "NGCEXT: ft1_data_ack cant be public\n"; + return false; + } + + Events::NGCEXT_ft1_data_ack e; + e.group_number = group_number; + e.peer_number = peer_number; + size_t curser = 0; + + // - 1 byte (temporary_file_tf_id) + _DATA_HAVE(sizeof(e.transfer_id), std::cerr << "NGCEXT: packet too small, missing transfer_id\n"; return false) + e.transfer_id = data[curser++]; + + while (curser < data_size) { + _DATA_HAVE(sizeof(uint16_t), std::cerr << "NGCEXT: packet too small, missing seq_id\n"; return false) + uint16_t seq_id = data[curser++]; + seq_id |= data[curser++] << (1*8); + e.sequence_ids.push_back(seq_id); + } + + return dispatch( + NGCEXT_Event::FT1_DATA_ACK, + e + ); +} + +bool NGCEXTEventProvider::handlePacket( + const uint32_t group_number, + const uint32_t peer_number, + const uint8_t* data, + const size_t data_size, + const bool _private +) { + if (data_size < 1) { + return false; // waht + } + + NGCEXT_Event pkg_type = static_cast(data[0]); + + switch (pkg_type) { + case NGCEXT_Event::HS1_REQUEST_LAST_IDS: + return false; + case NGCEXT_Event::HS1_RESPONSE_LAST_IDS: + return false; + case NGCEXT_Event::FT1_REQUEST: + return parse_ft1_request(group_number, peer_number, data+1, data_size-1, _private); + case NGCEXT_Event::FT1_INIT: + return parse_ft1_init(group_number, peer_number, data+1, data_size-1, _private); + case NGCEXT_Event::FT1_INIT_ACK: + return parse_ft1_init_ack(group_number, peer_number, data+1, data_size-1, _private); + case NGCEXT_Event::FT1_DATA: + return parse_ft1_data(group_number, peer_number, data+1, data_size-1, _private); + case NGCEXT_Event::FT1_DATA_ACK: + return parse_ft1_data_ack(group_number, peer_number, data+1, data_size-1, _private); + default: + return false; + } + + return false; +} + +bool NGCEXTEventProvider::onToxEvent(const Tox_Event_Group_Custom_Packet* e) { + const auto group_number = tox_event_group_custom_packet_get_group_number(e); + const auto peer_number = tox_event_group_custom_packet_get_peer_id(e); + const uint8_t* data = tox_event_group_custom_packet_get_data(e); + const auto data_length = tox_event_group_custom_packet_get_data_length(e); + + return handlePacket(group_number, peer_number, data, data_length, false); +} + +bool NGCEXTEventProvider::onToxEvent(const Tox_Event_Group_Custom_Private_Packet* e) { + const auto group_number = tox_event_group_custom_private_packet_get_group_number(e); + const auto peer_number = tox_event_group_custom_private_packet_get_peer_id(e); + const uint8_t* data = tox_event_group_custom_private_packet_get_data(e); + const auto data_length = tox_event_group_custom_private_packet_get_data_length(e); + + return handlePacket(group_number, peer_number, data, data_length, true); +} + diff --git a/src/ngcext.hpp b/src/ngcext.hpp new file mode 100644 index 0000000..aa160ff --- /dev/null +++ b/src/ngcext.hpp @@ -0,0 +1,248 @@ +#pragma once + +// solanaceae port of tox_ngc_ext + +#include +#include + +#include + +#include +#include + +namespace Events { + + // TODO: implement events as non-owning + + struct NGCEXT_hs1_request_last_ids { + uint32_t group_number; + uint32_t peer_number; + + // - peer_key bytes (peer key we want to know ids for) + ToxKey peer_key; + + // - 1 byte (uint8_t count ids, atleast 1) + uint8_t count_ids; + }; + + struct NGCEXT_hs1_response_last_ids { + uint32_t group_number; + uint32_t peer_number; + + // respond to a request with 0 or more message ids, sorted by newest first + // - peer_key bytes (the msg_ids are from) + ToxKey peer_key; + + // - 1 byte (uint8_t count ids, can be 0) + uint8_t count_ids; + + // - array [ + // - msg_id bytes (the message id) + // - ] + std::vector msg_ids; + }; + + struct NGCEXT_ft1_request { + uint32_t group_number; + uint32_t peer_number; + + // request the other side to initiate a FT + // - 4 byte (file_kind) + uint32_t file_kind; + + // - X bytes (file_kind dependent id, differnt sizes) + std::vector file_id; + }; + + struct NGCEXT_ft1_init { + uint32_t group_number; + uint32_t peer_number; + + // tell the other side you want to start a FT + // - 4 byte (file_kind) + uint32_t file_kind; + + // - 8 bytes (data size) + uint64_t file_size; + + // - 1 byte (temporary_file_tf_id, for this peer only, technically just a prefix to distinguish between simultainious fts) + uint8_t transfer_id; + + // - X bytes (file_kind dependent id, differnt sizes) + std::vector file_id; + + // TODO: max supported lossy packet size + }; + + struct NGCEXT_ft1_init_ack { + uint32_t group_number; + uint32_t peer_number; + + // - 1 byte (transfer_id) + uint8_t transfer_id; + + // TODO: max supported lossy packet size + }; + + struct NGCEXT_ft1_data { + uint32_t group_number; + uint32_t peer_number; + + // data fragment + // - 1 byte (temporary_file_tf_id) + uint8_t transfer_id; + + // - 2 bytes (sequece id) + uint16_t sequence_id; + + // - X bytes (the data fragment) + // (size is implicit) + std::vector data; + }; + + struct NGCEXT_ft1_data_ack { + uint32_t group_number; + uint32_t peer_number; + + // - 1 byte (temporary_file_tf_id) + uint8_t transfer_id; + + // - array [ (of sequece ids) + // - 2 bytes (sequece id) + // - ] + std::vector sequence_ids; + }; + +} // Events + +enum class NGCEXT_Event : uint8_t { + //TODO: make it possible to go further back + // request last (few) message_ids for a peer + // - peer_key bytes (peer key we want to know ids for) + // - 1 byte (uint8_t count ids, atleast 1) + HS1_REQUEST_LAST_IDS = 0x80 | 1u, + + // respond to a request with 0 or more message ids, sorted by newest first + // - peer_key bytes (the msg_ids are from) + // - 1 byte (uint8_t count ids, can be 0) + // - array [ + // - msg_id bytes (the message id) + // - ] + HS1_RESPONSE_LAST_IDS, + + // request the other side to initiate a FT + // - 1 byte (file_kind) + // - X bytes (file_kind dependent id, differnt sizes) + FT1_REQUEST = 0x80 | 8u, + + // TODO: request result negative, speed up not found + + // tell the other side you want to start a FT + // TODO: might use id layer instead. with it, it would look similar to friends_ft + // - 1 byte (file_kind) + // - 8 bytes (data size, can be 0 if unknown, BUT files have to be atleast 1 byte) + // - 1 byte (temporary_file_tf_id, for this peer only, technically just a prefix to distinguish between simultainious fts) + // - X bytes (file_kind dependent id, differnt sizes) + FT1_INIT, + + // acknowlage init (like an accept) + // like tox ft control continue + // - 1 byte (transfer_id) + FT1_INIT_ACK, + + // TODO: init deny, speed up non acceptance + + // data fragment + // - 1 byte (temporary_file_tf_id) + // - 2 bytes (sequece id) + // - X bytes (the data fragment) + // (size is implicit) + FT1_DATA, + + // acknowlage data fragments + // TODO: last 3 should be sufficient, 5 should be generous + // - 1 byte (temporary_file_tf_id) + // // this is implicit (pkg size)- 1 byte (number of sequence ids to ack, this kind of depends on window size) + // - array [ (of sequece ids) + // - 2 bytes (sequece id) + // - ] + FT1_DATA_ACK, + + MAX +}; + +struct NGCEXTEventI { + using enumType = NGCEXT_Event; + virtual bool onEvent(const Events::NGCEXT_hs1_request_last_ids&) { return false; } + virtual bool onEvent(const Events::NGCEXT_hs1_response_last_ids&) { return false; } + virtual bool onEvent(const Events::NGCEXT_ft1_request&) { return false; } + virtual bool onEvent(const Events::NGCEXT_ft1_init&) { return false; } + virtual bool onEvent(const Events::NGCEXT_ft1_init_ack&) { return false; } + virtual bool onEvent(const Events::NGCEXT_ft1_data&) { return false; } + virtual bool onEvent(const Events::NGCEXT_ft1_data_ack&) { return false; } +}; + +using NGCEXTEventProviderI = EventProviderI; + +class NGCEXTEventProvider : public ToxEventI, public NGCEXTEventProviderI { + ToxEventProviderI& _tep; + + public: + NGCEXTEventProvider(ToxEventProviderI& tep/*, ToxI& t*/); + + protected: + bool parse_hs1_request_last_ids( + uint32_t group_number, uint32_t peer_number, + const uint8_t* data, size_t data_size, + bool _private + ); + + bool parse_hs1_response_last_ids( + uint32_t group_number, uint32_t peer_number, + const uint8_t* data, size_t data_size, + bool _private + ); + + bool parse_ft1_request( + uint32_t group_number, uint32_t peer_number, + const uint8_t* data, size_t data_size, + bool _private + ); + + bool parse_ft1_init( + uint32_t group_number, uint32_t peer_number, + const uint8_t* data, size_t data_size, + bool _private + ); + + bool parse_ft1_init_ack( + uint32_t group_number, uint32_t peer_number, + const uint8_t* data, size_t data_size, + bool _private + ); + + bool parse_ft1_data( + uint32_t group_number, uint32_t peer_number, + const uint8_t* data, size_t data_size, + bool _private + ); + + bool parse_ft1_data_ack( + uint32_t group_number, uint32_t peer_number, + const uint8_t* data, size_t data_size, + bool _private + ); + + bool handlePacket( + const uint32_t group_number, + const uint32_t peer_number, + const uint8_t* data, + const size_t data_size, + const bool _private + ); + + protected: + bool onToxEvent(const Tox_Event_Group_Custom_Packet* e) override; + bool onToxEvent(const Tox_Event_Group_Custom_Private_Packet* e) override; +}; + diff --git a/src/ngcft1.cpp b/src/ngcft1.cpp new file mode 100644 index 0000000..e18511e --- /dev/null +++ b/src/ngcft1.cpp @@ -0,0 +1,54 @@ +#include "./ngcft1.hpp" + +NGCFT1::NGCFT1( + ToxEventProviderI& tep, + NGCEXTEventProviderI& neep +) : _tep(tep), _neep(neep) +{ + _neep.subscribe(this, NGCEXT_Event::FT1_REQUEST); + _neep.subscribe(this, NGCEXT_Event::FT1_INIT); + _neep.subscribe(this, NGCEXT_Event::FT1_INIT_ACK); + _neep.subscribe(this, NGCEXT_Event::FT1_DATA); + _neep.subscribe(this, NGCEXT_Event::FT1_DATA_ACK); +} + +void NGCFT1::iterate(float delta) { +} + +void NGCFT1::NGC_FT1_send_request_private( + uint32_t group_number, uint32_t peer_number, + uint32_t file_kind, + const uint8_t* file_id, size_t file_id_size +) { +} + +bool NGCFT1::NGC_FT1_send_init_private( + uint32_t group_number, uint32_t peer_number, + uint32_t file_kind, + const uint8_t* file_id, size_t file_id_size, + size_t file_size, + uint8_t* transfer_id +) { + return false; +} + +bool NGCFT1::onEvent(const Events::NGCEXT_ft1_request&) { + return false; +} + +bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init&) { + return false; +} + +bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init_ack&) { + return false; +} + +bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data&) { + return false; +} + +bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data_ack&) { + return false; +} + diff --git a/src/ngcft1.hpp b/src/ngcft1.hpp new file mode 100644 index 0000000..278f898 --- /dev/null +++ b/src/ngcft1.hpp @@ -0,0 +1,230 @@ +#pragma once + +// solanaceae port of tox_ngc_ft1 + +#include + +#include "./ngcext.hpp" +#include "./ledbat.hpp" + +#include "./rcv_buf.hpp" +#include "./snd_buf.hpp" + +#include + +// uint32_t - same as tox friend ft +// TODO: fill in toxfriend file kinds +enum class NGCFT1_file_kind : uint32_t { + //INVALID = 0u, // DATA? + + // id: + // group (implicit) + // peer pub key + msg_id + NGC_HS1_MESSAGE_BY_ID = 1u, // history sync PoC 1 + // TODO: oops, 1 should be avatar v1 + + // id: TOX_FILE_ID_LENGTH (32) bytes + // this is basically and id and probably not a hash, like the tox friend api + // this id can be unique between 2 peers + ID = 8u, + + // id: hash of the info, like a torrent infohash (using the same hash as the data) + // TODO: determain internal format + // draft: (for single file) + // - 256 bytes | filename + // - 8bytes | file size + // - 4bytes | chunk size + // - array of chunk hashes (ids) [ + // - SHA1 bytes (20) + // - ] + HASH_SHA1_INFO, + // draft: (for single file) v2 + // - c-string | filename + // - 8bytes | file size + // - 4bytes | chunk size + // - array of chunk hashes (ids) [ + // - SHA1 bytes (20) + // - ] + HASH_SHA1_INFO2, + // draft: multiple files + // - 4bytes | number of filenames + // - array of filenames (variable length c-strings) [ + // - c-string | filename (including path and '/' as dir seperator) + // - ] + // - 256 bytes | filename + // - 8bytes | file size + // - fixed chunk size of 4kb + // - array of chunk hashes (ids) [ + // - SHAX bytes + // - ] + HASH_SHA1_INFO3, + HASH_SHA2_INFO, // hm? + + // id: hash of the content + // TODO: fixed chunk size or variable (defined in info) + // if "variable" sized, it can be aliased with TORRENT_V1_CHUNK in the implementation + HASH_SHA1_CHUNK, + HASH_SHA2_CHUNK, + + // TODO: design the same thing again for tox? (msg_pack instead of bencode?) + // id: infohash + TORRENT_V1_METAINFO, + // id: sha1 + TORRENT_V1_PIECE, // alias with SHA1_CHUNK? + + // TODO: fix all the v2 stuff here + // id: infohash + // in v2, metainfo contains only the root hashes of the merkletree(s) + TORRENT_V2_METAINFO, + // id: root hash + // contains all the leaf hashes for a file root hash + TORRENT_V2_FILE_HASHES, + // id: sha256 + // always of size 16KiB, except if last piece in file + TORRENT_V2_PIECE, +}; + +// TODO: events +//typedef void NGC_FT1_recv_request_cb( + //Tox *tox, + //uint32_t group_number, uint32_t peer_number, + //const uint8_t* file_id, size_t file_id_size, + //void* user_data +//); + +// return true to accept, false to deny +//typedef bool NGC_FT1_recv_init_cb( + //Tox *tox, + //uint32_t group_number, uint32_t peer_number, + //const uint8_t* file_id, size_t file_id_size, + //const uint8_t transfer_id, + //const size_t file_size, + //void* user_data +//); + +//typedef void NGC_FT1_recv_data_cb( + //Tox *tox, + + //uint32_t group_number, + //uint32_t peer_number, + //uint8_t transfer_id, + + //size_t data_offset, const uint8_t* data, size_t data_size, + //void* user_data +//); + +// request to fill data_size bytes into data +//typedef void NGC_FT1_send_data_cb( + //Tox *tox, + + //uint32_t group_number, + //uint32_t peer_number, + //uint8_t transfer_id, + + //size_t data_offset, uint8_t* data, size_t data_size, + //void* user_data +//); + + +class NGCFT1 : public ToxEventI, public NGCEXTEventI { + ToxEventProviderI& _tep; + NGCEXTEventProviderI& _neep; + + // TODO: config + size_t acks_per_packet {3u}; // 3 + float init_retry_timeout_after {5.f}; // 10sec + float sending_give_up_after {30.f}; // 30sec + + + struct Group { + struct Peer { + LEDBAT cca{500-4}; // TODO: replace with tox_group_max_custom_lossy_packet_length()-4 + + struct RecvTransfer { + uint32_t file_kind; + std::vector file_id; + + enum class State { + INITED, //init acked, but no data received yet (might be dropped) + RECV, // receiving data + } state; + + // float time_since_last_activity ? + size_t file_size {0}; + size_t file_size_current {0}; + + // sequence id based reassembly + RecvSequenceBuffer rsb; + }; + std::array, 256> recv_transfers; + size_t next_recv_transfer_idx {0}; // next id will be 0 + + struct SendTransfer { + uint32_t file_kind; + std::vector file_id; + + enum class State { + INIT_SENT, // keep this state until ack or deny or giveup + + SENDING, // we got the ack and are now sending data + + FINISHING, // we sent all data but acks still outstanding???? + + // delete + } state; + + size_t inits_sent {1}; // is sent when creating + + float time_since_activity {0.f}; + size_t file_size {0}; + size_t file_size_current {0}; + + // sequence array + // list of sent but not acked seq_ids + SendSequenceBuffer ssb; + }; + std::array, 256> send_transfers; + size_t next_send_transfer_idx {0}; // next id will be 0 + }; + std::map peers; + }; + std::map groups; + + + public: + NGCFT1( + ToxEventProviderI& tep, + NGCEXTEventProviderI& neep + ); + + void iterate(float delta); + + public: // ft1 api + // TODO: public variant? + void NGC_FT1_send_request_private( + uint32_t group_number, uint32_t peer_number, + uint32_t file_kind, + const uint8_t* file_id, size_t file_id_size + ); + + // public does not make sense here + bool NGC_FT1_send_init_private( + uint32_t group_number, uint32_t peer_number, + uint32_t file_kind, + const uint8_t* file_id, size_t file_id_size, + size_t file_size, + uint8_t* transfer_id + ); + + protected: + bool onEvent(const Events::NGCEXT_ft1_request&) override; + bool onEvent(const Events::NGCEXT_ft1_init&) override; + bool onEvent(const Events::NGCEXT_ft1_init_ack&) override; + bool onEvent(const Events::NGCEXT_ft1_data&) override; + bool onEvent(const Events::NGCEXT_ft1_data_ack&) override; + + protected: + //bool onToxEvent(const Tox_Event_Group_Custom_Packet* e) override; + //bool onToxEvent(const Tox_Event_Group_Custom_Private_Packet* e) override; +}; + diff --git a/src/plugin_ngcft1.cpp b/src/plugin_ngcft1.cpp new file mode 100644 index 0000000..fd9c732 --- /dev/null +++ b/src/plugin_ngcft1.cpp @@ -0,0 +1,73 @@ +#include + +#include "./ngcext.hpp" +#include "./ngcft1.hpp" + +#include +#include + +// fwd +//class RegMessageModel; + +#define RESOLVE_INSTANCE(x) static_cast(solana_api->resolveInstance(#x)) +#define PROVIDE_INSTANCE(x, p, v) solana_api->provideInstance(#x, p, static_cast(v)) + +static std::unique_ptr g_ngcextep = nullptr; +// TODO: make sep plug +static std::unique_ptr g_ngcft1 = nullptr; + +extern "C" { + +SOLANA_PLUGIN_EXPORT const char* solana_plugin_get_name(void) { + return "NGCEXT"; +} + +SOLANA_PLUGIN_EXPORT uint32_t solana_plugin_get_version(void) { + return SOLANA_PLUGIN_VERSION; +} + +SOLANA_PLUGIN_EXPORT uint32_t solana_plugin_start(struct SolanaAPI* solana_api) { + std::cout << "PLUGIN NGCEXT START()\n"; + + if (solana_api == nullptr) { + return 1; + } + + ToxEventProviderI* tox_event_provider_i = nullptr; + + { // make sure required types are loaded + tox_event_provider_i = RESOLVE_INSTANCE(ToxEventProviderI); + + if (tox_event_provider_i == nullptr) { + std::cerr << "PLUGIN NGCEXT missing ToxEventProviderI\n"; + return 2; + } + } + + // static store, could be anywhere tho + // construct with fetched dependencies + g_ngcextep = std::make_unique(*tox_event_provider_i); + g_ngcft1 = std::make_unique(*tox_event_provider_i, *g_ngcextep.get()); + + // register types + PROVIDE_INSTANCE(NGCEXTEventProviderI, "NGCEXT", g_ngcextep.get()); + PROVIDE_INSTANCE(NGCFT1, "NGCEXT", g_ngcft1.get()); + + return 0; +} + +SOLANA_PLUGIN_EXPORT void solana_plugin_stop(void) { + std::cout << "PLUGIN NGCEXT STOP()\n"; + + g_ngcft1.reset(); + g_ngcextep.reset(); +} + +SOLANA_PLUGIN_EXPORT void solana_plugin_tick(float delta) { + //std::cout << "PLUGIN NGCEXT TICK()\n"; + + g_ngcft1->iterate(delta); +} + +} // extern C + diff --git a/src/rcv_buf.cpp b/src/rcv_buf.cpp new file mode 100644 index 0000000..21fc336 --- /dev/null +++ b/src/rcv_buf.cpp @@ -0,0 +1,44 @@ +#include "./rcv_buf.hpp" + +#include + +void RecvSequenceBuffer::erase(uint16_t seq) { + entries.erase(seq); +} + +// inflight chunks +size_t RecvSequenceBuffer::size(void) const { + return entries.size(); +} + +void RecvSequenceBuffer::add(uint16_t seq_id, std::vector&& data) { + entries[seq_id] = {data}; + ack_seq_ids.push_back(seq_id); + if (ack_seq_ids.size() > 3) { // TODO: magic + ack_seq_ids.pop_front(); + } +} + +bool RecvSequenceBuffer::canPop(void) const { + return entries.count(next_seq_id); +} + +std::vector RecvSequenceBuffer::pop(void) { + assert(canPop()); + auto tmp_data = entries.at(next_seq_id).data; + erase(next_seq_id); + next_seq_id++; + return tmp_data; +} + +// for acking, might be bad since its front +std::vector RecvSequenceBuffer::frontSeqIDs(size_t count) const { + std::vector seq_ids; + auto it = entries.cbegin(); + for (size_t i = 0; i < count && it != entries.cend(); i++, it++) { + seq_ids.push_back(it->first); + } + + return seq_ids; +} + diff --git a/src/rcv_buf.hpp b/src/rcv_buf.hpp new file mode 100644 index 0000000..1ac7187 --- /dev/null +++ b/src/rcv_buf.hpp @@ -0,0 +1,35 @@ +#pragma once + +#include +#include +#include +#include + +struct RecvSequenceBuffer { + struct RSBEntry { + std::vector data; + }; + + // sequence_id -> entry + std::map entries; + + uint16_t next_seq_id {0}; + + // list of seq_ids to ack, this is seperate bc rsbentries are deleted once processed + std::deque ack_seq_ids; + + void erase(uint16_t seq); + + // inflight chunks + size_t size(void) const; + + void add(uint16_t seq_id, std::vector&& data); + + bool canPop(void) const; + + std::vector pop(void); + + // for acking, might be bad since its front + std::vector frontSeqIDs(size_t count = 5) const; +}; + diff --git a/src/snd_buf.cpp b/src/snd_buf.cpp new file mode 100644 index 0000000..810cd2c --- /dev/null +++ b/src/snd_buf.cpp @@ -0,0 +1,16 @@ +#include "./snd_buf.hpp" + +void SendSequenceBuffer::erase(uint16_t seq) { + entries.erase(seq); +} + +// inflight chunks +size_t SendSequenceBuffer::size(void) const { + return entries.size(); +} + +uint16_t SendSequenceBuffer::add(std::vector&& data) { + entries[next_seq_id] = {data, 0.f}; + return next_seq_id++; +} + diff --git a/src/snd_buf.hpp b/src/snd_buf.hpp new file mode 100644 index 0000000..416f051 --- /dev/null +++ b/src/snd_buf.hpp @@ -0,0 +1,33 @@ +#pragma once + +#include +#include +#include + +struct SendSequenceBuffer { + struct SSBEntry { + std::vector data; // the data (variable size, but smaller than 500) + float time_since_activity {0.f}; + }; + + // sequence_id -> entry + std::map entries; + + uint16_t next_seq_id {0}; + + void erase(uint16_t seq); + + // inflight chunks + size_t size(void) const; + + uint16_t add(std::vector&& data); + + template + void for_each(float time_delta, FN&& fn) { + for (auto& [id, entry] : entries) { + entry.time_since_activity += time_delta; + fn(id, entry.data, entry.time_since_activity); + } + } +}; +