From a1a71804f4af2ebd7bb6b5060159e5fe027629fa Mon Sep 17 00:00:00 2001 From: gabime Date: Sat, 19 May 2018 14:41:53 +0300 Subject: [PATCH 1/2] replaced the lockfree queue with bounded, locked queue --- example/bench.cpp | 33 +- example/example.vcxproj | 15 +- include/spdlog/details/async_log_helper.h | 527 ++++++++++----------- include/spdlog/details/async_logger_impl.h | 4 +- include/spdlog/details/mpmc_blocking_q.h | 89 ++++ include/spdlog/details/mpmc_bounded_q.h | 183 ------- include/spdlog/sinks/test_sink.h | 40 ++ 7 files changed, 408 insertions(+), 483 deletions(-) create mode 100644 include/spdlog/details/mpmc_blocking_q.h delete mode 100644 include/spdlog/details/mpmc_bounded_q.h create mode 100644 include/spdlog/sinks/test_sink.h diff --git a/example/bench.cpp b/example/bench.cpp index 49385e0f..d92e20bc 100644 --- a/example/bench.cpp +++ b/example/bench.cpp @@ -7,8 +7,7 @@ // bench.cpp : spdlog benchmarks // #include "spdlog/async_logger.h" -#include "spdlog/sinks/file_sinks.h" -#include "spdlog/sinks/null_sink.h" +#include "spdlog/sinks/test_sink.h" #include "spdlog/spdlog.h" #include "utils.h" #include @@ -30,12 +29,10 @@ void bench_mt(int howmany, std::shared_ptr log, int thread_count int main(int argc, char *argv[]) { - int queue_size = 1048576; + int queue_size = 1024*1024; int howmany = 1000000; int threads = 10; - int file_size = 30 * 1024 * 1024; - int rotating_files = 5; - + try { @@ -45,7 +42,7 @@ int main(int argc, char *argv[]) threads = atoi(argv[2]); if (argc > 3) queue_size = atoi(argv[3]); - + /* cout << "*******************************************************************************\n"; cout << "Single thread, " << format(howmany) << " iterations" << endl; cout << "*******************************************************************************\n"; @@ -67,17 +64,32 @@ int main(int argc, char *argv[]) bench_mt(howmany, daily_mt, threads); bench(howmany, spdlog::create("null_mt")); + */ + cout << "\n*******************************************************************************\n"; cout << "async logging.. " << threads << " threads sharing same logger, " << format(howmany) << " iterations " << endl; cout << "*******************************************************************************\n"; spdlog::set_async_mode(queue_size); - for (int i = 0; i < 3; ++i) + + for (int i = 0; i < 300; ++i) { - auto as = spdlog::daily_logger_st("as", "logs/daily_async.log"); - bench_mt(howmany, as, threads); + //auto as = spdlog::daily_logger_mt("as", "logs/daily_async.log"); + auto test_sink = std::make_shared(); + //auto as = spdlog::basic_logger_mt("as", "logs/async.log", true); + auto as = std::make_shared("as", test_sink, queue_size, async_overflow_policy::block_retry, nullptr, std::chrono::milliseconds(2000)); + bench_mt(howmany, as, threads); + as.reset(); spdlog::drop("as"); + + auto msg_counter = test_sink->msg_counter(); + cout << "Count:" << msg_counter << endl; + if (msg_counter != howmany) + { + cout << "ERROR! Expected " << howmany; + exit(0); + } } } catch (std::exception &ex) @@ -119,6 +131,7 @@ void bench_mt(int howmany, std::shared_ptr log, int thread_count if (counter > howmany) break; log->info("Hello logger: msg number {}", counter); + //std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } })); } diff --git a/example/example.vcxproj b/example/example.vcxproj index 63db2b5d..9827f685 100644 --- a/example/example.vcxproj +++ b/example/example.vcxproj @@ -1,5 +1,5 @@  - + Debug @@ -10,9 +10,6 @@ Win32 - - - @@ -21,7 +18,6 @@ - @@ -46,23 +42,26 @@ + + + {9E5AB93A-0CCE-4BAC-9FCB-0FC9CB5EB8D2} Win32Proj . - 8.1 + 10.0.16299.0 Application true - v120 + v141 Unicode Application false - v120 + v141 true Unicode diff --git a/include/spdlog/details/async_log_helper.h b/include/spdlog/details/async_log_helper.h index a7780740..e1d1952d 100644 --- a/include/spdlog/details/async_log_helper.h +++ b/include/spdlog/details/async_log_helper.h @@ -14,11 +14,12 @@ #include "../common.h" #include "../details/log_msg.h" -#include "../details/mpmc_bounded_q.h" +#include "../details/mpmc_blocking_q.h" #include "../details/os.h" #include "../formatter.h" #include "../sinks/sink.h" +#include #include #include #include @@ -27,367 +28,333 @@ #include #include #include +#include namespace spdlog { -namespace details { + namespace details { -class async_log_helper -{ - // Async msg to move to/from the queue - // Movable only. should never be copied - enum class async_msg_type - { - log, - flush, - terminate - }; + class async_log_helper + { + // Async msg to move to/from the queue + // Movable only. should never be copied + enum class async_msg_type + { + log, + flush, + terminate + }; - struct async_msg - { - std::string logger_name; - level::level_enum level; - log_clock::time_point time; - size_t thread_id; - std::string txt; - async_msg_type msg_type; - size_t msg_id; + struct async_msg + { + std::string logger_name; + level::level_enum level; + log_clock::time_point time; + size_t thread_id; + std::string txt; + async_msg_type msg_type; + size_t msg_id; - async_msg() = default; - ~async_msg() = default; + async_msg() = default; + ~async_msg() = default; - explicit async_msg(async_msg_type m_type) - : level(level::info) - , thread_id(0) - , msg_type(m_type) - , msg_id(0) - { - } + explicit async_msg(async_msg_type m_type) + : level(level::info) + , thread_id(0) + , msg_type(m_type) + , msg_id(0) + { + } - async_msg(async_msg &&other) SPDLOG_NOEXCEPT : logger_name(std::move(other.logger_name)), - level(std::move(other.level)), - time(std::move(other.time)), - thread_id(other.thread_id), - txt(std::move(other.txt)), - msg_type(std::move(other.msg_type)), - msg_id(other.msg_id) - { - } + async_msg(async_msg &&other) SPDLOG_NOEXCEPT : logger_name(std::move(other.logger_name)), + level(std::move(other.level)), + time(std::move(other.time)), + thread_id(other.thread_id), + txt(std::move(other.txt)), + msg_type(std::move(other.msg_type)), + msg_id(other.msg_id) + { + } - async_msg &operator=(async_msg &&other) SPDLOG_NOEXCEPT - { - logger_name = std::move(other.logger_name); - level = other.level; - time = std::move(other.time); - thread_id = other.thread_id; - txt = std::move(other.txt); - msg_type = other.msg_type; - msg_id = other.msg_id; - return *this; - } + async_msg &operator=(async_msg &&other) SPDLOG_NOEXCEPT + { + logger_name = std::move(other.logger_name); + level = other.level; + time = std::move(other.time); + thread_id = other.thread_id; + txt = std::move(other.txt); + msg_type = other.msg_type; + msg_id = other.msg_id; + return *this; + } - // never copy or assign. should only be moved.. - async_msg(const async_msg &) = delete; - async_msg &operator=(const async_msg &other) = delete; + // never copy or assign. should only be moved.. + async_msg(const async_msg &) = delete; + async_msg &operator=(const async_msg &other) = delete; - // construct from log_msg - explicit async_msg(const details::log_msg &m) - : level(m.level) - , time(m.time) - , thread_id(m.thread_id) - , txt(m.raw.data(), m.raw.size()) - , msg_type(async_msg_type::log) - , msg_id(m.msg_id) - { + // construct from log_msg + explicit async_msg(const details::log_msg &m) + : level(m.level) + , time(m.time) + , thread_id(m.thread_id) + , txt(m.raw.data(), m.raw.size()) + , msg_type(async_msg_type::log) + , msg_id(m.msg_id) + { #ifndef SPDLOG_NO_NAME - logger_name = *m.logger_name; + logger_name = *m.logger_name; #endif - } + } - // copy into log_msg - void fill_log_msg(log_msg &msg) - { - msg.logger_name = &logger_name; - msg.level = level; - msg.time = time; - msg.thread_id = thread_id; - msg.raw << txt; - msg.msg_id = msg_id; - } - }; + // copy into log_msg + void fill_log_msg(log_msg &msg) + { + msg.logger_name = &logger_name; + msg.level = level; + msg.time = time; + msg.thread_id = thread_id; + msg.raw << txt; + msg.msg_id = msg_id; + } + }; -public: - using item_type = async_msg; - using q_type = details::mpmc_bounded_queue; + public: + using item_type = async_msg; + using q_type = details::mpmc_bounded_queue ; - using clock = std::chrono::steady_clock; + using clock = std::chrono::steady_clock; - async_log_helper(formatter_ptr formatter, std::vector sinks, size_t queue_size, const log_err_handler err_handler, - const async_overflow_policy overflow_policy = async_overflow_policy::block_retry, std::function worker_warmup_cb = nullptr, - const std::chrono::milliseconds &flush_interval_ms = std::chrono::milliseconds::zero(), - std::function worker_teardown_cb = nullptr); + async_log_helper(formatter_ptr formatter, std::vector sinks, size_t queue_size, const log_err_handler err_handler, + const async_overflow_policy overflow_policy = async_overflow_policy::block_retry, std::function worker_warmup_cb = nullptr, + const std::chrono::milliseconds &flush_interval_ms = std::chrono::milliseconds::zero(), + std::function worker_teardown_cb = nullptr); - void log(const details::log_msg &msg); + void log(const details::log_msg &msg); - // stop logging and join the back thread - ~async_log_helper(); + // stop logging and join the back thread + ~async_log_helper(); - async_log_helper(const async_log_helper &) = delete; - async_log_helper &operator=(const async_log_helper &) = delete; + async_log_helper(const async_log_helper &) = delete; + async_log_helper &operator=(const async_log_helper &) = delete; - void set_formatter(formatter_ptr msg_formatter); + void set_formatter(formatter_ptr msg_formatter); - void flush(bool wait_for_q); + void flush(); - void set_error_handler(spdlog::log_err_handler err_handler); + void set_error_handler(spdlog::log_err_handler err_handler); -private: - formatter_ptr _formatter; - std::vector> _sinks; + private: + formatter_ptr _formatter; + std::vector> _sinks; - // queue of messages to log - q_type _q; + // queue of messages to log + q_type _q; - log_err_handler _err_handler; + log_err_handler _err_handler; - bool _flush_requested; + std::chrono::time_point _last_flush; - bool _terminate_requested; + // overflow policy + const async_overflow_policy _overflow_policy; - // overflow policy - const async_overflow_policy _overflow_policy; + // worker thread warmup callback - one can set thread priority, affinity, etc + const std::function _worker_warmup_cb; - // worker thread warmup callback - one can set thread priority, affinity, etc - const std::function _worker_warmup_cb; + // auto periodic sink flush parameter + const std::chrono::milliseconds _flush_interval_ms; - // auto periodic sink flush parameter - const std::chrono::milliseconds _flush_interval_ms; + // worker thread teardown callback + const std::function _worker_teardown_cb; - // worker thread teardown callback - const std::function _worker_teardown_cb; + std::mutex null_mutex_; + //null_mutex null_mutex_; + std::condition_variable_any not_empty_cv_; + std::condition_variable_any not_full_cv_; - // worker thread - std::thread _worker_thread; + // worker thread + std::thread _worker_thread; - void push_msg(async_msg &&new_msg); + void enqueue_msg(async_msg &&new_msg, async_overflow_policy policy); - // worker thread main loop - void worker_loop(); + // worker thread main loop + void worker_loop(); - // pop next message from the queue and process it. will set the last_pop to the pop time - // return false if termination of the queue is required - bool process_next_msg(log_clock::time_point &last_pop, log_clock::time_point &last_flush); + // dequeue next message from the queue and process it. + // return false if termination of the queue is required + bool process_next_msg(); - void handle_flush_interval(log_clock::time_point &now, log_clock::time_point &last_flush); + void handle_flush_interval(); - // sleep,yield or return immediately using the time passed since last message as a hint - static void sleep_or_yield(const spdlog::log_clock::time_point &now, const log_clock::time_point &last_op_time); - - // wait until the queue is empty - void wait_empty_q(); -}; -} // namespace details + void flush_sinks(); + + }; + } // namespace details } // namespace spdlog /////////////////////////////////////////////////////////////////////////////// // async_sink class implementation /////////////////////////////////////////////////////////////////////////////// inline spdlog::details::async_log_helper::async_log_helper(formatter_ptr formatter, std::vector sinks, size_t queue_size, - log_err_handler err_handler, const async_overflow_policy overflow_policy, std::function worker_warmup_cb, - const std::chrono::milliseconds &flush_interval_ms, std::function worker_teardown_cb) - : _formatter(std::move(formatter)) - , _sinks(std::move(sinks)) - , _q(queue_size) - , _err_handler(std::move(err_handler)) - , _flush_requested(false) - , _terminate_requested(false) - , _overflow_policy(overflow_policy) - , _worker_warmup_cb(std::move(worker_warmup_cb)) - , _flush_interval_ms(flush_interval_ms) - , _worker_teardown_cb(std::move(worker_teardown_cb)) + log_err_handler err_handler, const async_overflow_policy overflow_policy, std::function worker_warmup_cb, + const std::chrono::milliseconds &flush_interval_ms, std::function worker_teardown_cb) + : _formatter(std::move(formatter)) + , _sinks(std::move(sinks)) + , _q(queue_size) + , _err_handler(std::move(err_handler)) + , _last_flush(os::now()) + , _overflow_policy(overflow_policy) + , _worker_warmup_cb(std::move(worker_warmup_cb)) + , _flush_interval_ms(flush_interval_ms) + , _worker_teardown_cb(std::move(worker_teardown_cb)) { - _worker_thread = std::thread(&async_log_helper::worker_loop, this); + _worker_thread = std::thread(&async_log_helper::worker_loop, this); } // Send to the worker thread termination message(level=off) // and wait for it to finish gracefully inline spdlog::details::async_log_helper::~async_log_helper() { - try - { - push_msg(async_msg(async_msg_type::terminate)); - _worker_thread.join(); - } - catch (...) // don't crash in destructor - { - } + try + { + enqueue_msg(async_msg(async_msg_type::terminate), async_overflow_policy::block_retry); + _worker_thread.join(); + } + catch (...) // don't crash in destructor + { + } } // Try to push and block until succeeded (if the policy is not to discard when the queue is full) inline void spdlog::details::async_log_helper::log(const details::log_msg &msg) -{ - push_msg(async_msg(msg)); +{ + enqueue_msg(async_msg(msg), _overflow_policy); } -inline void spdlog::details::async_log_helper::push_msg(details::async_log_helper::async_msg &&new_msg) +inline void spdlog::details::async_log_helper::enqueue_msg(details::async_log_helper::async_msg &&new_msg, async_overflow_policy policy) { - if (!_q.enqueue(std::move(new_msg)) && _overflow_policy != async_overflow_policy::discard_log_msg) - { - auto last_op_time = details::os::now(); - auto now = last_op_time; - do - { - now = details::os::now(); - sleep_or_yield(now, last_op_time); - } while (!_q.enqueue(std::move(new_msg))); - } + + // block until succeeded pushing to the queue + if (policy == async_overflow_policy::block_retry) + { + _q.enqueue(std::move(new_msg)); + } + else + { + _q.enqueue_nowait(std::move(new_msg)); + } + } // optionally wait for the queue be empty and request flush from the sinks -inline void spdlog::details::async_log_helper::flush(bool wait_for_q) +inline void spdlog::details::async_log_helper::flush() { - push_msg(async_msg(async_msg_type::flush)); - if (wait_for_q) - { - wait_empty_q(); // return when queue is empty - } + enqueue_msg(async_msg(async_msg_type::flush), _overflow_policy); } inline void spdlog::details::async_log_helper::worker_loop() { - if (_worker_warmup_cb) - { - _worker_warmup_cb(); - } - auto last_pop = details::os::now(); - auto last_flush = last_pop; - auto active = true; - while (active) - { - try - { - active = process_next_msg(last_pop, last_flush); - } - catch (const std::exception &ex) - { - _err_handler(ex.what()); - } - catch (...) - { - _err_handler("Unknown exeption in async logger worker loop."); - } - } - if (_worker_teardown_cb) - { - _worker_teardown_cb(); - } + if (_worker_warmup_cb) + { + _worker_warmup_cb(); + } + auto active = true; + while (active) + { + try + { + active = process_next_msg(); + } + catch (const std::exception &ex) + { + _err_handler(ex.what()); + } + catch (...) + { + _err_handler("Unknown exeption in async logger worker loop."); + } + } + if (_worker_teardown_cb) + { + _worker_teardown_cb(); + } } // process next message in the queue // return true if this thread should still be active (while no terminate msg was received) -inline bool spdlog::details::async_log_helper::process_next_msg(log_clock::time_point &last_pop, log_clock::time_point &last_flush) +inline bool spdlog::details::async_log_helper::process_next_msg() { - async_msg incoming_async_msg; + async_msg incoming_async_msg; + bool dequeued = _q.dequeue_for(incoming_async_msg, std::chrono::milliseconds(1000)); + if (!dequeued) + { + handle_flush_interval(); + return true; + } + + switch (incoming_async_msg.msg_type) + { + case async_msg_type::flush: + flush_sinks(); + return true; + - if (_q.dequeue(incoming_async_msg)) - { - last_pop = details::os::now(); - switch (incoming_async_msg.msg_type) - { - case async_msg_type::flush: - _flush_requested = true; - break; + case async_msg_type::terminate: + //flush_sinks(); + return false; + - case async_msg_type::terminate: - _flush_requested = true; - _terminate_requested = true; - break; + default: + log_msg incoming_log_msg; + incoming_async_msg.fill_log_msg(incoming_log_msg); + _formatter->format(incoming_log_msg); + for (auto &s : _sinks) + { + if (s->should_log(incoming_log_msg.level)) + { + s->log(incoming_log_msg); + } + } + return true; + } + assert(false); + return true; // should not be reached - default: - log_msg incoming_log_msg; - incoming_async_msg.fill_log_msg(incoming_log_msg); - _formatter->format(incoming_log_msg); - for (auto &s : _sinks) - { - if (s->should_log(incoming_log_msg.level)) - { - s->log(incoming_log_msg); - } - } - } - return true; - } - - // Handle empty queue.. - // This is the only place where the queue can terminate or flush to avoid losing messages already in the queue - auto now = details::os::now(); - handle_flush_interval(now, last_flush); - sleep_or_yield(now, last_pop); - return !_terminate_requested; } -// flush all sinks if _flush_interval_ms has expired -inline void spdlog::details::async_log_helper::handle_flush_interval(log_clock::time_point &now, log_clock::time_point &last_flush) -{ - auto should_flush = - _flush_requested || (_flush_interval_ms != std::chrono::milliseconds::zero() && now - last_flush >= _flush_interval_ms); - if (should_flush) - { - for (auto &s : _sinks) - { - s->flush(); - } - now = last_flush = details::os::now(); - _flush_requested = false; - } +// flush all sinks if _flush_interval_ms has expired. only called if queue is empty +inline void spdlog::details::async_log_helper::handle_flush_interval() +{ + if (_flush_interval_ms == std::chrono::milliseconds::zero()) + { + return; + } + auto delta = details::os::now() - _last_flush;; + if (delta >= _flush_interval_ms) + { + flush_sinks(); + } } + inline void spdlog::details::async_log_helper::set_formatter(formatter_ptr msg_formatter) { - _formatter = std::move(msg_formatter); + _formatter = std::move(msg_formatter); } -// spin, yield or sleep. use the time passed since last message as a hint -inline void spdlog::details::async_log_helper::sleep_or_yield( - const spdlog::log_clock::time_point &now, const spdlog::log_clock::time_point &last_op_time) -{ - using std::chrono::microseconds; - using std::chrono::milliseconds; - - auto time_since_op = now - last_op_time; - - // spin upto 50 micros - if (time_since_op <= microseconds(50)) - { - return; - } - - // yield upto 150 micros - if (time_since_op <= microseconds(100)) - { - return std::this_thread::yield(); - } - - // sleep for 20 ms upto 200 ms - if (time_since_op <= milliseconds(200)) - { - return details::os::sleep_for_millis(20); - } - - // sleep for 500 ms - return details::os::sleep_for_millis(500); -} - -// wait for the queue to be empty -inline void spdlog::details::async_log_helper::wait_empty_q() -{ - auto last_op = details::os::now(); - while (!_q.is_empty()) - { - sleep_or_yield(details::os::now(), last_op); - } -} inline void spdlog::details::async_log_helper::set_error_handler(spdlog::log_err_handler err_handler) { - _err_handler = std::move(err_handler); + _err_handler = std::move(err_handler); } + + +// flush all sinks if _flush_interval_ms has expired. only called if queue is empty +inline void spdlog::details::async_log_helper::flush_sinks() +{ + printf("FLUSH!\n"); + for (auto &s : _sinks) + { + s->flush(); + } + _last_flush = os::now(); +} + diff --git a/include/spdlog/details/async_logger_impl.h b/include/spdlog/details/async_logger_impl.h index 748a53ac..b4f68afd 100644 --- a/include/spdlog/details/async_logger_impl.h +++ b/include/spdlog/details/async_logger_impl.h @@ -44,7 +44,7 @@ inline spdlog::async_logger::async_logger(const std::string &logger_name, sink_p inline void spdlog::async_logger::flush() { - _async_log_helper->flush(true); + _async_log_helper->flush(); } // Error handler @@ -80,7 +80,7 @@ inline void spdlog::async_logger::_sink_it(details::log_msg &msg) _async_log_helper->log(msg); if (_should_flush_on(msg)) { - _async_log_helper->flush(false); // do async flush + _async_log_helper->flush(); // do async flush } } catch (const std::exception &ex) diff --git a/include/spdlog/details/mpmc_blocking_q.h b/include/spdlog/details/mpmc_blocking_q.h new file mode 100644 index 00000000..0fc10f2c --- /dev/null +++ b/include/spdlog/details/mpmc_blocking_q.h @@ -0,0 +1,89 @@ +#pragma once + +// +// Copyright(c) 2018 Gabi Melman. +// Distributed under the MIT License (http://opensource.org/licenses/MIT) +// + +// async log helper : +// multi producer-multi consumer blocking queue +// enqueue(..) - will block until room found to put the new message +// enqueue_nowait(..) - will return immediatly with false if no room left in the queue +// dequeue_for(..) - will block until the queue is not empty or timeout passed + + +#include +#include +#include + + +namespace spdlog { + namespace details { + + template + class mpmc_bounded_queue + { + public: + + using item_type = T; + explicit mpmc_bounded_queue(size_t max_items) : max_items_(max_items) {} + + // try to enqueue and block if no room left + void enqueue(T &&item) + { + { + std::unique_lock lock(queue_mutex_); + pop_cv_.wait(lock, [this] {return this->q_.size() <= this->max_items_; }); + q_.push(std::forward(item)); + } + push_cv_.notify_one(); + } + + // try to enqueue and return immdeialty false if no room left + bool enqueue_nowait(T &&item) + { + { + std::unique_lock lock(queue_mutex_); + if (this->q_.size() >= this->max_items_) + { + return false; + } + q_.push(std::forward(item)); + } + push_cv_.notify_one(); + return true; + } + + // try to dequeue item. if no item found. wait upto timeout and try again + // Return true, if succeeded dequeue item, false otherwise + bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration) + { + { + std::unique_lock lock(queue_mutex_); + //push_cv_.wait(lock, [this] {return this->q_.size() > 0; }); + bool found_msg = push_cv_.wait_for(lock, wait_duration, [this] {return this->q_.size() > 0; }); + if (!found_msg) + { + return false; + } + popped_item = std::move(q_.front()); + q_.pop(); + } + pop_cv_.notify_one(); + return true; + } + + + + + private: + size_t max_items_; + std::mutex queue_mutex_; + std::condition_variable push_cv_; + std::condition_variable pop_cv_; + + std::queue q_; + + }; + } +} diff --git a/include/spdlog/details/mpmc_bounded_q.h b/include/spdlog/details/mpmc_bounded_q.h deleted file mode 100644 index fd41e4c8..00000000 --- a/include/spdlog/details/mpmc_bounded_q.h +++ /dev/null @@ -1,183 +0,0 @@ -/* -A modified version of Bounded MPMC queue by Dmitry Vyukov. - -Original code from: -http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue - -licensed by Dmitry Vyukov under the terms below: - -Simplified BSD license - -Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. -Redistribution and use in source and binary forms, with or without modification, -are permitted provided that the following conditions are met: -1. Redistributions of source code must retain the above copyright notice, this list of -conditions and the following disclaimer. - -2. Redistributions in binary form must reproduce the above copyright notice, this list -of conditions and the following disclaimer in the documentation and/or other materials -provided with the distribution. - -THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED -WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF -MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT -SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, -INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, -OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF -LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE -OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF -ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -The views and conclusions contained in the software and documentation are those of the authors and -should not be interpreted as representing official policies, either expressed or implied, of Dmitry Vyukov. -*/ - -/* -The code in its current form adds the license below: - -Copyright(c) 2015 Gabi Melman. -Distributed under the MIT License (http://opensource.org/licenses/MIT) - -*/ - -#pragma once - -#include "../common.h" - -#include -#include - -namespace spdlog { -namespace details { - -template -class mpmc_bounded_queue -{ -public: - using item_type = T; - - explicit mpmc_bounded_queue(size_t buffer_size) - : max_size_(buffer_size) - , buffer_(new cell_t[buffer_size]) - , buffer_mask_(buffer_size - 1) - { - // queue size must be power of two - if (!((buffer_size >= 2) && ((buffer_size & (buffer_size - 1)) == 0))) - { - throw spdlog_ex("async logger queue size must be power of two"); - } - - for (size_t i = 0; i != buffer_size; i += 1) - { - buffer_[i].sequence_.store(i, std::memory_order_relaxed); - } - enqueue_pos_.store(0, std::memory_order_relaxed); - dequeue_pos_.store(0, std::memory_order_relaxed); - } - - ~mpmc_bounded_queue() - { - delete[] buffer_; - } - - mpmc_bounded_queue(mpmc_bounded_queue const &) = delete; - void operator=(mpmc_bounded_queue const &) = delete; - - bool enqueue(T &&data) - { - cell_t *cell; - size_t pos = enqueue_pos_.load(std::memory_order_relaxed); - for (;;) - { - cell = &buffer_[pos & buffer_mask_]; - size_t seq = cell->sequence_.load(std::memory_order_acquire); - intptr_t dif = static_cast(seq) - static_cast(pos); - if (dif == 0) - { - if (enqueue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) - { - break; - } - } - else if (dif < 0) - { - return false; - } - else - { - pos = enqueue_pos_.load(std::memory_order_relaxed); - } - } - cell->data_ = std::move(data); - cell->sequence_.store(pos + 1, std::memory_order_release); - return true; - } - - bool dequeue(T &data) - { - cell_t *cell; - size_t pos = dequeue_pos_.load(std::memory_order_relaxed); - for (;;) - { - cell = &buffer_[pos & buffer_mask_]; - size_t seq = cell->sequence_.load(std::memory_order_acquire); - intptr_t dif = static_cast(seq) - static_cast(pos + 1); - if (dif == 0) - { - if (dequeue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) - { - break; - } - } - else if (dif < 0) - { - return false; - } - else - { - pos = dequeue_pos_.load(std::memory_order_relaxed); - } - } - data = std::move(cell->data_); - cell->sequence_.store(pos + buffer_mask_ + 1, std::memory_order_release); - return true; - } - - bool is_empty() - { - size_t front, front1, back; - // try to take a consistent snapshot of front/tail. - do - { - front = enqueue_pos_.load(std::memory_order_acquire); - back = dequeue_pos_.load(std::memory_order_acquire); - front1 = enqueue_pos_.load(std::memory_order_relaxed); - } while (front != front1); - return back == front; - } - -private: - struct cell_t - { - std::atomic sequence_; - T data_; - }; - - size_t const max_size_; - - static size_t const cacheline_size = 64; - using cacheline_pad_t = char[cacheline_size]; - - cacheline_pad_t pad0_; - cell_t *const buffer_; - size_t const buffer_mask_; - cacheline_pad_t pad1_; - std::atomic enqueue_pos_; - cacheline_pad_t pad2_; - std::atomic dequeue_pos_; - cacheline_pad_t pad3_; -}; - -} // namespace details -} // namespace spdlog diff --git a/include/spdlog/sinks/test_sink.h b/include/spdlog/sinks/test_sink.h new file mode 100644 index 00000000..ba7336de --- /dev/null +++ b/include/spdlog/sinks/test_sink.h @@ -0,0 +1,40 @@ +// +// Copyright(c) 2015 Gabi Melman. +// Distributed under the MIT License (http://opensource.org/licenses/MIT) +// + +#pragma once + +#include "../details/null_mutex.h" +#include "base_sink.h" + +#include + +namespace spdlog { +namespace sinks { + +template +class test_sink : public base_sink +{ +public: + size_t msg_counter() + { + return msg_counter_; + } + +protected: + void _sink_it(const details::log_msg &) override + { + msg_counter_++; + } + + void _flush() override {} + size_t msg_counter_{ 0 }; + +}; + +using test_sink_mt = test_sink; +using test_sink_st = test_sink; + +} // namespace sinks +} // namespace spdlog From f57fc1b2fad328f7149beca07214af1480251613 Mon Sep 17 00:00:00 2001 From: gabime Date: Sat, 19 May 2018 16:06:57 +0300 Subject: [PATCH 2/2] fixed flush interval in async helper --- example/bench.cpp | 33 +- include/spdlog/contrib/sinks/step_file_sink.h | 17 +- include/spdlog/details/async_log_helper.h | 475 +++++++++--------- include/spdlog/details/mpmc_blocking_q.h | 125 +++-- include/spdlog/sinks/test_sink.h | 19 +- 5 files changed, 324 insertions(+), 345 deletions(-) diff --git a/example/bench.cpp b/example/bench.cpp index d92e20bc..49385e0f 100644 --- a/example/bench.cpp +++ b/example/bench.cpp @@ -7,7 +7,8 @@ // bench.cpp : spdlog benchmarks // #include "spdlog/async_logger.h" -#include "spdlog/sinks/test_sink.h" +#include "spdlog/sinks/file_sinks.h" +#include "spdlog/sinks/null_sink.h" #include "spdlog/spdlog.h" #include "utils.h" #include @@ -29,10 +30,12 @@ void bench_mt(int howmany, std::shared_ptr log, int thread_count int main(int argc, char *argv[]) { - int queue_size = 1024*1024; + int queue_size = 1048576; int howmany = 1000000; int threads = 10; - + int file_size = 30 * 1024 * 1024; + int rotating_files = 5; + try { @@ -42,7 +45,7 @@ int main(int argc, char *argv[]) threads = atoi(argv[2]); if (argc > 3) queue_size = atoi(argv[3]); - /* + cout << "*******************************************************************************\n"; cout << "Single thread, " << format(howmany) << " iterations" << endl; cout << "*******************************************************************************\n"; @@ -64,32 +67,17 @@ int main(int argc, char *argv[]) bench_mt(howmany, daily_mt, threads); bench(howmany, spdlog::create("null_mt")); - */ - cout << "\n*******************************************************************************\n"; cout << "async logging.. " << threads << " threads sharing same logger, " << format(howmany) << " iterations " << endl; cout << "*******************************************************************************\n"; spdlog::set_async_mode(queue_size); - - for (int i = 0; i < 300; ++i) + for (int i = 0; i < 3; ++i) { - //auto as = spdlog::daily_logger_mt("as", "logs/daily_async.log"); - auto test_sink = std::make_shared(); - //auto as = spdlog::basic_logger_mt("as", "logs/async.log", true); - auto as = std::make_shared("as", test_sink, queue_size, async_overflow_policy::block_retry, nullptr, std::chrono::milliseconds(2000)); - bench_mt(howmany, as, threads); - as.reset(); + auto as = spdlog::daily_logger_st("as", "logs/daily_async.log"); + bench_mt(howmany, as, threads); spdlog::drop("as"); - - auto msg_counter = test_sink->msg_counter(); - cout << "Count:" << msg_counter << endl; - if (msg_counter != howmany) - { - cout << "ERROR! Expected " << howmany; - exit(0); - } } } catch (std::exception &ex) @@ -131,7 +119,6 @@ void bench_mt(int howmany, std::shared_ptr log, int thread_count if (counter > howmany) break; log->info("Hello logger: msg number {}", counter); - //std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } })); } diff --git a/include/spdlog/contrib/sinks/step_file_sink.h b/include/spdlog/contrib/sinks/step_file_sink.h index 2becf29e..7e90e0ad 100644 --- a/include/spdlog/contrib/sinks/step_file_sink.h +++ b/include/spdlog/contrib/sinks/step_file_sink.h @@ -17,8 +17,10 @@ // // Create a file logger which creates new files with a specified time step and fixed file size: // -// std::shared_ptr step_logger_mt(const std::string &logger_name, const filename_t &filename, unsigned seconds = 60, const filename_t &tmp_ext = ".tmp", unsigned max_file_size = std::numeric_limits::max()); -// std::shared_ptr step_logger_st(const std::string &logger_name, const filename_t &filename, unsigned seconds = 60, const filename_t &tmp_ext = ".tmp", unsigned max_file_size = std::numeric_limits::max()); +// std::shared_ptr step_logger_mt(const std::string &logger_name, const filename_t &filename, unsigned seconds = 60, const +// filename_t &tmp_ext = ".tmp", unsigned max_file_size = std::numeric_limits::max()); std::shared_ptr +// step_logger_st(const std::string &logger_name, const filename_t &filename, unsigned seconds = 60, const filename_t &tmp_ext = ".tmp", +// unsigned max_file_size = std::numeric_limits::max()); // Example for spdlog_impl.h // Create a file logger that creates new files with a specified increment @@ -76,7 +78,7 @@ public: { throw spdlog_ex("step_file_sink: Invalid max log size in ctor"); } - + _tp = _next_tp(); std::tie(_current_filename, _ext) = FileNameCalc::calc_filename(_base_filename, _tmp_ext); @@ -88,7 +90,7 @@ public: _file_helper.open(_current_filename); _current_size = _file_helper.size(); // expensive. called only once } - + ~step_file_sink() { try @@ -96,7 +98,8 @@ public: close_current_file(); } catch (...) - {} + { + } } protected: @@ -130,7 +133,7 @@ private: { using details::os::filename_to_str; - filename_t src =_current_filename, target; + filename_t src = _current_filename, target; std::tie(target, std::ignore) = details::file_helper::split_by_extenstion(src); target += _ext; @@ -149,7 +152,7 @@ private: filename_t _current_filename; filename_t _ext; unsigned _current_size; - + details::file_helper _file_helper; }; diff --git a/include/spdlog/details/async_log_helper.h b/include/spdlog/details/async_log_helper.h index e1d1952d..7139328b 100644 --- a/include/spdlog/details/async_log_helper.h +++ b/include/spdlog/details/async_log_helper.h @@ -19,342 +19,337 @@ #include "../formatter.h" #include "../sinks/sink.h" -#include #include +#include #include #include +#include #include #include #include #include #include -#include namespace spdlog { - namespace details { +namespace details { - class async_log_helper - { - // Async msg to move to/from the queue - // Movable only. should never be copied - enum class async_msg_type - { - log, - flush, - terminate - }; +class async_log_helper +{ + // Async msg to move to/from the queue + // Movable only. should never be copied + enum class async_msg_type + { + log, + flush, + terminate + }; - struct async_msg - { - std::string logger_name; - level::level_enum level; - log_clock::time_point time; - size_t thread_id; - std::string txt; - async_msg_type msg_type; - size_t msg_id; + struct async_msg + { + std::string logger_name; + level::level_enum level; + log_clock::time_point time; + size_t thread_id; + std::string txt; + async_msg_type msg_type; + size_t msg_id; - async_msg() = default; - ~async_msg() = default; + async_msg() = default; + ~async_msg() = default; - explicit async_msg(async_msg_type m_type) - : level(level::info) - , thread_id(0) - , msg_type(m_type) - , msg_id(0) - { - } + explicit async_msg(async_msg_type m_type) + : level(level::info) + , thread_id(0) + , msg_type(m_type) + , msg_id(0) + { + } - async_msg(async_msg &&other) SPDLOG_NOEXCEPT : logger_name(std::move(other.logger_name)), - level(std::move(other.level)), - time(std::move(other.time)), - thread_id(other.thread_id), - txt(std::move(other.txt)), - msg_type(std::move(other.msg_type)), - msg_id(other.msg_id) - { - } + async_msg(async_msg &&other) SPDLOG_NOEXCEPT : logger_name(std::move(other.logger_name)), + level(std::move(other.level)), + time(std::move(other.time)), + thread_id(other.thread_id), + txt(std::move(other.txt)), + msg_type(std::move(other.msg_type)), + msg_id(other.msg_id) + { + } - async_msg &operator=(async_msg &&other) SPDLOG_NOEXCEPT - { - logger_name = std::move(other.logger_name); - level = other.level; - time = std::move(other.time); - thread_id = other.thread_id; - txt = std::move(other.txt); - msg_type = other.msg_type; - msg_id = other.msg_id; - return *this; - } + async_msg &operator=(async_msg &&other) SPDLOG_NOEXCEPT + { + logger_name = std::move(other.logger_name); + level = other.level; + time = std::move(other.time); + thread_id = other.thread_id; + txt = std::move(other.txt); + msg_type = other.msg_type; + msg_id = other.msg_id; + return *this; + } - // never copy or assign. should only be moved.. - async_msg(const async_msg &) = delete; - async_msg &operator=(const async_msg &other) = delete; + // never copy or assign. should only be moved.. + async_msg(const async_msg &) = delete; + async_msg &operator=(const async_msg &other) = delete; - // construct from log_msg - explicit async_msg(const details::log_msg &m) - : level(m.level) - , time(m.time) - , thread_id(m.thread_id) - , txt(m.raw.data(), m.raw.size()) - , msg_type(async_msg_type::log) - , msg_id(m.msg_id) - { + // construct from log_msg + explicit async_msg(const details::log_msg &m) + : level(m.level) + , time(m.time) + , thread_id(m.thread_id) + , txt(m.raw.data(), m.raw.size()) + , msg_type(async_msg_type::log) + , msg_id(m.msg_id) + { #ifndef SPDLOG_NO_NAME - logger_name = *m.logger_name; + logger_name = *m.logger_name; #endif - } + } - // copy into log_msg - void fill_log_msg(log_msg &msg) - { - msg.logger_name = &logger_name; - msg.level = level; - msg.time = time; - msg.thread_id = thread_id; - msg.raw << txt; - msg.msg_id = msg_id; - } - }; + // copy into log_msg + void fill_log_msg(log_msg &msg) + { + msg.logger_name = &logger_name; + msg.level = level; + msg.time = time; + msg.thread_id = thread_id; + msg.raw << txt; + msg.msg_id = msg_id; + } + }; - public: - using item_type = async_msg; - using q_type = details::mpmc_bounded_queue ; +public: + using item_type = async_msg; + using q_type = details::mpmc_bounded_queue; - using clock = std::chrono::steady_clock; + using clock = std::chrono::steady_clock; - async_log_helper(formatter_ptr formatter, std::vector sinks, size_t queue_size, const log_err_handler err_handler, - const async_overflow_policy overflow_policy = async_overflow_policy::block_retry, std::function worker_warmup_cb = nullptr, - const std::chrono::milliseconds &flush_interval_ms = std::chrono::milliseconds::zero(), - std::function worker_teardown_cb = nullptr); + async_log_helper(formatter_ptr formatter, std::vector sinks, size_t queue_size, const log_err_handler err_handler, + const async_overflow_policy overflow_policy = async_overflow_policy::block_retry, std::function worker_warmup_cb = nullptr, + const std::chrono::milliseconds &flush_interval_ms = std::chrono::milliseconds::zero(), + std::function worker_teardown_cb = nullptr); - void log(const details::log_msg &msg); + void log(const details::log_msg &msg); - // stop logging and join the back thread - ~async_log_helper(); + // stop logging and join the back thread + ~async_log_helper(); - async_log_helper(const async_log_helper &) = delete; - async_log_helper &operator=(const async_log_helper &) = delete; + async_log_helper(const async_log_helper &) = delete; + async_log_helper &operator=(const async_log_helper &) = delete; - void set_formatter(formatter_ptr msg_formatter); + void set_formatter(formatter_ptr msg_formatter); - void flush(); + void flush(); - void set_error_handler(spdlog::log_err_handler err_handler); + void set_error_handler(spdlog::log_err_handler err_handler); - private: - formatter_ptr _formatter; - std::vector> _sinks; +private: + formatter_ptr _formatter; + std::vector> _sinks; - // queue of messages to log - q_type _q; + // queue of messages to log + q_type _q; - log_err_handler _err_handler; + log_err_handler _err_handler; - std::chrono::time_point _last_flush; + std::chrono::time_point _last_flush; - // overflow policy - const async_overflow_policy _overflow_policy; + // overflow policy + const async_overflow_policy _overflow_policy; - // worker thread warmup callback - one can set thread priority, affinity, etc - const std::function _worker_warmup_cb; + // worker thread warmup callback - one can set thread priority, affinity, etc + const std::function _worker_warmup_cb; - // auto periodic sink flush parameter - const std::chrono::milliseconds _flush_interval_ms; + // auto periodic sink flush parameter + const std::chrono::milliseconds _flush_interval_ms; - // worker thread teardown callback - const std::function _worker_teardown_cb; + // worker thread teardown callback + const std::function _worker_teardown_cb; - std::mutex null_mutex_; - //null_mutex null_mutex_; - std::condition_variable_any not_empty_cv_; - std::condition_variable_any not_full_cv_; + std::mutex null_mutex_; + // null_mutex null_mutex_; + std::condition_variable_any not_empty_cv_; + std::condition_variable_any not_full_cv_; - // worker thread - std::thread _worker_thread; + // worker thread + std::thread _worker_thread; - void enqueue_msg(async_msg &&new_msg, async_overflow_policy policy); + void enqueue_msg(async_msg &&new_msg, async_overflow_policy policy); - // worker thread main loop - void worker_loop(); + // worker thread main loop + void worker_loop(); - // dequeue next message from the queue and process it. - // return false if termination of the queue is required - bool process_next_msg(); + // dequeue next message from the queue and process it. + // return false if termination of the queue is required + bool process_next_msg(); - void handle_flush_interval(); + void handle_flush_interval(); - void flush_sinks(); - - }; - } // namespace details + void flush_sinks(); +}; +} // namespace details } // namespace spdlog /////////////////////////////////////////////////////////////////////////////// // async_sink class implementation /////////////////////////////////////////////////////////////////////////////// inline spdlog::details::async_log_helper::async_log_helper(formatter_ptr formatter, std::vector sinks, size_t queue_size, - log_err_handler err_handler, const async_overflow_policy overflow_policy, std::function worker_warmup_cb, - const std::chrono::milliseconds &flush_interval_ms, std::function worker_teardown_cb) - : _formatter(std::move(formatter)) - , _sinks(std::move(sinks)) - , _q(queue_size) - , _err_handler(std::move(err_handler)) - , _last_flush(os::now()) - , _overflow_policy(overflow_policy) - , _worker_warmup_cb(std::move(worker_warmup_cb)) - , _flush_interval_ms(flush_interval_ms) - , _worker_teardown_cb(std::move(worker_teardown_cb)) + log_err_handler err_handler, const async_overflow_policy overflow_policy, std::function worker_warmup_cb, + const std::chrono::milliseconds &flush_interval_ms, std::function worker_teardown_cb) + : _formatter(std::move(formatter)) + , _sinks(std::move(sinks)) + , _q(queue_size) + , _err_handler(std::move(err_handler)) + , _last_flush(os::now()) + , _overflow_policy(overflow_policy) + , _worker_warmup_cb(std::move(worker_warmup_cb)) + , _flush_interval_ms(flush_interval_ms) + , _worker_teardown_cb(std::move(worker_teardown_cb)) { - _worker_thread = std::thread(&async_log_helper::worker_loop, this); + _worker_thread = std::thread(&async_log_helper::worker_loop, this); } // Send to the worker thread termination message(level=off) // and wait for it to finish gracefully inline spdlog::details::async_log_helper::~async_log_helper() { - try - { - enqueue_msg(async_msg(async_msg_type::terminate), async_overflow_policy::block_retry); - _worker_thread.join(); - } - catch (...) // don't crash in destructor - { - } + try + { + enqueue_msg(async_msg(async_msg_type::terminate), async_overflow_policy::block_retry); + _worker_thread.join(); + } + catch (...) // don't crash in destructor + { + } } // Try to push and block until succeeded (if the policy is not to discard when the queue is full) inline void spdlog::details::async_log_helper::log(const details::log_msg &msg) -{ - enqueue_msg(async_msg(msg), _overflow_policy); +{ + enqueue_msg(async_msg(msg), _overflow_policy); } inline void spdlog::details::async_log_helper::enqueue_msg(details::async_log_helper::async_msg &&new_msg, async_overflow_policy policy) { - // block until succeeded pushing to the queue - if (policy == async_overflow_policy::block_retry) - { - _q.enqueue(std::move(new_msg)); - } - else - { - _q.enqueue_nowait(std::move(new_msg)); - } - + // block until succeeded pushing to the queue + if (policy == async_overflow_policy::block_retry) + { + _q.enqueue(std::move(new_msg)); + } + else + { + _q.enqueue_nowait(std::move(new_msg)); + } } // optionally wait for the queue be empty and request flush from the sinks inline void spdlog::details::async_log_helper::flush() { - enqueue_msg(async_msg(async_msg_type::flush), _overflow_policy); + enqueue_msg(async_msg(async_msg_type::flush), _overflow_policy); } inline void spdlog::details::async_log_helper::worker_loop() { - if (_worker_warmup_cb) - { - _worker_warmup_cb(); - } - auto active = true; - while (active) - { - try - { - active = process_next_msg(); - } - catch (const std::exception &ex) - { - _err_handler(ex.what()); - } - catch (...) - { - _err_handler("Unknown exeption in async logger worker loop."); - } - } - if (_worker_teardown_cb) - { - _worker_teardown_cb(); - } + if (_worker_warmup_cb) + { + _worker_warmup_cb(); + } + auto active = true; + while (active) + { + try + { + active = process_next_msg(); + } + catch (const std::exception &ex) + { + _err_handler(ex.what()); + } + catch (...) + { + _err_handler("Unknown exeption in async logger worker loop."); + } + } + if (_worker_teardown_cb) + { + _worker_teardown_cb(); + } } // process next message in the queue // return true if this thread should still be active (while no terminate msg was received) inline bool spdlog::details::async_log_helper::process_next_msg() { - async_msg incoming_async_msg; - bool dequeued = _q.dequeue_for(incoming_async_msg, std::chrono::milliseconds(1000)); - if (!dequeued) - { + async_msg incoming_async_msg; + bool dequeued = _q.dequeue_for(incoming_async_msg, std::chrono::seconds(2)); + if (!dequeued) + { + handle_flush_interval(); + return true; + } + + switch (incoming_async_msg.msg_type) + { + case async_msg_type::flush: + flush_sinks(); + return true; + + case async_msg_type::terminate: + flush_sinks(); + return false; + + default: + log_msg incoming_log_msg; + incoming_async_msg.fill_log_msg(incoming_log_msg); + _formatter->format(incoming_log_msg); + for (auto &s : _sinks) + { + if (s->should_log(incoming_log_msg.level)) + { + s->log(incoming_log_msg); + } + } handle_flush_interval(); - return true; - } - - switch (incoming_async_msg.msg_type) - { - case async_msg_type::flush: - flush_sinks(); - return true; - - - case async_msg_type::terminate: - //flush_sinks(); - return false; - - - default: - log_msg incoming_log_msg; - incoming_async_msg.fill_log_msg(incoming_log_msg); - _formatter->format(incoming_log_msg); - for (auto &s : _sinks) - { - if (s->should_log(incoming_log_msg.level)) - { - s->log(incoming_log_msg); - } - } - return true; - } - assert(false); - return true; // should not be reached - + return true; + } + assert(false); + return true; // should not be reached } -// flush all sinks if _flush_interval_ms has expired. only called if queue is empty -inline void spdlog::details::async_log_helper::handle_flush_interval() -{ - if (_flush_interval_ms == std::chrono::milliseconds::zero()) - { - return; - } - auto delta = details::os::now() - _last_flush;; - if (delta >= _flush_interval_ms) - { - flush_sinks(); - } -} inline void spdlog::details::async_log_helper::set_formatter(formatter_ptr msg_formatter) { - _formatter = std::move(msg_formatter); + _formatter = std::move(msg_formatter); } - inline void spdlog::details::async_log_helper::set_error_handler(spdlog::log_err_handler err_handler) { - _err_handler = std::move(err_handler); + _err_handler = std::move(err_handler); } +// flush all sinks if _flush_interval_ms has expired. +inline void spdlog::details::async_log_helper::handle_flush_interval() +{ + if (_flush_interval_ms == std::chrono::milliseconds::zero()) + { + return; + } + auto delta = details::os::now() - _last_flush; + ; + if (delta >= _flush_interval_ms) + { + flush_sinks(); + } +} + // flush all sinks if _flush_interval_ms has expired. only called if queue is empty inline void spdlog::details::async_log_helper::flush_sinks() -{ - printf("FLUSH!\n"); - for (auto &s : _sinks) - { - s->flush(); - } - _last_flush = os::now(); +{ + for (auto &s : _sinks) + { + s->flush(); + } + _last_flush = os::now(); } - diff --git a/include/spdlog/details/mpmc_blocking_q.h b/include/spdlog/details/mpmc_blocking_q.h index 0fc10f2c..d4a8a41c 100644 --- a/include/spdlog/details/mpmc_blocking_q.h +++ b/include/spdlog/details/mpmc_blocking_q.h @@ -6,84 +6,79 @@ // // async log helper : -// multi producer-multi consumer blocking queue +// multi producer-multi consumer blocking queue // enqueue(..) - will block until room found to put the new message // enqueue_nowait(..) - will return immediatly with false if no room left in the queue // dequeue_for(..) - will block until the queue is not empty or timeout passed - #include #include #include - namespace spdlog { - namespace details { +namespace details { - template - class mpmc_bounded_queue - { - public: +template +class mpmc_bounded_queue +{ +public: + using item_type = T; + explicit mpmc_bounded_queue(size_t max_items) + : max_items_(max_items) + { + } - using item_type = T; - explicit mpmc_bounded_queue(size_t max_items) : max_items_(max_items) {} + // try to enqueue and block if no room left + void enqueue(T &&item) + { + { + std::unique_lock lock(queue_mutex_); + pop_cv_.wait(lock, [this] { return this->q_.size() < this->max_items_; }); + q_.push(std::move(item)); + } + push_cv_.notify_one(); + } - // try to enqueue and block if no room left - void enqueue(T &&item) - { - { - std::unique_lock lock(queue_mutex_); - pop_cv_.wait(lock, [this] {return this->q_.size() <= this->max_items_; }); - q_.push(std::forward(item)); - } - push_cv_.notify_one(); - } + // try to enqueue and return immdeialty false if no room left + bool enqueue_nowait(T &&item) + { + { + std::unique_lock lock(queue_mutex_); + if (q_.size() == this->max_items_) + { + return false; + } + q_.push(std::forward(item)); + } + push_cv_.notify_one(); + return true; + } - // try to enqueue and return immdeialty false if no room left - bool enqueue_nowait(T &&item) - { - { - std::unique_lock lock(queue_mutex_); - if (this->q_.size() >= this->max_items_) - { - return false; - } - q_.push(std::forward(item)); - } - push_cv_.notify_one(); - return true; - } + // try to dequeue item. if no item found. wait upto timeout and try again + // Return true, if succeeded dequeue item, false otherwise + bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration) + { + { + std::unique_lock lock(queue_mutex_); + if (!push_cv_.wait_for(lock, wait_duration, [this] { return this->q_.size() > 0; })) + { + return false; + } - // try to dequeue item. if no item found. wait upto timeout and try again - // Return true, if succeeded dequeue item, false otherwise - bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration) - { - { - std::unique_lock lock(queue_mutex_); - //push_cv_.wait(lock, [this] {return this->q_.size() > 0; }); - bool found_msg = push_cv_.wait_for(lock, wait_duration, [this] {return this->q_.size() > 0; }); - if (!found_msg) - { - return false; - } - popped_item = std::move(q_.front()); - q_.pop(); - } - pop_cv_.notify_one(); - return true; - } + popped_item = std::move(q_.front()); + q_.pop(); + } + pop_cv_.notify_one(); + return true; + } +private: + size_t max_items_; + std::mutex queue_mutex_; + std::condition_variable push_cv_; + std::condition_variable pop_cv_; - - - private: - size_t max_items_; - std::mutex queue_mutex_; - std::condition_variable push_cv_; - std::condition_variable pop_cv_; - - std::queue q_; - - }; - } -} + std::queue q_; +}; +} // namespace details +} // namespace spdlog diff --git a/include/spdlog/sinks/test_sink.h b/include/spdlog/sinks/test_sink.h index ba7336de..f25654cd 100644 --- a/include/spdlog/sinks/test_sink.h +++ b/include/spdlog/sinks/test_sink.h @@ -17,20 +17,19 @@ template class test_sink : public base_sink { public: - size_t msg_counter() - { - return msg_counter_; - } + size_t msg_counter() + { + return msg_counter_; + } protected: - void _sink_it(const details::log_msg &) override - { - msg_counter_++; - } + void _sink_it(const details::log_msg &) override + { + msg_counter_++; + } void _flush() override {} - size_t msg_counter_{ 0 }; - + size_t msg_counter_{0}; }; using test_sink_mt = test_sink;