Added with_all to async sink improved destructor

This commit is contained in:
gabime 2025-01-17 16:17:11 +02:00
parent bc2eed7913
commit af7b061773
3 changed files with 118 additions and 22 deletions

View File

@ -42,9 +42,46 @@ public:
err_handler custom_err_handler = nullptr;
};
explicit async_sink(const config &async_config);
explicit async_sink(config async_config);
async_sink(const async_sink &) = delete;
async_sink &operator=(const async_sink &) = delete;
~async_sink() override;
// create an async_sink with one backend sink
// sink interface implementation
void log(const details::log_msg &msg) override;
void set_pattern(const std::string &pattern) override;
void set_formatter(std::unique_ptr<formatter> sink_formatter) override;
// enqueue flush request to the worker thread and return immediately(default)
// if you need to wait for the actual flush to finish, call wait_for_all() after flush()
void flush() override;
// async_sink specific methods
// wait until all logs were processed up to timeout milliseconds.
// returns true if all messages were processed, false if timeout was reached
[[nodiscard]] bool wait_all(std::chrono::milliseconds timeout) const;
// wait until all logs were processed
void wait_all() const;
// return the number of overrun messages (effective only if policy is overrun_oldest)
[[nodiscard]] size_t get_overrun_counter() const;
// reset the overrun counter
void reset_overrun_counter() const;
// return the number of discarded messages (effective only if policy is discard_new)
[[nodiscard]] size_t get_discard_counter() const;
// reset the discard counter
void reset_discard_counter() const;
// return the current async_sink configuration
[[nodiscard]] const config &get_config() const;
// create an async_sink with one backend sink constructed with the given args.
// example:
// auto async_file = async_sink::with<spdlog::sinks::basic_file_sink_st>("mylog.txt");
template <typename Sink, typename... SinkArgs>
static std::shared_ptr<async_sink> with(SinkArgs &&...sink_args) {
config cfg{};
@ -52,20 +89,6 @@ public:
return std::make_shared<async_sink>(cfg);
}
~async_sink() override;
// sink interface implementation
void log(const details::log_msg &msg) override;
void flush() override;
void set_pattern(const std::string &pattern) override;
void set_formatter(std::unique_ptr<formatter> sink_formatter) override;
// async sink specific methods
[[nodiscard]] size_t get_overrun_counter() const;
void reset_overrun_counter() const;
[[nodiscard]] size_t get_discard_counter() const;
void reset_discard_counter() const;
[[nodiscard]] const config &get_config() const;
private:
using async_log_msg = details::async_log_msg;
@ -80,6 +103,8 @@ private:
std::unique_ptr<queue_t> q_;
std::thread worker_thread_;
details::err_helper err_helper_;
std::atomic_size_t flush_requests_ = 0;
std::atomic_bool terminate_worker_ = false;
};
} // namespace sinks

View File

