Agora  1.2.0
Agora project
sender.h
Go to the documentation of this file.
1 
5 #ifndef SENDER_H_
6 #define SENDER_H_
7 
8 #include <emmintrin.h>
9 #include <immintrin.h>
10 #include <netinet/ether.h>
11 #include <sys/types.h>
12 #include <unistd.h>
13 
14 #include <algorithm>
15 #include <chrono>
16 #include <iostream>
17 #include <numeric>
18 #include <thread>
19 #include <vector>
20 
21 #include "common_typedef_sdk.h"
22 #include "concurrentqueue.h"
23 #include "config.h"
24 #include "datatype_conversion.h"
25 #include "gettime.h"
26 #include "memory_manage.h"
27 #include "message.h"
28 #include "mkl_dfti.h"
29 #include "symbols.h"
30 #include "utils.h"
31 
32 #if defined(USE_DPDK)
33 #include "dpdk_transport.h"
34 #endif
35 
36 class Sender {
37  public:
38  static constexpr size_t kDequeueBulkSize = 4;
39 
58  Sender(Config* cfg, size_t socket_thread_num, size_t core_offset = 30,
59  size_t frame_duration = 1000, size_t inter_frame_delay = 0,
60  size_t enable_slow_start = 1,
61  const std::string& server_mac_addr_str = "ff:ff:ff:ff:ff:ff",
62  bool create_thread_for_master = false);
63 
64  ~Sender();
65 
66  void StartTx();
67 
68  // in_frame_start and in_frame_end must have space for at least
69  // kNumStatsFrames entries
70  void StartTxfromMain(double* in_frame_start, double* in_frame_end);
71 
72  private:
73  void* MasterThread(int tid);
74  void* WorkerThread(int tid);
75 
83  void InitIqFromFile(const std::string& filename);
84 
85  // Get number of CPU ticks for a symbol given a frame index
86  uint64_t GetTicksForFrame(size_t frame_id) const;
87  size_t GetMaxSymbolId() const;
88 
89  // Launch threads to run worker with thread IDs from tid_start to tid_end
90  void CreateWorkerThreads(size_t num_workers);
91 
92  void DelayForSymbol(size_t tx_frame_count, uint64_t tick_start);
93  void DelayForFrame(size_t tx_frame_count, uint64_t tick_start);
94 
95  void WriteStatsToFile(size_t tx_frame_count) const;
96 
97  size_t FindNextSymbol(size_t start_symbol);
98  void ScheduleSymbol(size_t frame, size_t symbol_id);
99 
100  // Run FFT on the data field in pkt, output to fft_inout
101  // Recombine pkt header data and fft output data into payload
102  void RunFft(Packet* pkt, complex_float* fft_inout,
103  DFTI_DESCRIPTOR_HANDLE mkl_handle) const;
104 
106  const double freq_ghz_; // RDTSC frequency in GHz
107  const double ticks_per_usec_; // RDTSC frequency in GHz
108  const size_t socket_thread_num_; // Number of worker threads sending pkts
109  const size_t enable_slow_start_; // If 1, send frames slowly at first
110 
111  // The master thread runs on core core_offset. Worker threads use cores
112  // {core_offset + 1, ..., core_offset + thread_num - 1}
113  const size_t core_offset_;
115  const size_t inter_frame_delay_;
116 
117  // RDTSC clock ticks between the start of transmission of two symbols in
118  // the steady state
119  uint64_t ticks_all_;
120 
121  // ticks_wnd_1 and ticks_wnd_2 are the RDTSC clock ticks between the start
122  // of transmission of two symbols for the first several frames
123  uint64_t ticks_wnd1_;
124  uint64_t ticks_wnd2_;
125 
126  // RDTSC clock ticks between the end of a frame and the start of the next
127  // frame
128  const uint64_t ticks_inter_frame_;
129 
135 
136  // First dimension: symbol_num_perframe * BS_ANT_NUM
137  // Second dimension: (CP_LEN + OFDM_CA_NUM) * 2
139 
140  // Number of packets transmitted for each symbol in a frame
142 
143  double* frame_start_;
144  double* frame_end_;
145 
146  std::vector<std::thread> threads_;
147 
148 #if defined(USE_DPDK)
149  std::vector<uint16_t> port_ids_;
150  struct rte_mempool* mbuf_pool_;
151  uint32_t bs_rru_addr_; // IPv4 address of this data sender
152  uint32_t bs_server_addr_; // IPv4 address of the remote target Agora server
153  // MAC addresses of this data sender
154  std::vector<rte_ether_addr> sender_mac_addr_;
155  // MAC addresses of the remote target Agora server
156  std::vector<rte_ether_addr> server_mac_addr_;
157 #endif
158 };
159 
160 #endif // SENDER_H_
FrameStats::NumPilotSyms
size_t NumPilotSyms() const
Definition: framestats.cc:91
fmt::v8::detail::byte
byte
Definition: core.h:388
Sender::core_offset_
const size_t core_offset_
Definition: sender.h:113
Sender::GetTicksForFrame
uint64_t GetTicksForFrame(size_t frame_id) const
Definition: sender.cc:539
moodycamel::ProducerToken
Definition: concurrentqueue.h:630
Sender::iq_data_short_
Table< short > iq_data_short_
Definition: sender.h:138
Sender::ticks_wnd2_
uint64_t ticks_wnd2_
Definition: sender.h:124
Sender::DelayForFrame
void DelayForFrame(size_t tx_frame_count, uint64_t tick_start)
ant_num
for ant_num
Definition: parse_all_dl.m:12
Packet::frame_id_
uint32_t frame_id_
Definition: message.h:168
Sender::ticks_inter_frame_
const uint64_t ticks_inter_frame_
Definition: sender.h:128
Table::Calloc
void Calloc(size_t dim1, size_t dim2, Agora_memory::Alignment_t alignment)
Definition: memory_manage.h:45
complex_float
Definition: test_transpose.cc:22
kDebugSenderReceiver
static constexpr bool kDebugSenderReceiver
Definition: symbols.h:354
PinToCoreWithOffset
void PinToCoreWithOffset(ThreadType thread_type, size_t core_offset, size_t thread_id, bool allow_reuse, bool verbose)
Definition: utils.cc:157
Sender::task_ptok_
moodycamel::ProducerToken ** task_ptok_
Definition: sender.h:134
Config::SampsPerSymbol
size_t SampsPerSymbol() const
Definition: config.h:234
Sender::send_queue_
moodycamel::ConcurrentQueue< size_t > send_queue_
Definition: sender.h:130
fmt::v8::printf
auto printf(const S &fmt, const T &... args) -> int
Definition: printf.h:631
datatype_conversion.h
Config::Rate
double Rate() const
Definition: config.h:58
DpdkTransport::AllocUdp
static rte_mbuf * AllocUdp(rte_mempool *mbuf_pool, rte_ether_addr src_mac_addr, rte_ether_addr dst_mac_addr, uint32_t src_ip_addr, uint32_t dst_ip_addr, uint16_t src_udp_port, uint16_t dst_udp_port, size_t buffer_length, uint16_t pkt_id)
sender.h
Declaration file for the sender class.
Config::NumChannels
size_t NumChannels() const
Definition: config.h:98
Sender::RunFft
void RunFft(Packet *pkt, complex_float *fft_inout, DFTI_DESCRIPTOR_HANDLE mkl_handle) const
Definition: sender.cc:609
kUse12BitIQ
static constexpr bool kUse12BitIQ
Definition: symbols.h:178
moodycamel::ConcurrentQueue::try_dequeue_bulk_from_producer
size_t try_dequeue_bulk_from_producer(producer_token_t const &producer, It itemFirst, size_t max)
Definition: concurrentqueue.h:1292
DpdkTransport::GetPortIDFromMacAddr
static std::vector< uint16_t > GetPortIDFromMacAddr(size_t port_num, const std::string &mac_addrs)
Sender::completion_queue_
moodycamel::ConcurrentQueue< size_t > completion_queue_
Definition: sender.h:132
SymbolType::kPilot
@ kPilot
gen_tag_t::symbol_id_
uint16_t symbol_id_
Definition: message.h:33
Config::CpLen
size_t CpLen() const
Definition: config.h:46
Sender::FindNextSymbol
size_t FindNextSymbol(size_t start_symbol)
Definition: sender.cc:195
kFrameWnd
static constexpr size_t kFrameWnd
Definition: symbols.h:18
unused
#define unused(x)
Definition: utils.h:14
Sender::WorkerThread
void * WorkerThread(int tid)
Definition: sender.cc:333
Agora_memory::PaddedAlignedAlloc
void * PaddedAlignedAlloc(Alignment_t alignment, size_t size)
Definition: memory_manage.cc:15
Packet::data_
short data_[]
Definition: message.h:173
GetTime::GetTimeUs
static double GetTimeUs()
Definition: gettime.h:14
Sender::StartTx
void StartTx()
Definition: sender.cc:176
Sender::socket_thread_num_
const size_t socket_thread_num_
Definition: sender.h:108
memory_manage.h
Sender::threads_
std::vector< std::thread > threads_
Definition: sender.h:146
Config::BsAntNum
size_t BsAntNum() const
Definition: config.h:35
Packet::ant_id_
uint32_t ant_id_
Definition: message.h:171
ThreadType::kMasterTX
@ kMasterTX
fclose
fclose(fileID)
AGORA_LOG_ERROR
#define AGORA_LOG_ERROR(...)
Definition: logger.h:39
TOSTRING
#define TOSTRING(x)
Definition: symbols.h:14
Config::Frame
const FrameStats & Frame() const
Definition: config.h:340
GetTime::CyclesToUs
static double CyclesToUs(size_t cycles, double freq_ghz)
Definition: gettime.h:97
AGORA_LOG_TRACE
#define AGORA_LOG_TRACE(...)
Definition: logger.h:92
Sender::StartTxfromMain
void StartTxfromMain(double *in_frame_start, double *in_frame_end)
Definition: sender.cc:188
Sender::DelayForSymbol
void DelayForSymbol(size_t tx_frame_count, uint64_t tick_start)
Sender::inter_frame_delay_
const size_t inter_frame_delay_
Definition: sender.h:115
Table< float >
DelayTicks
void DelayTicks(uint64_t start, uint64_t ticks)
Definition: sender.cc:33
Sender::enable_slow_start_
const size_t enable_slow_start_
Definition: sender.h:109
gen_tag_t
Definition: message.h:22
Sender::Sender
Sender(Config *cfg, size_t socket_thread_num, size_t core_offset=30, size_t frame_duration=1000, size_t inter_frame_delay=0, size_t enable_slow_start=1, const std::string &server_mac_addr_str="ff:ff:ff:ff:ff:ff", bool create_thread_for_master=false)
Create and optionally start a Sender that sends IQ packets to a server with MAC address [server_mac_a...
Definition: sender.cc:39
filename
filename
Definition: parse_all_dl.m:14
SimdConvertShortToFloat
static void SimdConvertShortToFloat(const short *in_buf, float *out_buf, size_t n_elems)
Definition: datatype_conversion.h:126
Config::FftInRru
bool FftInRru() const
Definition: config.h:311
Config::DpdkPortOffset
uint16_t DpdkPortOffset() const
Definition: config.h:314
Sender::~Sender
~Sender()
Definition: sender.cc:154
Config::BsRruPort
int BsRruPort() const
Definition: config.h:305
Sender::InitIqFromFile
void InitIqFromFile(const std::string &filename)
Read time-domain 32-bit floating-point IQ samples from [filename] and populate iq_data_short_ by conv...
Definition: sender.cc:551
Packet::cell_id_
uint32_t cell_id_
Definition: message.h:170
message.h
Self defined functions for message storage and passing.
Table::Free
void Free()
Definition: memory_manage.h:84
unlikely
#define unlikely(x)
Definition: utils.h:16
Config::BsServerPort
int BsServerPort() const
Definition: config.h:304
gen_tag_t::FrmSymAnt
static gen_tag_t FrmSymAnt(size_t frame_id, size_t symbol_id, size_t ant_id)
Definition: message.h:106
nlohmann::json_v3_11_1NLOHMANN_JSON_ABI_TAG_LEGACY_DISCARDED_VALUE_COMPARISON::detail2::begin
begin_tag begin(T &&...)
Sender::WriteStatsToFile
void WriteStatsToFile(size_t tx_frame_count) const
Definition: sender.cc:596
keep_running
static std::atomic< bool > keep_running
Definition: sender.cc:24
i
for i
Definition: generate_data.m:107
GetTime
Definition: gettime.h:11
Config::OfdmCaNum
size_t OfdmCaNum() const
Definition: config.h:45
Packet::kOffsetOfData
static constexpr size_t kOffsetOfData
Definition: message.h:166
moodycamel::ConcurrentQueue::enqueue
bool enqueue(T const &item)
Definition: concurrentqueue.h:974
Catch::cout
std::ostream & cout()
u
Plot Rx waveform for u
Definition: inspect_single_frame.m:108
GetTime::Rdtsc
static size_t Rdtsc()
Return the TSC.
Definition: gettime.h:25
fmt::v8::fprintf
auto fprintf(std::FILE *f, const S &fmt, const T &... args) -> int
Definition: printf.h:607
Sender::kDequeueBulkSize
static constexpr size_t kDequeueBulkSize
Definition: sender.h:38
Config::BsServerAddr
std::string BsServerAddr() const
Definition: config.h:299
Agora_memory::Alignment_t::kAlign64
@ kAlign64
Sender::ticks_per_usec_
const double ticks_per_usec_
Definition: sender.h:107
Packet
Definition: message.h:164
Packet::symbol_id_
uint32_t symbol_id_
Definition: message.h:169
SimdConvertFloat32ToFloat16
static void SimdConvertFloat32ToFloat16(float *out_buf, const float *in_buf, size_t n_elems)
Definition: datatype_conversion.h:576
gen_tag_t::tag_
size_t tag_
Definition: message.h:43
start
end start
Definition: inspect_agora_results.m:95
Config::FramesToTest
size_t FramesToTest() const
Definition: config.h:309
kNumStatsFrames
static constexpr size_t kNumStatsFrames
Definition: symbols.h:300
kDebugPrintSender
static constexpr bool kDebugPrintSender
Definition: sender.cc:22
DpdkTransport::CreateMempool
static rte_mempool * CreateMempool(size_t num_ports, size_t packet_length=kJumboFrameMaxSize)
moodycamel::ConcurrentQueue::enqueue_bulk
bool enqueue_bulk(It itemFirst, size_t count)
Definition: concurrentqueue.h:1016
Sender::ScheduleSymbol
void ScheduleSymbol(size_t frame, size_t symbol_id)
Definition: sender.cc:208
symbols.h
Sender::ticks_all_
uint64_t ticks_all_
Definition: sender.h:119
moodycamel::ConcurrentQueue< size_t >
Config::GetSymbolType
SymbolType GetSymbolType(size_t symbol_id) const
Return the symbol type of this symbol in this frame.
Definition: config.cc:1557
Sender::frame_start_
double * frame_start_
Definition: sender.h:143
GetTime::MeasureRdtscFreq
static double MeasureRdtscFreq()
Definition: gettime.h:51
FrameStats::NumULSyms
size_t NumULSyms() const
Definition: framestats.cc:85
DpdkTransport::NicInit
static int NicInit(uint16_t port, struct rte_mempool *mbuf_pool, int thread_num, size_t pkt_len=kJumboFrameMaxSize)
Agora_memory::Alignment_t::kAlign32
@ kAlign32
Config
Definition: config.h:26
kMacAddrBtyes
static constexpr size_t kMacAddrBtyes
Definition: dpdk_transport.h:35
AGORA_LOG_INFO
#define AGORA_LOG_INFO(...)
Definition: logger.h:62
Sender::cfg_
Config * cfg_
Definition: sender.h:105
Config::DpdkMacAddrs
const std::string & DpdkMacAddrs() const
Definition: config.h:316
AGORA_LOG_FRAME
#define AGORA_LOG_FRAME(...)
Definition: logger.h:72
RtAssert
static void RtAssert(bool condition, const char *throw_str)
Definition: utils.h:104
num_workers_ready_atomic
static std::atomic< size_t > num_workers_ready_atomic
Definition: sender.cc:26
to_string
std::string to_string() const
Definition: eth_common.h:64
SimdConvertFloatToShort
static void SimdConvertFloatToShort(const float *in_buf, short *out_buf, size_t n_elems, size_t n_prefix=0, float scale_down_factor=1.0f)
Definition: datatype_conversion.h:266
ThreadType::kWorkerTX
@ kWorkerTX
Sender::CreateWorkerThreads
void CreateWorkerThreads(size_t num_workers)
Definition: sender.cc:590
Config::NumCells
size_t NumCells() const
Definition: config.h:94
config.h
Declaration file for the configuration class which importants json configuration values into class va...
SymbolType
SymbolType
Definition: symbols.h:261
Sender
Definition: sender.h:36
kPayloadOffset
static constexpr size_t kPayloadOffset
Offset to the payload starting from the beginning of the UDP frame.
Definition: dpdk_transport.h:48
Sender::frame_end_
double * frame_end_
Definition: sender.h:144
moodycamel::ConcurrentQueue::try_dequeue
bool try_dequeue(U &item)
Definition: concurrentqueue.h:1104
gettime.h
ConvertFloatTo12bitIq
static void ConvertFloatTo12bitIq(const float *in_buf, uint8_t *out_buf, size_t n_elems)
Definition: datatype_conversion.h:298
Config::PacketLength
size_t PacketLength() const
Definition: config.h:238
dpdk_transport.h
Declaration file for the DpdkTransport class.
Sender::frame_duration_
size_t frame_duration_
Definition: sender.h:114
gen_tag_t::frame_id_
uint32_t frame_id_
Definition: message.h:32
Config::NumRadios
size_t NumRadios() const
Definition: config.h:95
Sender::MasterThread
void * MasterThread(int tid)
Definition: sender.cc:224
max
max(y1, y1_1)
Sender::freq_ghz_
const double freq_ghz_
Definition: sender.h:106
SymbolType::kUL
@ kUL
frame_duration
frame_duration
Definition: parse_dl_file.m:27
DpdkTransport::DpdkInit
static void DpdkInit(uint16_t core_offset, size_t thread_num)
Init dpdk on core [core_offset:core_offset+thread_num].
Config::DpdkNumPorts
uint16_t DpdkNumPorts() const
Definition: config.h:313
kDebugPrintPerFrameDone
static constexpr bool kDebugPrintPerFrameDone
Definition: symbols.h:196
concurrentqueue.h
Sender::packet_count_per_symbol_
size_t * packet_count_per_symbol_[kFrameWnd]
Definition: sender.h:141
nlohmann::json_v3_11_1NLOHMANN_JSON_ABI_TAG_LEGACY_DISCARDED_VALUE_COMPARISON::detail2::end
end_tag end(T &&...)
Sender::GetMaxSymbolId
size_t GetMaxSymbolId() const
Config::BsRruAddr
std::string BsRruAddr() const
Definition: config.h:302
FrameStats::NumTotalSyms
size_t NumTotalSyms() const
Definition: framestats.cc:93
Sender::ticks_wnd1_
uint64_t ticks_wnd1_
Definition: sender.h:123
InterruptHandler
void InterruptHandler(int)
Definition: sender.cc:28