From 63d1884215d8914adcf1c11241936b42c2c35c2d Mon Sep 17 00:00:00 2001 From: Gabi Melman Date: Fri, 1 Nov 2024 11:26:03 +0200 Subject: [PATCH] Gabime/async flush (#3235) * Revert "Ensure flush callback gets called in move-assign operator (#3232)" This reverts commit b6da59447f165ad70a4e3ca1c575b14ea66d92c9. * Revert "Exchange promise for condition_variable when flushing (fixes #3221) (#3228)" This reverts commit 16e0d2e77c9b8c741b0b23d9def2db04de6b1b41. * Revert PR #3049 --- include/spdlog/async_logger-inl.h | 38 +++++++++----------- include/spdlog/details/thread_pool-inl.h | 26 ++------------ include/spdlog/details/thread_pool.h | 43 ++++++++--------------- tests/test_async.cpp | 44 ------------------------ 4 files changed, 34 insertions(+), 117 deletions(-) diff --git a/include/spdlog/async_logger-inl.h b/include/spdlog/async_logger-inl.h index 655007c9..1e794798 100644 --- a/include/spdlog/async_logger-inl.h +++ b/include/spdlog/async_logger-inl.h @@ -32,30 +32,26 @@ SPDLOG_INLINE spdlog::async_logger::async_logger(std::string logger_name, // send the log message to the thread pool SPDLOG_INLINE void spdlog::async_logger::sink_it_(const details::log_msg &msg){ - SPDLOG_TRY { - if (auto pool_ptr = thread_pool_.lock()){ - pool_ptr->post_log(shared_from_this(), msg, overflow_policy_); - } - else { - throw_spdlog_ex("async log: thread pool doesn't exist anymore"); - } - } - SPDLOG_LOGGER_CATCH(msg.source) + SPDLOG_TRY{if (auto pool_ptr = thread_pool_.lock()){ + pool_ptr->post_log(shared_from_this(), msg, overflow_policy_); +} +else { + throw_spdlog_ex("async log: thread pool doesn't exist anymore"); +} +} +SPDLOG_LOGGER_CATCH(msg.source) } // send flush request to the thread pool -SPDLOG_INLINE void spdlog::async_logger::flush_() { - SPDLOG_TRY { - auto pool_ptr = thread_pool_.lock(); - if (!pool_ptr) { - throw_spdlog_ex("async flush: thread pool doesn't exist anymore"); - } - - // Wait for the flush operation to complete. - // This might throw exception if the flush message get dropped because of overflow. - pool_ptr->post_and_wait_for_flush(shared_from_this(), overflow_policy_); - } - SPDLOG_LOGGER_CATCH(source_loc()) +SPDLOG_INLINE void spdlog::async_logger::flush_(){ + SPDLOG_TRY{if (auto pool_ptr = thread_pool_.lock()){ + pool_ptr->post_flush(shared_from_this(), overflow_policy_); +} +else { + throw_spdlog_ex("async flush: thread pool doesn't exist anymore"); +} +} +SPDLOG_LOGGER_CATCH(source_loc()) } // diff --git a/include/spdlog/details/thread_pool-inl.h b/include/spdlog/details/thread_pool-inl.h index b41afc93..17e01c09 100644 --- a/include/spdlog/details/thread_pool-inl.h +++ b/include/spdlog/details/thread_pool-inl.h @@ -62,25 +62,9 @@ void SPDLOG_INLINE thread_pool::post_log(async_logger_ptr &&worker_ptr, post_async_msg_(std::move(async_m), overflow_policy); } -void SPDLOG_INLINE thread_pool::post_and_wait_for_flush(async_logger_ptr &&worker_ptr, - async_overflow_policy overflow_policy) { - std::mutex m; - std::unique_lock l(m); - std::condition_variable cv; - std::atomic cv_flag{async_msg_flush::not_synced}; - post_async_msg_(async_msg(std::move(worker_ptr), async_msg_type::flush, [&cv, &cv_flag](async_msg_flush flushed) { - cv_flag.store(flushed, std::memory_order_relaxed); - cv.notify_all(); - }), overflow_policy); - while(cv_flag.load(std::memory_order_relaxed) == async_msg_flush::not_synced) { - cv.wait_for(l, std::chrono::milliseconds(100), [&cv_flag]() { - return cv_flag.load(std::memory_order_relaxed) != async_msg_flush::not_synced; - }); - } - - if(cv_flag.load(std::memory_order_relaxed) == async_msg_flush::synced_not_flushed) { - throw spdlog_ex("Request for flushing got dropped."); - } +void SPDLOG_INLINE thread_pool::post_flush(async_logger_ptr &&worker_ptr, + async_overflow_policy overflow_policy) { + post_async_msg_(async_msg(std::move(worker_ptr), async_msg_type::flush), overflow_policy); } size_t SPDLOG_INLINE thread_pool::overrun_counter() { return q_.overrun_counter(); } @@ -124,10 +108,6 @@ bool SPDLOG_INLINE thread_pool::process_next_msg_() { } case async_msg_type::flush: { incoming_async_msg.worker_ptr->backend_flush_(); - if(incoming_async_msg.flush_callback) { - incoming_async_msg.flush_callback(async_msg_flush::synced_flushed); - incoming_async_msg.flush_callback = nullptr; - } return true; } diff --git a/include/spdlog/details/thread_pool.h b/include/spdlog/details/thread_pool.h index 84661371..f22b0782 100644 --- a/include/spdlog/details/thread_pool.h +++ b/include/spdlog/details/thread_pool.h @@ -22,60 +22,46 @@ using async_logger_ptr = std::shared_ptr; enum class async_msg_type { log, flush, terminate }; -enum class async_msg_flush { not_synced, synced_flushed, synced_not_flushed }; - // Async msg to move to/from the queue // Movable only. should never be copied struct async_msg : log_msg_buffer { async_msg_type msg_type{async_msg_type::log}; async_logger_ptr worker_ptr; - std::function flush_callback; async_msg() = default; - ~async_msg() { - if (flush_callback) { - flush_callback(async_msg_flush::synced_not_flushed); - flush_callback = nullptr; - } - } + ~async_msg() = default; // should only be moved in or out of the queue.. async_msg(const async_msg &) = delete; - async_msg(async_msg &&other) SPDLOG_NOEXCEPT +// support for vs2013 move +#if defined(_MSC_VER) && _MSC_VER <= 1800 + async_msg(async_msg &&other) : log_msg_buffer(std::move(other)), msg_type(other.msg_type), - worker_ptr(std::move(other.worker_ptr)), - flush_callback(std::move(other.flush_callback)) { - other.flush_callback = nullptr; - } + worker_ptr(std::move(other.worker_ptr)) {} - async_msg &operator=(async_msg &&other) SPDLOG_NOEXCEPT { - *static_cast(this) = static_cast(other); + async_msg &operator=(async_msg &&other) { + *static_cast(this) = std::move(other); msg_type = other.msg_type; worker_ptr = std::move(other.worker_ptr); - std::swap(flush_callback, other.flush_callback); return *this; } +#else // (_MSC_VER) && _MSC_VER <= 1800 + async_msg(async_msg &&) = default; + async_msg &operator=(async_msg &&) = default; +#endif // construct from log_msg with given type async_msg(async_logger_ptr &&worker, async_msg_type the_type, const details::log_msg &m) : log_msg_buffer{m}, msg_type{the_type}, - worker_ptr{std::move(worker)}, - flush_callback{} {} + worker_ptr{std::move(worker)} {} async_msg(async_logger_ptr &&worker, async_msg_type the_type) : log_msg_buffer{}, msg_type{the_type}, - worker_ptr{std::move(worker)}, - flush_callback{} {} - - async_msg(async_logger_ptr &&worker, async_msg_type the_type, std::function &&callback) - : log_msg_buffer{}, - msg_type{the_type}, - worker_ptr{std::move(worker)}, - flush_callback{std::move(callback)} {} + worker_ptr{std::move(worker)} {} explicit async_msg(async_msg_type the_type) : async_msg{nullptr, the_type} {} @@ -102,8 +88,7 @@ public: void post_log(async_logger_ptr &&worker_ptr, const details::log_msg &msg, async_overflow_policy overflow_policy); - void post_and_wait_for_flush(async_logger_ptr &&worker_ptr, - async_overflow_policy overflow_policy); + void post_flush(async_logger_ptr &&worker_ptr, async_overflow_policy overflow_policy); size_t overrun_counter(); void reset_overrun_counter(); size_t discard_counter(); diff --git a/tests/test_async.cpp b/tests/test_async.cpp index 3fa3b440..76fdd7c6 100644 --- a/tests/test_async.cpp +++ b/tests/test_async.cpp @@ -93,50 +93,6 @@ TEST_CASE("flush", "[async]") { REQUIRE(test_sink->flush_counter() == 1); } -TEST_CASE("multithread flush", "[async]") { - auto test_sink = std::make_shared(); - size_t queue_size = 2; - size_t messages = 10; - size_t n_threads = 10; - size_t flush_count = 1024; - std::mutex mtx; - std::vector errmsgs; - { - auto tp = std::make_shared(queue_size, 1); - auto logger = std::make_shared( - "as", test_sink, tp, spdlog::async_overflow_policy::discard_new); - - logger->set_error_handler([&](const std::string &) { - std::unique_lock lock(mtx); - errmsgs.push_back("Broken promise"); - }); - - for (size_t i = 0; i < messages; i++) { - logger->info("Hello message #{}", i); - } - - std::vector threads; - for (size_t i = 0; i < n_threads; i++) { - threads.emplace_back([logger, flush_count] { - for (size_t j = 0; j < flush_count; j++) { - // flush does not throw exception even if failed. - // Instead, the error handler is invoked. - logger->flush(); - } - }); - } - - for (auto &t : threads) { - t.join(); - } - } - REQUIRE(test_sink->flush_counter() >= 1); - REQUIRE(test_sink->flush_counter() + errmsgs.size() == n_threads * flush_count); - if (errmsgs.size() > 0) { - REQUIRE(errmsgs[0] == "Broken promise"); - } -} - TEST_CASE("async periodic flush", "[async]") { auto logger = spdlog::create_async("as"); auto test_sink = std::static_pointer_cast(logger->sinks()[0]);