From 6cc6a7b655f7c1aaeb2e1dc42ab4f2ff858b4f2a Mon Sep 17 00:00:00 2001 From: gabime Date: Sat, 7 Dec 2024 16:41:54 +0200 Subject: [PATCH] wip async sink --- include/spdlog/details/async_log_msg.h | 10 +- include/spdlog/sinks/async_sink.h | 123 +++++++++++++++++-------- src/details/async_log_msg.cpp | 5 +- 3 files changed, 94 insertions(+), 44 deletions(-) diff --git a/include/spdlog/details/async_log_msg.h b/include/spdlog/details/async_log_msg.h index f9782f87..52dee73d 100644 --- a/include/spdlog/details/async_log_msg.h +++ b/include/spdlog/details/async_log_msg.h @@ -15,10 +15,10 @@ namespace details { class SPDLOG_API async_log_msg : public log_msg { public: - enum class msg_type:std::uint8_t { log, flush, terminate }; + enum class type:std::uint8_t { log, flush, terminate }; async_log_msg() = default; - explicit async_log_msg(msg_type type); - async_log_msg(msg_type type, const log_msg &orig_msg); + explicit async_log_msg(type type); + async_log_msg(type type, const log_msg &orig_msg); ~async_log_msg() = default; async_log_msg(const async_log_msg &other); @@ -26,9 +26,9 @@ public: async_log_msg &operator=(const async_log_msg &other); async_log_msg &operator=(async_log_msg &&other) noexcept; - msg_type message_type() const {return msg_type_;} + type message_type() const {return msg_type_;} private: - msg_type msg_type_{msg_type::log}; + type msg_type_{type::log}; memory_buf_t buffer_; void update_string_views(); }; diff --git a/include/spdlog/sinks/async_sink.h b/include/spdlog/sinks/async_sink.h index 773275a1..359475ed 100644 --- a/include/spdlog/sinks/async_sink.h +++ b/include/spdlog/sinks/async_sink.h @@ -1,10 +1,17 @@ #pragma once +#include #include +#include +#include #include "../details/async_log_msg.h" #include "../details/mpmc_blocking_q.h" -#include "./dist_sink.h" +#include "dist_sink.h" + +// async_sink is a sink that sends log messages to a dist_sink in a separate thread using a queue. +// The worker thread dequeues the messages and sends them to the dist_sink to perform the actual logging. +// The worker thread is terminated when the async_sink is destroyed. namespace spdlog { namespace sinks { @@ -12,52 +19,96 @@ namespace sinks { template class async_sink final : public dist_sink { public: - using base_t = dist_sink; - using queue_t = details::mpmc_blocking_queue; - async_sink() { - q_ = std::make_unique(8192); - worker_thread_ = std::thread([this] { - details::async_log_msg incoming_msg; - for (;;) { - q_->dequeue(incoming_msg); - if (incoming_msg.message_type() == details::async_log_msg::msg_type::terminate) { - break; + using base_t = dist_sink; + using async_message_t = details::async_log_msg; + using queue_t = details::mpmc_blocking_queue; + enum { max_queue_size = 1024 * 1024 * 10 }; + + // Async overflow policy - block by default. + enum class overflow_policy : std::uint8_t { + block, // Block until message can be enqueued (default + overrun_oldest, // Discard oldest message in the queue if full when trying to + // add new item. + discard_new // Discard new message if the queue is full when trying to add new item. + }; + + explicit async_sink(size_t queue_size) { + if (queue_size == 0 || queue_size > max_queue_size) { + throw spdlog_ex("async_sink: invalid queue size"); } - base_t::sink_it_(incoming_msg); - } - }); - } - ~async_sink() override { - try { - q_->enqueue(details::async_log_msg(details::async_log_msg::msg_type::terminate)); - worker_thread_.join(); - } catch (...) {} - }; + // printf("........... Allocating queue: slot: %zu X %zu bytes ====> %lld KB ..............\n", + // queue_size, sizeof(details::async_log_msg), (sizeof(details::async_log_msg) * queue_size)/1024); + q_ = std::make_unique(queue_size); - async_sink(const async_sink &) = delete; - async_sink &operator=(const async_sink &) = delete; - async_sink(async_sink &&) = default; - async_sink &operator=(async_sink &&) = default; + worker_thread_ = std::thread([this] { + details::async_log_msg incoming_msg; + for (;;) { + q_->dequeue(incoming_msg); + if (incoming_msg.message_type() == async_message_t::type::terminate) { + break; + } + base_t::sink_it_(incoming_msg); + } + }); + } + ~async_sink() override { + try { + q_->enqueue(async_message_t(async_message_t::type::terminate)); + worker_thread_.join(); + } catch (...) { + } + }; + async_sink(const async_sink &) = delete; + async_sink &operator=(const async_sink &) = delete; + async_sink(async_sink &&) = default; + async_sink &operator=(async_sink &&) = default; + void set_overflow_policy(overflow_policy policy) { overflow_policy_ = policy; } + [[nodiscard]] overflow_policy get_overflow_policy() const { return overflow_policy_; } + [[nodiscard]] size_t get_overrun_counter() const { return q_->overrun_counter(); } + void reset_overrun_counter() { q_->reset_overrun_counter(); } + + [[nodiscard]] size_t get_discard_counter() const { return q_->discard_counter(); } + void reset_discard_counter() { q_->reset_discard_counter(); } private: - void sink_it_(const details::log_msg &msg) override { - // Asynchronously send the log message to the base sink - q_->enqueue(details::async_log_msg(details::async_log_msg::msg_type::log, msg)); - } - void flush_() override { - // Asynchronously flush the base sink - q_->enqueue(details::async_log_msg(details::async_log_msg::msg_type::flush)); - } + void sink_it_(const details::log_msg &msg) override { + send_message_(async_message_t::type::log, msg); + } - std::thread worker_thread_; - std::unique_ptr q_; + void flush_() override { + send_message_(async_message_t::type::flush, details::log_msg()); + } + + // asynchronously send the log message to the worker thread using the queue. + // take into account the configured overflow policy. + void send_message_(const async_message_t::type msg_type, const details::log_msg &msg) { + switch (overflow_policy_) { + case overflow_policy::block: + q_->enqueue(async_message_t(msg_type, msg)); + break; + case overflow_policy::overrun_oldest: + q_->enqueue_nowait(async_message_t(msg_type, msg)); + break; + case overflow_policy::discard_new: + q_->enqueue_if_have_room(async_message_t(msg_type, msg)); + break; + default: + assert(false); + throw spdlog_ex("async_sink: invalid overflow policy"); + } + + } + + std::atomic overflow_policy_ = overflow_policy::block; + std::unique_ptr q_; + std::thread worker_thread_; }; using async_sink_mt = async_sink; using async_sink_st = async_sink; -} // namespace sinks +} // namespace sinks } // namespace spdlog diff --git a/src/details/async_log_msg.cpp b/src/details/async_log_msg.cpp index 81632cb9..2d22a212 100644 --- a/src/details/async_log_msg.cpp +++ b/src/details/async_log_msg.cpp @@ -7,14 +7,14 @@ namespace spdlog { namespace details { -async_log_msg::async_log_msg(const msg_type type) +async_log_msg::async_log_msg(const type type) : msg_type_{type} {} // copy logger name and payload to buffer so can be used asynchronously // note: source location pointers are copied without allocation since they // are compiler generated const chars* (__FILE__, __LINE__, __FUNCTION__) // if you pass custom strings to source location, make sure they outlive the async_log_msg -async_log_msg::async_log_msg(const msg_type type, const log_msg &orig_msg) +async_log_msg::async_log_msg(const type type, const log_msg &orig_msg) : log_msg{orig_msg}, msg_type_(type) { buffer_.append(logger_name); buffer_.append(payload); @@ -23,7 +23,6 @@ async_log_msg::async_log_msg(const msg_type type, const log_msg &orig_msg) async_log_msg::async_log_msg(const async_log_msg &other) : log_msg{other}, msg_type_{other.msg_type_} { - buffer_.append(logger_name); buffer_.append(payload); update_string_views();