Agora  1.2.0
Agora project
thread_pool-inl.h
Go to the documentation of this file.
1 // Copyright(c) 2015-present, Gabi Melman & spdlog contributors.
2 // Distributed under the MIT License (http://opensource.org/licenses/MIT)
3 
4 #pragma once
5 
6 #ifndef SPDLOG_HEADER_ONLY
8 #endif
9 
10 #include <spdlog/common.h>
11 #include <cassert>
12 
13 namespace spdlog {
14 namespace details {
15 
17  size_t q_max_items, size_t threads_n, std::function<void()> on_thread_start, std::function<void()> on_thread_stop)
18  : q_(q_max_items)
19 {
20  if (threads_n == 0 || threads_n > 1000)
21  {
22  throw_spdlog_ex("spdlog::thread_pool(): invalid threads_n param (valid "
23  "range is 1-1000)");
24  }
25  for (size_t i = 0; i < threads_n; i++)
26  {
27  threads_.emplace_back([this, on_thread_start, on_thread_stop] {
28  on_thread_start();
30  on_thread_stop();
31  });
32  }
33 }
34 
35 SPDLOG_INLINE thread_pool::thread_pool(size_t q_max_items, size_t threads_n, std::function<void()> on_thread_start)
36  : thread_pool(q_max_items, threads_n, on_thread_start, [] {})
37 {}
38 
39 SPDLOG_INLINE thread_pool::thread_pool(size_t q_max_items, size_t threads_n)
40  : thread_pool(
41  q_max_items, threads_n, [] {}, [] {})
42 {}
43 
44 // message all threads to terminate gracefully join them
46 {
48  {
49  for (size_t i = 0; i < threads_.size(); i++)
50  {
52  }
53 
54  for (auto &t : threads_)
55  {
56  t.join();
57  }
58  }
60 }
61 
63 {
64  async_msg async_m(std::move(worker_ptr), async_msg_type::log, msg);
65  post_async_msg_(std::move(async_m), overflow_policy);
66 }
67 
69 {
70  post_async_msg_(async_msg(std::move(worker_ptr), async_msg_type::flush), overflow_policy);
71 }
72 
74 {
75  return q_.overrun_counter();
76 }
77 
79 {
80  return q_.size();
81 }
82 
84 {
85  if (overflow_policy == async_overflow_policy::block)
86  {
87  q_.enqueue(std::move(new_msg));
88  }
89  else
90  {
91  q_.enqueue_nowait(std::move(new_msg));
92  }
93 }
94 
96 {
97  while (process_next_msg_()) {}
98 }
99 
100 // process next message in the queue
101 // return true if this thread should still be active (while no terminate msg
102 // was received)
104 {
105  async_msg incoming_async_msg;
106  bool dequeued = q_.dequeue_for(incoming_async_msg, std::chrono::seconds(10));
107  if (!dequeued)
108  {
109  return true;
110  }
111 
112  switch (incoming_async_msg.msg_type)
113  {
114  case async_msg_type::log: {
115  incoming_async_msg.worker_ptr->backend_sink_it_(incoming_async_msg);
116  return true;
117  }
118  case async_msg_type::flush: {
119  incoming_async_msg.worker_ptr->backend_flush_();
120  return true;
121  }
122 
124  return false;
125  }
126 
127  default: {
128  assert(false);
129  }
130  }
131 
132  return true;
133 }
134 
135 } // namespace details
136 } // namespace spdlog
spdlog::details::thread_pool::post_log
void post_log(async_logger_ptr &&worker_ptr, const details::log_msg &msg, async_overflow_policy overflow_policy)
Definition: thread_pool-inl.h:62
spdlog::details::async_msg::msg_type
async_msg_type msg_type
Definition: thread_pool.h:34
spdlog::details::thread_pool::queue_size
size_t queue_size()
Definition: thread_pool-inl.h:78
spdlog::details::thread_pool::post_flush
void post_flush(async_logger_ptr &&worker_ptr, async_overflow_policy overflow_policy)
Definition: thread_pool-inl.h:68
thread_pool.h
spdlog::details::thread_pool::overrun_counter
size_t overrun_counter()
Definition: thread_pool-inl.h:73
spdlog::details::mpmc_blocking_queue::overrun_counter
size_t overrun_counter()
Definition: mpmc_blocking_q.h:107
spdlog::details::async_logger_ptr
std::shared_ptr< spdlog::async_logger > async_logger_ptr
Definition: thread_pool.h:21
SPDLOG_CATCH_STD
#define SPDLOG_CATCH_STD
Definition: common.h:104
spdlog::details::thread_pool
Definition: thread_pool.h:81
spdlog::details::thread_pool::threads_
std::vector< std::thread > threads_
Definition: thread_pool.h:105
spdlog
Definition: async.h:25
spdlog::throw_spdlog_ex
SPDLOG_INLINE void throw_spdlog_ex(const std::string &msg, int last_errno)
Definition: common-inl.h:72
spdlog::details::thread_pool::thread_pool
thread_pool(size_t q_max_items, size_t threads_n, std::function< void()> on_thread_start, std::function< void()> on_thread_stop)
Definition: thread_pool-inl.h:16
spdlog::details::mpmc_blocking_queue::size
size_t size()
Definition: mpmc_blocking_q.h:113
i
for i
Definition: generate_data.m:107
spdlog::details::mpmc_blocking_queue::dequeue_for
bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration)
Definition: mpmc_blocking_q.h:54
SPDLOG_INLINE
#define SPDLOG_INLINE
Definition: common.h:42
spdlog::details::async_msg
Definition: thread_pool.h:32
spdlog::details::async_msg::worker_ptr
async_logger_ptr worker_ptr
Definition: thread_pool.h:35
spdlog::details::thread_pool::worker_loop_
void worker_loop_()
Definition: thread_pool-inl.h:95
spdlog::async_overflow_policy::block
@ block
function
function[avg_proc_duration, std_proc_duration]
Definition: parse_dl_file.m:1
common.h
spdlog::details::thread_pool::q_
q_type q_
Definition: thread_pool.h:103
spdlog::details::log_msg
Definition: log_msg.h:11
spdlog::details::thread_pool::process_next_msg_
bool process_next_msg_()
Definition: thread_pool-inl.h:103
spdlog::details::async_msg_type::flush
@ flush
spdlog::details::async_msg_type::log
@ log
spdlog::details::mpmc_blocking_queue::enqueue
void enqueue(T &&item)
Definition: mpmc_blocking_q.h:32
SPDLOG_TRY
#define SPDLOG_TRY
Definition: common.h:102
spdlog::details::thread_pool::post_async_msg_
void post_async_msg_(async_msg &&new_msg, async_overflow_policy overflow_policy)
Definition: thread_pool-inl.h:83
spdlog::details::thread_pool::~thread_pool
~thread_pool()
Definition: thread_pool-inl.h:45
spdlog::async_overflow_policy
async_overflow_policy
Definition: async_logger.h:22
spdlog::details::async_msg_type::terminate
@ terminate
spdlog::details::mpmc_blocking_queue::enqueue_nowait
void enqueue_nowait(T &&item)
Definition: mpmc_blocking_q.h:43