Agora  1.2.0
Agora project
mpmc_blocking_q.h
Go to the documentation of this file.
1 // Copyright(c) 2015-present, Gabi Melman & spdlog contributors.
2 // Distributed under the MIT License (http://opensource.org/licenses/MIT)
3 
4 #pragma once
5 
6 // multi producer-multi consumer blocking queue.
7 // enqueue(..) - will block until room found to put the new message.
8 // enqueue_nowait(..) - will return immediately with false if no room left in
9 // the queue.
10 // dequeue_for(..) - will block until the queue is not empty or timeout have
11 // passed.
12 
14 
15 #include <condition_variable>
16 #include <mutex>
17 
18 namespace spdlog {
19 namespace details {
20 
21 template<typename T>
23 {
24 public:
25  using item_type = T;
26  explicit mpmc_blocking_queue(size_t max_items)
27  : q_(max_items)
28  {}
29 
30 #ifndef __MINGW32__
31  // try to enqueue and block if no room left
32  void enqueue(T &&item)
33  {
34  {
35  std::unique_lock<std::mutex> lock(queue_mutex_);
36  pop_cv_.wait(lock, [this] { return !this->q_.full(); });
37  q_.push_back(std::move(item));
38  }
39  push_cv_.notify_one();
40  }
41 
42  // enqueue immediately. overrun oldest message in the queue if no room left.
43  void enqueue_nowait(T &&item)
44  {
45  {
46  std::unique_lock<std::mutex> lock(queue_mutex_);
47  q_.push_back(std::move(item));
48  }
49  push_cv_.notify_one();
50  }
51 
52  // try to dequeue item. if no item found. wait up to timeout and try again
53  // Return true, if succeeded dequeue item, false otherwise
54  bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration)
55  {
56  {
57  std::unique_lock<std::mutex> lock(queue_mutex_);
58  if (!push_cv_.wait_for(lock, wait_duration, [this] { return !this->q_.empty(); }))
59  {
60  return false;
61  }
62  popped_item = std::move(q_.front());
63  q_.pop_front();
64  }
65  pop_cv_.notify_one();
66  return true;
67  }
68 
69 #else
70  // apparently mingw deadlocks if the mutex is released before cv.notify_one(),
71  // so release the mutex at the very end each function.
72 
73  // try to enqueue and block if no room left
74  void enqueue(T &&item)
75  {
76  std::unique_lock<std::mutex> lock(queue_mutex_);
77  pop_cv_.wait(lock, [this] { return !this->q_.full(); });
78  q_.push_back(std::move(item));
79  push_cv_.notify_one();
80  }
81 
82  // enqueue immediately. overrun oldest message in the queue if no room left.
83  void enqueue_nowait(T &&item)
84  {
85  std::unique_lock<std::mutex> lock(queue_mutex_);
86  q_.push_back(std::move(item));
87  push_cv_.notify_one();
88  }
89 
90  // try to dequeue item. if no item found. wait up to timeout and try again
91  // Return true, if succeeded dequeue item, false otherwise
92  bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration)
93  {
94  std::unique_lock<std::mutex> lock(queue_mutex_);
95  if (!push_cv_.wait_for(lock, wait_duration, [this] { return !this->q_.empty(); }))
96  {
97  return false;
98  }
99  popped_item = std::move(q_.front());
100  q_.pop_front();
101  pop_cv_.notify_one();
102  return true;
103  }
104 
105 #endif
106 
108  {
109  std::unique_lock<std::mutex> lock(queue_mutex_);
110  return q_.overrun_counter();
111  }
112 
113  size_t size()
114  {
115  std::unique_lock<std::mutex> lock(queue_mutex_);
116  return q_.size();
117  }
118 
119 private:
120  std::mutex queue_mutex_;
121  std::condition_variable push_cv_;
122  std::condition_variable pop_cv_;
124 };
125 } // namespace details
126 } // namespace spdlog
spdlog::details::mpmc_blocking_queue::mpmc_blocking_queue
mpmc_blocking_queue(size_t max_items)
Definition: mpmc_blocking_q.h:26
circular_q.h
spdlog::details::mpmc_blocking_queue::q_
spdlog::details::circular_q< T > q_
Definition: mpmc_blocking_q.h:123
spdlog::details::mpmc_blocking_queue::pop_cv_
std::condition_variable pop_cv_
Definition: mpmc_blocking_q.h:122
spdlog::details::mpmc_blocking_queue::push_cv_
std::condition_variable push_cv_
Definition: mpmc_blocking_q.h:121
spdlog::details::mpmc_blocking_queue::overrun_counter
size_t overrun_counter()
Definition: mpmc_blocking_q.h:107
spdlog::details::mpmc_blocking_queue
Definition: mpmc_blocking_q.h:22
T
T
Definition: simulate_performance.m:4
spdlog
Definition: async.h:25
spdlog::details::mpmc_blocking_queue::size
size_t size()
Definition: mpmc_blocking_q.h:113
spdlog::details::mpmc_blocking_queue::dequeue_for
bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration)
Definition: mpmc_blocking_q.h:54
spdlog::details::mpmc_blocking_queue< item_type >::item_type
item_type item_type
Definition: mpmc_blocking_q.h:25
spdlog::details::mpmc_blocking_queue::enqueue
void enqueue(T &&item)
Definition: mpmc_blocking_q.h:32
spdlog::details::mpmc_blocking_queue::queue_mutex_
std::mutex queue_mutex_
Definition: mpmc_blocking_q.h:120
spdlog::details::circular_q
Definition: circular_q.h:13
spdlog::details::mpmc_blocking_queue::enqueue_nowait
void enqueue_nowait(T &&item)
Definition: mpmc_blocking_q.h:43