wip async sink

This commit is contained in:
gabime 2024-12-07 16:41:54 +02:00
parent ccde9db84c
commit 6cc6a7b655
3 changed files with 94 additions and 44 deletions

View File

@ -15,10 +15,10 @@ namespace details {
class SPDLOG_API async_log_msg : public log_msg { class SPDLOG_API async_log_msg : public log_msg {
public: public:
enum class msg_type:std::uint8_t { log, flush, terminate }; enum class type:std::uint8_t { log, flush, terminate };
async_log_msg() = default; async_log_msg() = default;
explicit async_log_msg(msg_type type); explicit async_log_msg(type type);
async_log_msg(msg_type type, const log_msg &orig_msg); async_log_msg(type type, const log_msg &orig_msg);
~async_log_msg() = default; ~async_log_msg() = default;
async_log_msg(const async_log_msg &other); async_log_msg(const async_log_msg &other);
@ -26,9 +26,9 @@ public:
async_log_msg &operator=(const async_log_msg &other); async_log_msg &operator=(const async_log_msg &other);
async_log_msg &operator=(async_log_msg &&other) noexcept; async_log_msg &operator=(async_log_msg &&other) noexcept;
msg_type message_type() const {return msg_type_;} type message_type() const {return msg_type_;}
private: private:
msg_type msg_type_{msg_type::log}; type msg_type_{type::log};
memory_buf_t buffer_; memory_buf_t buffer_;
void update_string_views(); void update_string_views();
}; };

View File

@ -1,10 +1,17 @@
#pragma once #pragma once
#include <cstdint>
#include <thread> #include <thread>
#include <atomic>
#include <cassert>
#include "../details/async_log_msg.h" #include "../details/async_log_msg.h"
#include "../details/mpmc_blocking_q.h" #include "../details/mpmc_blocking_q.h"
#include "./dist_sink.h" #include "dist_sink.h"
// async_sink is a sink that sends log messages to a dist_sink in a separate thread using a queue.
// The worker thread dequeues the messages and sends them to the dist_sink to perform the actual logging.
// The worker thread is terminated when the async_sink is destroyed.
namespace spdlog { namespace spdlog {
namespace sinks { namespace sinks {
@ -12,52 +19,96 @@ namespace sinks {
template <typename Mutex> template <typename Mutex>
class async_sink final : public dist_sink<Mutex> { class async_sink final : public dist_sink<Mutex> {
public: public:
using base_t = dist_sink<Mutex>; using base_t = dist_sink<Mutex>;
using queue_t = details::mpmc_blocking_queue<details::async_log_msg>; using async_message_t = details::async_log_msg;
async_sink() { using queue_t = details::mpmc_blocking_queue<async_message_t>;
q_ = std::make_unique<queue_t>(8192); enum { max_queue_size = 1024 * 1024 * 10 };
worker_thread_ = std::thread([this] {
details::async_log_msg incoming_msg; // Async overflow policy - block by default.
for (;;) { enum class overflow_policy : std::uint8_t {
q_->dequeue(incoming_msg); block, // Block until message can be enqueued (default
if (incoming_msg.message_type() == details::async_log_msg::msg_type::terminate) { overrun_oldest, // Discard oldest message in the queue if full when trying to
break; // add new item.
discard_new // Discard new message if the queue is full when trying to add new item.
};
explicit async_sink(size_t queue_size) {
if (queue_size == 0 || queue_size > max_queue_size) {
throw spdlog_ex("async_sink: invalid queue size");
} }
base_t::sink_it_(incoming_msg); // printf("........... Allocating queue: slot: %zu X %zu bytes ====> %lld KB ..............\n",
} // queue_size, sizeof(details::async_log_msg), (sizeof(details::async_log_msg) * queue_size)/1024);
}); q_ = std::make_unique<queue_t>(queue_size);
}
~async_sink() override {
try {
q_->enqueue(details::async_log_msg(details::async_log_msg::msg_type::terminate));
worker_thread_.join();
} catch (...) {}
};
async_sink(const async_sink &) = delete; worker_thread_ = std::thread([this] {
async_sink &operator=(const async_sink &) = delete; details::async_log_msg incoming_msg;
async_sink(async_sink &&) = default; for (;;) {
async_sink &operator=(async_sink &&) = default; q_->dequeue(incoming_msg);
if (incoming_msg.message_type() == async_message_t::type::terminate) {
break;
}
base_t::sink_it_(incoming_msg);
}
});
}
~async_sink() override {
try {
q_->enqueue(async_message_t(async_message_t::type::terminate));
worker_thread_.join();
} catch (...) {
}
};
async_sink(const async_sink &) = delete;
async_sink &operator=(const async_sink &) = delete;
async_sink(async_sink &&) = default;
async_sink &operator=(async_sink &&) = default;
void set_overflow_policy(overflow_policy policy) { overflow_policy_ = policy; }
[[nodiscard]] overflow_policy get_overflow_policy() const { return overflow_policy_; }
[[nodiscard]] size_t get_overrun_counter() const { return q_->overrun_counter(); }
void reset_overrun_counter() { q_->reset_overrun_counter(); }
[[nodiscard]] size_t get_discard_counter() const { return q_->discard_counter(); }
void reset_discard_counter() { q_->reset_discard_counter(); }
private: private:
void sink_it_(const details::log_msg &msg) override { void sink_it_(const details::log_msg &msg) override {
// Asynchronously send the log message to the base sink send_message_(async_message_t::type::log, msg);
q_->enqueue(details::async_log_msg(details::async_log_msg::msg_type::log, msg)); }
}
void flush_() override {
// Asynchronously flush the base sink
q_->enqueue(details::async_log_msg(details::async_log_msg::msg_type::flush));
}
std::thread worker_thread_; void flush_() override {
std::unique_ptr<queue_t> q_; send_message_(async_message_t::type::flush, details::log_msg());
}
// asynchronously send the log message to the worker thread using the queue.
// take into account the configured overflow policy.
void send_message_(const async_message_t::type msg_type, const details::log_msg &msg) {
switch (overflow_policy_) {
case overflow_policy::block:
q_->enqueue(async_message_t(msg_type, msg));
break;
case overflow_policy::overrun_oldest:
q_->enqueue_nowait(async_message_t(msg_type, msg));
break;
case overflow_policy::discard_new:
q_->enqueue_if_have_room(async_message_t(msg_type, msg));
break;
default:
assert(false);
throw spdlog_ex("async_sink: invalid overflow policy");
}
}
std::atomic<overflow_policy> overflow_policy_ = overflow_policy::block;
std::unique_ptr<queue_t> q_;
std::thread worker_thread_;
}; };
using async_sink_mt = async_sink<std::mutex>; using async_sink_mt = async_sink<std::mutex>;
using async_sink_st = async_sink<details::null_mutex>; using async_sink_st = async_sink<details::null_mutex>;
} // namespace sinks } // namespace sinks
} // namespace spdlog } // namespace spdlog

View File

@ -7,14 +7,14 @@ namespace spdlog {
namespace details { namespace details {
async_log_msg::async_log_msg(const msg_type type) async_log_msg::async_log_msg(const type type)
: msg_type_{type} {} : msg_type_{type} {}
// copy logger name and payload to buffer so can be used asynchronously // copy logger name and payload to buffer so can be used asynchronously
// note: source location pointers are copied without allocation since they // note: source location pointers are copied without allocation since they
// are compiler generated const chars* (__FILE__, __LINE__, __FUNCTION__) // are compiler generated const chars* (__FILE__, __LINE__, __FUNCTION__)
// if you pass custom strings to source location, make sure they outlive the async_log_msg // if you pass custom strings to source location, make sure they outlive the async_log_msg
async_log_msg::async_log_msg(const msg_type type, const log_msg &orig_msg) async_log_msg::async_log_msg(const type type, const log_msg &orig_msg)
: log_msg{orig_msg}, msg_type_(type) { : log_msg{orig_msg}, msg_type_(type) {
buffer_.append(logger_name); buffer_.append(logger_name);
buffer_.append(payload); buffer_.append(payload);
@ -23,7 +23,6 @@ async_log_msg::async_log_msg(const msg_type type, const log_msg &orig_msg)
async_log_msg::async_log_msg(const async_log_msg &other) async_log_msg::async_log_msg(const async_log_msg &other)
: log_msg{other}, msg_type_{other.msg_type_} { : log_msg{other}, msg_type_{other.msg_type_} {
buffer_.append(logger_name); buffer_.append(logger_name);
buffer_.append(payload); buffer_.append(payload);
update_string_views(); update_string_views();