Agora  1.2.0
Agora project
recorder_thread.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2018-2020
3  RENEW OPEN SOURCE LICENSE: http://renew-wireless.org/license
4 
5 ----------------------------------------------------------------------
6 Event based message queue thread class for the recorder worker
7 ---------------------------------------------------------------------
8 */
9 #ifndef AGORA_RECORDER_THREAD_H_
10 #define AGORA_RECORDER_THREAD_H_
11 
12 #include <condition_variable>
13 #include <mutex>
14 #include <vector>
15 
16 #include "concurrentqueue.h"
17 #include "recorder_worker.h"
18 
19 namespace Agora_recorder {
20 
22  public:
23  RecorderThread(const Config *in_cfg, size_t thread_id, int core,
24  size_t queue_size, size_t antenna_offset, size_t num_antennas,
25  size_t interval, Direction rx_direction,
26  const std::vector<RecorderWorker::RecorderWorkerTypes> &types,
27  bool wait_signal = true);
29 
30  void Start();
31  void Stop();
32  bool DispatchWork(const EventData &event);
33 
34  private:
35  /*Main threading loop */
36  void DoRecording();
37  void HandleEvent(const EventData &event);
38  void Finalize();
39 
40  // 1 - Producer (dispatcher), 1 - Consumer
43  std::vector<std::unique_ptr<RecorderWorker>> workers_;
44  std::thread thread_;
45 
46  size_t id_;
47 
48  /* >= 0 to assign a core to the thread
49  * <0 to disable thread core assignment */
51 
52  /* Synchronization for startup and sleeping */
53  /* Setting wait signal to false will disable the thread waiting on new message
54  * may cause excessive CPU load for infrequent messages.
55  * However, when the message processing time ~= queue posting time the mutex
56  * could become unnecessary work
57  */
59  std::mutex sync_;
60  std::condition_variable condition_;
61  bool running_;
62 };
63 }; // namespace Agora_recorder
64 
65 #endif /* AGORA_RECORDER_THREAD_H_ */
Agora_recorder::RecorderThread
Definition: recorder_thread.h:21
ThreadType::kRecorderWorker
@ kRecorderWorker
Agora_recorder::RecorderThread::sync_
std::mutex sync_
Definition: recorder_thread.h:59
moodycamel::ProducerToken
Definition: concurrentqueue.h:630
Agora_recorder::RecorderThread::DoRecording
void DoRecording()
Definition: recorder_thread.cc:94
moodycamel::details::thread_id
thread_id_t thread_id()
Definition: concurrentqueue.h:157
Agora_recorder::RecorderThread::DispatchWork
bool DispatchWork(const EventData &event)
Definition: recorder_thread.cc:73
PinToCoreWithOffset
void PinToCoreWithOffset(ThreadType thread_type, size_t core_offset, size_t thread_id, bool allow_reuse, bool verbose)
Definition: utils.cc:157
EventType::kThreadTermination
@ kThreadTermination
Agora_recorder::RecorderThread::RecorderThread
RecorderThread(const Config *in_cfg, size_t thread_id, int core, size_t queue_size, size_t antenna_offset, size_t num_antennas, size_t interval, Direction rx_direction, const std::vector< RecorderWorker::RecorderWorkerTypes > &types, bool wait_signal=true)
Definition: recorder_thread.cc:17
Agora_recorder::RecorderThread::wait_signal_
bool wait_signal_
Definition: recorder_thread.h:58
Direction
Direction
Definition: symbols.h:39
Agora_recorder::RecorderThread::Finalize
void Finalize()
Definition: recorder_thread.cc:63
recorder_worker.h
Recorder worker interface factory.
Agora_recorder::RecorderThread::HandleEvent
void HandleEvent(const EventData &event)
Definition: recorder_thread.cc:141
EventData::tags_
std::array< size_t, kMaxTags > tags_
Definition: message.h:146
AGORA_LOG_ERROR
#define AGORA_LOG_ERROR(...)
Definition: logger.h:39
Agora_recorder
Definition: hdf5_lib.cc:17
AGORA_LOG_TRACE
#define AGORA_LOG_TRACE(...)
Definition: logger.h:92
rx_tag_t
Definition: message.h:227
EventData::event_type_
EventType event_type_
Definition: message.h:144
Agora_recorder::RecorderThread::~RecorderThread
~RecorderThread()
Definition: recorder_thread.cc:41
EventData
Definition: message.h:142
Agora_recorder::RecorderThread::running_
bool running_
Definition: recorder_thread.h:61
Agora_recorder::RecorderThread::id_
size_t id_
Definition: recorder_thread.h:46
message.h
Self defined functions for message storage and passing.
moodycamel::ConcurrentQueue::enqueue
bool enqueue(T const &item)
Definition: concurrentqueue.h:974
Agora_recorder::RecorderThread::workers_
std::vector< std::unique_ptr< RecorderWorker > > workers_
Definition: recorder_thread.h:43
u
Plot Rx waveform for u
Definition: inspect_single_frame.m:108
Agora_recorder::RecorderThread::core_alloc_
int core_alloc_
Definition: recorder_thread.h:50
Agora_recorder::RecorderThread::event_queue_
moodycamel::ConcurrentQueue< EventData > event_queue_
Definition: recorder_thread.h:41
EventType::kPacketRX
@ kPacketRX
Agora_recorder::RecorderThread::thread_
std::thread thread_
Definition: recorder_thread.h:44
moodycamel::ConcurrentQueue< EventData >
Config
Definition: config.h:26
AGORA_LOG_INFO
#define AGORA_LOG_INFO(...)
Definition: logger.h:62
recorder_thread.h
AGORA_LOG_WARN
#define AGORA_LOG_WARN(...)
Definition: logger.h:53
Agora_recorder::RecorderThread::Start
void Start()
Definition: recorder_thread.cc:45
Agora_recorder::RecorderThread::Stop
void Stop()
Definition: recorder_thread.cc:57
moodycamel::ConcurrentQueue::try_dequeue
bool try_dequeue(U &item)
Definition: concurrentqueue.h:1104
moodycamel::ConsumerToken
Definition: concurrentqueue.h:695
moodycamel::ConcurrentQueue::try_enqueue
bool try_enqueue(T const &item)
Definition: concurrentqueue.h:1039
Agora_recorder::RecorderThread::condition_
std::condition_variable condition_
Definition: recorder_thread.h:60
Agora_recorder::RecorderWorker::Create
static std::unique_ptr< RecorderWorker > Create(RecorderWorkerTypes type, const Config *in_cfg, size_t antenna_offset, size_t num_antennas, size_t record_interval, Direction rx_direction)
Factory function to make concrete worker.
Definition: recorder_worker.cc:14
RxPacket::Free
void Free()
Definition: message.h:216
concurrentqueue.h
Agora_recorder::RecorderThread::producer_token_
moodycamel::ProducerToken producer_token_
Definition: recorder_thread.h:42
rx_tag_t::rx_packet_
RxPacket * rx_packet_
Definition: message.h:228