diff --git a/include/spdlog/sinks/async_sink.h b/include/spdlog/sinks/async_sink.h index 03e986ae..37831d95 100644 --- a/include/spdlog/sinks/async_sink.h +++ b/include/spdlog/sinks/async_sink.h @@ -42,9 +42,46 @@ public: err_handler custom_err_handler = nullptr; }; - explicit async_sink(const config &async_config); + explicit async_sink(config async_config); + async_sink(const async_sink &) = delete; + async_sink &operator=(const async_sink &) = delete; + ~async_sink() override; - // create an async_sink with one backend sink + // sink interface implementation + void log(const details::log_msg &msg) override; + void set_pattern(const std::string &pattern) override; + void set_formatter(std::unique_ptr sink_formatter) override; + // enqueue flush request to the worker thread and return immediately(default) + // if you need to wait for the actual flush to finish, call wait_for_all() after flush() + void flush() override; + + // async_sink specific methods + + // wait until all logs were processed up to timeout milliseconds. + // returns true if all messages were processed, false if timeout was reached + [[nodiscard]] bool wait_all(std::chrono::milliseconds timeout) const; + + // wait until all logs were processed + void wait_all() const; + + // return the number of overrun messages (effective only if policy is overrun_oldest) + [[nodiscard]] size_t get_overrun_counter() const; + + // reset the overrun counter + void reset_overrun_counter() const; + + // return the number of discarded messages (effective only if policy is discard_new) + [[nodiscard]] size_t get_discard_counter() const; + + // reset the discard counter + void reset_discard_counter() const; + + // return the current async_sink configuration + [[nodiscard]] const config &get_config() const; + + // create an async_sink with one backend sink constructed with the given args. + // example: + // auto async_file = async_sink::with("mylog.txt"); template static std::shared_ptr with(SinkArgs &&...sink_args) { config cfg{}; @@ -52,20 +89,6 @@ public: return std::make_shared(cfg); } - ~async_sink() override; - - // sink interface implementation - void log(const details::log_msg &msg) override; - void flush() override; - void set_pattern(const std::string &pattern) override; - void set_formatter(std::unique_ptr sink_formatter) override; - - // async sink specific methods - [[nodiscard]] size_t get_overrun_counter() const; - void reset_overrun_counter() const; - [[nodiscard]] size_t get_discard_counter() const; - void reset_discard_counter() const; - [[nodiscard]] const config &get_config() const; private: using async_log_msg = details::async_log_msg; @@ -80,6 +103,8 @@ private: std::unique_ptr q_; std::thread worker_thread_; details::err_helper err_helper_; + std::atomic_size_t flush_requests_ = 0; + std::atomic_bool terminate_worker_ = false; }; } // namespace sinks diff --git a/src/sinks/async_sink.cpp b/src/sinks/async_sink.cpp index a3d99ea2..e75d6fd1 100644 --- a/src/sinks/async_sink.cpp +++ b/src/sinks/async_sink.cpp @@ -3,8 +3,10 @@ #include "spdlog/sinks/async_sink.h" +#include #include #include +#include #include "spdlog/common.h" #include "spdlog/details/mpmc_blocking_q.h" @@ -14,8 +16,8 @@ namespace spdlog { namespace sinks { -async_sink::async_sink(const config &async_config) - : config_(async_config) { +async_sink::async_sink(config async_config) + : config_(std::move(async_config)) { if (config_.queue_size == 0 || config_.queue_size > max_queue_size) { throw spdlog_ex("async_sink: invalid queue size"); } @@ -36,13 +38,18 @@ async_sink::~async_sink() { q_->enqueue(async_log_msg(async_log_msg::type::terminate)); worker_thread_.join(); } catch (...) { - printf("Exception in ~async_sink()\n"); + terminate_worker_ = true; // as last resort, stop the worker thread using terminate_worker_ flag. + #ifndef NDEBUG + printf("Exception in ~async_sink()\n"); + #endif } } void async_sink::log(const details::log_msg &msg) { send_message_(async_log_msg::type::log, msg); } -void async_sink::flush() { send_message_(async_log_msg::type::flush, details::log_msg()); } +void async_sink::flush() { + send_message_(async_log_msg::type::flush, details::log_msg()); +} void async_sink::set_pattern(const std::string &pattern) { set_formatter(std::make_unique(pattern)); } @@ -58,6 +65,24 @@ void async_sink::set_formatter(std::unique_ptr formatter) { } } +bool async_sink::wait_all(const std::chrono::milliseconds timeout) const { + using std::chrono::steady_clock; + constexpr std::chrono::milliseconds sleep_duration(5); + const auto start_time = steady_clock::now(); + while (q_->size() > 0) { + auto elapsed = steady_clock::now() - start_time; + if (elapsed > timeout) { + return false; + } + std::this_thread::sleep_for(std::min(sleep_duration, timeout)); + } + return true; +} + +void async_sink::wait_all() const { + while (!wait_all(std::chrono::milliseconds(10))) { /* empty */ } +} + size_t async_sink::get_overrun_counter() const { return q_->overrun_counter(); } void async_sink::reset_overrun_counter() const { q_->reset_overrun_counter(); } @@ -88,7 +113,7 @@ void async_sink::send_message_(async_log_msg::type msg_type, const details::log_ void async_sink::backend_loop_() { details::async_log_msg incoming_msg; - for (;;) { + while (!terminate_worker_) { q_->dequeue(incoming_msg); switch (incoming_msg.message_type()) { case async_log_msg::type::log: @@ -105,7 +130,7 @@ void async_sink::backend_loop_() { } } -void async_sink::backend_log_(const details::log_msg &msg) { +void async_sink::backend_log_(const details::log_msg &msg) { for (const auto &sink : config_.sinks) { if (sink->should_log(msg.log_level)) { try { diff --git a/tests/test_async.cpp b/tests/test_async.cpp index 74e92f17..062c66d6 100644 --- a/tests/test_async.cpp +++ b/tests/test_async.cpp @@ -309,3 +309,49 @@ TEST_CASE("custom_err_handler", "[async]") { // lvalue logger so will be destructed here already so all messages were processed REQUIRE(error_called); } + +// test wait_all +TEST_CASE("wait_all", "[async]") { + auto test_sink = std::make_shared(); + auto delay = std::chrono::milliseconds(10); + test_sink->set_delay(delay); + async_sink::config config; + config.sinks.push_back(test_sink); + size_t messages = 10; + auto as = std::make_shared(config); + auto logger = std::make_shared("async_logger", as); + for (size_t i = 0; i < messages; i++) { + logger->info("Hello message"); + } + REQUIRE_FALSE(as->wait_all(std::chrono::milliseconds(-10))); + REQUIRE_FALSE(as->wait_all(std::chrono::milliseconds(0))); + auto start = std::chrono::steady_clock::now(); + REQUIRE_FALSE(as->wait_all(delay)); + + // should have waited approx 10ms before giving up + auto elapsed = std::chrono::steady_clock::now() - start; + REQUIRE(elapsed >= delay); + REQUIRE(elapsed < delay * 3); + // wait enough time for all messages to be processed + REQUIRE(as->wait_all(std::chrono::milliseconds(messages * delay))); + REQUIRE(as->wait_all(std::chrono::milliseconds(-10))); // no more messages + REQUIRE(as->wait_all(std::chrono::milliseconds(0))); // no more messages + REQUIRE(as->wait_all(std::chrono::milliseconds(10))); // no more messages +} + +// test wait_all without timeout +TEST_CASE("wait_all2", "[async]") { + auto test_sink = std::make_shared(); + auto delay = std::chrono::milliseconds(10); + test_sink->set_delay(delay); + async_sink::config config; + config.sinks.push_back(test_sink); + size_t messages = 10; + auto as = std::make_shared(config); + auto logger = std::make_shared("async_logger", as); + for (size_t i = 0; i < messages; i++) { + logger->info("Hello message"); + } + as->wait_all(); + REQUIRE(test_sink->msg_counter() == messages); +}