diff --git a/.gitignore b/.gitignore index 8bec5f8a..f90d8b1a 100644 --- a/.gitignore +++ b/.gitignore @@ -70,3 +70,4 @@ fabric.properties misc.xml .idea/* +/.vs diff --git a/CMakePresets.json b/CMakePresets.json new file mode 100644 index 00000000..ee26b52e --- /dev/null +++ b/CMakePresets.json @@ -0,0 +1,25 @@ +{ + "version": 3, + "configurePresets": [ + { + "name": "linux-debug", + "displayName": "Linux Debug", + "generator": "Ninja", + "binaryDir": "${sourceDir}/out/build/${presetName}", + "installDir": "${sourceDir}/out/install/${presetName}", + "cacheVariables": { + "CMAKE_BUILD_TYPE": "Debug" + }, + "condition": { + "type": "equals", + "lhs": "${hostSystemName}", + "rhs": "Linux" + }, + "vendor": { + "microsoft.com/VisualStudioRemoteSettings/CMake/1.0": { + "sourceDir": "$env{HOME}/.vs/$ms{projectDirName}" + } + } + } + ] +} diff --git a/WBLib.cmake b/WBLib.cmake index ed8eb61d..4c7fde36 100644 --- a/WBLib.cmake +++ b/WBLib.cmake @@ -25,7 +25,20 @@ target_sources(wifibroadcast PRIVATE # radiotap and fec ${CMAKE_CURRENT_LIST_DIR}/src/external/radiotap/radiotap.c ${CMAKE_CURRENT_LIST_DIR}/src/external/fec/fec_base.cpp - ${CMAKE_CURRENT_LIST_DIR}/src/FECStream.cpp + + ${CMAKE_CURRENT_LIST_DIR}/src/encryption/Key.hpp + ${CMAKE_CURRENT_LIST_DIR}/src/encryption/KeyPairTxRx.hpp + ${CMAKE_CURRENT_LIST_DIR}/src/encryption/Encryptor.cpp + ${CMAKE_CURRENT_LIST_DIR}/src/encryption/Encryption.cpp + ${CMAKE_CURRENT_LIST_DIR}/src/encryption/EncryptionFsUtils.cpp + ${CMAKE_CURRENT_LIST_DIR}/src/encryption/Decryptor.cpp + + ${CMAKE_CURRENT_LIST_DIR}/src/fec/FEC.cpp + ${CMAKE_CURRENT_LIST_DIR}/src/fec/FECConstants.hpp + ${CMAKE_CURRENT_LIST_DIR}/src/fec/FECDecoder.cpp + ${CMAKE_CURRENT_LIST_DIR}/src/fec/FECEncoder.cpp + ${CMAKE_CURRENT_LIST_DIR}/src/fec/RxBlock.cpp + ${CMAKE_CURRENT_LIST_DIR}/src/WBStreamRx.cpp ${CMAKE_CURRENT_LIST_DIR}/src/WBStreamTx.cpp ${CMAKE_CURRENT_LIST_DIR}/src/WBTxRx.cpp @@ -34,7 +47,6 @@ target_sources(wifibroadcast PRIVATE ${CMAKE_CURRENT_LIST_DIR}/src/radiotap/RadiotapHeaderTxHolder.hpp ${CMAKE_CURRENT_LIST_DIR}/src/radiotap/RSSIAccumulator.hpp ${CMAKE_CURRENT_LIST_DIR}/src/wifibroadcast_spdlog.cpp - ${CMAKE_CURRENT_LIST_DIR}/src/Encryption.cpp ${CMAKE_CURRENT_LIST_DIR}/src/radiotap/RadiotapRxRfAggregator.cpp ) diff --git a/executables/benchmark.cpp b/executables/benchmark.cpp index 54f961ae..d02215e3 100644 --- a/executables/benchmark.cpp +++ b/executables/benchmark.cpp @@ -30,11 +30,15 @@ #include #include -#include "../src/Encryption.h" -#include "../src/FECStream.h" +#include "../src/encryption/Encryption.h" +#include "../src/fec/FEC.h" #include "../src/HelperSources/RandomBufferPot.hpp" #include "../src/HelperSources/SchedulingHelper.hpp" #include "../src/external/fec/fec_base.h" +#include "../src/fec/FECEncoder.h" +#include "../src/fec/FECConstants.hpp" +#include "../src/encryption/Encryptor.h" +#include "../src/encryption/Decryptor.h" // Test the FEC encoding / decoding and Encryption / Decryption performance // (throughput) of this system diff --git a/executables/unit_test.cpp b/executables/unit_test.cpp index 85c31678..8a73cba9 100644 --- a/executables/unit_test.cpp +++ b/executables/unit_test.cpp @@ -24,18 +24,87 @@ #include #include -#include "../src/FECStream.h" -#include "../src/FEC.hpp" +#include "../src/fec/FEC.h" -#include "../src/Encryption.h" +#include "../src/encryption/Encryption.h" +#include "../src/encryption/EncryptionFsUtils.h" #include "../src/HelperSources/Helper.hpp" #include "../src/Ieee80211Header.hpp" #include "../src/wifibroadcast_spdlog.h" +#include "../src/fec/FECDecoder.h" +#include "../src/fec/FECEncoder.h" +#include "../src/encryption/Encryptor.h" +#include "../src/encryption/Decryptor.h" // Simple unit testing for the FEC lib that doesn't require wifi cards namespace TestFEC { +// randomly select a possible combination of received indices (either primary or +// secondary). +static void testFecCPlusPlusWrapperY(const int nPrimaryFragments, + const int nSecondaryFragments) { + srand(time(NULL)); + constexpr auto FRAGMENT_SIZE = 1446; + + auto txBlockBuffer = GenericHelper::createRandomDataBuffers( + nPrimaryFragments + nSecondaryFragments); + std::cout << "XSelected nPrimaryFragments:" << nPrimaryFragments + << " nSecondaryFragments:" << nSecondaryFragments << "\n"; + + fecEncode(FRAGMENT_SIZE, txBlockBuffer, nPrimaryFragments, + nSecondaryFragments); + std::cout << "Encode done\n"; + + for (int test = 0; test < 100; test++) { + // takes nPrimaryFragments random (possible) indices without duplicates + // NOTE: Perhaps you could calculate all possible permutations, but these + // would be quite a lot. Therefore, I just use n random selections of + // received indices + auto receivedFragmentIndices = GenericHelper::takeNRandomElements( + GenericHelper::createIndices(nPrimaryFragments + nSecondaryFragments), + nPrimaryFragments); + assert(receivedFragmentIndices.size() == nPrimaryFragments); + std::cout << "(Emulated) receivedFragmentIndices" + << StringHelper::vectorAsString(receivedFragmentIndices) << "\n"; + + auto rxBlockBuffer = std::vector>( + nPrimaryFragments + nSecondaryFragments); + std::vector fragmentMap(nPrimaryFragments + nSecondaryFragments, + FRAGMENT_STATUS_UNAVAILABLE); + for (const auto idx : receivedFragmentIndices) { + rxBlockBuffer[idx] = txBlockBuffer[idx]; + fragmentMap[idx] = FRAGMENT_STATUS_AVAILABLE; + } + + fecDecode(FRAGMENT_SIZE, rxBlockBuffer, nPrimaryFragments, fragmentMap); + + for (unsigned int i = 0; i < nPrimaryFragments; i++) { + // std::cout<<"Comparing fragment:"< #include -#include "../src/Encryption.h" +#include "../src/encryption/Encryption.h" +#include "../src/encryption/EncryptionFsUtils.h" +#include "../src/encryption/KeyPairTxRx.hpp" /** * Generates a new tx rx keypair and saves it to file for later use. diff --git a/src/Encryption.cpp b/src/Encryption.cpp deleted file mode 100644 index 9dba874a..00000000 --- a/src/Encryption.cpp +++ /dev/null @@ -1,200 +0,0 @@ -// -// Created by consti10 on 13.08.23. -// - -#include "Encryption.h" - -#include -#include -#include - -#include -#include "wifibroadcast_spdlog.h" - -wb::KeyPairTxRx wb::generate_keypair_random() { - KeyPairTxRx ret{}; - crypto_box_keypair(ret.key_1.public_key.data(), ret.key_1.secret_key.data()); - crypto_box_keypair(ret.key_2.public_key.data(), ret.key_2.secret_key.data()); - return ret; -} - -// Salts generated once using https://www.random.org/cgi-bin/randbyte?nbytes=16&format=d -// We want deterministic seed from a pw, and are only interested in making it impossible to reverse the process (even though the salt is plain text) -static constexpr std::array OHD_SALT_AIR{192,189,216,102,56,153,154,92,228,26,49,209,157,7,128,207}; -static constexpr std::array OHD_SALT_GND{179,30,150,20,17,200,225,82,48,64,18,130,89,62,83,234}; - -std::array -wb::create_seed_from_password_openhd_salt(const std::string& pw, - bool use_salt_air) { - const auto salt = use_salt_air ? OHD_SALT_AIR : OHD_SALT_GND; - std::array key{}; - if (crypto_pwhash(key.data(), key.size(), pw.c_str(), pw.length(), salt.data(), - crypto_pwhash_OPSLIMIT_INTERACTIVE, crypto_pwhash_MEMLIMIT_INTERACTIVE, - crypto_pwhash_ALG_DEFAULT) != 0) { - std::cerr<<"ERROR: cannot create_seed_from_password_openhd_salt"< wb::create_onetimeauth_subkey( - const uint64_t& nonce, const std::array& session_key) { - // sub-key for this packet - std::array subkey{}; - // We only have an 8 byte nonce, this should be enough entropy - std::array nonce_buf{0}; - memcpy(nonce_buf.data(),(uint8_t*)&nonce,8); - crypto_core_hchacha20(subkey.data(),nonce_buf.data(),session_key.data(), nullptr); - return subkey; -} - -void wb::Encryptor::makeNewSessionKey( - std::array& sessionKeyNonce, - std::array& sessionKeyData) { - randombytes_buf(session_key.data(), sizeof(session_key)); - randombytes_buf(sessionKeyNonce.data(), sizeof(sessionKeyNonce)); - if (crypto_box_easy(sessionKeyData.data(), session_key.data(), sizeof(session_key), - sessionKeyNonce.data(), rx_publickey.data(), tx_secretkey.data()) != 0) { - throw std::runtime_error("Unable to make session key!"); - } -} - -int wb::Encryptor::authenticate_and_encrypt(const uint64_t& nonce, - const uint8_t* src, int src_len, - uint8_t* dest) { - if(!m_encrypt_data){ // Only sign message - memcpy(dest,src, src_len); - uint8_t* sign=dest+src_len; - const auto sub_key=wb::create_onetimeauth_subkey(nonce,session_key); - crypto_onetimeauth(sign,src,src_len,sub_key.data()); - return src_len+crypto_onetimeauth_BYTES; - } - // sign and encrypt all together - long long unsigned int ciphertext_len; - crypto_aead_chacha20poly1305_encrypt(dest, &ciphertext_len, - src, src_len, - (uint8_t *)nullptr, 0, - nullptr, - (uint8_t *) &nonce, session_key.data()); - return (int)ciphertext_len; -} - -std::shared_ptr> -wb::Encryptor::authenticate_and_encrypt_buff(const uint64_t& nonce, - const uint8_t* src, - std::size_t src_len) { - auto ret=std::make_shared>(src_len + ENCRYPTION_ADDITIONAL_VALIDATION_DATA); - const auto size=authenticate_and_encrypt(nonce, src, src_len, ret->data()); - assert(size==ret->size()); - return ret; -} - -wb::Decryptor::Decryptor(wb::Key key1) - :rx_secretkey(key1.secret_key),tx_publickey(key1.public_key){ - memset(session_key.data(), 0, sizeof(session_key)); -} - - -int wb::Decryptor::onNewPacketSessionKeyData( - const std::array& sessionKeyNonce, - const std::array& sessionKeyData) { - std::array new_session_key{}; - if (crypto_box_open_easy(new_session_key.data(), - sessionKeyData.data(), sessionKeyData.size(), - sessionKeyNonce.data(), - tx_publickey.data(), rx_secretkey.data()) != 0) { - // this basically should just never happen, and is an error - wifibroadcast::log::get_default()->warn("unable to decrypt session key"); - return SESSION_NOT_VALID; - } - if (memcmp(session_key.data(), new_session_key.data(), sizeof(session_key)) != 0) { - wifibroadcast::log::get_default()->info("Decryptor-New session detected"); - session_key = new_session_key; - m_has_valid_session= true; - return SESSION_VALID_NEW; - } - // this is NOT an error, the same session key is sent multiple times ! - return SESSION_VALID_NOT_NEW; -} - -bool wb::Decryptor::authenticate_and_decrypt(const uint64_t& nonce, - const uint8_t* encrypted, - int encrypted_size, - uint8_t* dest) { - if(!m_encrypt_data){ - const auto payload_size=encrypted_size-crypto_onetimeauth_BYTES; - assert(payload_size>0); - const uint8_t* sign=encrypted+payload_size; - //const int res=crypto_auth_hmacsha256_verify(sign,msg,payload_size,session_key.data()); - const auto sub_key=wb::create_onetimeauth_subkey(nonce,session_key); - const int res=crypto_onetimeauth_verify(sign,encrypted,payload_size,sub_key.data()); - if(res!=-1){ - memcpy(dest,encrypted,payload_size); - return true; - } - return false; - } - unsigned long long mlen; - int res=crypto_aead_chacha20poly1305_decrypt(dest, &mlen, - nullptr, - encrypted, encrypted_size, - nullptr,0, - (uint8_t *) (&nonce), session_key.data()); - return res!=-1; -} - -std::shared_ptr> -wb::Decryptor::authenticate_and_decrypt_buff(const uint64_t& nonce, - const uint8_t* encrypted, - int encrypted_size) { - auto ret=std::make_shared>(encrypted_size - crypto_aead_chacha20poly1305_ABYTES); - const auto res=authenticate_and_decrypt(nonce, encrypted, encrypted_size, ret->data()); - if(res){ - return ret; - } - return nullptr; -} diff --git a/src/Encryption.h b/src/Encryption.h deleted file mode 100644 index 8de24107..00000000 --- a/src/Encryption.h +++ /dev/null @@ -1,184 +0,0 @@ - -#ifndef ENCRYPTION_HPP -#define ENCRYPTION_HPP - -#include -#include -#include -#include - -#include - -// Namespace that can be used to add encryption+packet validation -// (Or packet validation only to save CPU resources) -// to a lossy unidirectional link -// Packet validation is quite important, to make sure only openhd packets (and not standard wifi packets) are used in OpenHD -// The Encryption / Decryption name(s) are legacy - -// The more difficult part is dealing with the session key stuff, and this class makes it a bit easier to use - -// one time authentication and encryption nicely are really similar -static_assert(crypto_onetimeauth_BYTES==crypto_aead_chacha20poly1305_ABYTES); -// Encryption (or packet validation) adds this many bytes to the end of the message -static constexpr auto ENCRYPTION_ADDITIONAL_VALIDATION_DATA=crypto_aead_chacha20poly1305_ABYTES; - -namespace wb{ - -// A wb key consists of a public and secret key -struct Key { - std::array public_key; - std::array secret_key; -}; - -// A wb keypair are 2 keys, one for transmitting, one for receiving -// (Since both ground and air unit talk bidirectional) -// We use a different key for the down-link / uplink, respective -struct KeyPairTxRx { - Key key_1; - Key key_2; - Key get_tx_key(bool is_air){ - return is_air ? key_1 : key_2; - } - Key get_rx_key(bool is_air){ - return is_air ? key_2 : key_1; - } -}; - -/** - * Generates a new keypair. Non-deterministic, 100% secure. - */ -KeyPairTxRx generate_keypair_random(); - -/** - * See https://libsodium.gitbook.io/doc/password_hashing - * Deterministic seed from password, but hides password itself (non-reversible) - * Uses a pre-defined salt to be deterministic - */ -std::array -create_seed_from_password_openhd_salt(const std::string& pw,bool use_salt_air); - -// We always use the same bind phrase by default -static constexpr auto DEFAULT_BIND_PHRASE="openhd"; -/** - * Generates 2 new (deterministic) tx rx keys, using the seed created from the pw. - * @param bind_phrase the password / bind phrase - */ -KeyPairTxRx generate_keypair_from_bind_phrase(const std::string& bind_phrase=DEFAULT_BIND_PHRASE); - -/** - * Saves the KeyPairTxRx as a raw file - */ -int write_keypair_to_file(const KeyPairTxRx& keypair_txrx,const std::string& filename); - -/** - * Reads a raw KeyPairTxRx from a raw file previusly generated. - */ -KeyPairTxRx read_keypair_from_file(const std::string& filename); - - -/** - * https://libsodium.gitbook.io/doc/key_derivation - * UINT16SeqNrHelper since we both support encryption and one time validation to save cpu performance - */ -std::array create_onetimeauth_subkey(const uint64_t& nonce,const std::array& session_key); - -class Encryptor { - public: - /** - * - * @param key1 encryption key, otherwise enable a default deterministic encryption key by using std::nullopt - * @param DISABLE_ENCRYPTION_FOR_PERFORMANCE only validate, do not encrypt (less CPU usage) - */ - explicit Encryptor(wb::Key key1) - : tx_secretkey(key1.secret_key), - rx_publickey(key1.public_key){ - } - /** - * Creates a new session key, simply put, the data we can send publicly - * @param sessionKeyNonce filled with public nonce - * @param sessionKeyData filled with public data - */ - void makeNewSessionKey(std::array &sessionKeyNonce, - std::array &sessionKeyData); - /** - * Encrypt the given message of size @param src_len - * (Or if encryption is disabled, only calculate the message sign) - * and write the (encrypted) data appended by the validation data into dest - * @param nonce: needs to be different for every packet - * @param src @param src_len message to encrypt - * @param dest needs to point to a memory region at least @param src_len + 16 bytes big - * Returns written data size (msg payload plus sign data) - */ - int authenticate_and_encrypt(const uint64_t& nonce,const uint8_t *src,int src_len,uint8_t* dest); - - /** - * For easy use - returns a buffer including (encrypted) payload plus validation data - */ - std::shared_ptr> authenticate_and_encrypt_buff(const uint64_t& nonce,const uint8_t *src,std::size_t src_len); - /** - * Disables encryption (to save cpu performance) but keeps packet validation functionality - * @param encryption_enabled - */ - void set_encryption_enabled(bool encryption_enabled){ - m_encrypt_data =encryption_enabled; - } - private: - // tx->rx keypair - const std::array tx_secretkey{}; - const std::array rx_publickey{}; - std::array session_key{}; - // use this one if you are worried about CPU usage when using encryption - bool m_encrypt_data= true; -}; - -class Decryptor { - public: - // enable a default deterministic encryption key by using std::nullopt - // else, pass path to file with encryption keys - explicit Decryptor(wb::Key key1); - static constexpr auto SESSION_VALID_NEW=0; - static constexpr auto SESSION_VALID_NOT_NEW=1; - static constexpr auto SESSION_NOT_VALID=-1; - /** - * Returns 0 if the session is a valid session in regards to the key-pairs AND the session is a new session - * Returns 1 if the session is a valid session in regards to the key-pairs but it is not a new session - * Returns -1 if the session is not a valid session in regards to the key-pairs - * - */ - int onNewPacketSessionKeyData(const std::array &sessionKeyNonce, - const std::array &sessionKeyData); - /** - * Decrypt (or validate only if encryption is disabled) the given message - * and writes the original message content into dest. - * Returns true on success, false otherwise (false== the message is not a valid message) - * @param dest needs to be at least @param encrypted - 16 bytes big. - */ - bool authenticate_and_decrypt(const uint64_t& nonce,const uint8_t* encrypted,int encrypted_size,uint8_t* dest); - - /** - * Easier to use, but usage might require memcpy - */ - std::shared_ptr> authenticate_and_decrypt_buff(const uint64_t& nonce,const uint8_t* encrypted,int encrypted_size); - /** - * Disables encryption (to save cpu performance) but keeps packet validation functionality - * @param encryption_enabled - */ - void set_encryption_enabled(bool encryption_enabled){ - m_encrypt_data =encryption_enabled; - } - // Set to true as soon as a valid session has been detected - bool has_valid_session() const{ - return m_has_valid_session; - } - private: - // use this one if you are worried about CPU usage when using encryption - bool m_encrypt_data= true; - const std::array rx_secretkey{}; - const std::array tx_publickey{}; - std::array session_key{}; - bool m_has_valid_session= false; -}; - -} // namespace wb end - - -#endif //ENCRYPTION_HPP \ No newline at end of file diff --git a/src/FECStream.cpp b/src/FECStream.cpp deleted file mode 100644 index 03b0652b..00000000 --- a/src/FECStream.cpp +++ /dev/null @@ -1,472 +0,0 @@ -// -// Created by consti10 on 30.06.23. -// - -#include "FECStream.h" - -#include - -#include "FEC.hpp" -#include "wifibroadcast_spdlog.h" -#include -#include - -void FECEncoder::encode_block( - std::vector>> data_packets, - int n_secondary_fragments) { - assert(data_packets.size()<=MAX_N_P_FRAGMENTS_PER_BLOCK); - assert(n_secondary_fragments<=MAX_N_S_FRAGMENTS_PER_BLOCK); - const auto n_primary_fragments=data_packets.size(); - // nice to have statistic - m_block_sizes.add(n_primary_fragments); - if(m_block_sizes.get_delta_since_last_reset()>=std::chrono::seconds(1)){ - //wifibroadcast::log::get_default()->debug("Block sizes: {}",m_block_sizes.getAvgReadable()); - m_curr_fec_block_sizes=m_block_sizes.getMinMaxAvg(); - m_block_sizes.reset(); - } - FECPayloadHdr header{}; - header.block_idx=m_curr_block_idx; - m_curr_block_idx++; - header.n_primary_fragments=n_primary_fragments; - // write and forward all the data packets first - // also calculate the size of the biggest data packet - size_t max_packet_size=0; - // Store a pointer where the FEC data begins for performing the FEC step later on - std::vector primary_fragments_data_p; - for(int i=0;idebug("In:{}",(int)data_fragment->size()); - assert(!data_fragment->empty()); - assert(data_fragment->size()<=FEC_PACKET_MAX_PAYLOAD_SIZE); - header.fragment_idx=i; - header.data_size=data_fragment->size(); - auto buffer_p=m_block_buffer[i].data(); - // copy over the header - memcpy(buffer_p,(uint8_t*)&header,sizeof(FECPayloadHdr)); - // write the actual data - memcpy(buffer_p + sizeof(FECPayloadHdr), data_fragment->data(),data_fragment->size()); - // zero out the remaining bytes such that FEC always sees zeroes - // same is done on the rx. These zero bytes are never transmitted via wifi - const auto writtenDataSize = sizeof(FECPayloadHdr) + data_fragment->size(); - memset(buffer_p + writtenDataSize, 0, MAX_PAYLOAD_BEFORE_FEC - writtenDataSize); - max_packet_size = std::max(max_packet_size, data_fragment->size()); - // we can forward the data packet immediately via the callback - if(outputDataCallback){ - outputDataCallback(buffer_p,writtenDataSize); - } - // NOTE: FECPayloadHdr::data_size needs to be included during the fec encode step - primary_fragments_data_p.push_back(buffer_p+sizeof(FECPayloadHdr)-sizeof(uint16_t)); - } - // then we create as many FEC packets as needed - if(n_secondary_fragments==0){ - //wifibroadcast::log::get_default()->debug("No FEC step performed"); - // no FEC step is actually performed, usefully for debugging / performance evaluation - return ; - } - const auto before=std::chrono::steady_clock::now(); - // Now we perform the actual FEC encode step - std::vector secondary_fragments_data_p; - for(int i=0;i=std::chrono::seconds(1)){ - //wifibroadcast::log::get_default()->debug("FEC encode time:{}",m_fec_block_encode_time.getAvgReadable()); - m_curr_fec_block_encode_time=m_fec_block_encode_time.getMinMaxAvg(); - m_fec_block_encode_time.reset(); - } - // and forward all the FEC correction packets - for(int i=0;i= - m_n_primary_fragments_in_block)return true; - return false; -} - -bool RxBlock::allPrimaryFragmentsAreAvailable() const { - if (m_n_primary_fragments_in_block == -1)return false; - return m_n_available_primary_fragments == m_n_primary_fragments_in_block; -} - -void RxBlock::addFragment(const uint8_t* data, const std::size_t dataLen) { - auto* hdr_p=(FECPayloadHdr*) data; - FECPayloadHdr& header=*hdr_p; - assert(!hasFragment(header.fragment_idx)); - assert(header.block_idx == blockIdx); - assert(fragment_map[header.fragment_idx] == FRAGMENT_STATUS_UNAVAILABLE); - assert(header.fragment_idx < blockBuffer.size()); - fragment_copy_payload(header.fragment_idx,data,dataLen); - // mark it as available - fragment_map[header.fragment_idx] = FRAGMENT_STATUS_AVAILABLE; - - // each fragment inside a block should report the same n of primary fragments - if(m_n_primary_fragments_in_block ==-1){ - m_n_primary_fragments_in_block =header.n_primary_fragments; - }else{ - assert(m_n_primary_fragments_in_block ==header.n_primary_fragments); - } - const bool is_primary_fragment=header.fragment_idx RxBlock::pullAvailablePrimaryFragments( - const bool discardMissingPackets) { - // note: when pulling the available fragments, we do not need to know how many primary fragments this block actually contains - std::vector ret; - for (int i = nAlreadyForwardedPrimaryFragments; i < m_n_available_primary_fragments; i++) { - if (fragment_map[i] == FRAGMENT_STATUS_UNAVAILABLE) { - if (discardMissingPackets) { - continue; - } else { - break; - } - } - ret.push_back(i); - } - // make sure these indices won't be returned again - nAlreadyForwardedPrimaryFragments += (int) ret.size(); - return ret; -} - -const uint8_t* RxBlock::get_primary_fragment_data_p(const int fragment_index) { - assert(fragment_map[fragment_index] == FRAGMENT_STATUS_AVAILABLE); - assert(m_n_primary_fragments_in_block !=-1); - assert(fragment_index< m_n_primary_fragments_in_block); - //return blockBuffer[fragment_index].data()+sizeof(FECPayloadHdr); - return blockBuffer[fragment_index].data()+sizeof(uint16_t); -} - -const int RxBlock::get_primary_fragment_data_size(const int fragment_index) { - assert(fragment_map[fragment_index] == FRAGMENT_STATUS_AVAILABLE); - assert(m_n_primary_fragments_in_block !=-1); - assert(fragment_index< m_n_primary_fragments_in_block); - uint16_t* len_p=(uint16_t*)blockBuffer[fragment_index].data(); - return *len_p; -} - -int RxBlock::reconstructAllMissingData() { - //wifibroadcast::log::get_default()->debug("reconstructAllMissingData"<=FEC_K - assert(m_n_primary_fragments_in_block != -1); - assert(m_size_of_secondary_fragments != -1); - // do not reconstruct if reconstruction is impossible - assert(getNAvailableFragments() >= m_n_primary_fragments_in_block); - // also do not reconstruct if reconstruction is not needed - // const int nMissingPrimaryFragments = m_n_primary_fragments_in_block- m_n_available_primary_fragments; - auto recoveredFragmentIndices = fecDecode(m_size_of_secondary_fragments, blockBuffer, - m_n_primary_fragments_in_block, fragment_map); - // now mark them as available - for (const auto idx: recoveredFragmentIndices) { - fragment_map[idx] = FRAGMENT_STATUS_AVAILABLE; - } - m_n_available_primary_fragments += recoveredFragmentIndices.size(); - // n of reconstructed packets - return recoveredFragmentIndices.size(); -} - -std::optional RxBlock::get_missing_primary_packets() const { - if(m_n_primary_fragments_in_block<=0)return std::nullopt; - return m_n_primary_fragments_in_block-getNAvailableFragments(); -} - -std::string RxBlock::get_missing_primary_packets_readable() const { - const auto tmp=get_missing_primary_packets(); - if(tmp==std::nullopt)return "?"; - return std::to_string(tmp.value()); -} -int RxBlock::get_n_primary_fragments() const { - return m_n_primary_fragments_in_block; -} - -bool FECDecoder::validate_packet_size(const int data_len) { - if(data_lenMAX_PAYLOAD_BEFORE_FEC){ - // packet is too big - return false; - } - return true; -} - -bool FECDecoder::process_valid_packet(const uint8_t* data, - int data_len) { - assert(validate_packet_size(data_len)); - // reconstruct the data layout - const FECPayloadHdr* header_p=(FECPayloadHdr*)data; - /* const uint8_t* payload_p=data+sizeof(FECPayloadHdr); - const int payload_size=data_len-sizeof(FECPayloadHdr);*/ - if (header_p->fragment_idx >= maxNFragmentsPerBlock) { - wifibroadcast::log::get_default()->warn("invalid fragment_idx: {}",header_p->fragment_idx); - return false; - } - process_with_rx_queue(*header_p,data,data_len); - return true; -} - -void FECDecoder::forwardMissingPrimaryFragmentsIfAvailable( - RxBlock& block, const bool discardMissingPackets) { - assert(mSendDecodedPayloadCallback); - // TODO remove me - if(discardMissingPackets){ - if(m_enable_log_debug){ - wifibroadcast::log::get_default()->warn("Forwarding block that is not yet fully finished: {} total: {} available: {} missing: {}", - block.getBlockIdx(),block.get_n_primary_fragments(),block.getNAvailableFragments(),block.get_missing_primary_packets_readable()); - } - } - const auto indices = block.pullAvailablePrimaryFragments(discardMissingPackets); - for (auto primaryFragmentIndex: indices) { - const uint8_t* data=block.get_primary_fragment_data_p(primaryFragmentIndex); - const int data_size=block.get_primary_fragment_data_size(primaryFragmentIndex); - if (data_size > FEC_PACKET_MAX_PAYLOAD_SIZE || data_size <= 0) { - wifibroadcast::log::get_default()->warn("corrupted packet on FECDecoder out ({}:{}) : {}B",block.getBlockIdx(),primaryFragmentIndex,data_size); - } else { - mSendDecodedPayloadCallback(data, data_size); - stats.count_bytes_forwarded+=data_size; - } - } -} - -void FECDecoder::rxQueuePopFront() { - assert(rx_queue.front() != nullptr); - if (!rx_queue.front()->allPrimaryFragmentsHaveBeenForwarded()) { - stats.count_blocks_lost++; - if(m_enable_log_debug){ - auto& block=*rx_queue.front(); - wifibroadcast::log::get_default()->debug("Removing block {} {}",block.getBlockIdx(),block.get_missing_primary_packets_readable()); - } - } - rx_queue.pop_front(); -} - -void FECDecoder::rxRingCreateNewSafe(const uint64_t blockIdx) { - // check: make sure to always put blocks into the queue in order ! - if (!rx_queue.empty()) { - // the newest block in the queue should be equal to block_idx -1 - // but it must not ?! - if (rx_queue.back()->getBlockIdx() != (blockIdx - 1)) { - // If we land here, one or more full blocks are missing, which can happen on bad rx links - //wifibroadcast::log::get_default()->debug("In queue: {} But new: {}",rx_queue.back()->getBlockIdx(),blockIdx); - } - //assert(rx_queue.back()->getBlockIdx() == (blockIdx - 1)); - } - // we can return early if this operation doesn't exceed the size limit - if (rx_queue.size() < RX_QUEUE_MAX_SIZE) { - rx_queue.push_back(std::make_unique(maxNFragmentsPerBlock, blockIdx)); - stats.count_blocks_total++; - return; - } - //Ring overflow. This means that there are more unfinished blocks than ring size - //Possible solutions: - //1. Increase ring size. Do this if you have large variance of packet travel time throught WiFi card or network stack. - // Some cards can do this due to packet reordering inside, diffent chipset and/or firmware or your RX hosts have different CPU power. - //2. Reduce packet injection speed or try to unify RX hardware. - - // forward remaining data for the (oldest) block, since we need to get rid of it - auto &oldestBlock = rx_queue.front(); - forwardMissingPrimaryFragmentsIfAvailable(*oldestBlock, true); - // and remove the block once done with it - rxQueuePopFront(); - - // now we are guaranteed to have space for one new block - rx_queue.push_back(std::make_unique(maxNFragmentsPerBlock, blockIdx)); - stats.count_blocks_total++; -} - -RxBlock* FECDecoder::rxRingFindCreateBlockByIdx(const uint64_t blockIdx) { - // check if block is already in the ring - auto found = std::find_if(rx_queue.begin(), rx_queue.end(), - [&blockIdx](const std::unique_ptr &block) { - return block->getBlockIdx() == blockIdx; - }); - if (found != rx_queue.end()) { - return found->get(); - } - // check if block is already known and not in the ring then it is already processed - if (last_known_block != (uint64_t) -1 && blockIdx <= last_known_block) { - return nullptr; - } - - // don't forget to increase the lost blocks counter if we do not add blocks here due to no space in the rx queue - // (can happen easily if the rx queue has a size of 1) - const auto n_needed_new_blocks = last_known_block != (uint64_t) -1 ? blockIdx - last_known_block : 1; - if(n_needed_new_blocks>RX_QUEUE_MAX_SIZE){ - if(m_enable_log_debug){ - wifibroadcast::log::get_default()->debug("Need {} blocks, exceeds {}",n_needed_new_blocks,RX_QUEUE_MAX_SIZE); - } - stats.count_blocks_lost+=n_needed_new_blocks-RX_QUEUE_MAX_SIZE; - } - // add as many blocks as we need ( the rx ring mustn't have any gaps between the block indices). - // but there is no point in adding more blocks than RX_RING_SIZE - const int new_blocks = (int) std::min(n_needed_new_blocks, - (uint64_t) FECDecoder::RX_QUEUE_MAX_SIZE); - last_known_block = blockIdx; - - for (int i = 0; i < new_blocks; i++) { - rxRingCreateNewSafe(blockIdx + i + 1 - new_blocks); - } - // the new block we've added is now the most recently added element (and since we always push to the back, the "back()" element) - assert(rx_queue.back()->getBlockIdx() == blockIdx); - return rx_queue.back().get(); -} - -void FECDecoder::process_with_rx_queue(const FECPayloadHdr& header, - const uint8_t* data, int data_size) { - auto blockP = rxRingFindCreateBlockByIdx(header.block_idx); - //ignore already processed blocks - if (blockP == nullptr) return; - // cannot be nullptr - RxBlock &block = *blockP; - // ignore already processed fragments - if (block.hasFragment(header.fragment_idx)) { - return; - } - block.addFragment(data,data_size); - if (block == *rx_queue.front()) { - //wifibroadcast::log::get_default()->debug("In front\n"; - // we are in the front of the queue (e.g. at the oldest block) - // forward packets until the first gap - forwardMissingPrimaryFragmentsIfAvailable(block); - // We are done with this block if either all fragments have been forwarded or it can be recovered - if (block.allPrimaryFragmentsHaveBeenForwarded()) { - // remove block when done with it - rxQueuePopFront(); - return; - } - if (block.allPrimaryFragmentsCanBeRecovered()) { - // apply fec for this block - const auto before_encode=std::chrono::steady_clock::now(); - stats.count_fragments_recovered += block.reconstructAllMissingData(); - stats.count_blocks_recovered++; - m_fec_decode_time.add(std::chrono::steady_clock::now()-before_encode); - if(m_fec_decode_time.get_delta_since_last_reset()>std::chrono::seconds(1)){ - //wifibroadcast::log::get_default()->debug("FEC decode took {}",m_fec_decode_time.getAvgReadable()); - stats.curr_fec_decode_time=m_fec_decode_time.getMinMaxAvg(); - m_fec_decode_time.reset(); - } - forwardMissingPrimaryFragmentsIfAvailable(block); - assert(block.allPrimaryFragmentsHaveBeenForwarded()); - // remove block when done with it - rxQueuePopFront(); - return; - } - return; - } else { - //wifibroadcast::log::get_default()->debug("Not in front\n"; - // we are not in the front of the queue but somewhere else - // If this block can be fully recovered or all primary fragments are available this triggers a flush - if (block.allPrimaryFragmentsAreAvailable() || block.allPrimaryFragmentsCanBeRecovered()) { - // send all queued packets in all unfinished blocks before and remove them - if(m_enable_log_debug){ - wifibroadcast::log::get_default()->debug("Block {} triggered a flush",block.getBlockIdx()); - } - while (block != *rx_queue.front()) { - forwardMissingPrimaryFragmentsIfAvailable(*rx_queue.front(), true); - rxQueuePopFront(); - } - // then process the block who is fully recoverable or has no gaps in the primary fragments - if (block.allPrimaryFragmentsAreAvailable()) { - forwardMissingPrimaryFragmentsIfAvailable(block); - assert(block.allPrimaryFragmentsHaveBeenForwarded()); - } else { - // apply fec for this block - stats.count_fragments_recovered += block.reconstructAllMissingData(); - stats.count_blocks_recovered++; - forwardMissingPrimaryFragmentsIfAvailable(block); - assert(block.allPrimaryFragmentsHaveBeenForwarded()); - } - // remove block - rxQueuePopFront(); - } - } -} - -void FECDecoder::reset_rx_queue() { - /*while (auto el=rx_queue.front() != nullptr){ - rxQueuePopFront(); - }*/ - rx_queue.resize(0); - last_known_block=((uint64_t) -1); -} - -uint32_t calculate_n_secondary_fragments(uint32_t n_primary_fragments, - uint32_t fec_overhead_perc) { - if(fec_overhead_perc<=0)return 0; - const float n_secondary=static_cast(n_primary_fragments) * static_cast(fec_overhead_perc) / 100.0f; - if(n_secondary<=1.0){ - // Always calculate at least one FEC packet - return 1; - } - return std::lroundf(n_secondary); -} - -unsigned int calculateN(const unsigned int k, const unsigned int percentage) { - return k + calculate_n_secondary_fragments(k,percentage); -} - -void fec_stream_print_fec_optimization_method() { - print_optimization_method(); -} diff --git a/src/FECStream.h b/src/FECStream.h deleted file mode 100644 index 558dd07b..00000000 --- a/src/FECStream.h +++ /dev/null @@ -1,276 +0,0 @@ -// -// Created by consti10 on 28.06.23. -// - -#ifndef WIFIBROADCAST_FECSTREAM_H -#define WIFIBROADCAST_FECSTREAM_H - -#include -#include -#include -#include -#include -#include -#include -#include - - -#include "HelperSources/TimeHelper.hpp" - -/** - * Encoder and Decoder pair for FEC protected block / packet based data streaming. - * adds sizeof(FECPayloadHdr) to each fec primary or secondary packet. - */ - -static_assert(__BYTE_ORDER == __LITTLE_ENDIAN, "This code is written for little endian only !"); - -struct FECPayloadHdr{ - // Most often each frame is encoded as one fec block - // rolling - uint32_t block_idx; - // each fragment inside a block has a fragment index - // uint8_t is enough, since we are limited to 128+128=256 fragments anyway by the FEC impl. - uint8_t fragment_idx; - // how many fragments make up the primary fragments part, the rest is secondary fragments - // note that we do not need to know how many secondary fragments have been created - as soon as we - // 'have enough', we can perform the FEC correction step if necessary - uint8_t n_primary_fragments; - // For FEC all data fragments have to be the same size. We pad the rest during encoding / decoding with 0, - // and do this when encoding / decoding such that the 0 bytes don't have to be transmitted. - // This needs to be included during the fec encode / decode step ! - uint16_t data_size; -}__attribute__ ((packed)); -static_assert(sizeof(FECPayloadHdr)==8); - -// See WBTxRx -static constexpr const auto MAX_PAYLOAD_BEFORE_FEC=1449; -// The FEC stream encode adds an overhead, leaving X bytes to the application -static constexpr const auto FEC_PACKET_MAX_PAYLOAD_SIZE=MAX_PAYLOAD_BEFORE_FEC-sizeof(FECPayloadHdr); -static_assert(FEC_PACKET_MAX_PAYLOAD_SIZE==1441); - -// max 255 primary and secondary fragments together for now. Theoretically, this implementation has enough bytes in the header for -// up to 15 bit fragment indices, 2^15=32768 -// Note: currently limited by the fec c implementation -static constexpr const uint16_t MAX_N_P_FRAGMENTS_PER_BLOCK = 128; -static constexpr const uint16_t MAX_N_S_FRAGMENTS_PER_BLOCK = 128; -static constexpr const uint16_t - MAX_TOTAL_FRAGMENTS_PER_BLOCK = MAX_N_P_FRAGMENTS_PER_BLOCK + MAX_N_S_FRAGMENTS_PER_BLOCK; - -/** - * For dynamic block sizes, we switched to a FEC overhead "percentage" value. - * e.g. the final data throughput ~= original data throughput * fec overhead percentage - * Rounds up / down (.5), but always at least 1 - */ -uint32_t calculate_n_secondary_fragments(uint32_t n_primary_fragments,uint32_t fec_overhead_perc); - -/** - * calculate n from k and percentage as used in FEC terms - * (k: number of primary fragments, n: primary + secondary fragments) - */ -unsigned int calculateN(unsigned int k, unsigned int percentage); - -void fec_stream_print_fec_optimization_method(); - -class FECEncoder { - public: - typedef std::function - OUTPUT_DATA_CALLBACK; - OUTPUT_DATA_CALLBACK outputDataCallback; - explicit FECEncoder()=default; - FECEncoder(const FECEncoder &other) = delete; - public: - /** - * Encodes a new block and forwards the packets for this block - * forwards data packets first, then generated fec packets - * (if needed) and forwards them after. - * @param data_packets the packets for this block - * @param n_secondary_fragments how many secondary fragments (FEC packets) should be created - */ - void encode_block(std::vector>> data_packets,int n_secondary_fragments); - // Pre-allocated to have space for storing primary fragments (they are needed once the fec step needs to be performed) - // and creating the wanted amount of secondary packets - std::array,MAX_TOTAL_FRAGMENTS_PER_BLOCK> m_block_buffer{}; - uint32_t m_curr_block_idx=0; - static_assert(sizeof(m_curr_block_idx)==sizeof(FECPayloadHdr::block_idx)); - AvgCalculator m_fec_block_encode_time; - MinMaxAvg m_curr_fec_block_encode_time{}; - BaseAvgCalculator m_block_sizes{}; - MinMaxAvg m_curr_fec_block_sizes{}; -}; - -// This encapsulates everything you need when working on a single FEC block on the receiver -// for example, addFragment() or pullAvailablePrimaryFragments() -// it also provides convenient methods to query if the block is fully forwarded -// or if it is ready for the FEC reconstruction step. -class RxBlock { - public: - // @param maxNFragmentsPerBlock max number of primary and secondary fragments for this block. - // you could just use MAX_TOTAL_FRAGMENTS_PER_BLOCK for that, but if your tx then uses (4:8) for example, you'd - // allocate much more memory every time for a new RX block than needed. - explicit RxBlock(unsigned int maxNFragmentsPerBlock, uint64_t blockIdx1); - // No copy constructor for safety - RxBlock(const RxBlock &) = delete; - // two blocks are the same if they refer to the same block idx: - constexpr bool operator==(const RxBlock &other) const { - return blockIdx == other.blockIdx; - } - // same for not equal operator - constexpr bool operator!=(const RxBlock &other) const { - return !(*this == other); - } - ~RxBlock() = default; - public: - // returns true if this fragment has been already received - bool hasFragment(int fragment_idx); - // returns true if we are "done with this block" aka all data has been already forwarded - bool allPrimaryFragmentsHaveBeenForwarded() const; - // returns true if enough FEC secondary fragments are available to replace all missing primary fragments - bool allPrimaryFragmentsCanBeRecovered() const; - // returns true as soon as all primary fragments are available - bool allPrimaryFragmentsAreAvailable() const; - // copy the fragment data and mark it as available - // you should check if it is already available with hasFragment() to avoid copying the same fragment multiple times - // when using multiple RX cards - void addFragment(const uint8_t *data, const std::size_t dataLen); - // util to copy the packet size and payload (and not more) - void fragment_copy_payload(const int fragment_idx,const uint8_t *data, const std::size_t dataLen); - /** - * @returns the indices for all primary fragments that have not yet been forwarded and are available (already received or reconstructed). - * Once an index is returned here, it won't be returned again - * (Therefore, as long as you immediately forward all primary fragments returned here,everything happens in order) - * @param discardMissingPackets : if true, gaps are ignored and fragments are forwarded even though this means the missing ones are irreversible lost - * Be carefully with this param, use it only before you need to get rid of a block */ - std::vector pullAvailablePrimaryFragments(const bool discardMissingPackets = false); - const uint8_t *get_primary_fragment_data_p(const int fragment_index); - const int get_primary_fragment_data_size(const int fragment_index); - - // returns the n of primary and secondary fragments for this block - int getNAvailableFragments() const { - return m_n_available_primary_fragments + m_n_available_secondary_fragments; - } - /** - * Reconstruct all missing primary fragments (data packets) by using the received secondary (FEC) packets - * NOTE: reconstructing only part of the missing data is not supported ! (That's a non-fixable technical detail of FEC) - * NOTE: Do not call this method unless it is needed - * @return the n of reconstructed packets - */ - int reconstructAllMissingData(); - [[nodiscard]] uint64_t getBlockIdx() const { - return blockIdx; - } - [[nodiscard]] std::optional getFirstFragmentTimePoint() const { - return firstFragmentTimePoint; - } - // Returns the number of missing primary packets (e.g. the n of actual data packets that are missing) - // This only works if we know the "fec_k" parameter - std::optional get_missing_primary_packets() const; - std::string get_missing_primary_packets_readable() const; - int get_n_primary_fragments()const; - private: - // the block idx marks which block this element refers to - const uint64_t blockIdx = 0; - // n of primary fragments that are already pulled out - int nAlreadyForwardedPrimaryFragments = 0; - // for each fragment (via fragment_idx) store if it has been received yet - std::vector fragment_map; - // holds all the data for all received fragments (if fragment_map says UNAVALIABLE at this position, content is undefined) - std::vector> blockBuffer; - // time point when the first fragment for this block was received (via addFragment() ) - std::optional firstFragmentTimePoint = std::nullopt; - // as soon as we know any of the fragments for this block, we know how many primary fragments this block contains - // (and therefore, how many primary or secondary fragments we need to fully reconstruct) - int m_n_primary_fragments_in_block =-1; - // for the fec step, we need the size of the fec secondary fragments, which should be equal for all secondary fragments - int m_size_of_secondary_fragments =-1; - int m_n_available_primary_fragments =0; - int m_n_available_secondary_fragments =0; -}; - -// Takes a continuous stream of packets (data and fec correction packets) and -// processes them such that the output is exactly (or as close as possible) to the -// Input stream fed to FECEncoder. -// Most importantly, it also handles re-ordering of packets and packet duplicates due to multiple rx cards -class FECDecoder { - public: - /** - * @param rx_queue_max_depth max size of rx queue - since in case of openhd, one frame is either one or two FEC blocks - * we don't need that big of an rx queue - * @param maxNFragmentsPerBlock memory per block is pre-allocated, reduce this value if you know the encoder doesn't ever exceed a given - * n of fragments per block - * @param enable_log_debug - */ - explicit FECDecoder(const unsigned int rx_queue_max_depth,const unsigned int maxNFragmentsPerBlock = MAX_TOTAL_FRAGMENTS_PER_BLOCK, - bool enable_log_debug=false) : - RX_QUEUE_MAX_SIZE(rx_queue_max_depth), - maxNFragmentsPerBlock(maxNFragmentsPerBlock), - m_enable_log_debug(enable_log_debug){ - assert(rx_queue_max_depth<20); - assert(rx_queue_max_depth>=1); - } - FECDecoder(const FECDecoder &other) = delete; - ~FECDecoder() = default; - // data forwarded on this callback is always in-order but possibly with gaps - typedef std::function SEND_DECODED_PACKET; - // WARNING: Don't forget to register this callback ! - SEND_DECODED_PACKET mSendDecodedPayloadCallback; - // A value too high doesn't really give much benefit and increases memory usage - const unsigned int RX_QUEUE_MAX_SIZE; - const unsigned int maxNFragmentsPerBlock; - const bool m_enable_log_debug; - AvgCalculator m_fec_decode_time{}; - public: - static bool validate_packet_size(int data_len); - // process a valid packet - bool process_valid_packet(const uint8_t* data,int data_len); - private: - // since we also need to search this data structure, a std::queue is not enough. - // since we have an upper limit on the size of this dequeue, it is basically a searchable ring buffer - std::deque> rx_queue; - uint64_t last_known_block = ((uint64_t) -1); //id of last known block - /** - * For this Block, - * starting at the primary fragment we stopped on last time, - * forward as many primary fragments as they are available until there is a gap - * @param discardMissingPackets : if true, gaps are ignored and fragments are forwarded even though this means the missing ones are irreversible lost - * Be carefully with this param, use it only before you need to get rid of a block - */ - void forwardMissingPrimaryFragmentsIfAvailable(RxBlock &block, const bool discardMissingPackets = false); - // also increase lost block count if block is not fully recovered - void rxQueuePopFront(); - // create a new RxBlock for the specified block_idx and push it into the queue - // NOTE: Checks first if this operation would increase the size of the queue over its max capacity - // In this case, the only solution is to remove the oldest block before adding the new one - void rxRingCreateNewSafe(const uint64_t blockIdx); - - // If block is already known and not in the queue anymore return nullptr - // else if block is inside the ring return pointer to it - // and if it is not inside the ring add as many blocks as needed, then return pointer to it - RxBlock *rxRingFindCreateBlockByIdx(const uint64_t blockIdx); - void process_with_rx_queue(const FECPayloadHdr& header,const uint8_t* data,int data_size); - public: - // matches FECDecoder - struct FECRxStats { - // total block count - uint64_t count_blocks_total = 0; - // a block counts as "lost" if it was removed before being fully received or recovered - uint64_t count_blocks_lost = 0; - // a block counts as "recovered" if it was recovered using FEC packets - uint64_t count_blocks_recovered = 0; - // n of primary fragments that were reconstructed during the recovery process of a block - uint64_t count_fragments_recovered = 0; - // n of forwarded bytes - uint64_t count_bytes_forwarded=0; - MinMaxAvg curr_fec_decode_time{}; - }; - FECRxStats stats{}; - void reset_rx_queue(); -}; - -// quick math regarding sequence numbers: -//uint32_t holds max 4294967295 . At 10 000 pps (packets per seconds) (which is already completely out of reach) this allows the tx to run for 429496.7295 seconds -// 429496.7295 / 60 / 60 = 119.304647083 hours which is also completely overkill for OpenHD (and after this time span, a "reset" of the sequence number happens anyways) -// unsigned 24 bits holds 16777215 . At 1000 blocks per second this allows the tx to create blocks for 16777.215 seconds or 4.6 hours. That should cover a flight (and after 4.6h a reset happens, -// which means you might lose a couple of blocks once every 4.6 h ) -// and 8 bits holds max 255. - -#endif // WIFIBROADCAST_FECSTREAM_H diff --git a/src/WBStreamRx.h b/src/WBStreamRx.h index 660596c9..8af633f0 100644 --- a/src/WBStreamRx.h +++ b/src/WBStreamRx.h @@ -7,7 +7,7 @@ #include "moodycamel/concurrentqueue/blockingconcurrentqueue.h" #include "moodycamel/readerwriterqueue/readerwritercircularbuffer.h" -#include "FECStream.h" +#include "fec/FEC.h" #include "HelperSources/Helper.hpp" #include "HelperSources/SequenceNumberDebugger.hpp" #include "HelperSources/TimeHelper.hpp" @@ -15,6 +15,7 @@ #include "SimpleStream.hpp" #include "WBTxRx.h" #include "wifibroadcast_spdlog.h" +#include "fec/FECDecoder.h" /** * Receiver for a (multiplexed) wifbroadcast stream diff --git a/src/WBStreamTx.h b/src/WBStreamTx.h index e9edae7a..dc4411be 100644 --- a/src/WBStreamTx.h +++ b/src/WBStreamTx.h @@ -11,10 +11,11 @@ #include "moodycamel/concurrentqueue/blockingconcurrentqueue.h" #include "moodycamel/readerwriterqueue/readerwritercircularbuffer.h" -#include "FECStream.h" +#include "fec/FEC.h" #include "SimpleStream.hpp" #include "HelperSources/TimeHelper.hpp" #include "WBTxRx.h" +#include "fec/FECEncoder.h" /** * Transmitter for a (multiplexed) wifbroadcast stream diff --git a/src/WBTxRx.h b/src/WBTxRx.h index ad748aca..14e834fa 100644 --- a/src/WBTxRx.h +++ b/src/WBTxRx.h @@ -15,16 +15,18 @@ #include #include -#include "Encryption.h" +#include "encryption/Encryption.h" #include "HelperSources/UINT16SeqNrHelper.hpp" #include "HelperSources/UINT64SeqNrHelper.hpp" #include "Ieee80211Header.hpp" #include "WiFiCard.h" +#include "encryption/Decryptor.h" #include "radiotap/RSSIAccumulator.hpp" #include "radiotap/RadiotapHeaderTx.hpp" #include "radiotap/RadiotapHeaderTxHolder.hpp" #include "radiotap/RadiotapRxRfAggregator.h" #include "radiotap/SignalQualityAccumulator.hpp" +#include "encryption/Encryptor.h" /** * This class exists to provide a clean, working interface to create a diff --git a/src/encryption/Decryptor.cpp b/src/encryption/Decryptor.cpp new file mode 100644 index 00000000..6062d27e --- /dev/null +++ b/src/encryption/Decryptor.cpp @@ -0,0 +1,76 @@ +#include "Decryptor.h" + +#include + +#include +#include + +#include "Encryption.h" +#include "../wifibroadcast_spdlog.h" + +wb::Decryptor::Decryptor(wb::Key key1) + : rx_secretkey(key1.secret_key), tx_publickey(key1.public_key) { + memset(session_key.data(), 0, sizeof(session_key)); +} + +int wb::Decryptor::onNewPacketSessionKeyData( + const std::array& sessionKeyNonce, + const std::array& sessionKeyData) { + std::array new_session_key{}; + if (crypto_box_open_easy(new_session_key.data(), sessionKeyData.data(), + sessionKeyData.size(), sessionKeyNonce.data(), + tx_publickey.data(), rx_secretkey.data()) != 0) { + // this basically should just never happen, and is an error + wifibroadcast::log::get_default()->warn("unable to decrypt session key"); + return SESSION_NOT_VALID; + } + if (memcmp(session_key.data(), new_session_key.data(), sizeof(session_key)) != + 0) { + wifibroadcast::log::get_default()->info("Decryptor-New session detected"); + session_key = new_session_key; + m_has_valid_session = true; + return SESSION_VALID_NEW; + } + // this is NOT an error, the same session key is sent multiple times ! + return SESSION_VALID_NOT_NEW; +} + +bool wb::Decryptor::authenticate_and_decrypt(const uint64_t& nonce, + const uint8_t* encrypted, + int encrypted_size, + uint8_t* dest) { + if (!m_encrypt_data) { + const auto payload_size = encrypted_size - crypto_onetimeauth_BYTES; + assert(payload_size > 0); + const uint8_t* sign = encrypted + payload_size; + // const int + // res=crypto_auth_hmacsha256_verify(sign,msg,payload_size,session_key.data()); + const auto sub_key = wb::create_onetimeauth_subkey(nonce, session_key); + const int res = crypto_onetimeauth_verify(sign, encrypted, payload_size, + sub_key.data()); + if (res != -1) { + memcpy(dest, encrypted, payload_size); + return true; + } + return false; + } + unsigned long long mlen; + int res = crypto_aead_chacha20poly1305_decrypt( + dest, &mlen, nullptr, encrypted, encrypted_size, nullptr, 0, + (uint8_t*)(&nonce), session_key.data()); + return res != -1; +} + +std::shared_ptr> +wb::Decryptor::authenticate_and_decrypt_buff(const uint64_t& nonce, + const uint8_t* encrypted, + int encrypted_size) { + auto ret = std::make_shared>( + encrypted_size - crypto_aead_chacha20poly1305_ABYTES); + const auto res = + authenticate_and_decrypt(nonce, encrypted, encrypted_size, ret->data()); + if (res) { + return ret; + } + return nullptr; +} \ No newline at end of file diff --git a/src/encryption/Decryptor.h b/src/encryption/Decryptor.h new file mode 100644 index 00000000..c473c897 --- /dev/null +++ b/src/encryption/Decryptor.h @@ -0,0 +1,71 @@ +#ifndef DECRIPTOR_HPP +#define DECRIPTOR_HPP +#include +#include + +#include +#include +#include +#include + +#include "Key.hpp" + + +namespace wb { +class Decryptor { + public: + // enable a default deterministic encryption key by using std::nullopt + // else, pass path to file with encryption keys + explicit Decryptor(wb::Key key1); + static constexpr auto SESSION_VALID_NEW = 0; + static constexpr auto SESSION_VALID_NOT_NEW = 1; + static constexpr auto SESSION_NOT_VALID = -1; + /** + * Returns 0 if the session is a valid session in regards to the key-pairs AND + * the session is a new session Returns 1 if the session is a valid session in + * regards to the key-pairs but it is not a new session Returns -1 if the + * session is not a valid session in regards to the key-pairs + * + */ + int onNewPacketSessionKeyData( + const std::array& sessionKeyNonce, + const std::array& sessionKeyData); + /** + * Decrypt (or validate only if encryption is disabled) the given message + * and writes the original message content into dest. + * Returns true on success, false otherwise (false== the message is not a + * valid message) + * @param dest needs to be at least @param encrypted - 16 bytes big. + */ + bool authenticate_and_decrypt(const uint64_t& nonce, const uint8_t* encrypted, + int encrypted_size, uint8_t* dest); + + /** + * Easier to use, but usage might require memcpy + */ + std::shared_ptr> authenticate_and_decrypt_buff( + const uint64_t& nonce, const uint8_t* encrypted, int encrypted_size); + /** + * Disables encryption (to save cpu performance) but keeps packet validation + * functionality + * @param encryption_enabled + */ + void set_encryption_enabled(bool encryption_enabled) { + m_encrypt_data = encryption_enabled; + } + // Set to true as soon as a valid session has been detected + bool has_valid_session() const { return m_has_valid_session; } + + private: + // use this one if you are worried about CPU usage when using encryption + bool m_encrypt_data = true; + const std::array rx_secretkey{}; + const std::array tx_publickey{}; + std::array session_key{}; + bool m_has_valid_session = false; +}; + +} // namespace wb + +#endif // DECRIPTOR_HPP \ No newline at end of file diff --git a/src/encryption/Encryption.cpp b/src/encryption/Encryption.cpp new file mode 100644 index 00000000..90615a46 --- /dev/null +++ b/src/encryption/Encryption.cpp @@ -0,0 +1,62 @@ +// +// Created by consti10 on 13.08.23. +// + +#include "Encryption.h" + +#include +#include +#include + +#include +#include "../wifibroadcast_spdlog.h" + +wb::KeyPairTxRx wb::generate_keypair_random() { + KeyPairTxRx ret{}; + crypto_box_keypair(ret.key_1.public_key.data(), ret.key_1.secret_key.data()); + crypto_box_keypair(ret.key_2.public_key.data(), ret.key_2.secret_key.data()); + return ret; +} + +// Salts generated once using https://www.random.org/cgi-bin/randbyte?nbytes=16&format=d +// We want deterministic seed from a pw, and are only interested in making it impossible to reverse the process (even though the salt is plain text) +static constexpr std::array OHD_SALT_AIR{192,189,216,102,56,153,154,92,228,26,49,209,157,7,128,207}; +static constexpr std::array OHD_SALT_GND{179,30,150,20,17,200,225,82,48,64,18,130,89,62,83,234}; + +std::array +wb::create_seed_from_password_openhd_salt(const std::string& pw, + bool use_salt_air) { + const auto salt = use_salt_air ? OHD_SALT_AIR : OHD_SALT_GND; + std::array key{}; + if (crypto_pwhash(key.data(), key.size(), pw.c_str(), pw.length(), salt.data(), + crypto_pwhash_OPSLIMIT_INTERACTIVE, crypto_pwhash_MEMLIMIT_INTERACTIVE, + crypto_pwhash_ALG_DEFAULT) != 0) { + std::cerr<<"ERROR: cannot create_seed_from_password_openhd_salt"< wb::create_onetimeauth_subkey( + const uint64_t& nonce, const std::array& session_key) { + // sub-key for this packet + std::array subkey{}; + // We only have an 8 byte nonce, this should be enough entropy + std::array nonce_buf{0}; + memcpy(nonce_buf.data(),(uint8_t*)&nonce,8); + crypto_core_hchacha20(subkey.data(),nonce_buf.data(),session_key.data(), nullptr); + return subkey; +} \ No newline at end of file diff --git a/src/encryption/Encryption.h b/src/encryption/Encryption.h new file mode 100644 index 00000000..a5256571 --- /dev/null +++ b/src/encryption/Encryption.h @@ -0,0 +1,57 @@ +#ifndef ENCRYPTION_HPP +#define ENCRYPTION_HPP + +#include +#include +#include +#include + +#include + +#include "KeyPairTxRx.hpp" + +// Namespace that can be used to add encryption+packet validation +// (Or packet validation only to save CPU resources) +// to a lossy unidirectional link +// Packet validation is quite important, to make sure only openhd packets (and not standard wifi packets) are used in OpenHD +// The Encryption / Decryption name(s) are legacy - +// The more difficult part is dealing with the session key stuff, and this class makes it a bit easier to use + +// one time authentication and encryption nicely are really similar +static_assert(crypto_onetimeauth_BYTES==crypto_aead_chacha20poly1305_ABYTES); +// Encryption (or packet validation) adds this many bytes to the end of the message +static constexpr auto ENCRYPTION_ADDITIONAL_VALIDATION_DATA=crypto_aead_chacha20poly1305_ABYTES; + +namespace wb{ + +/** + * Generates a new keypair. Non-deterministic, 100% secure. + */ +KeyPairTxRx generate_keypair_random(); + +/** + * See https://libsodium.gitbook.io/doc/password_hashing + * Deterministic seed from password, but hides password itself (non-reversible) + * Uses a pre-defined salt to be deterministic + */ +std::array +create_seed_from_password_openhd_salt(const std::string& pw,bool use_salt_air); + +// We always use the same bind phrase by default +static constexpr auto DEFAULT_BIND_PHRASE="openhd"; +/** + * Generates 2 new (deterministic) tx rx keys, using the seed created from the pw. + * @param bind_phrase the password / bind phrase + */ +KeyPairTxRx generate_keypair_from_bind_phrase(const std::string& bind_phrase=DEFAULT_BIND_PHRASE); + +/** + * https://libsodium.gitbook.io/doc/key_derivation + * UINT16SeqNrHelper since we both support encryption and one time validation to save cpu performance + */ +std::array create_onetimeauth_subkey(const uint64_t& nonce,const std::array& session_key); + +} // namespace wb end + + +#endif //ENCRYPTION_HPP \ No newline at end of file diff --git a/src/encryption/EncryptionFsUtils.cpp b/src/encryption/EncryptionFsUtils.cpp new file mode 100644 index 00000000..57305e67 --- /dev/null +++ b/src/encryption/EncryptionFsUtils.cpp @@ -0,0 +1,45 @@ +// +// Created by consti10 on 13.08.23. +// + +#include "EncryptionFsUtils.h" + +#include + +#include +#include +#include + +#include +#include "../wifibroadcast_spdlog.h" + +int wb::write_keypair_to_file(const wb::KeyPairTxRx& keypair_txrx, + const std::string& filename) { + FILE *fp; + if ((fp = fopen(filename.c_str(), "w")) == nullptr) { + std::cerr<<"Unable to save "< + +namespace wb { + +/** + * Saves the KeyPairTxRx as a raw file + */ +int write_keypair_to_file(const KeyPairTxRx& keypair_txrx, + const std::string& filename); + +/** + * Reads a raw KeyPairTxRx from a raw file previusly generated. + */ +KeyPairTxRx read_keypair_from_file(const std::string& filename); + +} // namespace wb end + +#endif // ENCRYPTION_FS_UTILS_HPP \ No newline at end of file diff --git a/src/encryption/Encryptor.cpp b/src/encryption/Encryptor.cpp new file mode 100644 index 00000000..9a20ea93 --- /dev/null +++ b/src/encryption/Encryptor.cpp @@ -0,0 +1,50 @@ +#include "Encryptor.h" + +#include + +#include +#include +#include + +#include "Encryption.h" + +void wb::Encryptor::makeNewSessionKey( + std::array& sessionKeyNonce, + std::array& sessionKeyData) { + randombytes_buf(session_key.data(), sizeof(session_key)); + randombytes_buf(sessionKeyNonce.data(), sizeof(sessionKeyNonce)); + if (crypto_box_easy(sessionKeyData.data(), session_key.data(), + sizeof(session_key), sessionKeyNonce.data(), + rx_publickey.data(), tx_secretkey.data()) != 0) { + throw std::runtime_error("Unable to make session key!"); + } +} + +int wb::Encryptor::authenticate_and_encrypt(const uint64_t& nonce, + const uint8_t* src, int src_len, + uint8_t* dest) { + if (!m_encrypt_data) { // Only sign message + memcpy(dest, src, src_len); + uint8_t* sign = dest + src_len; + const auto sub_key = wb::create_onetimeauth_subkey(nonce, session_key); + crypto_onetimeauth(sign, src, src_len, sub_key.data()); + return src_len + crypto_onetimeauth_BYTES; + } + // sign and encrypt all together + long long unsigned int ciphertext_len; + crypto_aead_chacha20poly1305_encrypt(dest, &ciphertext_len, src, src_len, + (uint8_t*)nullptr, 0, nullptr, + (uint8_t*)&nonce, session_key.data()); + return (int)ciphertext_len; +} + +std::shared_ptr> +wb::Encryptor::authenticate_and_encrypt_buff(const uint64_t& nonce, + const uint8_t* src, + std::size_t src_len) { + auto ret = std::make_shared>( + src_len + ENCRYPTION_ADDITIONAL_VALIDATION_DATA); + const auto size = authenticate_and_encrypt(nonce, src, src_len, ret->data()); + assert(size == ret->size()); + return ret; +} \ No newline at end of file diff --git a/src/encryption/Encryptor.h b/src/encryption/Encryptor.h new file mode 100644 index 00000000..b9418bbf --- /dev/null +++ b/src/encryption/Encryptor.h @@ -0,0 +1,73 @@ +#ifndef ENCRIPTOR_HPP +#define ENCRIPTOR_HPP + +#include +#include + +#include +#include +#include +#include + +#include "Key.hpp" + +namespace wb { +class Encryptor { + public: + /** + * + * @param key1 encryption key, otherwise enable a default deterministic + * encryption key by using std::nullopt + * @param DISABLE_ENCRYPTION_FOR_PERFORMANCE only validate, do not encrypt + * (less CPU usage) + */ + explicit Encryptor(wb::Key key1) + : tx_secretkey(key1.secret_key), rx_publickey(key1.public_key) {} + /** + * Creates a new session key, simply put, the data we can send publicly + * @param sessionKeyNonce filled with public nonce + * @param sessionKeyData filled with public data + */ + void makeNewSessionKey( + std::array &sessionKeyNonce, + std::array &sessionKeyData); + /** + * Encrypt the given message of size @param src_len + * (Or if encryption is disabled, only calculate the message sign) + * and write the (encrypted) data appended by the validation data into dest + * @param nonce: needs to be different for every packet + * @param src @param src_len message to encrypt + * @param dest needs to point to a memory region at least @param src_len + 16 + * bytes big Returns written data size (msg payload plus sign data) + */ + int authenticate_and_encrypt(const uint64_t &nonce, const uint8_t *src, + int src_len, uint8_t *dest); + + /** + * For easy use - returns a buffer including (encrypted) payload plus + * validation data + */ + std::shared_ptr> authenticate_and_encrypt_buff( + const uint64_t &nonce, const uint8_t *src, std::size_t src_len); + /** + * Disables encryption (to save cpu performance) but keeps packet validation + * functionality + * @param encryption_enabled + */ + void set_encryption_enabled(bool encryption_enabled) { + m_encrypt_data = encryption_enabled; + } + + private: + // tx->rx keypair + const std::array tx_secretkey{}; + const std::array rx_publickey{}; + std::array session_key{}; + // use this one if you are worried about CPU usage when using encryption + bool m_encrypt_data = true; +}; + +} // namespace wb + +#endif // ENCRIPTOR_HPP \ No newline at end of file diff --git a/src/encryption/Key.hpp b/src/encryption/Key.hpp new file mode 100644 index 00000000..f0855e38 --- /dev/null +++ b/src/encryption/Key.hpp @@ -0,0 +1,19 @@ +#ifndef KEY_HPP +#define KEY_HPP +#include + +#include +#include + +namespace wb { + + +// A wb key consists of a public and secret key +struct Key { + std::array public_key; + std::array secret_key; +}; + +} // namespace wb + +#endif // KEY_HPP diff --git a/src/encryption/KeyPairTxRx.hpp b/src/encryption/KeyPairTxRx.hpp new file mode 100644 index 00000000..1ca4b63a --- /dev/null +++ b/src/encryption/KeyPairTxRx.hpp @@ -0,0 +1,20 @@ +#ifndef KEY_PAIR_TX_RX_HPP +#define KEY_PAIR_TX_RX_HPP + +#include "Key.hpp" + +namespace wb { + +// A wb keypair are 2 keys, one for transmitting, one for receiving +// (Since both ground and air unit talk bidirectional) +// We use a different key for the down-link / uplink, respective +struct KeyPairTxRx { + Key key_1; + Key key_2; + Key get_tx_key(bool is_air) { return is_air ? key_1 : key_2; } + Key get_rx_key(bool is_air) { return is_air ? key_2 : key_1; } +}; + +} // namespace wb + +#endif // KEY_PAIR_TX_RX_HPP \ No newline at end of file diff --git a/src/fec/FEC.cpp b/src/fec/FEC.cpp new file mode 100644 index 00000000..705be952 --- /dev/null +++ b/src/fec/FEC.cpp @@ -0,0 +1,29 @@ +// +// Created by consti10 on 30.06.23. +// + +#include + +#include "FEC.h" +#include "RxBlock.h" +#include "../wifibroadcast_spdlog.h" +#include + +uint32_t calculate_n_secondary_fragments(uint32_t n_primary_fragments, + uint32_t fec_overhead_perc) { + if(fec_overhead_perc<=0)return 0; + const float n_secondary=static_cast(n_primary_fragments) * static_cast(fec_overhead_perc) / 100.0f; + if(n_secondary<=1.0){ + // Always calculate at least one FEC packet + return 1; + } + return std::lroundf(n_secondary); +} + +unsigned int calculateN(const unsigned int k, const unsigned int percentage) { + return k + calculate_n_secondary_fragments(k,percentage); +} + +void fec_stream_print_fec_optimization_method() { + print_optimization_method(); +} diff --git a/src/FEC.hpp b/src/fec/FEC.h similarity index 65% rename from src/FEC.hpp rename to src/fec/FEC.h index 408028ef..97c746a7 100644 --- a/src/FEC.hpp +++ b/src/fec/FEC.h @@ -9,8 +9,9 @@ #include #include -#include "HelperSources/Helper.hpp" -#include "external/fec/fec_base.h" +#include "FECConstants.hpp" +#include "../HelperSources/Helper.hpp" +#include "../external/fec/fec_base.h" // c++ wrapper around fec library // NOTE: When working with FEC, people seem to use the terms block, fragments and more in different context(s). @@ -53,9 +54,6 @@ void fecEncode(unsigned int fragmentSize, //std::cout<<"fec_encode step took:"<(delta).count()<<"us\n"; } -static constexpr auto FRAGMENT_STATUS_UNAVAILABLE=false; -static constexpr auto FRAGMENT_STATUS_AVAILABLE=true; - /** * @param fragmentSize size of each fragment * @param blockBuffer blockBuffer (big) data buffer. The nth element is to be treated as the nth fragment of the block, either as primary or secondary fragment. @@ -103,58 +101,31 @@ std::vector fecDecode(unsigned int fragmentSize, return indicesMissingPrimaryFragments; } -// randomly select a possible combination of received indices (either primary or secondary). -static void testFecCPlusPlusWrapperY(const int nPrimaryFragments, const int nSecondaryFragments) { - srand(time(NULL)); - constexpr auto FRAGMENT_SIZE = 1446; - - auto txBlockBuffer = GenericHelper::createRandomDataBuffers(nPrimaryFragments + nSecondaryFragments); - std::cout << "XSelected nPrimaryFragments:" << nPrimaryFragments << " nSecondaryFragments:" << nSecondaryFragments - << "\n"; - - fecEncode(FRAGMENT_SIZE, txBlockBuffer, nPrimaryFragments, nSecondaryFragments); - std::cout << "Encode done\n"; - - for (int test = 0; test < 100; test++) { - // takes nPrimaryFragments random (possible) indices without duplicates - // NOTE: Perhaps you could calculate all possible permutations, but these would be quite a lot. - // Therefore, I just use n random selections of received indices - auto receivedFragmentIndices = GenericHelper::takeNRandomElements( - GenericHelper::createIndices(nPrimaryFragments + nSecondaryFragments), - nPrimaryFragments); - assert(receivedFragmentIndices.size() == nPrimaryFragments); - std::cout << "(Emulated) receivedFragmentIndices" << StringHelper::vectorAsString(receivedFragmentIndices) << "\n"; - - auto rxBlockBuffer = std::vector>(nPrimaryFragments + nSecondaryFragments); - std::vector fragmentMap(nPrimaryFragments + nSecondaryFragments, FRAGMENT_STATUS_UNAVAILABLE); - for (const auto idx: receivedFragmentIndices) { - rxBlockBuffer[idx] = txBlockBuffer[idx]; - fragmentMap[idx] = FRAGMENT_STATUS_AVAILABLE; - } - - fecDecode(FRAGMENT_SIZE, rxBlockBuffer, nPrimaryFragments, fragmentMap); - - for (unsigned int i = 0; i < nPrimaryFragments; i++) { - //std::cout<<"Comparing fragment:"< MAX_PAYLOAD_BEFORE_FEC) { + // packet is too big + return false; + } + return true; +} + +bool FECDecoder::process_valid_packet(const uint8_t* data, int data_len) { + assert(validate_packet_size(data_len)); + // reconstruct the data layout + const FECPayloadHdr* header_p = (FECPayloadHdr*)data; + /* const uint8_t* payload_p=data+sizeof(FECPayloadHdr); + const int payload_size=data_len-sizeof(FECPayloadHdr);*/ + if (header_p->fragment_idx >= maxNFragmentsPerBlock) { + wifibroadcast::log::get_default()->warn("invalid fragment_idx: {}", + header_p->fragment_idx); + return false; + } + process_with_rx_queue(*header_p, data, data_len); + return true; +} + +void FECDecoder::forwardMissingPrimaryFragmentsIfAvailable( + RxBlock& block, const bool discardMissingPackets) { + assert(mSendDecodedPayloadCallback); + // TODO remove me + if (discardMissingPackets) { + if (m_enable_log_debug) { + wifibroadcast::log::get_default()->warn( + "Forwarding block that is not yet fully finished: {} total: {} " + "available: {} missing: {}", + block.getBlockIdx(), block.get_n_primary_fragments(), + block.getNAvailableFragments(), + block.get_missing_primary_packets_readable()); + } + } + const auto indices = + block.pullAvailablePrimaryFragments(discardMissingPackets); + for (auto primaryFragmentIndex : indices) { + const uint8_t* data = + block.get_primary_fragment_data_p(primaryFragmentIndex); + const int data_size = + block.get_primary_fragment_data_size(primaryFragmentIndex); + if (data_size > FEC_PACKET_MAX_PAYLOAD_SIZE || data_size <= 0) { + wifibroadcast::log::get_default()->warn( + "corrupted packet on FECDecoder out ({}:{}) : {}B", + block.getBlockIdx(), primaryFragmentIndex, data_size); + } else { + mSendDecodedPayloadCallback(data, data_size); + stats.count_bytes_forwarded += data_size; + } + } +} + +void FECDecoder::rxQueuePopFront() { + assert(rx_queue.front() != nullptr); + if (!rx_queue.front()->allPrimaryFragmentsHaveBeenForwarded()) { + stats.count_blocks_lost++; + if (m_enable_log_debug) { + auto& block = *rx_queue.front(); + wifibroadcast::log::get_default()->debug( + "Removing block {} {}", block.getBlockIdx(), + block.get_missing_primary_packets_readable()); + } + } + rx_queue.pop_front(); +} + +void FECDecoder::rxRingCreateNewSafe(const uint64_t blockIdx) { + // check: make sure to always put blocks into the queue in order ! + if (!rx_queue.empty()) { + // the newest block in the queue should be equal to block_idx -1 + // but it must not ?! + if (rx_queue.back()->getBlockIdx() != (blockIdx - 1)) { + // If we land here, one or more full blocks are missing, which can happen + // on bad rx links + // wifibroadcast::log::get_default()->debug("In queue: {} But new: + // {}",rx_queue.back()->getBlockIdx(),blockIdx); + } + // assert(rx_queue.back()->getBlockIdx() == (blockIdx - 1)); + } + // we can return early if this operation doesn't exceed the size limit + if (rx_queue.size() < RX_QUEUE_MAX_SIZE) { + rx_queue.push_back( + std::make_unique(maxNFragmentsPerBlock, blockIdx)); + stats.count_blocks_total++; + return; + } + // Ring overflow. This means that there are more unfinished blocks than ring + // size Possible solutions: + // 1. Increase ring size. Do this if you have large variance of packet travel + // time throught WiFi card or network stack. + // Some cards can do this due to packet reordering inside, diffent chipset + // and/or firmware or your RX hosts have different CPU power. + // 2. Reduce packet injection speed or try to unify RX hardware. + + // forward remaining data for the (oldest) block, since we need to get rid of + // it + auto& oldestBlock = rx_queue.front(); + forwardMissingPrimaryFragmentsIfAvailable(*oldestBlock, true); + // and remove the block once done with it + rxQueuePopFront(); + + // now we are guaranteed to have space for one new block + rx_queue.push_back( + std::make_unique(maxNFragmentsPerBlock, blockIdx)); + stats.count_blocks_total++; +} + +RxBlock* FECDecoder::rxRingFindCreateBlockByIdx(const uint64_t blockIdx) { + // check if block is already in the ring + auto found = std::find_if(rx_queue.begin(), rx_queue.end(), + [&blockIdx](const std::unique_ptr& block) { + return block->getBlockIdx() == blockIdx; + }); + if (found != rx_queue.end()) { + return found->get(); + } + // check if block is already known and not in the ring then it is already + // processed + if (last_known_block != (uint64_t)-1 && blockIdx <= last_known_block) { + return nullptr; + } + + // don't forget to increase the lost blocks counter if we do not add blocks + // here due to no space in the rx queue (can happen easily if the rx queue has + // a size of 1) + const auto n_needed_new_blocks = + last_known_block != (uint64_t)-1 ? blockIdx - last_known_block : 1; + if (n_needed_new_blocks > RX_QUEUE_MAX_SIZE) { + if (m_enable_log_debug) { + wifibroadcast::log::get_default()->debug( + "Need {} blocks, exceeds {}", n_needed_new_blocks, RX_QUEUE_MAX_SIZE); + } + stats.count_blocks_lost += n_needed_new_blocks - RX_QUEUE_MAX_SIZE; + } + // add as many blocks as we need ( the rx ring mustn't have any gaps between + // the block indices). but there is no point in adding more blocks than + // RX_RING_SIZE + const int new_blocks = (int)std::min(n_needed_new_blocks, + (uint64_t)FECDecoder::RX_QUEUE_MAX_SIZE); + last_known_block = blockIdx; + + for (int i = 0; i < new_blocks; i++) { + rxRingCreateNewSafe(blockIdx + i + 1 - new_blocks); + } + // the new block we've added is now the most recently added element (and since + // we always push to the back, the "back()" element) + assert(rx_queue.back()->getBlockIdx() == blockIdx); + return rx_queue.back().get(); +} + +void FECDecoder::process_with_rx_queue(const FECPayloadHdr& header, + const uint8_t* data, int data_size) { + auto blockP = rxRingFindCreateBlockByIdx(header.block_idx); + // ignore already processed blocks + if (blockP == nullptr) return; + // cannot be nullptr + RxBlock& block = *blockP; + // ignore already processed fragments + if (block.hasFragment(header.fragment_idx)) { + return; + } + block.addFragment(data, data_size); + if (block == *rx_queue.front()) { + // wifibroadcast::log::get_default()->debug("In front\n"; + // we are in the front of the queue (e.g. at the oldest block) + // forward packets until the first gap + forwardMissingPrimaryFragmentsIfAvailable(block); + // We are done with this block if either all fragments have been forwarded + // or it can be recovered + if (block.allPrimaryFragmentsHaveBeenForwarded()) { + // remove block when done with it + rxQueuePopFront(); + return; + } + if (block.allPrimaryFragmentsCanBeRecovered()) { + // apply fec for this block + const auto before_encode = std::chrono::steady_clock::now(); + stats.count_fragments_recovered += block.reconstructAllMissingData(); + stats.count_blocks_recovered++; + m_fec_decode_time.add(std::chrono::steady_clock::now() - before_encode); + if (m_fec_decode_time.get_delta_since_last_reset() > + std::chrono::seconds(1)) { + // wifibroadcast::log::get_default()->debug("FEC decode took + // {}",m_fec_decode_time.getAvgReadable()); + stats.curr_fec_decode_time = m_fec_decode_time.getMinMaxAvg(); + m_fec_decode_time.reset(); + } + forwardMissingPrimaryFragmentsIfAvailable(block); + assert(block.allPrimaryFragmentsHaveBeenForwarded()); + // remove block when done with it + rxQueuePopFront(); + return; + } + return; + } else { + // wifibroadcast::log::get_default()->debug("Not in front\n"; + // we are not in the front of the queue but somewhere else + // If this block can be fully recovered or all primary fragments are + // available this triggers a flush + if (block.allPrimaryFragmentsAreAvailable() || + block.allPrimaryFragmentsCanBeRecovered()) { + // send all queued packets in all unfinished blocks before and remove them + if (m_enable_log_debug) { + wifibroadcast::log::get_default()->debug("Block {} triggered a flush", + block.getBlockIdx()); + } + while (block != *rx_queue.front()) { + forwardMissingPrimaryFragmentsIfAvailable(*rx_queue.front(), true); + rxQueuePopFront(); + } + // then process the block who is fully recoverable or has no gaps in the + // primary fragments + if (block.allPrimaryFragmentsAreAvailable()) { + forwardMissingPrimaryFragmentsIfAvailable(block); + assert(block.allPrimaryFragmentsHaveBeenForwarded()); + } else { + // apply fec for this block + stats.count_fragments_recovered += block.reconstructAllMissingData(); + stats.count_blocks_recovered++; + forwardMissingPrimaryFragmentsIfAvailable(block); + assert(block.allPrimaryFragmentsHaveBeenForwarded()); + } + // remove block + rxQueuePopFront(); + } + } +} + +void FECDecoder::reset_rx_queue() { + /*while (auto el=rx_queue.front() != nullptr){ + rxQueuePopFront(); + }*/ + rx_queue.resize(0); + last_known_block = ((uint64_t)-1); +} \ No newline at end of file diff --git a/src/fec/FECDecoder.h b/src/fec/FECDecoder.h new file mode 100644 index 00000000..7340f8d3 --- /dev/null +++ b/src/fec/FECDecoder.h @@ -0,0 +1,113 @@ +#ifndef FEC_DECODER_HPP +#define FEC_DECODER_HPP + +#include + +#include "FECConstants.hpp" +#include +#include +#include + +#include "RxBlock.h" +#include "TimeHelper.hpp" + +// Takes a continuous stream of packets (data and fec correction packets) and +// processes them such that the output is exactly (or as close as possible) to +// the Input stream fed to FECEncoder. Most importantly, it also handles +// re-ordering of packets and packet duplicates due to multiple rx cards +class FECDecoder { + public: + /** + * @param rx_queue_max_depth max size of rx queue - since in case of openhd, + * one frame is either one or two FEC blocks we don't need that big of an rx + * queue + * @param maxNFragmentsPerBlock memory per block is pre-allocated, reduce this + * value if you know the encoder doesn't ever exceed a given n of fragments + * per block + * @param enable_log_debug + */ + explicit FECDecoder( + const unsigned int rx_queue_max_depth, + const unsigned int maxNFragmentsPerBlock = MAX_TOTAL_FRAGMENTS_PER_BLOCK, + bool enable_log_debug = false) + : RX_QUEUE_MAX_SIZE(rx_queue_max_depth), + maxNFragmentsPerBlock(maxNFragmentsPerBlock), + m_enable_log_debug(enable_log_debug) { + assert(rx_queue_max_depth < 20); + assert(rx_queue_max_depth >= 1); + } + FECDecoder(const FECDecoder &other) = delete; + ~FECDecoder() = default; + // data forwarded on this callback is always in-order but possibly with gaps + typedef std::function + SEND_DECODED_PACKET; + // WARNING: Don't forget to register this callback ! + SEND_DECODED_PACKET mSendDecodedPayloadCallback; + // A value too high doesn't really give much benefit and increases memory + // usage + const unsigned int RX_QUEUE_MAX_SIZE; + const unsigned int maxNFragmentsPerBlock; + const bool m_enable_log_debug; + AvgCalculator m_fec_decode_time{}; + + public: + static bool validate_packet_size(int data_len); + // process a valid packet + bool process_valid_packet(const uint8_t *data, int data_len); + + private: + // since we also need to search this data structure, a std::queue is not + // enough. since we have an upper limit on the size of this dequeue, it is + // basically a searchable ring buffer + std::deque> rx_queue; + uint64_t last_known_block = ((uint64_t)-1); // id of last known block + /** + * For this Block, + * starting at the primary fragment we stopped on last time, + * forward as many primary fragments as they are available until there is a + * gap + * @param discardMissingPackets : if true, gaps are ignored and fragments are + * forwarded even though this means the missing ones are irreversible lost Be + * carefully with this param, use it only before you need to get rid of a + * block + */ + void forwardMissingPrimaryFragmentsIfAvailable( + RxBlock &block, const bool discardMissingPackets = false); + // also increase lost block count if block is not fully recovered + void rxQueuePopFront(); + // create a new RxBlock for the specified block_idx and push it into the queue + // NOTE: Checks first if this operation would increase the size of the queue + // over its max capacity In this case, the only solution is to remove the + // oldest block before adding the new one + void rxRingCreateNewSafe(const uint64_t blockIdx); + + // If block is already known and not in the queue anymore return nullptr + // else if block is inside the ring return pointer to it + // and if it is not inside the ring add as many blocks as needed, then return + // pointer to it + RxBlock *rxRingFindCreateBlockByIdx(const uint64_t blockIdx); + void process_with_rx_queue(const FECPayloadHdr &header, const uint8_t *data, + int data_size); + + public: + // matches FECDecoder + struct FECRxStats { + // total block count + uint64_t count_blocks_total = 0; + // a block counts as "lost" if it was removed before being fully received or + // recovered + uint64_t count_blocks_lost = 0; + // a block counts as "recovered" if it was recovered using FEC packets + uint64_t count_blocks_recovered = 0; + // n of primary fragments that were reconstructed during the recovery + // process of a block + uint64_t count_fragments_recovered = 0; + // n of forwarded bytes + uint64_t count_bytes_forwarded = 0; + MinMaxAvg curr_fec_decode_time{}; + }; + FECRxStats stats{}; + void reset_rx_queue(); +}; + +#endif // FEC_DECODER_HPP diff --git a/src/fec/FECEncoder.cpp b/src/fec/FECEncoder.cpp new file mode 100644 index 00000000..5bc1575b --- /dev/null +++ b/src/fec/FECEncoder.cpp @@ -0,0 +1,102 @@ +#include "FECEncoder.h" + +#include +#include +#include +#include + +#include "../external/fec/fec_base.h" + +#include "FECConstants.hpp" + +void FECEncoder::encode_block( + std::vector>> data_packets, + int n_secondary_fragments) { + assert(data_packets.size() <= MAX_N_P_FRAGMENTS_PER_BLOCK); + assert(n_secondary_fragments <= MAX_N_S_FRAGMENTS_PER_BLOCK); + const auto n_primary_fragments = data_packets.size(); + // nice to have statistic + m_block_sizes.add(n_primary_fragments); + if (m_block_sizes.get_delta_since_last_reset() >= std::chrono::seconds(1)) { + // wifibroadcast::log::get_default()->debug("Block sizes: + // {}",m_block_sizes.getAvgReadable()); + m_curr_fec_block_sizes = m_block_sizes.getMinMaxAvg(); + m_block_sizes.reset(); + } + FECPayloadHdr header{}; + header.block_idx = m_curr_block_idx; + m_curr_block_idx++; + header.n_primary_fragments = n_primary_fragments; + // write and forward all the data packets first + // also calculate the size of the biggest data packet + size_t max_packet_size = 0; + // Store a pointer where the FEC data begins for performing the FEC step later + // on + std::vector primary_fragments_data_p; + for (int i = 0; i < data_packets.size(); i++) { + const auto& data_fragment = data_packets[i]; + // wifibroadcast::log::get_default()->debug("In:{}",(int)data_fragment->size()); + assert(!data_fragment->empty()); + assert(data_fragment->size() <= FEC_PACKET_MAX_PAYLOAD_SIZE); + header.fragment_idx = i; + header.data_size = data_fragment->size(); + auto buffer_p = m_block_buffer[i].data(); + // copy over the header + memcpy(buffer_p, (uint8_t*)&header, sizeof(FECPayloadHdr)); + // write the actual data + memcpy(buffer_p + sizeof(FECPayloadHdr), data_fragment->data(), + data_fragment->size()); + // zero out the remaining bytes such that FEC always sees zeroes + // same is done on the rx. These zero bytes are never transmitted via wifi + const auto writtenDataSize = sizeof(FECPayloadHdr) + data_fragment->size(); + memset(buffer_p + writtenDataSize, 0, + MAX_PAYLOAD_BEFORE_FEC - writtenDataSize); + max_packet_size = std::max(max_packet_size, data_fragment->size()); + // we can forward the data packet immediately via the callback + if (outputDataCallback) { + outputDataCallback(buffer_p, writtenDataSize); + } + // NOTE: FECPayloadHdr::data_size needs to be included during the fec encode + // step + primary_fragments_data_p.push_back(buffer_p + sizeof(FECPayloadHdr) - + sizeof(uint16_t)); + } + // then we create as many FEC packets as needed + if (n_secondary_fragments == 0) { + // wifibroadcast::log::get_default()->debug("No FEC step performed"); + // no FEC step is actually performed, usefully for debugging / performance + // evaluation + return; + } + const auto before = std::chrono::steady_clock::now(); + // Now we perform the actual FEC encode step + std::vector secondary_fragments_data_p; + for (int i = 0; i < n_secondary_fragments; i++) { + auto fragment_index = i + n_primary_fragments; + auto buffer_p = m_block_buffer[fragment_index].data(); + header.fragment_idx = fragment_index; + // copy over the header + memcpy(buffer_p, (uint8_t*)&header, sizeof(FECPayloadHdr)); + // where the FEC packet correction data is written to + secondary_fragments_data_p.push_back(buffer_p + sizeof(FECPayloadHdr) - + sizeof(uint16_t)); + } + fec_encode2(max_packet_size + sizeof(uint16_t), primary_fragments_data_p, + secondary_fragments_data_p); + m_fec_block_encode_time.add(std::chrono::steady_clock::now() - before); + if (m_fec_block_encode_time.get_delta_since_last_reset() >= + std::chrono::seconds(1)) { + // wifibroadcast::log::get_default()->debug("FEC encode + // time:{}",m_fec_block_encode_time.getAvgReadable()); + m_curr_fec_block_encode_time = m_fec_block_encode_time.getMinMaxAvg(); + m_fec_block_encode_time.reset(); + } + // and forward all the FEC correction packets + for (int i = 0; i < n_secondary_fragments; i++) { + auto fragment_index = i + n_primary_fragments; + if (outputDataCallback) { + outputDataCallback(m_block_buffer[fragment_index].data(), + sizeof(FECPayloadHdr) + max_packet_size); + } + } +} diff --git a/src/fec/FECEncoder.h b/src/fec/FECEncoder.h new file mode 100644 index 00000000..199725bb --- /dev/null +++ b/src/fec/FECEncoder.h @@ -0,0 +1,44 @@ +#ifndef FEC_ENCODER_HPP +#define FEC_ENCODER_HPP + +#include +#include + +#include "FECConstants.hpp" +#include "TimeHelper.hpp" + +class FECEncoder { + public: + typedef std::function + OUTPUT_DATA_CALLBACK; + OUTPUT_DATA_CALLBACK outputDataCallback; + explicit FECEncoder() = default; + FECEncoder(const FECEncoder& other) = delete; + + public: + /** + * Encodes a new block and forwards the packets for this block + * forwards data packets first, then generated fec packets + * (if needed) and forwards them after. + * @param data_packets the packets for this block + * @param n_secondary_fragments how many secondary fragments (FEC packets) + * should be created + */ + void encode_block( + std::vector>> data_packets, + int n_secondary_fragments); + // Pre-allocated to have space for storing primary fragments (they are needed + // once the fec step needs to be performed) and creating the wanted amount of + // secondary packets + std::array, + MAX_TOTAL_FRAGMENTS_PER_BLOCK> + m_block_buffer{}; + uint32_t m_curr_block_idx = 0; + static_assert(sizeof(m_curr_block_idx) == sizeof(FECPayloadHdr::block_idx)); + AvgCalculator m_fec_block_encode_time; + MinMaxAvg m_curr_fec_block_encode_time{}; + BaseAvgCalculator m_block_sizes{}; + MinMaxAvg m_curr_fec_block_sizes{}; +}; + +#endif // FEC_ENCODER_HPP \ No newline at end of file diff --git a/src/fec/FECPayloadHdr.hpp b/src/fec/FECPayloadHdr.hpp new file mode 100644 index 00000000..b9f30895 --- /dev/null +++ b/src/fec/FECPayloadHdr.hpp @@ -0,0 +1,36 @@ +#ifndef FEC_PAYLOAD_HDR_HPP +#define FEC_PAYLOAD_HDR_HPP + +#include +#include "endian.h" +/** + * Encoder and Decoder pair for FEC protected block / packet based data + * streaming. adds sizeof(FECPayloadHdr) to each fec primary or secondary + * packet. + */ + +static_assert(__BYTE_ORDER == __LITTLE_ENDIAN, + "This code is written for little endian only !"); + +struct FECPayloadHdr { + // Most often each frame is encoded as one fec block + // rolling + uint32_t block_idx; + // each fragment inside a block has a fragment index + // uint8_t is enough, since we are limited to 128+128=256 fragments anyway by + // the FEC impl. + uint8_t fragment_idx; + // how many fragments make up the primary fragments part, the rest is + // secondary fragments note that we do not need to know how many secondary + // fragments have been created - as soon as we 'have enough', we can perform + // the FEC correction step if necessary + uint8_t n_primary_fragments; + // For FEC all data fragments have to be the same size. We pad the rest during + // encoding / decoding with 0, and do this when encoding / decoding such that + // the 0 bytes don't have to be transmitted. This needs to be included during + // the fec encode / decode step ! + uint16_t data_size; +} __attribute__((packed)); +static_assert(sizeof(FECPayloadHdr) == 8); + +#endif // FEC_PAYLOAD_HDR_HPP \ No newline at end of file diff --git a/src/fec/RxBlock.cpp b/src/fec/RxBlock.cpp new file mode 100644 index 00000000..4a8d8c29 --- /dev/null +++ b/src/fec/RxBlock.cpp @@ -0,0 +1,165 @@ +#include "RxBlock.h" +#include "FEC.h" + +RxBlock::RxBlock(const unsigned int maxNFragmentsPerBlock, + const uint64_t blockIdx1) + : blockIdx(blockIdx1), + fragment_map( + maxNFragmentsPerBlock, + FRAGMENT_STATUS_UNAVAILABLE), // after creation of the RxBlock every + // f. is marked as unavailable + blockBuffer(maxNFragmentsPerBlock) { + assert(fragment_map.size() == blockBuffer.size()); +} + +bool RxBlock::hasFragment(const int fragment_idx) { + assert(fragment_idx < fragment_map.size()); + return fragment_map[fragment_idx] == FRAGMENT_STATUS_AVAILABLE; +} + +bool RxBlock::allPrimaryFragmentsHaveBeenForwarded() const { + if (m_n_primary_fragments_in_block == -1) return false; + return nAlreadyForwardedPrimaryFragments == m_n_primary_fragments_in_block; +} + +bool RxBlock::allPrimaryFragmentsCanBeRecovered() const { + // return false if k is not known for this block yet (which means we didn't + // get a secondary fragment yet, since each secondary fragment contains k) + if (m_n_primary_fragments_in_block == -1) return false; + // ready for FEC step if we have as many secondary fragments as we are missing + // on primary fragments + if (m_n_available_primary_fragments + m_n_available_secondary_fragments >= + m_n_primary_fragments_in_block) + return true; + return false; +} + +bool RxBlock::allPrimaryFragmentsAreAvailable() const { + if (m_n_primary_fragments_in_block == -1) return false; + return m_n_available_primary_fragments == m_n_primary_fragments_in_block; +} + +void RxBlock::addFragment(const uint8_t* data, const std::size_t dataLen) { + auto* hdr_p = (FECPayloadHdr*)data; + FECPayloadHdr& header = *hdr_p; + assert(!hasFragment(header.fragment_idx)); + assert(header.block_idx == blockIdx); + assert(fragment_map[header.fragment_idx] == FRAGMENT_STATUS_UNAVAILABLE); + assert(header.fragment_idx < blockBuffer.size()); + fragment_copy_payload(header.fragment_idx, data, dataLen); + // mark it as available + fragment_map[header.fragment_idx] = FRAGMENT_STATUS_AVAILABLE; + + // each fragment inside a block should report the same n of primary fragments + if (m_n_primary_fragments_in_block == -1) { + m_n_primary_fragments_in_block = header.n_primary_fragments; + } else { + assert(m_n_primary_fragments_in_block == header.n_primary_fragments); + } + const bool is_primary_fragment = + header.fragment_idx < header.n_primary_fragments; + if (is_primary_fragment) { + m_n_available_primary_fragments++; + } else { + m_n_available_secondary_fragments++; + const auto payload_len_including_size = + dataLen - sizeof(FECPayloadHdr) + sizeof(uint16_t); + // all secondary fragments shall have the same size + if (m_size_of_secondary_fragments == -1) { + m_size_of_secondary_fragments = payload_len_including_size; + } else { + assert(m_size_of_secondary_fragments == payload_len_including_size); + } + } + if (firstFragmentTimePoint == std::nullopt) { + firstFragmentTimePoint = std::chrono::steady_clock::now(); + } +} + +void RxBlock::fragment_copy_payload(const int fragment_idx, const uint8_t* data, + const std::size_t dataLen) { + uint8_t* buff = blockBuffer[fragment_idx].data(); + // NOTE: FECPayloadHdr::data_size needs to be included during the fec decode + // step + const uint8_t* payload_p = data + sizeof(FECPayloadHdr) - sizeof(uint16_t); + auto payload_s = dataLen - sizeof(FECPayloadHdr) + sizeof(uint16_t); + // write the data (doesn't matter if FEC data or correction packet) + memcpy(buff, payload_p, payload_s); + // set the rest to zero such that FEC works + memset(buff + payload_s, 0, MAX_PAYLOAD_BEFORE_FEC - payload_s); +} + +std::vector RxBlock::pullAvailablePrimaryFragments( + const bool discardMissingPackets) { + // note: when pulling the available fragments, we do not need to know how many + // primary fragments this block actually contains + std::vector ret; + for (int i = nAlreadyForwardedPrimaryFragments; + i < m_n_available_primary_fragments; i++) { + if (fragment_map[i] == FRAGMENT_STATUS_UNAVAILABLE) { + if (discardMissingPackets) { + continue; + } else { + break; + } + } + ret.push_back(i); + } + // make sure these indices won't be returned again + nAlreadyForwardedPrimaryFragments += (int)ret.size(); + return ret; +} + +const uint8_t* RxBlock::get_primary_fragment_data_p(const int fragment_index) { + assert(fragment_map[fragment_index] == FRAGMENT_STATUS_AVAILABLE); + assert(m_n_primary_fragments_in_block != -1); + assert(fragment_index < m_n_primary_fragments_in_block); + // return blockBuffer[fragment_index].data()+sizeof(FECPayloadHdr); + return blockBuffer[fragment_index].data() + sizeof(uint16_t); +} + +const int RxBlock::get_primary_fragment_data_size(const int fragment_index) { + assert(fragment_map[fragment_index] == FRAGMENT_STATUS_AVAILABLE); + assert(m_n_primary_fragments_in_block != -1); + assert(fragment_index < m_n_primary_fragments_in_block); + uint16_t* len_p = (uint16_t*)blockBuffer[fragment_index].data(); + return *len_p; +} + +int RxBlock::reconstructAllMissingData() { + // wifibroadcast::log::get_default()->debug("reconstructAllMissingData"<=FEC_K + assert(m_n_primary_fragments_in_block != -1); + assert(m_size_of_secondary_fragments != -1); + // do not reconstruct if reconstruction is impossible + assert(getNAvailableFragments() >= m_n_primary_fragments_in_block); + // also do not reconstruct if reconstruction is not needed + // const int nMissingPrimaryFragments = m_n_primary_fragments_in_block- + // m_n_available_primary_fragments; + auto recoveredFragmentIndices = + fecDecode(m_size_of_secondary_fragments, blockBuffer, + m_n_primary_fragments_in_block, fragment_map); + // now mark them as available + for (const auto idx : recoveredFragmentIndices) { + fragment_map[idx] = FRAGMENT_STATUS_AVAILABLE; + } + m_n_available_primary_fragments += recoveredFragmentIndices.size(); + // n of reconstructed packets + return recoveredFragmentIndices.size(); +} + +std::optional RxBlock::get_missing_primary_packets() const { + if (m_n_primary_fragments_in_block <= 0) return std::nullopt; + return m_n_primary_fragments_in_block - getNAvailableFragments(); +} + +std::string RxBlock::get_missing_primary_packets_readable() const { + const auto tmp = get_missing_primary_packets(); + if (tmp == std::nullopt) return "?"; + return std::to_string(tmp.value()); +} + +int RxBlock::get_n_primary_fragments() const { + return m_n_primary_fragments_in_block; +} \ No newline at end of file diff --git a/src/fec/RxBlock.h b/src/fec/RxBlock.h new file mode 100644 index 00000000..55b063ac --- /dev/null +++ b/src/fec/RxBlock.h @@ -0,0 +1,118 @@ +#ifndef RX_BLOCK_HPP +#define RX_BLOCK_HPP + +#include +#include +#include +#include +#include +#include + +#include "FECConstants.hpp" + + +// This encapsulates everything you need when working on a single FEC block on +// the receiver for example, addFragment() or pullAvailablePrimaryFragments() it +// also provides convenient methods to query if the block is fully forwarded or +// if it is ready for the FEC reconstruction step. +class RxBlock { + public: + // @param maxNFragmentsPerBlock max number of primary and secondary fragments + // for this block. you could just use MAX_TOTAL_FRAGMENTS_PER_BLOCK for that, + // but if your tx then uses (4:8) for example, you'd allocate much more memory + // every time for a new RX block than needed. + explicit RxBlock(unsigned int maxNFragmentsPerBlock, uint64_t blockIdx1); + // No copy constructor for safety + RxBlock(const RxBlock &) = delete; + // two blocks are the same if they refer to the same block idx: + constexpr bool operator==(const RxBlock &other) const { + return blockIdx == other.blockIdx; + } + // same for not equal operator + constexpr bool operator!=(const RxBlock &other) const { + return !(*this == other); + } + ~RxBlock() = default; + + public: + // returns true if this fragment has been already received + bool hasFragment(int fragment_idx); + // returns true if we are "done with this block" aka all data has been already + // forwarded + bool allPrimaryFragmentsHaveBeenForwarded() const; + // returns true if enough FEC secondary fragments are available to replace all + // missing primary fragments + bool allPrimaryFragmentsCanBeRecovered() const; + // returns true as soon as all primary fragments are available + bool allPrimaryFragmentsAreAvailable() const; + // copy the fragment data and mark it as available + // you should check if it is already available with hasFragment() to avoid + // copying the same fragment multiple times when using multiple RX cards + void addFragment(const uint8_t *data, const std::size_t dataLen); + // util to copy the packet size and payload (and not more) + void fragment_copy_payload(const int fragment_idx, const uint8_t *data, + const std::size_t dataLen); + /** + * @returns the indices for all primary fragments that have not yet been + * forwarded and are available (already received or reconstructed). Once an + * index is returned here, it won't be returned again (Therefore, as long as + * you immediately forward all primary fragments returned here,everything + * happens in order) + * @param discardMissingPackets : if true, gaps are ignored and fragments are + * forwarded even though this means the missing ones are irreversible lost Be + * carefully with this param, use it only before you need to get rid of a + * block */ + std::vector pullAvailablePrimaryFragments( + const bool discardMissingPackets = false); + const uint8_t *get_primary_fragment_data_p(const int fragment_index); + const int get_primary_fragment_data_size(const int fragment_index); + + // returns the n of primary and secondary fragments for this block + int getNAvailableFragments() const { + return m_n_available_primary_fragments + m_n_available_secondary_fragments; + } + /** + * Reconstruct all missing primary fragments (data packets) by using the + * received secondary (FEC) packets NOTE: reconstructing only part of the + * missing data is not supported ! (That's a non-fixable technical detail of + * FEC) NOTE: Do not call this method unless it is needed + * @return the n of reconstructed packets + */ + int reconstructAllMissingData(); + [[nodiscard]] uint64_t getBlockIdx() const { return blockIdx; } + [[nodiscard]] std::optional + getFirstFragmentTimePoint() const { + return firstFragmentTimePoint; + } + // Returns the number of missing primary packets (e.g. the n of actual data + // packets that are missing) This only works if we know the "fec_k" parameter + std::optional get_missing_primary_packets() const; + std::string get_missing_primary_packets_readable() const; + int get_n_primary_fragments() const; + + private: + // the block idx marks which block this element refers to + const uint64_t blockIdx = 0; + // n of primary fragments that are already pulled out + int nAlreadyForwardedPrimaryFragments = 0; + // for each fragment (via fragment_idx) store if it has been received yet + std::vector fragment_map; + // holds all the data for all received fragments (if fragment_map says + // UNAVALIABLE at this position, content is undefined) + std::vector> blockBuffer; + // time point when the first fragment for this block was received (via + // addFragment() ) + std::optional firstFragmentTimePoint = + std::nullopt; + // as soon as we know any of the fragments for this block, we know how many + // primary fragments this block contains (and therefore, how many primary or + // secondary fragments we need to fully reconstruct) + int m_n_primary_fragments_in_block = -1; + // for the fec step, we need the size of the fec secondary fragments, which + // should be equal for all secondary fragments + int m_size_of_secondary_fragments = -1; + int m_n_available_primary_fragments = 0; + int m_n_available_secondary_fragments = 0; +}; + +#endif // RX_BLOCK_HPP \ No newline at end of file diff --git a/src/wifibroadcast_spdlog.cpp b/src/wifibroadcast_spdlog.cpp index f3eb1f84..27f94ce8 100644 --- a/src/wifibroadcast_spdlog.cpp +++ b/src/wifibroadcast_spdlog.cpp @@ -4,9 +4,9 @@ #include "wifibroadcast_spdlog.h" -#include #include +#include #include std::shared_ptr wifibroadcast::log::create_or_get( diff --git a/src/wifibroadcast_spdlog.h b/src/wifibroadcast_spdlog.h index 9a208012..5d0ab097 100644 --- a/src/wifibroadcast_spdlog.h +++ b/src/wifibroadcast_spdlog.h @@ -7,9 +7,7 @@ //#include "wifibroadcast-spdlog-fake.h" -//#include -//#include -#include +#include #include namespace wifibroadcast::log{