From cdf8aac7e3a88df0fb93586bbf47b17c192ae2fc Mon Sep 17 00:00:00 2001 From: Tobias Frust Date: Thu, 21 Jul 2016 09:33:42 +0200 Subject: added ReceiverThreads for tests with 40GBE --- src/CMakeLists.txt | 1 + src/CMakeLists.txt~ | 14 ++++++++-- src/DetectorModule/DetectorModule.cpp | 2 +- src/ReceiverThreads/ReceiverThreads.cpp | 48 +++++++++++++++++++++++++++++++++ src/ReceiverThreads/ReceiverThreads.h | 35 ++++++++++++++++++++++++ src/UDPServer/UDPServer.cpp | 26 +++++------------- src/main_server.cpp | 33 ++++++++++++----------- 7 files changed, 122 insertions(+), 37 deletions(-) create mode 100644 src/ReceiverThreads/ReceiverThreads.cpp create mode 100644 src/ReceiverThreads/ReceiverThreads.h diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index d77a039..d4ee49d 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -45,6 +45,7 @@ set(SOURCES_CLIENT ) set(SOURCES_SERVER + "${CMAKE_SOURCE_DIR}/ReceiverThreads/ReceiverThreads.cpp" "${CMAKE_SOURCE_DIR}/UDPServer/UDPServer.cpp" "${CMAKE_SOURCE_DIR}/main_server.cpp" ) diff --git a/src/CMakeLists.txt~ b/src/CMakeLists.txt~ index 80196d9..ebe2f50 100644 --- a/src/CMakeLists.txt~ +++ b/src/CMakeLists.txt~ @@ -31,9 +31,18 @@ include_directories( ${BOOST_INCLUDE_DIRS} ) +set(LINK_LIBRARIES ${LINK_LIBRARIES} + ${LIBCONFIGPP_LIBRARY} + ${Boost_LIBRARIES} +) + set(SOURCES_CLIENT + "${CMAKE_SOURCE_DIR}/ConfigReader/ConfigReader.cpp" "${CMAKE_SOURCE_DIR}/UDPClient/UDPClient.cpp" - "${CMAKE_SOURCE_DIR}/main.cpp" + "${CMAKE_SOURCE_DIR}/DetectorModule/DetectorModule.cpp" + "${CMAKE_SOURCE_DIR}/Detector/Detector.cpp" + "${CMAKE_SOURCE_DIR}/main_client.cpp" + "${CMAKE_SOURCE_DIR}/ReceiverThreads/ReceiverThreads.cpp" ) set(SOURCES_SERVER @@ -43,5 +52,6 @@ set(SOURCES_SERVER add_executable(onlineDetectorSimulatorServer ${SOURCES_SERVER}) add_executable(onlineDetectorSimulatorClient ${SOURCES_CLIENT}) - +target_link_libraries(onlineDetectorSimulatorClient ${LINK_LIBRARIES}) +target_link_libraries(onlineDetectorSimulatorServer ${LINK_LIBRARIES}) diff --git a/src/DetectorModule/DetectorModule.cpp b/src/DetectorModule/DetectorModule.cpp index bee50e9..9c3d98f 100644 --- a/src/DetectorModule/DetectorModule.cpp +++ b/src/DetectorModule/DetectorModule.cpp @@ -28,7 +28,7 @@ void timer_start(std::function func, unsigned int interval){ DetectorModule::DetectorModule(const int detectorID, const std::string& address, const std::string& configPath) : detectorID_{detectorID}, numberOfDetectorsPerModule_{16}, - index_{0}, + index_{1}, client_{address, detectorID+4000} { printf("Creating %d\n", detectorID); diff --git a/src/ReceiverThreads/ReceiverThreads.cpp b/src/ReceiverThreads/ReceiverThreads.cpp new file mode 100644 index 0000000..6f389f2 --- /dev/null +++ b/src/ReceiverThreads/ReceiverThreads.cpp @@ -0,0 +1,48 @@ +/* + * Copyright 2016 + * + * ReceiverThreads.cpp + * + * Created on: 21.07.2016 + * Author: Tobias Frust + */ + +#include "ReceiverThreads.h" +#include "../UDPServer/UDPServer.h" + +#include + +ReceiverThreads::ReceiverThreads(const std::string& address, const int timeIntervall, const int numberOfDetectorModules) + : timeIntervall_{timeIntervall}, numberOfDetectorModules_{numberOfDetectorModules}, address_{address}, loss_{0} { + + for(auto i = 0; i < numberOfDetectorModules; i++){ + receiverModules_.emplace_back(&ReceiverThreads::receiverThread, this, 4000+i); + } + + for(auto i = 0; i < numberOfDetectorModules; i++){ + receiverModules_[i].join(); + } + +} + +auto ReceiverThreads::receiverThread(const int port) -> void { + UDPServer server = UDPServer(address_, port); + std::vector buf(16000); + std::size_t lastIndex{0}; + BOOST_LOG_TRIVIAL(info) << "Address: " << address_ << " port: " << port << " timeout: " << timeIntervall_; + while(true){ + int bytes = server.timed_recv((char*)buf.data(), buf.size()*sizeof(unsigned short), timeIntervall_); + if(bytes < 0){ + break; + } + std::size_t index = *((std::size_t *)buf.data()); + int diff = index - lastIndex - 1; + if(diff > 0){ + BOOST_LOG_TRIVIAL(warning) << "Packet loss or wrong order! new: " << index << " old: " << lastIndex; + } + loss_ += diff; + lastIndex = index; + } + BOOST_LOG_TRIVIAL(info) << "Lost " << loss_ << " from " << lastIndex << " packets; (" << loss_/(double)lastIndex << "%)"; +} + diff --git a/src/ReceiverThreads/ReceiverThreads.h b/src/ReceiverThreads/ReceiverThreads.h new file mode 100644 index 0000000..7cb04c0 --- /dev/null +++ b/src/ReceiverThreads/ReceiverThreads.h @@ -0,0 +1,35 @@ +/* + * Copyright 2016 + * + * ReceiverThreads.h + * + * Created on: 21.07.2016 + * Author: Tobias Frust + */ + +#ifndef RECEIVERTHREADS_H_ +#define RECEIVERTHREADS_H_ + +#include +#include + +class ReceiverThreads { +public: + ReceiverThreads(const std::string& address, const int timeIntervall, const int numberOfDetectorModules); + + auto run() -> void; +private: + auto receiverThread(const int port) -> void; + + std::vector receiverModules_; + + std::size_t loss_; + + int timeIntervall_; + int numberOfDetectorModules_; + + std::string address_; + +}; + +#endif /* RECEIVERTHREADS_H_ */ diff --git a/src/UDPServer/UDPServer.cpp b/src/UDPServer/UDPServer.cpp index 3a50d0c..8c9decf 100644 --- a/src/UDPServer/UDPServer.cpp +++ b/src/UDPServer/UDPServer.cpp @@ -168,31 +168,19 @@ int UDPServer::recv(char *msg, size_t max_size) * * \param[in] msg The buffer where the message will be saved. * \param[in] max_size The size of the \p msg buffer in bytes. - * \param[in] max_wait_ms The maximum number of milliseconds to wait for a message. + * \param[in] max_wait_s The maximum number of seconds to wait for a message. * * \return -1 if an error occurs or the function timed out, the number of bytes received otherwise. */ -int UDPServer::timed_recv(char *msg, size_t max_size, int max_wait_ms) +int UDPServer::timed_recv(char *msg, size_t max_size, int max_wait_s) { fd_set s; FD_ZERO(&s); FD_SET(f_socket, &s); struct timeval timeout; - timeout.tv_sec = max_wait_ms / 1000; - timeout.tv_usec = (max_wait_ms % 1000) * 1000; - int retval = select(f_socket + 1, &s, &s, &s, &timeout); - if(retval == -1) - { - // select() set errno accordingly - return -1; - } - if(retval > 0) - { - // our socket has data - return ::recv(f_socket, msg, max_size, 0); - } - - // our socket has no data - errno = EAGAIN; - return -1; + timeout.tv_sec = max_wait_s; + timeout.tv_usec = 0; + setsockopt(f_socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout,sizeof(struct timeval)); + // our socket has data + return ::recv(f_socket, msg, max_size, 0); } diff --git a/src/main_server.cpp b/src/main_server.cpp index 6e936e4..90b3835 100644 --- a/src/main_server.cpp +++ b/src/main_server.cpp @@ -1,4 +1,5 @@ #include "UDPServer/UDPServer.h" +#include "ReceiverThreads/ReceiverThreads.h" #include #include @@ -30,9 +31,9 @@ int main (int argc, char *argv[]){ initLog(); std::string address = "localhost"; - int port = 4002; - - UDPServer server = UDPServer(address, port); +// int port = 4002; +// +// UDPServer server = UDPServer(address, port); std::size_t length{32768}; std::size_t lastIndex{0}; @@ -41,6 +42,8 @@ int main (int argc, char *argv[]){ std::cout << "Receiving UDP packages: " << std::endl; + ReceiverThreads(address, 10, 27); + // for(auto i = 0; i < 27; i++){ // std::function f = [=]() { // server.recv(); @@ -48,18 +51,18 @@ int main (int argc, char *argv[]){ // start(); // } - while(true){ - int bytes = server.recv((char*)buf.data(), length); - std::size_t index = *((std::size_t *)buf.data()); - if(index%1000 == 99) printf("%lu\n", index); - - if(lastIndex != (index-1)) - BOOST_LOG_TRIVIAL(warning) << "Packet loss or wrong order!"; - - lastIndex = index; - - BOOST_LOG_TRIVIAL(debug) << "Server: Received " << bytes << " Bytes with Index " << index; - } +// while(true){ +// int bytes = server.recv((char*)buf.data(), length); +// std::size_t index = *((std::size_t *)buf.data()); +// if(index%1000 == 99) printf("%lu\n", index); +// +// if(lastIndex != (index-1)) +// BOOST_LOG_TRIVIAL(warning) << "Packet loss or wrong order!"; +// +// lastIndex = index; +// +// BOOST_LOG_TRIVIAL(debug) << "Server: Received " << bytes << " Bytes with Index " << index; +// } return 0; -- cgit v1.2.3