Make async_logger::flush() synchronous and wait for the flush to complete (#3049)

This commit is contained in:
Yubin 2024-03-23 21:52:32 +08:00 committed by GitHub
parent 6766f873d6
commit 6725584e27
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 74 additions and 11 deletions

View File

@ -43,13 +43,15 @@ SPDLOG_LOGGER_CATCH(msg.source)
} }
// send flush request to the thread pool // send flush request to the thread pool
SPDLOG_INLINE void spdlog::async_logger::flush_(){ SPDLOG_INLINE void spdlog::async_logger::flush_(){SPDLOG_TRY{auto pool_ptr = thread_pool_.lock();
SPDLOG_TRY{if (auto pool_ptr = thread_pool_.lock()){ if (!pool_ptr) {
pool_ptr->post_flush(shared_from_this(), overflow_policy_);
}
else {
throw_spdlog_ex("async flush: thread pool doesn't exist anymore"); throw_spdlog_ex("async flush: thread pool doesn't exist anymore");
} }
std::future<void> future = pool_ptr->post_flush(shared_from_this(), overflow_policy_);
// Wait for the flush operation to complete.
// This might throw exception if the flush message get dropped because of overflow.
future.get();
} }
SPDLOG_LOGGER_CATCH(source_loc()) SPDLOG_LOGGER_CATCH(source_loc())
} }

View File

@ -62,9 +62,13 @@ void SPDLOG_INLINE thread_pool::post_log(async_logger_ptr &&worker_ptr,
post_async_msg_(std::move(async_m), overflow_policy); post_async_msg_(std::move(async_m), overflow_policy);
} }
void SPDLOG_INLINE thread_pool::post_flush(async_logger_ptr &&worker_ptr, std::future<void> SPDLOG_INLINE thread_pool::post_flush(async_logger_ptr &&worker_ptr,
async_overflow_policy overflow_policy) { async_overflow_policy overflow_policy) {
post_async_msg_(async_msg(std::move(worker_ptr), async_msg_type::flush), overflow_policy); std::promise<void> promise;
std::future<void> future = promise.get_future();
post_async_msg_(async_msg(std::move(worker_ptr), async_msg_type::flush, std::move(promise)),
overflow_policy);
return future;
} }
size_t SPDLOG_INLINE thread_pool::overrun_counter() { return q_.overrun_counter(); } size_t SPDLOG_INLINE thread_pool::overrun_counter() { return q_.overrun_counter(); }
@ -108,6 +112,7 @@ bool SPDLOG_INLINE thread_pool::process_next_msg_() {
} }
case async_msg_type::flush: { case async_msg_type::flush: {
incoming_async_msg.worker_ptr->backend_flush_(); incoming_async_msg.worker_ptr->backend_flush_();
incoming_async_msg.flush_promise.set_value();
return true; return true;
} }

View File

@ -9,6 +9,7 @@
#include <chrono> #include <chrono>
#include <functional> #include <functional>
#include <future>
#include <memory> #include <memory>
#include <thread> #include <thread>
#include <vector> #include <vector>
@ -27,6 +28,7 @@ enum class async_msg_type { log, flush, terminate };
struct async_msg : log_msg_buffer { struct async_msg : log_msg_buffer {
async_msg_type msg_type{async_msg_type::log}; async_msg_type msg_type{async_msg_type::log};
async_logger_ptr worker_ptr; async_logger_ptr worker_ptr;
std::promise<void> flush_promise;
async_msg() = default; async_msg() = default;
~async_msg() = default; ~async_msg() = default;
@ -56,12 +58,22 @@ struct async_msg : log_msg_buffer {
async_msg(async_logger_ptr &&worker, async_msg_type the_type, const details::log_msg &m) async_msg(async_logger_ptr &&worker, async_msg_type the_type, const details::log_msg &m)
: log_msg_buffer{m}, : log_msg_buffer{m},
msg_type{the_type}, msg_type{the_type},
worker_ptr{std::move(worker)} {} worker_ptr{std::move(worker)},
flush_promise{} {}
async_msg(async_logger_ptr &&worker, async_msg_type the_type) async_msg(async_logger_ptr &&worker, async_msg_type the_type)
: log_msg_buffer{}, : log_msg_buffer{},
msg_type{the_type}, msg_type{the_type},
worker_ptr{std::move(worker)} {} worker_ptr{std::move(worker)},
flush_promise{} {}
async_msg(async_logger_ptr &&worker,
async_msg_type the_type,
std::promise<void> &&promise)
: log_msg_buffer{},
msg_type{the_type},
worker_ptr{std::move(worker)},
flush_promise{std::move(promise)} {}
explicit async_msg(async_msg_type the_type) explicit async_msg(async_msg_type the_type)
: async_msg{nullptr, the_type} {} : async_msg{nullptr, the_type} {}
@ -88,7 +100,8 @@ public:
void post_log(async_logger_ptr &&worker_ptr, void post_log(async_logger_ptr &&worker_ptr,
const details::log_msg &msg, const details::log_msg &msg,
async_overflow_policy overflow_policy); async_overflow_policy overflow_policy);
void post_flush(async_logger_ptr &&worker_ptr, async_overflow_policy overflow_policy); std::future<void> post_flush(async_logger_ptr &&worker_ptr,
async_overflow_policy overflow_policy);
size_t overrun_counter(); size_t overrun_counter();
void reset_overrun_counter(); void reset_overrun_counter();
size_t discard_counter(); size_t discard_counter();

View File

@ -93,6 +93,49 @@ TEST_CASE("flush", "[async]") {
REQUIRE(test_sink->flush_counter() == 1); REQUIRE(test_sink->flush_counter() == 1);
} }
TEST_CASE("multithread flush", "[async]") {
auto test_sink = std::make_shared<spdlog::sinks::test_sink_mt>();
size_t queue_size = 2;
size_t messages = 10;
size_t n_threads = 10;
size_t flush_count = 1024;
std::mutex mtx;
std::vector<std::string> errmsgs;
{
auto tp = std::make_shared<spdlog::details::thread_pool>(queue_size, 1);
auto logger = std::make_shared<spdlog::async_logger>(
"as", test_sink, tp, spdlog::async_overflow_policy::discard_new);
logger->set_error_handler([&](const std::string &) {
std::unique_lock<std::mutex> lock(mtx);
errmsgs.push_back("Broken promise");
});
for (size_t i = 0; i < messages; i++) {
logger->info("Hello message #{}", i);
}
std::vector<std::thread> threads;
for (size_t i = 0; i < n_threads; i++) {
threads.emplace_back([logger, flush_count] {
for (size_t j = 0; j < flush_count; j++) {
// flush does not throw exception even if failed.
// Instead, the error handler is invoked.
logger->flush();
}
});
}
for (auto &t : threads) {
t.join();
}
}
REQUIRE(test_sink->flush_counter() >= 1);
REQUIRE(test_sink->flush_counter() + errmsgs.size() == n_threads * flush_count);
REQUIRE(errmsgs.size() >= 1);
REQUIRE(errmsgs[0] == "Broken promise");
}
TEST_CASE("async periodic flush", "[async]") { TEST_CASE("async periodic flush", "[async]") {
auto logger = spdlog::create_async<spdlog::sinks::test_sink_mt>("as"); auto logger = spdlog::create_async<spdlog::sinks::test_sink_mt>("as");
auto test_sink = std::static_pointer_cast<spdlog::sinks::test_sink_mt>(logger->sinks()[0]); auto test_sink = std::static_pointer_cast<spdlog::sinks::test_sink_mt>(logger->sinks()[0]);