Agora  1.2.0
Agora project
txrx_worker.h
Go to the documentation of this file.
1 
5 #ifndef TXRX_WORKER_H_
6 #define TXRX_WORKER_H_
7 
8 #include <atomic>
9 #include <condition_variable>
10 #include <mutex>
11 #include <thread>
12 #include <vector>
13 
14 #include "concurrentqueue.h"
15 #include "config.h"
16 #include "message.h"
17 
18 class TxRxWorker {
19  public:
20  static constexpr bool kDebugTxMemory = false;
21 
22  TxRxWorker(size_t core_offset, size_t tid, size_t interface_count,
23  size_t interface_offset, size_t channels_per_interface,
24  Config* const config, size_t* rx_frame_start,
27  moodycamel::ProducerToken& tx_producer,
28  moodycamel::ProducerToken& notify_producer,
29  std::vector<RxPacket>& rx_memory, std::byte* const tx_memory,
30  std::mutex& sync_mutex, std::condition_variable& sync_cond,
31  std::atomic<bool>& can_proceed);
32  TxRxWorker() = delete;
33 
34  virtual ~TxRxWorker();
35 
36  virtual void Start();
37  virtual void Stop();
38  virtual void DoTxRx() = 0;
39 
40  inline size_t Id() const { return tid_; }
41  inline bool Started() const { return started_; }
42  inline bool Running() const { return running_; }
43 
44  protected:
45  void WaitSync();
46  inline Config* Configuration() { return cfg_; }
47  bool NotifyComplete(const EventData& complete_event);
48  std::vector<EventData> GetPendingTxEvents(size_t max_events = 0);
50  void ReturnRxPacket(RxPacket& unused_packet);
51  Packet* GetTxPacket(size_t frame, size_t symbol, size_t ant);
52  Packet* GetUlTxPacket(size_t frame, size_t symbol, size_t ant);
53 
54  const size_t tid_;
55  const size_t core_offset_;
56  const size_t num_interfaces_;
57  const size_t interface_offset_;
59  size_t* const rx_frame_start_;
60  bool running_;
61 
63  std::mutex& mutex_;
64  std::condition_variable& cond_;
65  std::atomic<bool>& can_proceed_;
66 
67  private:
68  Config* const cfg_;
69  std::thread thread_;
70 
72  std::vector<RxPacket>& rx_memory_;
74 
77  //foreign producer of tx messages (used for TX only)
79  //local producer of notification messages (used for TX and RX)
81 
82  bool started_;
83 };
84 #endif // TXRX_WORKER_H_
fmt::v8::detail::byte
byte
Definition: core.h:388
FrameStats::GetDLSymbolIdx
size_t GetDLSymbolIdx(size_t symbol_number) const
Definition: framestats.cc:152
moodycamel::ProducerToken
Definition: concurrentqueue.h:630
Config::Running
void Running(bool value)
Definition: config.h:33
Config::UeAntNum
size_t UeAntNum() const
Definition: config.h:41
TxRxWorker::kDebugTxMemory
static constexpr bool kDebugTxMemory
Definition: txrx_worker.h:20
TxRxWorker::tx_memory_
std::byte *const tx_memory_
Definition: txrx_worker.h:73
TxRxWorker::rx_frame_start_
size_t *const rx_frame_start_
Definition: txrx_worker.h:59
TxRxWorker::DoTxRx
virtual void DoTxRx()=0
moodycamel::ConcurrentQueue::try_dequeue_bulk_from_producer
size_t try_dequeue_bulk_from_producer(producer_token_t const &producer, It itemFirst, size_t max)
Definition: concurrentqueue.h:1292
Config::GetTotalDataSymbolIdxUl
size_t GetTotalDataSymbolIdxUl(size_t frame_id, size_t symbol_idx_ul) const
Definition: config.h:421
TxRxWorker::rx_memory_idx_
size_t rx_memory_idx_
Definition: txrx_worker.h:71
TxRxWorker::mutex_
std::mutex & mutex_
Owned by the parent TxRx object for sync.
Definition: txrx_worker.h:63
FrameStats::GetULSymbolIdx
size_t GetULSymbolIdx(size_t symbol_number) const
Definition: framestats.cc:156
TxRxWorker::started_
bool started_
Definition: txrx_worker.h:82
TxRxWorker::Id
size_t Id() const
Definition: txrx_worker.h:40
Config::BsAntNum
size_t BsAntNum() const
Definition: config.h:35
TxRxWorker::GetTxPacket
Packet * GetTxPacket(size_t frame, size_t symbol, size_t ant)
Definition: txrx_worker.cc:160
TxRxWorker::Running
bool Running() const
Definition: txrx_worker.h:42
AGORA_LOG_ERROR
#define AGORA_LOG_ERROR(...)
Definition: logger.h:39
TxRxWorker::~TxRxWorker
virtual ~TxRxWorker()
Definition: txrx_worker.cc:42
Config::Frame
const FrameStats & Frame() const
Definition: config.h:340
TxRxWorker::GetUlTxPacket
Packet * GetUlTxPacket(size_t frame, size_t symbol, size_t ant)
Definition: txrx_worker.cc:173
TxRxWorker::Started
bool Started() const
Definition: txrx_worker.h:41
AGORA_LOG_TRACE
#define AGORA_LOG_TRACE(...)
Definition: logger.h:92
RxPacket::Use
void Use()
Definition: message.h:215
TxRxWorker::WaitSync
void WaitSync()
Using a latch might be better but adds c++20 requirement.
Definition: txrx_worker.cc:64
TxRxWorker::ReturnRxPacket
void ReturnRxPacket(RxPacket &unused_packet)
Definition: txrx_worker.cc:129
TxRxWorker::cond_
std::condition_variable & cond_
Definition: txrx_worker.h:64
TxRxWorker::can_proceed_
std::atomic< bool > & can_proceed_
Definition: txrx_worker.h:65
TxRxWorker::GetPendingTxEvents
std::vector< EventData > GetPendingTxEvents(size_t max_events=0)
Definition: txrx_worker.cc:85
EventData
Definition: message.h:142
Config::DlPacketLength
size_t DlPacketLength() const
Definition: config.h:242
message.h
Self defined functions for message storage and passing.
TxRxWorker::Start
virtual void Start()
Definition: txrx_worker.cc:44
moodycamel::ConcurrentQueue::enqueue
bool enqueue(T const &item)
Definition: concurrentqueue.h:974
TxRxWorker::notify_producer_token_
moodycamel::ProducerToken & notify_producer_token_
Definition: txrx_worker.h:80
TxRxWorker::tx_pending_q_
moodycamel::ConcurrentQueue< EventData > * tx_pending_q_
Definition: txrx_worker.h:76
Packet
Definition: message.h:164
TxRxWorker::Configuration
Config * Configuration()
Definition: txrx_worker.h:46
TxRxWorker::NotifyComplete
bool NotifyComplete(const EventData &complete_event)
Definition: txrx_worker.cc:75
TxRxWorker
Definition: txrx_worker.h:18
TxRxWorker::rx_memory_
std::vector< RxPacket > & rx_memory_
Definition: txrx_worker.h:72
TxRxWorker::Stop
virtual void Stop()
Definition: txrx_worker.cc:55
TxRxWorker::GetRxPacket
RxPacket & GetRxPacket()
Definition: txrx_worker.cc:104
TxRxWorker::channels_per_interface_
const size_t channels_per_interface_
Definition: txrx_worker.h:58
moodycamel::ConcurrentQueue< EventData >
TxRxWorker::tid_
const size_t tid_
Definition: txrx_worker.h:54
Config
Definition: config.h:26
AGORA_LOG_INFO
#define AGORA_LOG_INFO(...)
Definition: logger.h:62
TxRxWorker::cfg_
Config *const cfg_
Definition: txrx_worker.h:68
TxRxWorker::tx_producer_token_
moodycamel::ProducerToken & tx_producer_token_
Definition: txrx_worker.h:78
AGORA_LOG_FRAME
#define AGORA_LOG_FRAME(...)
Definition: logger.h:72
AGORA_LOG_WARN
#define AGORA_LOG_WARN(...)
Definition: logger.h:53
Config::GetTotalDataSymbolIdxDl
size_t GetTotalDataSymbolIdxDl(size_t frame_id, size_t symbol_idx_dl) const
Definition: config.h:428
RxPacket
Definition: message.h:186
config.h
Declaration file for the configuration class which importants json configuration values into class va...
TxRxWorker::interface_offset_
const size_t interface_offset_
Definition: txrx_worker.h:57
TxRxWorker::TxRxWorker
TxRxWorker()=delete
Config::PacketLength
size_t PacketLength() const
Definition: config.h:238
TxRxWorker::event_notify_q_
moodycamel::ConcurrentQueue< EventData > * event_notify_q_
Definition: txrx_worker.h:75
TxRxWorker::num_interfaces_
const size_t num_interfaces_
Definition: txrx_worker.h:56
txrx_worker.h
txrx worker thread definition. This is the parent / interface
TxRxWorker::core_offset_
const size_t core_offset_
Definition: txrx_worker.h:55
RxPacket::Free
void Free()
Definition: message.h:216
concurrentqueue.h
TxRxWorker::thread_
std::thread thread_
Definition: txrx_worker.h:69
TxRxWorker::running_
bool running_
Definition: txrx_worker.h:60
RxPacket::Empty
bool Empty() const
Definition: message.h:214