Agora  1.2.0
Agora project
txrx_worker_dpdk.h
Go to the documentation of this file.
1 
6 #ifndef TXRX_WORKER_DPDK_H_
7 #define TXRX_WORKER_DPDK_H_
8 
9 #include <cstddef>
10 #include <cstdint>
11 #include <vector>
12 
13 #include "dpdk_transport.h"
14 #include "txrx_worker.h"
15 
16 class TxRxWorkerDpdk : public TxRxWorker {
17  public:
18  TxRxWorkerDpdk() = delete;
19  TxRxWorkerDpdk(size_t core_offset, size_t tid, size_t interface_count,
20  size_t interface_offset, Config* const config,
21  size_t* rx_frame_start,
24  moodycamel::ProducerToken& tx_producer,
25  moodycamel::ProducerToken& notify_producer,
26  std::vector<RxPacket>& rx_memory, std::byte* const tx_memory,
27  std::mutex& sync_mutex, std::condition_variable& sync_cond,
28  std::atomic<bool>& can_proceed,
29  std::vector<std::pair<uint16_t, uint16_t>> dpdk_phy,
30  rte_mempool* mbuf_pool);
31  ~TxRxWorkerDpdk() final;
32  void DoTxRx() final;
33  void Start() final;
34  void Stop() final;
35 
36  private:
37  std::vector<Packet*> RecvEnqueue(uint16_t port_id, uint16_t queue_id);
38  size_t DequeueSend();
39  // Returns true if packet should be ignored - will garbage collect. Handles arp requests
40  bool Filter(rte_mbuf* packet, uint16_t port_id, uint16_t queue_id);
41 
42  uint32_t bs_rru_addr_; // IPv4 address of the simulator sender
43  uint32_t bs_server_addr_; // IPv4 address of the Agora server
44  // dpdk port_id / device : queue_id
45  const std::vector<std::pair<uint16_t, uint16_t>> dpdk_phy_port_queues_;
46  // Shared memory pool for rx and tx
47  rte_mempool* mbuf_pool_;
48  std::vector<rte_ether_addr> src_mac_;
49  std::vector<rte_ether_addr> dest_mac_;
50 };
51 #endif // TXRX_WORKER_DPDK_H_
DpdkTransport::InstallFlowRule
static void InstallFlowRule(uint16_t port_id, uint16_t rx_q, uint32_t src_ip, uint32_t dest_ip, uint16_t src_port, uint16_t dst_port)
__attribute__
class RadioSocket __attribute__
TxRxWorkerDpdk::RecvEnqueue
std::vector< Packet * > RecvEnqueue(uint16_t port_id, uint16_t queue_id)
Definition: txrx_worker_dpdk.cc:153
TxRxWorkerDpdk::DequeueSend
size_t DequeueSend()
Definition: txrx_worker_dpdk.cc:222
fmt::v8::detail::byte
byte
Definition: core.h:388
moodycamel::ProducerToken
Definition: concurrentqueue.h:630
Config::Running
void Running(bool value)
Definition: config.h:33
Packet::frame_id_
uint32_t frame_id_
Definition: message.h:168
TxRxWorkerDpdk::bs_rru_addr_
uint32_t bs_rru_addr_
Definition: txrx_worker_dpdk.h:42
fmt::v8::printf
auto printf(const S &fmt, const T &... args) -> int
Definition: printf.h:631
DpdkTransport::AllocUdp
static rte_mbuf * AllocUdp(rte_mempool *mbuf_pool, rte_ether_addr src_mac_addr, rte_ether_addr dst_mac_addr, uint32_t src_ip_addr, uint32_t dst_ip_addr, uint16_t src_udp_port, uint16_t dst_udp_port, size_t buffer_length, uint16_t pkt_id)
src_port
uint16_t src_port
Definition: eth_common.h:60
TxRxWorker::rx_frame_start_
size_t *const rx_frame_start_
Definition: txrx_worker.h:59
gen_tag_t::symbol_id_
uint16_t symbol_id_
Definition: message.h:33
Packet::ant_id_
uint32_t ant_id_
Definition: message.h:171
EventType::kPacketTX
@ kPacketTX
TxRxWorkerDpdk::src_mac_
std::vector< rte_ether_addr > src_mac_
Definition: txrx_worker_dpdk.h:48
TxRxWorkerDpdk::dest_mac_
std::vector< rte_ether_addr > dest_mac_
Definition: txrx_worker_dpdk.h:49
TxRxWorker::GetTxPacket
Packet * GetTxPacket(size_t frame, size_t symbol, size_t ant)
Definition: txrx_worker.cc:160
TxRxWorker::Running
bool Running() const
Definition: txrx_worker.h:42
AGORA_LOG_ERROR
#define AGORA_LOG_ERROR(...)
Definition: logger.h:39
TxRxWorkerDpdk::DoTxRx
void DoTxRx() final
Definition: txrx_worker_dpdk.cc:105
AGORA_LOG_TRACE
#define AGORA_LOG_TRACE(...)
Definition: logger.h:92
rx_tag_t
Definition: message.h:227
TxRxWorker::WaitSync
void WaitSync()
Using a latch might be better but adds c++20 requirement.
Definition: txrx_worker.cc:64
TxRxWorkerDpdk::TxRxWorkerDpdk
TxRxWorkerDpdk()=delete
txrx_worker_dpdk.h
txrx worker dpdk definition. This is the simulator declaration with dpdk enhancements
DpdkTransport::PrintPkt
static void PrintPkt(rte_be32_t src_ip, rte_be32_t dst_ip, rte_be16_t src_port, rte_be16_t dst_port, size_t len, size_t tid)
TxRxWorkerDpdk::Start
void Start() final
Definition: txrx_worker_dpdk.cc:89
gen_tag_t
Definition: message.h:22
kDebugPrintInTask
static constexpr bool kDebugPrintInTask
Definition: symbols.h:201
TxRxWorker::GetPendingTxEvents
std::vector< EventData > GetPendingTxEvents(size_t max_events=0)
Definition: txrx_worker.cc:85
TxRxWorkerDpdk::Stop
void Stop() final
Definition: txrx_worker_dpdk.cc:99
EventData
Definition: message.h:142
Config::BsRruPort
int BsRruPort() const
Definition: config.h:305
gen_tag_t::ant_id_
uint16_t ant_id_
Definition: message.h:38
message.h
Self defined functions for message storage and passing.
unlikely
#define unlikely(x)
Definition: utils.h:16
Config::BsServerPort
int BsServerPort() const
Definition: config.h:304
i
for i
Definition: generate_data.m:107
TxRxWorkerDpdk::mbuf_pool_
rte_mempool * mbuf_pool_
Definition: txrx_worker_dpdk.h:47
GetTime::Rdtsc
static size_t Rdtsc()
Return the TSC.
Definition: gettime.h:25
TxRxWorkerDpdk::~TxRxWorkerDpdk
~TxRxWorkerDpdk() final
Definition: txrx_worker_dpdk.cc:79
Config::BsServerAddr
std::string BsServerAddr() const
Definition: config.h:299
Packet
Definition: message.h:164
TxRxWorker::Configuration
Config * Configuration()
Definition: txrx_worker.h:46
EventType::kPacketRX
@ kPacketRX
Packet::symbol_id_
uint32_t symbol_id_
Definition: message.h:169
gen_tag_t::tag_
size_t tag_
Definition: message.h:43
kNumStatsFrames
static constexpr size_t kNumStatsFrames
Definition: symbols.h:300
TxRxWorkerDpdk::dpdk_phy_port_queues_
const std::vector< std::pair< uint16_t, uint16_t > > dpdk_phy_port_queues_
Definition: txrx_worker_dpdk.h:45
TxRxWorker::NotifyComplete
bool NotifyComplete(const EventData &complete_event)
Definition: txrx_worker.cc:75
TxRxWorker
Definition: txrx_worker.h:18
kIsWorkerTimingEnabled
static constexpr bool kIsWorkerTimingEnabled
Definition: symbols.h:303
TxRxWorker::GetRxPacket
RxPacket & GetRxPacket()
Definition: txrx_worker.cc:104
TxRxWorker::channels_per_interface_
const size_t channels_per_interface_
Definition: txrx_worker.h:58
moodycamel::ConcurrentQueue< EventData >
Config::CellId
const std::vector< size_t > & CellId() const
Definition: config.h:368
TxRxWorker::tid_
const size_t tid_
Definition: txrx_worker.h:54
TxRxWorkerDpdk::bs_server_addr_
uint32_t bs_server_addr_
Definition: txrx_worker_dpdk.h:43
Config
Definition: config.h:26
AGORA_LOG_INFO
#define AGORA_LOG_INFO(...)
Definition: logger.h:62
std
Definition: json.hpp:5213
AGORA_LOG_FRAME
#define AGORA_LOG_FRAME(...)
Definition: logger.h:72
RtAssert
static void RtAssert(bool condition, const char *throw_str)
Definition: utils.h:104
TxRxWorkerDpdk
Definition: txrx_worker_dpdk.h:16
AGORA_LOG_WARN
#define AGORA_LOG_WARN(...)
Definition: logger.h:53
eth_type
uint16_t eth_type
Definition: eth_common.h:62
kDebugDPDK
static constexpr bool kDebugDPDK
Definition: txrx_worker_dpdk.cc:19
kPayloadOffset
static constexpr size_t kPayloadOffset
Offset to the payload starting from the beginning of the UDP frame.
Definition: dpdk_transport.h:48
TxRxWorkerDpdk::Filter
bool Filter(rte_mbuf *packet, uint16_t port_id, uint16_t queue_id)
Definition: txrx_worker_dpdk.cc:285
kRxBatchSize
static constexpr size_t kRxBatchSize
Maximum number of packets received in rx_burst.
Definition: dpdk_transport.h:40
kTxBatchSize
static constexpr size_t kTxBatchSize
Definition: dpdk_transport.h:41
TxRxWorker::interface_offset_
const size_t interface_offset_
Definition: txrx_worker.h:57
gettime.h
dpdk_transport.h
Declaration file for the DpdkTransport class.
TxRxWorker::num_interfaces_
const size_t num_interfaces_
Definition: txrx_worker.h:56
gen_tag_t::frame_id_
uint32_t frame_id_
Definition: message.h:32
txrx_worker.h
txrx worker thread definition. This is the parent / interface
TxRxWorker::running_
bool running_
Definition: txrx_worker.h:60
Config::BsRruAddr
std::string BsRruAddr() const
Definition: config.h:302
ClassFunctioWrapper
static void ClassFunctioWrapper(TxRxWorkerDpdk *context)
Definition: txrx_worker_dpdk.cc:83