Agora  1.2.0
Agora project
mac_sender.h
Go to the documentation of this file.
1 
5 #ifndef MAC_SENDER_H_
6 #define MAC_SENDER_H_
7 
8 #include <cstdint>
9 #include <functional>
10 #include <thread>
11 #include <vector>
12 
13 #include "concurrentqueue.h"
14 #include "config.h"
15 #include "mac_data_receiver.h"
16 #include "memory_manage.h"
17 #include "message.h"
18 
19 class MacSender {
20  public:
21  static constexpr size_t kDequeueBulkSize = 4;
22  static constexpr size_t kMessageQueueSize = 1024;
23 
46  MacSender(Config* cfg, std::string& data_filename, size_t mac_packet_length,
47  size_t mac_payload_max_length, size_t packets_per_frame,
48  std::string server_address, size_t server_rx_port,
49  std::function<size_t(size_t)> get_data_symbol_id,
50  size_t core_offset = 30, size_t worker_thread_num = 1,
51  size_t update_thread_num = 1, size_t frame_duration_us = 0,
52  size_t inter_frame_delay = 0, size_t enable_slow_start = 1,
53  bool create_thread_for_master = false);
54  ~MacSender();
55 
56  void StartTx();
57 
58  // in_frame_start and in_frame_end must have space for at least
59  // kNumStatsFrames entries
60  void StartTxfromMain(double* in_frame_start, double* in_frame_end);
61 
62  private:
63  void* MasterThread(size_t tid);
64  void* WorkerThread(size_t tid);
65  void* DataUpdateThread(size_t tid, size_t num_data_sources);
66 
67  // Get number of CPU ticks for a symbol given a frame index
68  uint64_t GetTicksForFrame(size_t frame_id) const;
69  size_t GetMaxSymbolId() const;
70 
71  // Launch threads to run worker with thread IDs from tid_start to tid_end
72  void CreateWorkerThreads(size_t num_workers);
73 
74  void UpdateTxBuffer(MacDataReceiver* data_source, gen_tag_t tag);
75  void WriteStatsToFile(size_t tx_frame_count) const;
76 
77  void ScheduleFrame(size_t frame);
78  void LoadFrame(size_t frame);
79  size_t TagToTxBuffersIndex(gen_tag_t tag) const;
80 
82  const double freq_ghz_; // RDTSC frequency in GHz
83  const double ticks_per_usec_; // RDTSC frequency in GHz
84  const size_t worker_thread_num_; // Number of worker threads sending pkts
85  const size_t update_thread_num_; // Number of Tx buffer update threads
86  const size_t enable_slow_start_; // If 1, send frames slowly at first
87 
88  // The master thread runs on core core_offset. Worker threads use cores
89  // {core_offset + 1, ..., core_offset + thread_num - 1}
90  const size_t core_offset_;
92  const size_t inter_frame_delay_;
93 
94  // RDTSC clock ticks between the start of transmission of two symbols in
95  // the steady state
96  uint64_t ticks_all_;
97 
98  // ticks_wnd_1 and ticks_wnd_2 are the RDTSC clock ticks between the start
99  // of transmission of two symbols for the first several frames
100  uint64_t ticks_wnd1_;
101  uint64_t ticks_wnd2_;
102 
103  // RDTSC clock ticks between the end of a frame and the start of the next
104  // frame
105  const uint64_t ticks_inter_frame_;
106 
112 
113  std::vector<moodycamel::ConcurrentQueue<size_t>> data_update_queue_;
114 
115  double* frame_start_;
116  double* frame_end_;
117 
118  std::vector<std::thread> threads_;
119 
122  std::string data_filename_;
123 
127  const std::string server_address_;
128  const size_t server_rx_port_;
130  const bool has_master_thread_;
131 };
132 
133 #endif // MAC_SENDER_H_
mac_data_receiver.h
Declaration file for the MacDataReceiver interface class.
MacSender::GetTicksForFrame
uint64_t GetTicksForFrame(size_t frame_id) const
Definition: mac_sender.cc:434
fmt::v8::detail::byte
byte
Definition: core.h:388
moodycamel::ProducerToken
Definition: concurrentqueue.h:630
file_receiver.h
Declaration file for the FileReceiver class.
Config::UeAntNum
size_t UeAntNum() const
Definition: config.h:41
MacDataReceiver::Load
virtual size_t Load(unsigned char *destination, size_t requested_bytes)=0
kDebugSenderReceiver
static constexpr bool kDebugSenderReceiver
Definition: symbols.h:354
PinToCoreWithOffset
void PinToCoreWithOffset(ThreadType thread_type, size_t core_offset, size_t thread_id, bool allow_reuse, bool verbose)
Definition: utils.cc:157
MacSender::threads_
std::vector< std::thread > threads_
Definition: mac_sender.h:118
Config::SampsPerSymbol
size_t SampsPerSymbol() const
Definition: config.h:234
datatype_conversion.h
Config::Rate
double Rate() const
Definition: config.h:58
MacSender::update_thread_num_
const size_t update_thread_num_
Definition: mac_sender.h:85
moodycamel::ConcurrentQueue::try_dequeue_bulk_from_producer
size_t try_dequeue_bulk_from_producer(producer_token_t const &producer, It itemFirst, size_t max)
Definition: concurrentqueue.h:1292
MacSender::frame_start_
double * frame_start_
Definition: mac_sender.h:115
MacSender::freq_ghz_
const double freq_ghz_
Definition: mac_sender.h:82
kBufferInit
static constexpr size_t kBufferInit
Definition: mac_sender.cc:25
kFrameLoadAdvance
static constexpr size_t kFrameLoadAdvance
Definition: mac_sender.cc:24
kFrameWnd
static constexpr size_t kFrameWnd
Definition: symbols.h:18
MacSender::WorkerThread
void * WorkerThread(size_t tid)
Definition: mac_sender.cc:311
Agora_memory::PaddedAlignedAlloc
void * PaddedAlignedAlloc(Alignment_t alignment, size_t size)
Definition: memory_manage.cc:15
MacSender::CreateWorkerThreads
void CreateWorkerThreads(size_t num_workers)
Definition: mac_sender.cc:446
GetTime::GetTimeUs
static double GetTimeUs()
Definition: gettime.h:14
MacSender::frame_end_
double * frame_end_
Definition: mac_sender.h:116
memory_manage.h
MacSender::has_master_thread_
const bool has_master_thread_
Definition: mac_sender.h:130
MacSender::GetMaxSymbolId
size_t GetMaxSymbolId() const
MacSender::ScheduleFrame
void ScheduleFrame(size_t frame)
Definition: mac_sender.cc:202
ThreadType::kMasterTX
@ kMasterTX
MacSender::ticks_inter_frame_
const uint64_t ticks_inter_frame_
Definition: mac_sender.h:105
MacSender::data_update_queue_
std::vector< moodycamel::ConcurrentQueue< size_t > > data_update_queue_
Definition: mac_sender.h:113
video_receiver.h
Declaration file for the VideoReceiver class.
AGORA_LOG_ERROR
#define AGORA_LOG_ERROR(...)
Definition: logger.h:39
TOSTRING
#define TOSTRING(x)
Definition: symbols.h:14
Config::Frame
const FrameStats & Frame() const
Definition: config.h:340
GetTime::CyclesToUs
static double CyclesToUs(size_t cycles, double freq_ghz)
Definition: gettime.h:97
AGORA_LOG_TRACE
#define AGORA_LOG_TRACE(...)
Definition: logger.h:92
gen_tag_t::ue_id_
uint16_t ue_id_
Definition: message.h:37
MacSender::core_offset_
const size_t core_offset_
Definition: mac_sender.h:90
gen_tag_t::FrmSymUe
static gen_tag_t FrmSymUe(size_t frame_id, size_t symbol_id, size_t ue_id)
Definition: message.h:84
Table< uint8_t >
MacSender::enable_slow_start_
const size_t enable_slow_start_
Definition: mac_sender.h:86
gen_tag_t
Definition: message.h:22
filename
filename
Definition: parse_all_dl.m:14
num_workers_ready_atomic
static std::atomic< size_t > num_workers_ready_atomic(0)
MacSender
Definition: mac_sender.h:19
MacSender::mac_payload_max_length_
size_t mac_payload_max_length_
Definition: mac_sender.h:125
MacSender::inter_frame_delay_
const size_t inter_frame_delay_
Definition: mac_sender.h:92
MacSender::kMessageQueueSize
static constexpr size_t kMessageQueueSize
Definition: mac_sender.h:22
mac_sender.h
Declaration file for the simple mac sender class.
gen_tag_t::ant_id_
uint16_t ant_id_
Definition: message.h:38
message.h
Self defined functions for message storage and passing.
Table::Free
void Free()
Definition: memory_manage.h:84
MacSender::data_filename_
std::string data_filename_
Definition: mac_sender.h:122
gen_tag_t::FrmSymAnt
static gen_tag_t FrmSymAnt(size_t frame_id, size_t symbol_id, size_t ant_id)
Definition: message.h:106
nlohmann::json_v3_11_1NLOHMANN_JSON_ABI_TAG_LEGACY_DISCARDED_VALUE_COMPARISON::detail2::begin
begin_tag begin(T &&...)
kMacSendFromPort
static constexpr uint16_t kMacSendFromPort
Definition: mac_sender.cc:20
i
for i
Definition: generate_data.m:107
GetTime
Definition: gettime.h:11
kDebugPrintSender
static constexpr bool kDebugPrintSender
Definition: mac_sender.cc:23
moodycamel::ConcurrentQueue::enqueue
bool enqueue(T const &item)
Definition: concurrentqueue.h:974
kSlowStartThresh1
static constexpr size_t kSlowStartThresh1
Definition: mac_sender.cc:28
MacSender::mac_packet_length_
size_t mac_packet_length_
Definition: mac_sender.h:124
Catch::cout
std::ostream & cout()
u
Plot Rx waveform for u
Definition: inspect_single_frame.m:108
GetTime::Rdtsc
static size_t Rdtsc()
Return the TSC.
Definition: gettime.h:25
MacSender::kDequeueBulkSize
static constexpr size_t kDequeueBulkSize
Definition: mac_sender.h:21
MacSender::ticks_wnd2_
uint64_t ticks_wnd2_
Definition: mac_sender.h:101
Agora_memory::Alignment_t::kAlign64
@ kAlign64
MacSender::LoadFrame
void LoadFrame(size_t frame)
Definition: mac_sender.cc:195
kSlowStartMulStage2
static constexpr size_t kSlowStartMulStage2
Definition: mac_sender.cc:31
ThreadType::kWorkerMacTXRX
@ kWorkerMacTXRX
MacSender::DataUpdateThread
void * DataUpdateThread(size_t tid, size_t num_data_sources)
Definition: mac_sender.cc:453
MacSender::ticks_all_
uint64_t ticks_all_
Definition: mac_sender.h:96
gen_tag_t::tag_
size_t tag_
Definition: message.h:43
VideoReceiver::kVideoStreamRxPort
static constexpr size_t kVideoStreamRxPort
Definition: video_receiver.h:20
start
end start
Definition: inspect_agora_results.m:95
MacSender::tx_buffers_
Table< uint8_t > tx_buffers_
Definition: mac_sender.h:120
Config::FramesToTest
size_t FramesToTest() const
Definition: config.h:309
MacSender::send_queue_
moodycamel::ConcurrentQueue< size_t > send_queue_
Definition: mac_sender.h:107
MacSender::cfg_
Config * cfg_
Definition: mac_sender.h:81
UDPClient::Send
void Send(const std::string &rem_hostname, uint16_t rem_port, const std::byte *msg, size_t len)
Send one UDP packet to a remote server. The client caches the the remote server's addrinfo after reso...
Definition: udp_client.h:50
kNumStatsFrames
static constexpr size_t kNumStatsFrames
Definition: symbols.h:300
MacDataReceiver
The MacDataReceiver interface class.
Definition: mac_data_receiver.h:13
moodycamel::ConcurrentQueue::enqueue_bulk
bool enqueue_bulk(It itemFirst, size_t count)
Definition: concurrentqueue.h:1016
MacSender::WriteStatsToFile
void WriteStatsToFile(size_t tx_frame_count) const
Definition: mac_sender.cc:554
MacSender::packets_per_frame_
size_t packets_per_frame_
Definition: mac_sender.h:126
moodycamel::ConcurrentQueue< size_t >
MacSender::worker_thread_num_
const size_t worker_thread_num_
Definition: mac_sender.h:84
GetTime::MeasureRdtscFreq
static double MeasureRdtscFreq()
Definition: gettime.h:51
function
function[avg_proc_duration, std_proc_duration]
Definition: parse_dl_file.m:1
MacSender::StartTxfromMain
void StartTxfromMain(double *in_frame_start, double *in_frame_end)
Definition: mac_sender.cc:188
MacSender::~MacSender
~MacSender()
Definition: mac_sender.cc:159
MacSender::frame_duration_us_
size_t frame_duration_us_
Definition: mac_sender.h:91
Config
Definition: config.h:26
AGORA_LOG_INFO
#define AGORA_LOG_INFO(...)
Definition: logger.h:62
kMacSendFromAddress
static const std::string kMacSendFromAddress
Definition: mac_sender.cc:19
InterruptHandler
void InterruptHandler(int)
Definition: mac_sender.cc:39
MacSender::server_address_
const std::string server_address_
Definition: mac_sender.h:127
kTxBufferElementAlignment
static constexpr size_t kTxBufferElementAlignment
Definition: mac_sender.cc:26
std
Definition: json.hpp:5213
MacSender::tx_buffer_pkt_offset_
size_t tx_buffer_pkt_offset_
Definition: mac_sender.h:121
MacSender::get_data_symbol_id_
std::function< size_t(size_t)> get_data_symbol_id_
Definition: mac_sender.h:129
AGORA_LOG_FRAME
#define AGORA_LOG_FRAME(...)
Definition: logger.h:72
Table::Malloc
void Malloc(size_t dim1, size_t dim2, Agora_memory::Alignment_t alignment)
Definition: memory_manage.h:37
RtAssert
static void RtAssert(bool condition, const char *throw_str)
Definition: utils.h:104
MacSender::TagToTxBuffersIndex
size_t TagToTxBuffersIndex(gen_tag_t tag) const
Definition: mac_sender.cc:50
AGORA_LOG_WARN
#define AGORA_LOG_WARN(...)
Definition: logger.h:53
MacSender::MasterThread
void * MasterThread(size_t tid)
Definition: mac_sender.cc:218
gen_tag_t::FrmSym
static gen_tag_t FrmSym(size_t frame_id, size_t symbol_id)
Definition: message.h:128
kMasterThreadId
static constexpr size_t kMasterThreadId
Definition: mac_sender.cc:32
ThreadType::kWorkerTX
@ kWorkerTX
kSlowStartThresh2
static constexpr size_t kSlowStartThresh2
Definition: mac_sender.cc:29
config.h
Declaration file for the configuration class which importants json configuration values into class va...
MacSender::MacSender
MacSender(Config *cfg, std::string &data_filename, size_t mac_packet_length, size_t mac_payload_max_length, size_t packets_per_frame, std::string server_address, size_t server_rx_port, std::function< size_t(size_t)> get_data_symbol_id, size_t core_offset=30, size_t worker_thread_num=1, size_t update_thread_num=1, size_t frame_duration_us=0, size_t inter_frame_delay=0, size_t enable_slow_start=1, bool create_thread_for_master=false)
Create and optionally start a Sender Client that sends data packets to a agora mac interface.
Definition: mac_sender.cc:56
moodycamel::ConcurrentQueue::try_dequeue
bool try_dequeue(U &item)
Definition: concurrentqueue.h:1104
MacPacketPacked
Definition: message.h:274
gettime.h
DelayTicks
void DelayTicks(uint64_t start, uint64_t ticks)
Definition: mac_sender.cc:44
UDPClient
Definition: udp_client.h:13
keep_running
static std::atomic< bool > keep_running(true)
MacSender::task_ptok_
moodycamel::ProducerToken ** task_ptok_
Definition: mac_sender.h:111
Config::PacketLength
size_t PacketLength() const
Definition: config.h:238
MacSender::completion_queue_
moodycamel::ConcurrentQueue< size_t > completion_queue_
Definition: mac_sender.h:109
gen_tag_t::frame_id_
uint32_t frame_id_
Definition: message.h:32
MacSender::ticks_wnd1_
uint64_t ticks_wnd1_
Definition: mac_sender.h:100
MacSender::UpdateTxBuffer
void UpdateTxBuffer(MacDataReceiver *data_source, gen_tag_t tag)
Definition: mac_sender.cc:523
max
max(y1, y1_1)
kSlowStartMulStage1
static constexpr size_t kSlowStartMulStage1
Definition: mac_sender.cc:30
MacSender::StartTx
void StartTx()
Definition: mac_sender.cc:175
kDebugPrintPerFrameDone
static constexpr bool kDebugPrintPerFrameDone
Definition: symbols.h:196
concurrentqueue.h
nlohmann::json_v3_11_1NLOHMANN_JSON_ABI_TAG_LEGACY_DISCARDED_VALUE_COMPARISON::detail2::end
end_tag end(T &&...)
MacSender::ticks_per_usec_
const double ticks_per_usec_
Definition: mac_sender.h:83
FrameStats::NumTotalSyms
size_t NumTotalSyms() const
Definition: framestats.cc:93
MacSender::server_rx_port_
const size_t server_rx_port_
Definition: mac_sender.h:128