Gabime/async flush (#3235)
Some checks are pending
ci / ${{ matrix.config.compiler}} ${{ matrix.config.version }} (C++${{ matrix.config.cppstd }}, ${{ matrix.config.build_type }}) (map[asan:OFF build_type:Debug compiler:clang cppstd:17 version:12]) (push) Waiting to run
ci / ${{ matrix.config.compiler}} ${{ matrix.config.version }} (C++${{ matrix.config.cppstd }}, ${{ matrix.config.build_type }}) (map[asan:OFF build_type:Release compiler:clang cppstd:20 version:15]) (push) Waiting to run
ci / ${{ matrix.config.compiler}} ${{ matrix.config.version }} (C++${{ matrix.config.cppstd }}, ${{ matrix.config.build_type }}) (map[build_type:Debug compiler:gcc cppstd:20 version:11]) (push) Waiting to run
ci / ${{ matrix.config.compiler}} ${{ matrix.config.version }} (C++${{ matrix.config.cppstd }}, ${{ matrix.config.build_type }}) (map[build_type:Release compiler:gcc cppstd:11 version:7]) (push) Waiting to run
ci / ${{ matrix.config.compiler}} ${{ matrix.config.version }} (C++${{ matrix.config.cppstd }}, ${{ matrix.config.build_type }}) (map[build_type:Release compiler:gcc cppstd:17 version:9]) (push) Waiting to run
ci / ${{ matrix.config.compiler}} ${{ matrix.config.version }} (C++${{ matrix.config.cppstd }}, ${{ matrix.config.build_type }}) (map[build_type:Release compiler:gcc cppstd:20 version:12]) (push) Waiting to run
ci / OS X Clang (C++11, Release) (push) Waiting to run

* Revert "Ensure flush callback gets called in move-assign operator (#3232)"

This reverts commit b6da59447f.

* Revert "Exchange promise for condition_variable when flushing (fixes #3221) (#3228)"

This reverts commit 16e0d2e77c.

* Revert PR #3049
This commit is contained in:
Gabi Melman 2024-11-01 11:26:03 +02:00 committed by GitHub
parent b6da59447f
commit 63d1884215
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 34 additions and 117 deletions

View File

@ -32,30 +32,26 @@ SPDLOG_INLINE spdlog::async_logger::async_logger(std::string logger_name,
// send the log message to the thread pool
SPDLOG_INLINE void spdlog::async_logger::sink_it_(const details::log_msg &msg){
SPDLOG_TRY {
if (auto pool_ptr = thread_pool_.lock()){
pool_ptr->post_log(shared_from_this(), msg, overflow_policy_);
}
else {
throw_spdlog_ex("async log: thread pool doesn't exist anymore");
}
}
SPDLOG_LOGGER_CATCH(msg.source)
SPDLOG_TRY{if (auto pool_ptr = thread_pool_.lock()){
pool_ptr->post_log(shared_from_this(), msg, overflow_policy_);
}
else {
throw_spdlog_ex("async log: thread pool doesn't exist anymore");
}
}
SPDLOG_LOGGER_CATCH(msg.source)
}
// send flush request to the thread pool
SPDLOG_INLINE void spdlog::async_logger::flush_() {
SPDLOG_TRY {
auto pool_ptr = thread_pool_.lock();
if (!pool_ptr) {
throw_spdlog_ex("async flush: thread pool doesn't exist anymore");
}
// Wait for the flush operation to complete.
// This might throw exception if the flush message get dropped because of overflow.
pool_ptr->post_and_wait_for_flush(shared_from_this(), overflow_policy_);
}
SPDLOG_LOGGER_CATCH(source_loc())
SPDLOG_INLINE void spdlog::async_logger::flush_(){
SPDLOG_TRY{if (auto pool_ptr = thread_pool_.lock()){
pool_ptr->post_flush(shared_from_this(), overflow_policy_);
}
else {
throw_spdlog_ex("async flush: thread pool doesn't exist anymore");
}
}
SPDLOG_LOGGER_CATCH(source_loc())
}
//

View File

@ -62,25 +62,9 @@ void SPDLOG_INLINE thread_pool::post_log(async_logger_ptr &&worker_ptr,
post_async_msg_(std::move(async_m), overflow_policy);
}
void SPDLOG_INLINE thread_pool::post_and_wait_for_flush(async_logger_ptr &&worker_ptr,
async_overflow_policy overflow_policy) {
std::mutex m;
std::unique_lock<std::mutex> l(m);
std::condition_variable cv;
std::atomic<async_msg_flush> cv_flag{async_msg_flush::not_synced};
post_async_msg_(async_msg(std::move(worker_ptr), async_msg_type::flush, [&cv, &cv_flag](async_msg_flush flushed) {
cv_flag.store(flushed, std::memory_order_relaxed);
cv.notify_all();
}), overflow_policy);
while(cv_flag.load(std::memory_order_relaxed) == async_msg_flush::not_synced) {
cv.wait_for(l, std::chrono::milliseconds(100), [&cv_flag]() {
return cv_flag.load(std::memory_order_relaxed) != async_msg_flush::not_synced;
});
}
if(cv_flag.load(std::memory_order_relaxed) == async_msg_flush::synced_not_flushed) {
throw spdlog_ex("Request for flushing got dropped.");
}
void SPDLOG_INLINE thread_pool::post_flush(async_logger_ptr &&worker_ptr,
async_overflow_policy overflow_policy) {
post_async_msg_(async_msg(std::move(worker_ptr), async_msg_type::flush), overflow_policy);
}
size_t SPDLOG_INLINE thread_pool::overrun_counter() { return q_.overrun_counter(); }
@ -124,10 +108,6 @@ bool SPDLOG_INLINE thread_pool::process_next_msg_() {
}
case async_msg_type::flush: {
incoming_async_msg.worker_ptr->backend_flush_();
if(incoming_async_msg.flush_callback) {
incoming_async_msg.flush_callback(async_msg_flush::synced_flushed);
incoming_async_msg.flush_callback = nullptr;
}
return true;
}

View File

@ -22,60 +22,46 @@ using async_logger_ptr = std::shared_ptr<spdlog::async_logger>;
enum class async_msg_type { log, flush, terminate };
enum class async_msg_flush { not_synced, synced_flushed, synced_not_flushed };
// Async msg to move to/from the queue
// Movable only. should never be copied
struct async_msg : log_msg_buffer {
async_msg_type msg_type{async_msg_type::log};
async_logger_ptr worker_ptr;
std::function<void(async_msg_flush)> flush_callback;
async_msg() = default;
~async_msg() {
if (flush_callback) {
flush_callback(async_msg_flush::synced_not_flushed);
flush_callback = nullptr;
}
}
~async_msg() = default;
// should only be moved in or out of the queue..
async_msg(const async_msg &) = delete;
async_msg(async_msg &&other) SPDLOG_NOEXCEPT
// support for vs2013 move
#if defined(_MSC_VER) && _MSC_VER <= 1800
async_msg(async_msg &&other)
: log_msg_buffer(std::move(other)),
msg_type(other.msg_type),
worker_ptr(std::move(other.worker_ptr)),
flush_callback(std::move(other.flush_callback)) {
other.flush_callback = nullptr;
}
worker_ptr(std::move(other.worker_ptr)) {}
async_msg &operator=(async_msg &&other) SPDLOG_NOEXCEPT {
*static_cast<log_msg_buffer *>(this) = static_cast<log_msg_buffer&&>(other);
async_msg &operator=(async_msg &&other) {
*static_cast<log_msg_buffer *>(this) = std::move(other);
msg_type = other.msg_type;
worker_ptr = std::move(other.worker_ptr);
std::swap(flush_callback, other.flush_callback);
return *this;
}
#else // (_MSC_VER) && _MSC_VER <= 1800
async_msg(async_msg &&) = default;
async_msg &operator=(async_msg &&) = default;
#endif
// construct from log_msg with given type
async_msg(async_logger_ptr &&worker, async_msg_type the_type, const details::log_msg &m)
: log_msg_buffer{m},
msg_type{the_type},
worker_ptr{std::move(worker)},
flush_callback{} {}
worker_ptr{std::move(worker)} {}
async_msg(async_logger_ptr &&worker, async_msg_type the_type)
: log_msg_buffer{},
msg_type{the_type},
worker_ptr{std::move(worker)},
flush_callback{} {}
async_msg(async_logger_ptr &&worker, async_msg_type the_type, std::function<void(async_msg_flush)> &&callback)
: log_msg_buffer{},
msg_type{the_type},
worker_ptr{std::move(worker)},
flush_callback{std::move(callback)} {}
worker_ptr{std::move(worker)} {}
explicit async_msg(async_msg_type the_type)
: async_msg{nullptr, the_type} {}
@ -102,8 +88,7 @@ public:
void post_log(async_logger_ptr &&worker_ptr,
const details::log_msg &msg,
async_overflow_policy overflow_policy);
void post_and_wait_for_flush(async_logger_ptr &&worker_ptr,
async_overflow_policy overflow_policy);
void post_flush(async_logger_ptr &&worker_ptr, async_overflow_policy overflow_policy);
size_t overrun_counter();
void reset_overrun_counter();
size_t discard_counter();

View File

@ -93,50 +93,6 @@ TEST_CASE("flush", "[async]") {
REQUIRE(test_sink->flush_counter() == 1);
}
TEST_CASE("multithread flush", "[async]") {
auto test_sink = std::make_shared<spdlog::sinks::test_sink_mt>();
size_t queue_size = 2;
size_t messages = 10;
size_t n_threads = 10;
size_t flush_count = 1024;
std::mutex mtx;
std::vector<std::string> errmsgs;
{
auto tp = std::make_shared<spdlog::details::thread_pool>(queue_size, 1);
auto logger = std::make_shared<spdlog::async_logger>(
"as", test_sink, tp, spdlog::async_overflow_policy::discard_new);
logger->set_error_handler([&](const std::string &) {
std::unique_lock<std::mutex> lock(mtx);
errmsgs.push_back("Broken promise");
});
for (size_t i = 0; i < messages; i++) {
logger->info("Hello message #{}", i);
}
std::vector<std::thread> threads;
for (size_t i = 0; i < n_threads; i++) {
threads.emplace_back([logger, flush_count] {
for (size_t j = 0; j < flush_count; j++) {
// flush does not throw exception even if failed.
// Instead, the error handler is invoked.
logger->flush();
}
});
}
for (auto &t : threads) {
t.join();
}
}
REQUIRE(test_sink->flush_counter() >= 1);
REQUIRE(test_sink->flush_counter() + errmsgs.size() == n_threads * flush_count);
if (errmsgs.size() > 0) {
REQUIRE(errmsgs[0] == "Broken promise");
}
}
TEST_CASE("async periodic flush", "[async]") {
auto logger = spdlog::create_async<spdlog::sinks::test_sink_mt>("as");
auto test_sink = std::static_pointer_cast<spdlog::sinks::test_sink_mt>(logger->sinks()[0]);