diff --git a/include/spdlog/async_logger-inl.h b/include/spdlog/async_logger-inl.h index 499800d8..655007c9 100644 --- a/include/spdlog/async_logger-inl.h +++ b/include/spdlog/async_logger-inl.h @@ -32,28 +32,30 @@ SPDLOG_INLINE spdlog::async_logger::async_logger(std::string logger_name, // send the log message to the thread pool SPDLOG_INLINE void spdlog::async_logger::sink_it_(const details::log_msg &msg){ - SPDLOG_TRY{if (auto pool_ptr = thread_pool_.lock()){ - pool_ptr->post_log(shared_from_this(), msg, overflow_policy_); -} -else { - throw_spdlog_ex("async log: thread pool doesn't exist anymore"); -} -} -SPDLOG_LOGGER_CATCH(msg.source) + SPDLOG_TRY { + if (auto pool_ptr = thread_pool_.lock()){ + pool_ptr->post_log(shared_from_this(), msg, overflow_policy_); + } + else { + throw_spdlog_ex("async log: thread pool doesn't exist anymore"); + } + } + SPDLOG_LOGGER_CATCH(msg.source) } // send flush request to the thread pool -SPDLOG_INLINE void spdlog::async_logger::flush_(){SPDLOG_TRY{auto pool_ptr = thread_pool_.lock(); -if (!pool_ptr) { - throw_spdlog_ex("async flush: thread pool doesn't exist anymore"); -} +SPDLOG_INLINE void spdlog::async_logger::flush_() { + SPDLOG_TRY { + auto pool_ptr = thread_pool_.lock(); + if (!pool_ptr) { + throw_spdlog_ex("async flush: thread pool doesn't exist anymore"); + } -std::future future = pool_ptr->post_flush(shared_from_this(), overflow_policy_); -// Wait for the flush operation to complete. -// This might throw exception if the flush message get dropped because of overflow. -future.get(); -} -SPDLOG_LOGGER_CATCH(source_loc()) + // Wait for the flush operation to complete. + // This might throw exception if the flush message get dropped because of overflow. + pool_ptr->post_and_wait_for_flush(shared_from_this(), overflow_policy_); + } + SPDLOG_LOGGER_CATCH(source_loc()) } // diff --git a/include/spdlog/details/thread_pool-inl.h b/include/spdlog/details/thread_pool-inl.h index ccc1dc97..b41afc93 100644 --- a/include/spdlog/details/thread_pool-inl.h +++ b/include/spdlog/details/thread_pool-inl.h @@ -62,13 +62,25 @@ void SPDLOG_INLINE thread_pool::post_log(async_logger_ptr &&worker_ptr, post_async_msg_(std::move(async_m), overflow_policy); } -std::future SPDLOG_INLINE thread_pool::post_flush(async_logger_ptr &&worker_ptr, +void SPDLOG_INLINE thread_pool::post_and_wait_for_flush(async_logger_ptr &&worker_ptr, async_overflow_policy overflow_policy) { - std::promise promise; - std::future future = promise.get_future(); - post_async_msg_(async_msg(std::move(worker_ptr), async_msg_type::flush, std::move(promise)), - overflow_policy); - return future; + std::mutex m; + std::unique_lock l(m); + std::condition_variable cv; + std::atomic cv_flag{async_msg_flush::not_synced}; + post_async_msg_(async_msg(std::move(worker_ptr), async_msg_type::flush, [&cv, &cv_flag](async_msg_flush flushed) { + cv_flag.store(flushed, std::memory_order_relaxed); + cv.notify_all(); + }), overflow_policy); + while(cv_flag.load(std::memory_order_relaxed) == async_msg_flush::not_synced) { + cv.wait_for(l, std::chrono::milliseconds(100), [&cv_flag]() { + return cv_flag.load(std::memory_order_relaxed) != async_msg_flush::not_synced; + }); + } + + if(cv_flag.load(std::memory_order_relaxed) == async_msg_flush::synced_not_flushed) { + throw spdlog_ex("Request for flushing got dropped."); + } } size_t SPDLOG_INLINE thread_pool::overrun_counter() { return q_.overrun_counter(); } @@ -112,7 +124,10 @@ bool SPDLOG_INLINE thread_pool::process_next_msg_() { } case async_msg_type::flush: { incoming_async_msg.worker_ptr->backend_flush_(); - incoming_async_msg.flush_promise.set_value(); + if(incoming_async_msg.flush_callback) { + incoming_async_msg.flush_callback(async_msg_flush::synced_flushed); + incoming_async_msg.flush_callback = nullptr; + } return true; } diff --git a/include/spdlog/details/thread_pool.h b/include/spdlog/details/thread_pool.h index 0d56a091..4d7e1d75 100644 --- a/include/spdlog/details/thread_pool.h +++ b/include/spdlog/details/thread_pool.h @@ -9,7 +9,6 @@ #include #include -#include #include #include #include @@ -23,55 +22,60 @@ using async_logger_ptr = std::shared_ptr; enum class async_msg_type { log, flush, terminate }; +enum class async_msg_flush { not_synced, synced_flushed, synced_not_flushed }; + // Async msg to move to/from the queue // Movable only. should never be copied struct async_msg : log_msg_buffer { async_msg_type msg_type{async_msg_type::log}; async_logger_ptr worker_ptr; - std::promise flush_promise; + std::function flush_callback; async_msg() = default; - ~async_msg() = default; + ~async_msg() { + if (flush_callback) { + flush_callback(async_msg_flush::synced_not_flushed); + flush_callback = nullptr; + } + } // should only be moved in or out of the queue.. async_msg(const async_msg &) = delete; -// support for vs2013 move -#if defined(_MSC_VER) && _MSC_VER <= 1800 - async_msg(async_msg &&other) + async_msg(async_msg &&other) SPDLOG_NOEXCEPT : log_msg_buffer(std::move(other)), msg_type(other.msg_type), - worker_ptr(std::move(other.worker_ptr)) {} - - async_msg &operator=(async_msg &&other) { - *static_cast(this) = std::move(other); + worker_ptr(std::move(other.worker_ptr)), + flush_callback(std::move(other.flush_callback)) { + other.flush_callback = nullptr; + } + async_msg &operator=(async_msg &&other) SPDLOG_NOEXCEPT { + *static_cast(this) = static_cast(other); msg_type = other.msg_type; worker_ptr = std::move(other.worker_ptr); + flush_callback = std::move(other.flush_callback); + other.flush_callback = nullptr; return *this; } -#else // (_MSC_VER) && _MSC_VER <= 1800 - async_msg(async_msg &&) = default; - async_msg &operator=(async_msg &&) = default; -#endif // construct from log_msg with given type async_msg(async_logger_ptr &&worker, async_msg_type the_type, const details::log_msg &m) : log_msg_buffer{m}, msg_type{the_type}, worker_ptr{std::move(worker)}, - flush_promise{} {} + flush_callback{} {} async_msg(async_logger_ptr &&worker, async_msg_type the_type) : log_msg_buffer{}, msg_type{the_type}, worker_ptr{std::move(worker)}, - flush_promise{} {} + flush_callback{} {} - async_msg(async_logger_ptr &&worker, async_msg_type the_type, std::promise &&promise) + async_msg(async_logger_ptr &&worker, async_msg_type the_type, std::function &&callback) : log_msg_buffer{}, msg_type{the_type}, worker_ptr{std::move(worker)}, - flush_promise{std::move(promise)} {} + flush_callback{std::move(callback)} {} explicit async_msg(async_msg_type the_type) : async_msg{nullptr, the_type} {} @@ -98,7 +102,7 @@ public: void post_log(async_logger_ptr &&worker_ptr, const details::log_msg &msg, async_overflow_policy overflow_policy); - std::future post_flush(async_logger_ptr &&worker_ptr, + void post_and_wait_for_flush(async_logger_ptr &&worker_ptr, async_overflow_policy overflow_policy); size_t overrun_counter(); void reset_overrun_counter();