@ -3,8 +3,10 @@
#include "spdlog/sinks/async_sink.h"
#include <algorithm>
#include <cassert>
#include <memory>
#include <utility>
#include "spdlog/common.h"
#include "spdlog/details/mpmc_blocking_q.h"
@ -14,8 +16,8 @@
namespace spdlog {
namespace sinks {
async_sink::async_sink(const config &async_config)
: config_(async_config) {
async_sink::async_sink(config async_config)
: config_(std::move(async_config)) {
if (config_.queue_size == 0 || config_.queue_size > max_queue_size) {
throw spdlog_ex("async_sink: invalid queue size");
}
@ -36,13 +38,18 @@ async_sink::~async_sink() {
q_->enqueue(async_log_msg(async_log_msg::type::terminate));
worker_thread_.join();
} catch (...) {
printf("Exception in ~async_sink()\n");
terminate_worker_ = true; // as last resort, stop the worker thread using terminate_worker_ flag.
#ifndef NDEBUG
printf("Exception in ~async_sink()\n");
#endif
}
}
void async_sink::log(const details::log_msg &msg) { send_message_(async_log_msg::type::log, msg); }
void async_sink::flush() { send_message_(async_log_msg::type::flush, details::log_msg()); }
void async_sink::flush() {
send_message_(async_log_msg::type::flush, details::log_msg());
}
void async_sink::set_pattern(const std::string &pattern) { set_formatter(std::make_unique<pattern_formatter>(pattern)); }
@ -58,6 +65,24 @@ void async_sink::set_formatter(std::unique_ptr<formatter> formatter) {
}
}
bool async_sink::wait_all(const std::chrono::milliseconds timeout) const {
using std::chrono::steady_clock;
constexpr std::chrono::milliseconds sleep_duration(5);
const auto start_time = steady_clock::now();
while (q_->size() > 0) {
auto elapsed = steady_clock::now() - start_time;
if (elapsed > timeout) {
return false;
}
std::this_thread::sleep_for(std::min(sleep_duration, timeout));
}
return true;
}
void async_sink::wait_all() const {
while (!wait_all(std::chrono::milliseconds(10))) { /* empty */ }
}
size_t async_sink::get_overrun_counter() const { return q_->overrun_counter(); }
void async_sink::reset_overrun_counter() const { q_->reset_overrun_counter(); }
@ -88,7 +113,7 @@ void async_sink::send_message_(async_log_msg::type msg_type, const details::log_
void async_sink::backend_loop_() {
details::async_log_msg incoming_msg;
for (;;) {
while (!terminate_worker_) {
q_->dequeue(incoming_msg);
switch (incoming_msg.message_type()) {
case async_log_msg::type::log:
@ -105,7 +130,7 @@ void async_sink::backend_loop_() {
}
}
void async_sink::backend_log_(const details::log_msg &msg) {
void async_sink::backend_log_(const details::log_msg &msg) {
for (const auto &sink : config_.sinks) {
if (sink->should_log(msg.log_level)) {
try {

View File

@ -309,3 +309,49 @@ TEST_CASE("custom_err_handler", "[async]") {
// lvalue logger so will be destructed here already so all messages were processed
REQUIRE(error_called);
}
// test wait_all
TEST_CASE("wait_all", "[async]") {
auto test_sink = std::make_shared<test_sink_mt>();
auto delay = std::chrono::milliseconds(10);
test_sink->set_delay(delay);
async_sink::config config;
config.sinks.push_back(test_sink);
size_t messages = 10;
auto as = std::make_shared<async_sink>(config);
auto logger = std::make_shared<spdlog::logger>("async_logger", as);
for (size_t i = 0; i < messages; i++) {
logger->info("Hello message");
}
REQUIRE_FALSE(as->wait_all(std::chrono::milliseconds(-10)));
REQUIRE_FALSE(as->wait_all(std::chrono::milliseconds(0)));
auto start = std::chrono::steady_clock::now();
REQUIRE_FALSE(as->wait_all(delay));
// should have waited approx 10ms before giving up
auto elapsed = std::chrono::steady_clock::now() - start;
REQUIRE(elapsed >= delay);
REQUIRE(elapsed < delay * 3);
// wait enough time for all messages to be processed
REQUIRE(as->wait_all(std::chrono::milliseconds(messages * delay)));
REQUIRE(as->wait_all(std::chrono::milliseconds(-10))); // no more messages
REQUIRE(as->wait_all(std::chrono::milliseconds(0))); // no more messages
REQUIRE(as->wait_all(std::chrono::milliseconds(10))); // no more messages
}
// test wait_all without timeout
TEST_CASE("wait_all2", "[async]") {
auto test_sink = std::make_shared<test_sink_mt>();
auto delay = std::chrono::milliseconds(10);
test_sink->set_delay(delay);
async_sink::config config;
config.sinks.push_back(test_sink);
size_t messages = 10;
auto as = std::make_shared<async_sink>(config);
auto logger = std::make_shared<spdlog::logger>("async_logger", as);
for (size_t i = 0; i < messages; i++) {
logger->info("Hello message");
}
as->wait_all();
REQUIRE(test_sink->msg_counter() == messages);
}