/* * Copyright 2016 Tobias Frust * * DetectorModule.cpp * * Created on: 29.06.2016 * Author: Tobias Frust */ #include "DetectorModule.h" #include "../ConfigReader/ConfigReader.h" #include #include #include void timer_start(std::function func, unsigned int interval, unsigned int max_packets){ std::thread([func, interval, max_packets]() { int packets = 1; auto next = std::chrono::high_resolution_clock::now(); while (true) { func(packets); next += std::chrono::microseconds(packets * interval); auto now = std::chrono::high_resolution_clock::now(); if (now > next) { std::chrono::nanoseconds late = now - next; packets = 1 + (late.count() / interval / 1000); if (packets > max_packets) packets = max_packets; } else { packets = 1; } std::this_thread::sleep_until(next); // std::this_thread::sleep_for(std::chrono::microseconds(interval)); } }).detach(); } DetectorModule::DetectorModule(const int detectorID, const std::string& address, const int port, const std::string& configPath) : detectorID_{detectorID}, numberOfDetectorsPerModule_{16}, index_{0u}, client_{address, port}, max_packets_{1000u}{ if (readConfig(configPath)) { throw std::runtime_error("DetectorModule: Configuration file could not be loaded successfully. Please check!"); } sendBuffer_.resize(max_packets_); for(auto &it: sendBuffer_) { it.resize(numberOfProjectionsPerPacket_ * numberOfDetectorsPerModule_ * sizeof(unsigned short) + sizeof(size_t) + sizeof(short int)); } //read the input data from the file corresponding to the detectorModuleID readInput(); //unsigned int sinoSize = numberOfDetectorsPerModule_*numberOfProjections_; //std::copy(((char*)buffer_.data()), ((char*)buffer_.data())+sinoSize*sizeof(unsigned short), sendBuffer_.begin()+sizeof(std::size_t)); printf("Created %d\n", detectorID); } auto DetectorModule::send(int packets = 1) -> void{ BOOST_LOG_TRIVIAL(debug) << "Detectormodule " << detectorID_ << " :sending udp packet with index " << index_ << "."; int numberOfParts = numberOfProjections_/numberOfProjectionsPerPacket_; // sendBuffer_[0] = (sizeof(std::size_t)) & 0xff; // sendBuffer_[1] = (sizeof(std::size_t) >> 8) & 0xff; // sendBuffer_[2] = (sizeof(std::size_t) >> 16) & 0xff; // sendBuffer_[3] = (sizeof(std::size_t) >> 24) & 0xff; // sendBuffer_[4] = (sizeof(std::size_t) >> 32) & 0xff; // sendBuffer_[5] = (sizeof(std::size_t) >> 40) & 0xff; // sendBuffer_[6] = (sizeof(std::size_t) >> 48) & 0xff; // sendBuffer_[7] = (sizeof(std::size_t) >> 56) & 0xff; struct mmsghdr msg[packets]; struct iovec msgvec[packets]; unsigned int hdrSize = sizeof(size_t) + sizeof(short int); unsigned int sinoSize = numberOfDetectorsPerModule_ * numberOfProjectionsPerPacket_; memset(msg, 0, sizeof(msg)); memset(msgvec, 0, sizeof(msgvec)); for (int i = 0; i < packets; i++) { unsigned int bufferSizeIndex = index_ % 1000; char *ptr = sendBuffer_[i].data(); msgvec[i].iov_base = sendBuffer_[i].data(); msgvec[i].iov_len = sendBuffer_[i].size(); msg[i].msg_hdr.msg_iov = &msgvec[i]; msg[i].msg_hdr.msg_iovlen = 1; *reinterpret_cast(ptr) = index_ * numberOfParts + partID_; *reinterpret_cast(ptr + sizeof(size_t)) = partID_; memcpy(ptr + hdrSize, buffer_.data() + sinoSize * (bufferSizeIndex * numberOfParts + partID_), sinoSize * sizeof(unsigned short)); partID_ = (partID_ + 1) % numberOfParts; if (partID_ == 0) ++index_; } client_.msend(packets, msg); auto ts = std::chrono::high_resolution_clock::now(); std::chrono::nanoseconds d = ts - ts_; counter_ += packets; if (d.count() >= 1000000000) { printf("Packets %i (%zu bytes, %.3lf GBit/s) in %.3lf ms\n", counter_, sendBuffer_[0].size(), 8. * counter_ * sendBuffer_[0].size() / 1024 / 1024 / 1024, 1. * d.count() / 1000000); counter_ = 0; ts_ = ts; } } auto DetectorModule::sendPeriodically(unsigned int timeIntervall) -> void { counter_ = 0; ips_ = 1000000. / ((double)timeIntervall); ts_ = std::chrono::high_resolution_clock::now(); std::function f = [=](int packets = 1) { this->send(packets); }; timer_start(f, timeIntervall, max_packets_); } auto DetectorModule::readInput() -> void { if(path_.back() != '/') path_.append("/"); //open file const std::string filePath = path_ + fileName_ + std::to_string(detectorID_+1) + fileEnding_; BOOST_LOG_TRIVIAL(debug) << "DetectorModule: Path = " << filePath; std::ifstream input(filePath, std::ios::in | std::ios::binary); if(input){ //allocate memory in vector std::streampos fileSize; input.seekg(0, std::ios::end); fileSize = input.tellg(); input.seekg(0, std::ios::beg); buffer_.resize(fileSize / sizeof(unsigned short)); input.read((char*) &buffer_[0], fileSize); }else{ throw std::runtime_error("File not found."); } } auto DetectorModule::readConfig(const std::string& configFile) -> bool { ConfigReader configReader = ConfigReader(configFile.data()); int samplingRate, scanRate; if (configReader.lookupValue("numberOfFanDetectors", numberOfDetectors_) && configReader.lookupValue("dataInputPath", path_) && configReader.lookupValue("dataFileName", fileName_) && configReader.lookupValue("dataFileEnding", fileEnding_) && configReader.lookupValue("numberOfPlanes", numberOfPlanes_) && configReader.lookupValue("samplingRate", samplingRate) && configReader.lookupValue("scanRate", scanRate) && configReader.lookupValue("numberOfDataFrames", numberOfFrames_) && configReader.lookupValue("numberOfProjectionsPerPacket", numberOfProjectionsPerPacket_) && configReader.lookupValue("numberOfDetectorsPerModule", numberOfDetectorsPerModule_)) { numberOfProjections_ = samplingRate * 1000000 / scanRate; return EXIT_SUCCESS; } return EXIT_FAILURE; }