Agora  1.2.0
Agora project
shared_counters.h
Go to the documentation of this file.
1 #ifndef SHARED_COUNTERS_INC_
2 #define SHARED_COUNTERS_INC_
3 
4 #include <mutex>
5 #include <sstream>
6 #include <vector>
7 
8 #include "config.h"
9 #include "logger.h"
10 #include "symbols.h"
11 #include "utils.h"
12 
13 // We use one RxStatus object to track packet reception status.
14 // This object is shared between socket threads and subcarrier workers.
15 class RxStatus {
16  public:
18  : num_pilot_pkts_per_frame_(cfg->Frame().NumPilotSyms() *
19  cfg->BsAntNum()),
20  num_pilot_symbols_per_frame_(cfg->Frame().NumPilotSyms()),
21  num_data_symbol_per_frame_(cfg->Frame().NumDataSyms()),
22  num_pkts_per_symbol_(cfg->BsAntNum()),
23  num_decode_tasks_per_frame_(cfg->get_num_ues_to_process()) {}
24 
25  // When receive a new packet, record it here
26  bool add_new_packet(const Packet* pkt) {
27  if (pkt->frame_id >= cur_frame_ + kFrameWnd) {
29  "SharedCounters RxStatus error: Received packet for future "
30  "frame %u beyond frame window (%zu + %zu). This can "
31  "happen if Agora is running slowly, e.g., in debug mode. "
32  "Full packet = %s.\n",
33  pkt->frame_id, cur_frame_, kFrameWnd, pkt->to_string().c_str());
34  return false;
35  }
36 
37  const size_t frame_slot = pkt->frame_id % kFrameWnd;
38  num_pkts_[frame_slot]++;
39  if (num_pkts_[frame_slot] ==
43  "SharedCounters: received all packets in frame: %u. "
44  "Pilot pkts = %zu of %zu\n",
45  pkt->frame_id, num_pilot_pkts_[frame_slot].load(),
47  }
48 
49  if (pkt->symbol_id < num_pilot_symbols_per_frame_) {
50  num_pilot_pkts_[frame_slot]++;
51  if (num_pilot_pkts_[frame_slot] == num_pilot_pkts_per_frame_) {
52  std::printf("SharedCounters: received all pilots in frame: %u\n",
53  pkt->frame_id);
54  }
55  } else {
56  num_data_pkts_[frame_slot][pkt->symbol_id]++;
57  }
58  if (pkt->frame_id > latest_frame_) {
59  // TODO: race condition could happen here but the impact is small
60  latest_frame_ = pkt->frame_id;
61  }
62  return true;
63  }
64 
65  // Check whether all pilot packets are received for a frame
66  // used by CSI
67  bool received_all_pilots(size_t frame_id) {
68  if (frame_id < cur_frame_ || frame_id >= cur_frame_ + kFrameWnd) {
69  return false;
70  }
72  }
73 
74  // Check whether demodulation can proceed for a symbol in a frame
75  bool is_demod_ready(size_t frame_id, size_t symbol_id) {
76  if (frame_id < cur_frame_ || frame_id >= cur_frame_ + kFrameWnd) {
77  return false;
78  }
79  return num_data_pkts_[frame_id % kFrameWnd][symbol_id] ==
81  }
82 
83  // When decoding is done for a frame from one decoder, call this function
84  // This function will increase cur_frame when this frame is decoded so that
85  // we can move on decoding the next frame and release the resources used by
86  // this frame
87  void decode_done(size_t frame_id) {
88  rt_assert(frame_id == cur_frame_, "Wrong completed decode task!");
89  decode_mutex_.lock();
92  cur_frame_++;
94  size_t frame_slot = frame_id % kFrameWnd;
95  num_pkts_[frame_slot] = 0;
96  num_pilot_pkts_[frame_slot] = 0;
97  for (size_t j = 0; j < kMaxSymbols; j++) {
98  num_data_pkts_[frame_slot][j] = 0;
99  }
100  std::printf("Main thread: Decode done frame: %lu\n", cur_frame_ - 1);
101  }
102  decode_mutex_.unlock();
103  }
104 
105  // TODO: Instead of having all-atomic counter arrays, can we just make
106  // the entire class atomic?
107 
108  // num_pkts[i % kFrameWnd] is the total number of packets
109  // received for frame i (may be removed if not used)
110  std::array<std::atomic<size_t>, kFrameWnd> num_pkts_ = {};
111 
112  // num_pilot_pkts[i % kFrameWnd] is the total number of pilot
113  // packets received for frame i
114  std::array<std::atomic<size_t>, kFrameWnd> num_pilot_pkts_ = {};
115 
116  // num_data_pkts[i % kFrameWnd][j] is the total number of data
117  // packets received for frame i and symbol j
118  std::array<std::array<std::atomic<size_t>, kMaxSymbols>, kFrameWnd>
120 
121  // cur_frame is the first frame for which decoding is incomplete
122  size_t cur_frame_ = 0;
123 
124  // The max frame number for which socket I/O threads have received any packet
125  size_t latest_frame_ = 0;
126 
127  // Atomic counter for # completed decode tasks
128  // cur_frame will be incremented in all tasks are completed
130  std::mutex decode_mutex_;
131 
132  // Copies of Config variables
136  const size_t num_pkts_per_symbol_;
138 };
139 
140 // We use DemulStatus to track # completed demul tasks for each symbol
141 // This object is shared between all dosubcarriers and dodecoders
142 class DemulStatus {
143  public:
145  : num_demul_tasks_required_(cfg->get_num_sc_per_server() /
146  cfg->DemulBlockSize()) {
147  for (size_t i = 0; i < kFrameWnd; i++) {
148  for (size_t j = 0; j < kMaxSymbols; j++) {
150  }
151  }
152  max_frame_ = 0;
153  }
154 
155  // Mark [num_tasks] demodulation tasks for this frame and symbol as complete
156  void demul_complete(size_t frame_id, size_t symbol_id, size_t num_tasks) {
157  max_frame_mutex_.lock();
158  if (frame_id > max_frame_) {
159  max_frame_++;
160  for (size_t i = 0; i < kMaxSymbols; i++) {
162  }
163  }
164  max_frame_mutex_.unlock();
165  rt_assert(frame_id <= max_frame_ && frame_id + kFrameWnd > max_frame_,
166  "Complete a wrong frame in demul!");
167  num_demul_tasks_completed_[frame_id % kFrameWnd][symbol_id] += num_tasks;
168  }
169 
170  // Return true iff we have completed demodulation for all subcarriers in
171  // this symbol have
172  bool ready_to_decode(size_t frame_id, size_t symbol_id) {
173  rt_assert(frame_id + kFrameWnd > max_frame_, "Decode too slow!");
174  if (frame_id > max_frame_) {
175  return false;
176  }
177  return num_demul_tasks_completed_[frame_id % kFrameWnd][symbol_id] ==
179  }
180 
181  // num_demul_tasks_completed[i % kFrameWnd][j] is
182  // the number of subcarriers completed for demul tasks in
183  // frame i and symbol j
184  std::array<std::array<std::atomic<size_t>, kMaxSymbols>, kFrameWnd>
186 
187  // Number of subcarriers required to demodulate for each symbol
189 
190  size_t max_frame_;
191  std::mutex max_frame_mutex_;
192 };
193 
195  public:
197  : cfg_(cfg), num_demod_data_required_(cfg->server_addr_list.size()) {
198  cur_frame_ = new size_t[cfg->get_num_ues_to_process()];
199  std::memset(cur_frame_, 0, sizeof(size_t) * cfg->get_num_ues_to_process());
200  cur_symbol_ = new size_t[cfg->get_num_ues_to_process()];
201  std::memset(cur_symbol_, 0, sizeof(size_t) * cfg->get_num_ues_to_process());
202 
204  new std::array<std::array<size_t, kMaxSymbols>,
205  kFrameWnd>[cfg->get_num_ues_to_process()];
206  for (size_t i = 0; i < cfg->get_num_ues_to_process(); i++) {
207  for (size_t j = 0; j < kFrameWnd; j++) {
208  num_demod_data_received_[i][j].fill(0);
209  }
210  }
211  }
212 
213  void receive_demod_data(size_t ue_id, size_t frame_id, size_t symbol_id) {
214  num_demod_data_received_[ue_id - cfg_->ue_start][frame_id % kFrameWnd]
215  [symbol_id]++;
216  }
217 
218  bool received_all_demod_data(size_t ue_id, size_t frame_id,
219  size_t symbol_id) {
220  if (num_demod_data_received_[ue_id - cfg_->ue_start][frame_id % kFrameWnd]
221  [symbol_id] == num_demod_data_required_) {
222  num_demod_data_received_[ue_id - cfg_->ue_start][frame_id % kFrameWnd]
223  [symbol_id] = 0;
224  return true;
225  }
226  return false;
227  }
228 
229  private:
231  size_t* cur_frame_;
232  size_t* cur_symbol_; // symbol ID for UL data
234 
235  std::array<std::array<size_t, kMaxSymbols>, kFrameWnd>*
237 };
238 
239 #endif // SHARED_COUNTERS_INC_
DecodeStatus::cfg_
Config * cfg_
Definition: shared_counters.h:230
RxStatus::num_data_pkts_
std::array< std::array< std::atomic< size_t >, kMaxSymbols >, kFrameWnd > num_data_pkts_
Definition: shared_counters.h:119
DemulStatus::max_frame_
size_t max_frame_
Definition: shared_counters.h:190
RxStatus::latest_frame_
size_t latest_frame_
Definition: shared_counters.h:125
size
end IFFT Reshape the symbol vector into two different spatial streams size
Definition: generate_data.m:73
fmt::v8::printf
auto printf(const S &fmt, const T &... args) -> int
Definition: printf.h:631
kFrameWnd
static constexpr size_t kFrameWnd
Definition: symbols.h:18
RxStatus::num_pilot_pkts_
std::array< std::atomic< size_t >, kFrameWnd > num_pilot_pkts_
Definition: shared_counters.h:114
DecodeStatus::cur_frame_
size_t * cur_frame_
Definition: shared_counters.h:231
RxStatus::num_decode_tasks_completed_
size_t num_decode_tasks_completed_
Definition: shared_counters.h:129
AGORA_LOG_ERROR
#define AGORA_LOG_ERROR(...)
Definition: logger.h:39
DemulStatus::DemulStatus
DemulStatus(Config *cfg)
Definition: shared_counters.h:144
RxStatus::add_new_packet
bool add_new_packet(const Packet *pkt)
Definition: shared_counters.h:26
DemulStatus::demul_complete
void demul_complete(size_t frame_id, size_t symbol_id, size_t num_tasks)
Definition: shared_counters.h:156
RxStatus::RxStatus
RxStatus(Config *cfg)
Definition: shared_counters.h:17
RxStatus
Definition: shared_counters.h:15
DecodeStatus::num_demod_data_required_
const size_t num_demod_data_required_
Definition: shared_counters.h:233
RxStatus::num_decode_tasks_per_frame_
const size_t num_decode_tasks_per_frame_
Definition: shared_counters.h:137
RxStatus::num_pilot_symbols_per_frame_
const size_t num_pilot_symbols_per_frame_
Definition: shared_counters.h:134
i
for i
Definition: generate_data.m:107
RxStatus::is_demod_ready
bool is_demod_ready(size_t frame_id, size_t symbol_id)
Definition: shared_counters.h:75
DecodeStatus::num_demod_data_received_
std::array< std::array< size_t, kMaxSymbols >, kFrameWnd > * num_demod_data_received_
Definition: shared_counters.h:236
RxStatus::decode_mutex_
std::mutex decode_mutex_
Definition: shared_counters.h:130
DemulStatus
Definition: shared_counters.h:142
Packet
Definition: message.h:164
rt_assert
static void rt_assert(bool condition, const char *throw_str)
Definition: bench.cc:19
RxStatus::received_all_pilots
bool received_all_pilots(size_t frame_id)
Definition: shared_counters.h:67
RxStatus::decode_done
void decode_done(size_t frame_id)
Definition: shared_counters.h:87
DemulStatus::num_demul_tasks_required_
const size_t num_demul_tasks_required_
Definition: shared_counters.h:188
DemulStatus::max_frame_mutex_
std::mutex max_frame_mutex_
Definition: shared_counters.h:191
symbols.h
DecodeStatus::cur_symbol_
size_t * cur_symbol_
Definition: shared_counters.h:232
RxStatus::num_pilot_pkts_per_frame_
const size_t num_pilot_pkts_per_frame_
Definition: shared_counters.h:133
DecodeStatus::receive_demod_data
void receive_demod_data(size_t ue_id, size_t frame_id, size_t symbol_id)
Definition: shared_counters.h:213
Config
Definition: config.h:26
RxStatus::num_pkts_per_symbol_
const size_t num_pkts_per_symbol_
Definition: shared_counters.h:136
RxStatus::num_data_symbol_per_frame_
const size_t num_data_symbol_per_frame_
Definition: shared_counters.h:135
RxStatus::num_pkts_
std::array< std::atomic< size_t >, kFrameWnd > num_pkts_
Definition: shared_counters.h:110
DecodeStatus::received_all_demod_data
bool received_all_demod_data(size_t ue_id, size_t frame_id, size_t symbol_id)
Definition: shared_counters.h:218
config.h
Declaration file for the configuration class which importants json configuration values into class va...
DecodeStatus
Definition: shared_counters.h:194
utils.h
Utility functions for file and text processing.
kMaxSymbols
static constexpr size_t kMaxSymbols
Definition: symbols.h:280
DemulStatus::num_demul_tasks_completed_
std::array< std::array< std::atomic< size_t >, kMaxSymbols >, kFrameWnd > num_demul_tasks_completed_
Definition: shared_counters.h:185
DemulStatus::ready_to_decode
bool ready_to_decode(size_t frame_id, size_t symbol_id)
Definition: shared_counters.h:172
DecodeStatus::DecodeStatus
DecodeStatus(Config *cfg)
Definition: shared_counters.h:196
RxStatus::cur_frame_
size_t cur_frame_
Definition: shared_counters.h:122