Agora  1.2.0
Agora project
rx_status_tracker.h
Go to the documentation of this file.
1 
5 #ifndef RX_STATUS_TRACKER_H_
6 #define RX_STATUS_TRACKER_H_
7 
8 #include <complex>
9 #include <cstddef>
10 #include <vector>
11 
12 #include "logger.h"
13 #include "message.h"
14 
15 namespace TxRxWorkerRx {
16 
18  public:
21  size_t sample_offset_ = 0;
22  };
23 
24  explicit RxStatusTracker(size_t number_channels)
25  : tracking_(number_channels) {
26  for (auto& tracker : tracking_) {
27  tracker.sample_offset_ = 0;
28  tracker.rx_packet_memory_ = nullptr;
29  }
30  }
31 
32  void Reset(std::vector<RxPacket*> new_packets) {
35  for (size_t i = 0; i < tracking_.size(); i++) {
36  auto& tracker = tracking_.at(i);
37  tracker.sample_offset_ = 0;
38  tracker.rx_packet_memory_ = new_packets.at(i);
39  }
40  }
41 
42  //Reset, reusing the same rx packet memory locations
43  void Reset() {
44  std::vector<RxPacket*> new_packets;
45  for (auto& tracker : tracking_) {
46  new_packets.emplace_back(tracker.rx_packet_memory_);
47  }
48  Reset(new_packets);
49  }
50 
53  void DiscardOld(size_t new_samples, long long sample_rx_start) {
54  const size_t num_bytes_in_sample = sizeof(std::complex<int16_t>);
55  //rx_loc is the start of the new samples
56  for (auto* rx_loc : GetRxPtrs()) {
57  auto* buf_start =
58  reinterpret_cast<std::complex<int16_t>*>(rx_loc) - samples_available_;
60  "DiscardOld - Shifting %zu samples to start. Ignoring %zu, Current "
61  "location %ld, Start Location %ld\n",
62  new_samples, samples_available_, reinterpret_cast<intptr_t>(rx_loc),
63  reinterpret_cast<intptr_t>(buf_start));
64  ::memmove(buf_start, rx_loc, new_samples * num_bytes_in_sample);
65  }
66  Reset();
67  Update(new_samples, sample_rx_start);
68  }
69 
70  std::vector<RxPacket*> GetRxPackets() const {
71  const size_t num_packets = tracking_.size();
72  std::vector<RxPacket*> rx_packets;
73  rx_packets.reserve(num_packets);
74  for (const auto& rx_channel : tracking_) {
75  rx_packets.emplace_back(rx_channel.rx_packet_memory_);
76  }
77  return rx_packets;
78  }
79 
80  //Append new samples to data_set tracker
81  void Update(size_t new_samples, long long sample_rx_start) {
82  if (samples_available_ == 0) {
83  RtAssert(
85  "RxStatusTracker::Update - Expected samples start time to be 0\n");
86  sample_start_rx_time_ = sample_rx_start;
87  } else {
88  const bool is_continuous = CheckContinuity(sample_rx_start);
89  //New start == 0, means there was frags left from soapy (typically)
90  if (is_continuous == false) {
92  "RxStatusTracker::Update - Available %zu Rx Start %lld, New Start "
93  "%lld\n",
94  samples_available_, sample_start_rx_time_, sample_rx_start);
95  throw std::runtime_error("Unexpected sample rx time");
96  }
97  }
98 
99  //Update (incrememt) the rx memory locations by the number of new samples
100  for (auto& tracker : tracking_) {
101  tracker.sample_offset_ += new_samples;
102  RtAssert(tracker.rx_packet_memory_ != nullptr,
103  "RxStatusTracker::Update - Rx packet memory to be assigned\n");
104  }
105  samples_available_ += new_samples;
106  }
107 
108  bool CheckContinuity(long long sample_rx_start) const {
109  bool has_continuity = true;
110  if (samples_available_ > 0 && (sample_rx_start != 0)) {
111  const long long expected_start =
113 
114  if (expected_start != sample_rx_start) {
115  has_continuity = false;
116  }
117  }
118  return has_continuity;
119  }
120 
121  inline size_t SamplesAvailable() const { return samples_available_; }
122  inline size_t NumChannels() const { return tracking_.size(); }
123  inline long long StartTime() const { return sample_start_rx_time_; }
124  //Get memory locations for Rx calls
125  inline std::vector<void*> GetRxPtrs() const {
126  const size_t num_locations = tracking_.size();
127  std::vector<void*> rx_locations;
128  rx_locations.reserve(num_locations);
129  for (const auto& rx_channel : tracking_) {
130  rx_locations.emplace_back(
131  static_cast<void*>(&reinterpret_cast<std::complex<int16_t>*>(
132  rx_channel.rx_packet_memory_->RawPacket()
133  ->data_)[rx_channel.sample_offset_]));
134  }
135  return rx_locations;
136  }
137 
138  private:
139  size_t samples_available_ = 0;
140  long long sample_start_rx_time_ = 0;
141  // For each channel
142  std::vector<RxStatusPerChannelTracker> tracking_;
143 };
144 } // namespace TxRxWorkerRx
145 
146 #endif // RX_STATUS_TRACKER_H_
TxRxWorkerRx::RxStatusTracker::RxStatusPerChannelTracker::sample_offset_
size_t sample_offset_
Definition: rx_status_tracker.h:21
TxRxWorkerRx::RxStatusTracker::DiscardOld
void DiscardOld(size_t new_samples, long long sample_rx_start)
Definition: rx_status_tracker.h:53
TxRxWorkerRx::RxStatusTracker::RxStatusPerChannelTracker
Definition: rx_status_tracker.h:19
TxRxWorkerRx::RxStatusTracker::GetRxPackets
std::vector< RxPacket * > GetRxPackets() const
Definition: rx_status_tracker.h:70
message.h
Self defined functions for message storage and passing.
i
for i
Definition: generate_data.m:107
TxRxWorkerRx::RxStatusTracker
Definition: rx_status_tracker.h:17
TxRxWorkerRx::RxStatusTracker::RxStatusTracker
RxStatusTracker(size_t number_channels)
Definition: rx_status_tracker.h:24
TxRxWorkerRx::RxStatusTracker::GetRxPtrs
std::vector< void * > GetRxPtrs() const
Definition: rx_status_tracker.h:125
TxRxWorkerRx::RxStatusTracker::tracking_
std::vector< RxStatusPerChannelTracker > tracking_
Definition: rx_status_tracker.h:142
TxRxWorkerRx::RxStatusTracker::RxStatusPerChannelTracker::rx_packet_memory_
RxPacket * rx_packet_memory_
Definition: rx_status_tracker.h:20
TxRxWorkerRx::RxStatusTracker::Update
void Update(size_t new_samples, long long sample_rx_start)
Definition: rx_status_tracker.h:81
TxRxWorkerRx::RxStatusTracker::NumChannels
size_t NumChannels() const
Definition: rx_status_tracker.h:122
AGORA_LOG_INFO
#define AGORA_LOG_INFO(...)
Definition: logger.h:62
TxRxWorkerRx
Definition: rx_status_tracker.h:15
RtAssert
static void RtAssert(bool condition, const char *throw_str)
Definition: utils.h:104
AGORA_LOG_WARN
#define AGORA_LOG_WARN(...)
Definition: logger.h:53
TxRxWorkerRx::RxStatusTracker::samples_available_
size_t samples_available_
Definition: rx_status_tracker.h:139
RxPacket
Definition: message.h:186
TxRxWorkerRx::RxStatusTracker::SamplesAvailable
size_t SamplesAvailable() const
Definition: rx_status_tracker.h:121
TxRxWorkerRx::RxStatusTracker::Reset
void Reset()
Definition: rx_status_tracker.h:43
TxRxWorkerRx::RxStatusTracker::StartTime
long long StartTime() const
Definition: rx_status_tracker.h:123
TxRxWorkerRx::RxStatusTracker::CheckContinuity
bool CheckContinuity(long long sample_rx_start) const
Definition: rx_status_tracker.h:108
TxRxWorkerRx::RxStatusTracker::sample_start_rx_time_
long long sample_start_rx_time_
Definition: rx_status_tracker.h:140
TxRxWorkerRx::RxStatusTracker::Reset
void Reset(std::vector< RxPacket * > new_packets)
Definition: rx_status_tracker.h:32