default_queue_size

This commit is contained in:
gabime 2024-12-07 16:47:22 +02:00
parent 16eef3ebf1
commit 43998c07f9

View File

@ -20,9 +20,9 @@ template <typename Mutex>
class async_sink final : public dist_sink<Mutex> { class async_sink final : public dist_sink<Mutex> {
public: public:
using base_t = dist_sink<Mutex>; using base_t = dist_sink<Mutex>;
using async_message_t = details::async_log_msg; using async_log_msg = details::async_log_msg;
using queue_t = details::mpmc_blocking_queue<async_message_t>; using queue_t = details::mpmc_blocking_queue<async_log_msg>;
enum { max_queue_size = 1024 * 1024 * 10 }; enum { default_queue_size = 8192, max_queue_size = 1024 * 1024 * 10 };
// Async overflow policy - block by default. // Async overflow policy - block by default.
enum class overflow_policy : std::uint8_t { enum class overflow_policy : std::uint8_t {
@ -32,7 +32,7 @@ public:
discard_new // Discard new message if the queue is full when trying to add new item. discard_new // Discard new message if the queue is full when trying to add new item.
}; };
explicit async_sink(size_t queue_size) { explicit async_sink(size_t queue_size = default_queue_size) {
if (queue_size == 0 || queue_size > max_queue_size) { if (queue_size == 0 || queue_size > max_queue_size) {
throw spdlog_ex("async_sink: invalid queue size"); throw spdlog_ex("async_sink: invalid queue size");
} }
@ -44,7 +44,7 @@ public:
details::async_log_msg incoming_msg; details::async_log_msg incoming_msg;
for (;;) { for (;;) {
q_->dequeue(incoming_msg); q_->dequeue(incoming_msg);
if (incoming_msg.message_type() == async_message_t::type::terminate) { if (incoming_msg.message_type() == async_log_msg::type::terminate) {
break; break;
} }
base_t::sink_it_(incoming_msg); base_t::sink_it_(incoming_msg);
@ -53,7 +53,7 @@ public:
} }
~async_sink() override { ~async_sink() override {
try { try {
q_->enqueue(async_message_t(async_message_t::type::terminate)); q_->enqueue(async_log_msg(async_log_msg::type::terminate));
worker_thread_.join(); worker_thread_.join();
} catch (...) { } catch (...) {
} }
@ -75,25 +75,25 @@ public:
private: private:
void sink_it_(const details::log_msg &msg) override { void sink_it_(const details::log_msg &msg) override {
send_message_(async_message_t::type::log, msg); send_message_(async_log_msg::type::log, msg);
} }
void flush_() override { void flush_() override {
send_message_(async_message_t::type::flush, details::log_msg()); send_message_(async_log_msg::type::flush, details::log_msg());
} }
// asynchronously send the log message to the worker thread using the queue. // asynchronously send the log message to the worker thread using the queue.
// take into account the configured overflow policy. // take into account the configured overflow policy.
void send_message_(const async_message_t::type msg_type, const details::log_msg &msg) { void send_message_(const async_log_msg::type msg_type, const details::log_msg &msg) {
switch (overflow_policy_) { switch (overflow_policy_) {
case overflow_policy::block: case overflow_policy::block:
q_->enqueue(async_message_t(msg_type, msg)); q_->enqueue(async_log_msg(msg_type, msg));
break; break;
case overflow_policy::overrun_oldest: case overflow_policy::overrun_oldest:
q_->enqueue_nowait(async_message_t(msg_type, msg)); q_->enqueue_nowait(async_log_msg(msg_type, msg));
break; break;
case overflow_policy::discard_new: case overflow_policy::discard_new:
q_->enqueue_if_have_room(async_message_t(msg_type, msg)); q_->enqueue_if_have_room(async_log_msg(msg_type, msg));
break; break;
default: default:
assert(false); assert(false);