From e69aafc73793a3d262e27dc4a80380ab41e7723c Mon Sep 17 00:00:00 2001 From: gabime Date: Sat, 19 May 2018 16:06:57 +0300 Subject: [PATCH] fixed flush interval in async helper --- example/bench.cpp | 33 +- include/spdlog/contrib/sinks/step_file_sink.h | 17 +- include/spdlog/details/async_log_helper.h | 475 +++++++++--------- include/spdlog/details/mpmc_blocking_q.h | 125 +++-- include/spdlog/sinks/test_sink.h | 19 +- 5 files changed, 324 insertions(+), 345 deletions(-) diff --git a/example/bench.cpp b/example/bench.cpp index d92e20bc..49385e0f 100644 --- a/example/bench.cpp +++ b/example/bench.cpp @@ -7,7 +7,8 @@ // bench.cpp : spdlog benchmarks // #include "spdlog/async_logger.h" -#include "spdlog/sinks/test_sink.h" +#include "spdlog/sinks/file_sinks.h" +#include "spdlog/sinks/null_sink.h" #include "spdlog/spdlog.h" #include "utils.h" #include @@ -29,10 +30,12 @@ void bench_mt(int howmany, std::shared_ptr log, int thread_count int main(int argc, char *argv[]) { - int queue_size = 1024*1024; + int queue_size = 1048576; int howmany = 1000000; int threads = 10; - + int file_size = 30 * 1024 * 1024; + int rotating_files = 5; + try { @@ -42,7 +45,7 @@ int main(int argc, char *argv[]) threads = atoi(argv[2]); if (argc > 3) queue_size = atoi(argv[3]); - /* + cout << "*******************************************************************************\n"; cout << "Single thread, " << format(howmany) << " iterations" << endl; cout << "*******************************************************************************\n"; @@ -64,32 +67,17 @@ int main(int argc, char *argv[]) bench_mt(howmany, daily_mt, threads); bench(howmany, spdlog::create("null_mt")); - */ - cout << "\n*******************************************************************************\n"; cout << "async logging.. " << threads << " threads sharing same logger, " << format(howmany) << " iterations " << endl; cout << "*******************************************************************************\n"; spdlog::set_async_mode(queue_size); - - for (int i = 0; i < 300; ++i) + for (int i = 0; i < 3; ++i) { - //auto as = spdlog::daily_logger_mt("as", "logs/daily_async.log"); - auto test_sink = std::make_shared(); - //auto as = spdlog::basic_logger_mt("as", "logs/async.log", true); - auto as = std::make_shared("as", test_sink, queue_size, async_overflow_policy::block_retry, nullptr, std::chrono::milliseconds(2000)); - bench_mt(howmany, as, threads); - as.reset(); + auto as = spdlog::daily_logger_st("as", "logs/daily_async.log"); + bench_mt(howmany, as, threads); spdlog::drop("as"); - - auto msg_counter = test_sink->msg_counter(); - cout << "Count:" << msg_counter << endl; - if (msg_counter != howmany) - { - cout << "ERROR! Expected " << howmany; - exit(0); - } } } catch (std::exception &ex) @@ -131,7 +119,6 @@ void bench_mt(int howmany, std::shared_ptr log, int thread_count if (counter > howmany) break; log->info("Hello logger: msg number {}", counter); - //std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } })); } diff --git a/include/spdlog/contrib/sinks/step_file_sink.h b/include/spdlog/contrib/sinks/step_file_sink.h index 2becf29e..7e90e0ad 100644 --- a/include/spdlog/contrib/sinks/step_file_sink.h +++ b/include/spdlog/contrib/sinks/step_file_sink.h @@ -17,8 +17,10 @@ // // Create a file logger which creates new files with a specified time step and fixed file size: // -// std::shared_ptr step_logger_mt(const std::string &logger_name, const filename_t &filename, unsigned seconds = 60, const filename_t &tmp_ext = ".tmp", unsigned max_file_size = std::numeric_limits::max()); -// std::shared_ptr step_logger_st(const std::string &logger_name, const filename_t &filename, unsigned seconds = 60, const filename_t &tmp_ext = ".tmp", unsigned max_file_size = std::numeric_limits::max()); +// std::shared_ptr step_logger_mt(const std::string &logger_name, const filename_t &filename, unsigned seconds = 60, const +// filename_t &tmp_ext = ".tmp", unsigned max_file_size = std::numeric_limits::max()); std::shared_ptr +// step_logger_st(const std::string &logger_name, const filename_t &filename, unsigned seconds = 60, const filename_t &tmp_ext = ".tmp", +// unsigned max_file_size = std::numeric_limits::max()); // Example for spdlog_impl.h // Create a file logger that creates new files with a specified increment @@ -76,7 +78,7 @@ public: { throw spdlog_ex("step_file_sink: Invalid max log size in ctor"); } - + _tp = _next_tp(); std::tie(_current_filename, _ext) = FileNameCalc::calc_filename(_base_filename, _tmp_ext); @@ -88,7 +90,7 @@ public: _file_helper.open(_current_filename); _current_size = _file_helper.size(); // expensive. called only once } - + ~step_file_sink() { try @@ -96,7 +98,8 @@ public: close_current_file(); } catch (...) - {} + { + } } protected: @@ -130,7 +133,7 @@ private: { using details::os::filename_to_str; - filename_t src =_current_filename, target; + filename_t src = _current_filename, target; std::tie(target, std::ignore) = details::file_helper::split_by_extenstion(src); target += _ext; @@ -149,7 +152,7 @@ private: filename_t _current_filename; filename_t _ext; unsigned _current_size; - + details::file_helper _file_helper; }; diff --git a/include/spdlog/details/async_log_helper.h b/include/spdlog/details/async_log_helper.h index e1d1952d..7139328b 100644 --- a/include/spdlog/details/async_log_helper.h +++ b/include/spdlog/details/async_log_helper.h @@ -19,342 +19,337 @@ #include "../formatter.h" #include "../sinks/sink.h" -#include #include +#include #include #include +#include #include #include #include #include #include -#include namespace spdlog { - namespace details { +namespace details { - class async_log_helper - { - // Async msg to move to/from the queue - // Movable only. should never be copied - enum class async_msg_type - { - log, - flush, - terminate - }; +class async_log_helper +{ + // Async msg to move to/from the queue + // Movable only. should never be copied + enum class async_msg_type + { + log, + flush, + terminate + }; - struct async_msg - { - std::string logger_name; - level::level_enum level; - log_clock::time_point time; - size_t thread_id; - std::string txt; - async_msg_type msg_type; - size_t msg_id; + struct async_msg + { + std::string logger_name; + level::level_enum level; + log_clock::time_point time; + size_t thread_id; + std::string txt; + async_msg_type msg_type; + size_t msg_id; - async_msg() = default; - ~async_msg() = default; + async_msg() = default; + ~async_msg() = default; - explicit async_msg(async_msg_type m_type) - : level(level::info) - , thread_id(0) - , msg_type(m_type) - , msg_id(0) - { - } + explicit async_msg(async_msg_type m_type) + : level(level::info) + , thread_id(0) + , msg_type(m_type) + , msg_id(0) + { + } - async_msg(async_msg &&other) SPDLOG_NOEXCEPT : logger_name(std::move(other.logger_name)), - level(std::move(other.level)), - time(std::move(other.time)), - thread_id(other.thread_id), - txt(std::move(other.txt)), - msg_type(std::move(other.msg_type)), - msg_id(other.msg_id) - { - } + async_msg(async_msg &&other) SPDLOG_NOEXCEPT : logger_name(std::move(other.logger_name)), + level(std::move(other.level)), + time(std::move(other.time)), + thread_id(other.thread_id), + txt(std::move(other.txt)), + msg_type(std::move(other.msg_type)), + msg_id(other.msg_id) + { + } - async_msg &operator=(async_msg &&other) SPDLOG_NOEXCEPT - { - logger_name = std::move(other.logger_name); - level = other.level; - time = std::move(other.time); - thread_id = other.thread_id; - txt = std::move(other.txt); - msg_type = other.msg_type; - msg_id = other.msg_id; - return *this; - } + async_msg &operator=(async_msg &&other) SPDLOG_NOEXCEPT + { + logger_name = std::move(other.logger_name); + level = other.level; + time = std::move(other.time); + thread_id = other.thread_id; + txt = std::move(other.txt); + msg_type = other.msg_type; + msg_id = other.msg_id; + return *this; + } - // never copy or assign. should only be moved.. - async_msg(const async_msg &) = delete; - async_msg &operator=(const async_msg &other) = delete; + // never copy or assign. should only be moved.. + async_msg(const async_msg &) = delete; + async_msg &operator=(const async_msg &other) = delete; - // construct from log_msg - explicit async_msg(const details::log_msg &m) - : level(m.level) - , time(m.time) - , thread_id(m.thread_id) - , txt(m.raw.data(), m.raw.size()) - , msg_type(async_msg_type::log) - , msg_id(m.msg_id) - { + // construct from log_msg + explicit async_msg(const details::log_msg &m) + : level(m.level) + , time(m.time) + , thread_id(m.thread_id) + , txt(m.raw.data(), m.raw.size()) + , msg_type(async_msg_type::log) + , msg_id(m.msg_id) + { #ifndef SPDLOG_NO_NAME - logger_name = *m.logger_name; + logger_name = *m.logger_name; #endif - } + } - // copy into log_msg - void fill_log_msg(log_msg &msg) - { - msg.logger_name = &logger_name; - msg.level = level; - msg.time = time; - msg.thread_id = thread_id; - msg.raw << txt; - msg.msg_id = msg_id; - } - }; + // copy into log_msg + void fill_log_msg(log_msg &msg) + { + msg.logger_name = &logger_name; + msg.level = level; + msg.time = time; + msg.thread_id = thread_id; + msg.raw << txt; + msg.msg_id = msg_id; + } + }; - public: - using item_type = async_msg; - using q_type = details::mpmc_bounded_queue ; +public: + using item_type = async_msg; + using q_type = details::mpmc_bounded_queue; - using clock = std::chrono::steady_clock; + using clock = std::chrono::steady_clock; - async_log_helper(formatter_ptr formatter, std::vector sinks, size_t queue_size, const log_err_handler err_handler, - const async_overflow_policy overflow_policy = async_overflow_policy::block_retry, std::function worker_warmup_cb = nullptr, - const std::chrono::milliseconds &flush_interval_ms = std::chrono::milliseconds::zero(), - std::function worker_teardown_cb = nullptr); + async_log_helper(formatter_ptr formatter, std::vector sinks, size_t queue_size, const log_err_handler err_handler, + const async_overflow_policy overflow_policy = async_overflow_policy::block_retry, std::function worker_warmup_cb = nullptr, + const std::chrono::milliseconds &flush_interval_ms = std::chrono::milliseconds::zero(), + std::function worker_teardown_cb = nullptr); - void log(const details::log_msg &msg); + void log(const details::log_msg &msg); - // stop logging and join the back thread - ~async_log_helper(); + // stop logging and join the back thread + ~async_log_helper(); - async_log_helper(const async_log_helper &) = delete; - async_log_helper &operator=(const async_log_helper &) = delete; + async_log_helper(const async_log_helper &) = delete; + async_log_helper &operator=(const async_log_helper &) = delete; - void set_formatter(formatter_ptr msg_formatter); + void set_formatter(formatter_ptr msg_formatter); - void flush(); + void flush(); - void set_error_handler(spdlog::log_err_handler err_handler); + void set_error_handler(spdlog::log_err_handler err_handler); - private: - formatter_ptr _formatter; - std::vector> _sinks; +private: + formatter_ptr _formatter; + std::vector> _sinks; - // queue of messages to log - q_type _q; + // queue of messages to log + q_type _q; - log_err_handler _err_handler; + log_err_handler _err_handler; - std::chrono::time_point _last_flush; + std::chrono::time_point _last_flush; - // overflow policy - const async_overflow_policy _overflow_policy; + // overflow policy + const async_overflow_policy _overflow_policy; - // worker thread warmup callback - one can set thread priority, affinity, etc - const std::function _worker_warmup_cb; + // worker thread warmup callback - one can set thread priority, affinity, etc + const std::function _worker_warmup_cb; - // auto periodic sink flush parameter - const std::chrono::milliseconds _flush_interval_ms; + // auto periodic sink flush parameter + const std::chrono::milliseconds _flush_interval_ms; - // worker thread teardown callback - const std::function _worker_teardown_cb; + // worker thread teardown callback + const std::function _worker_teardown_cb; - std::mutex null_mutex_; - //null_mutex null_mutex_; - std::condition_variable_any not_empty_cv_; - std::condition_variable_any not_full_cv_; + std::mutex null_mutex_; + // null_mutex null_mutex_; + std::condition_variable_any not_empty_cv_; + std::condition_variable_any not_full_cv_; - // worker thread - std::thread _worker_thread; + // worker thread + std::thread _worker_thread; - void enqueue_msg(async_msg &&new_msg, async_overflow_policy policy); + void enqueue_msg(async_msg &&new_msg, async_overflow_policy policy); - // worker thread main loop - void worker_loop(); + // worker thread main loop + void worker_loop(); - // dequeue next message from the queue and process it. - // return false if termination of the queue is required - bool process_next_msg(); + // dequeue next message from the queue and process it. + // return false if termination of the queue is required + bool process_next_msg(); - void handle_flush_interval(); + void handle_flush_interval(); - void flush_sinks(); - - }; - } // namespace details + void flush_sinks(); +}; +} // namespace details } // namespace spdlog /////////////////////////////////////////////////////////////////////////////// // async_sink class implementation /////////////////////////////////////////////////////////////////////////////// inline spdlog::details::async_log_helper::async_log_helper(formatter_ptr formatter, std::vector sinks, size_t queue_size, - log_err_handler err_handler, const async_overflow_policy overflow_policy, std::function worker_warmup_cb, - const std::chrono::milliseconds &flush_interval_ms, std::function worker_teardown_cb) - : _formatter(std::move(formatter)) - , _sinks(std::move(sinks)) - , _q(queue_size) - , _err_handler(std::move(err_handler)) - , _last_flush(os::now()) - , _overflow_policy(overflow_policy) - , _worker_warmup_cb(std::move(worker_warmup_cb)) - , _flush_interval_ms(flush_interval_ms) - , _worker_teardown_cb(std::move(worker_teardown_cb)) + log_err_handler err_handler, const async_overflow_policy overflow_policy, std::function worker_warmup_cb, + const std::chrono::milliseconds &flush_interval_ms, std::function worker_teardown_cb) + : _formatter(std::move(formatter)) + , _sinks(std::move(sinks)) + , _q(queue_size) + , _err_handler(std::move(err_handler)) + , _last_flush(os::now()) + , _overflow_policy(overflow_policy) + , _worker_warmup_cb(std::move(worker_warmup_cb)) + , _flush_interval_ms(flush_interval_ms) + , _worker_teardown_cb(std::move(worker_teardown_cb)) { - _worker_thread = std::thread(&async_log_helper::worker_loop, this); + _worker_thread = std::thread(&async_log_helper::worker_loop, this); } // Send to the worker thread termination message(level=off) // and wait for it to finish gracefully inline spdlog::details::async_log_helper::~async_log_helper() { - try - { - enqueue_msg(async_msg(async_msg_type::terminate), async_overflow_policy::block_retry); - _worker_thread.join(); - } - catch (...) // don't crash in destructor - { - } + try + { + enqueue_msg(async_msg(async_msg_type::terminate), async_overflow_policy::block_retry); + _worker_thread.join(); + } + catch (...) // don't crash in destructor + { + } } // Try to push and block until succeeded (if the policy is not to discard when the queue is full) inline void spdlog::details::async_log_helper::log(const details::log_msg &msg) -{ - enqueue_msg(async_msg(msg), _overflow_policy); +{ + enqueue_msg(async_msg(msg), _overflow_policy); } inline void spdlog::details::async_log_helper::enqueue_msg(details::async_log_helper::async_msg &&new_msg, async_overflow_policy policy) { - // block until succeeded pushing to the queue - if (policy == async_overflow_policy::block_retry) - { - _q.enqueue(std::move(new_msg)); - } - else - { - _q.enqueue_nowait(std::move(new_msg)); - } - + // block until succeeded pushing to the queue + if (policy == async_overflow_policy::block_retry) + { + _q.enqueue(std::move(new_msg)); + } + else + { + _q.enqueue_nowait(std::move(new_msg)); + } } // optionally wait for the queue be empty and request flush from the sinks inline void spdlog::details::async_log_helper::flush() { - enqueue_msg(async_msg(async_msg_type::flush), _overflow_policy); + enqueue_msg(async_msg(async_msg_type::flush), _overflow_policy); } inline void spdlog::details::async_log_helper::worker_loop() { - if (_worker_warmup_cb) - { - _worker_warmup_cb(); - } - auto active = true; - while (active) - { - try - { - active = process_next_msg(); - } - catch (const std::exception &ex) - { - _err_handler(ex.what()); - } - catch (...) - { - _err_handler("Unknown exeption in async logger worker loop."); - } - } - if (_worker_teardown_cb) - { - _worker_teardown_cb(); - } + if (_worker_warmup_cb) + { + _worker_warmup_cb(); + } + auto active = true; + while (active) + { + try + { + active = process_next_msg(); + } + catch (const std::exception &ex) + { + _err_handler(ex.what()); + } + catch (...) + { + _err_handler("Unknown exeption in async logger worker loop."); + } + } + if (_worker_teardown_cb) + { + _worker_teardown_cb(); + } } // process next message in the queue // return true if this thread should still be active (while no terminate msg was received) inline bool spdlog::details::async_log_helper::process_next_msg() { - async_msg incoming_async_msg; - bool dequeued = _q.dequeue_for(incoming_async_msg, std::chrono::milliseconds(1000)); - if (!dequeued) - { + async_msg incoming_async_msg; + bool dequeued = _q.dequeue_for(incoming_async_msg, std::chrono::seconds(2)); + if (!dequeued) + { + handle_flush_interval(); + return true; + } + + switch (incoming_async_msg.msg_type) + { + case async_msg_type::flush: + flush_sinks(); + return true; + + case async_msg_type::terminate: + flush_sinks(); + return false; + + default: + log_msg incoming_log_msg; + incoming_async_msg.fill_log_msg(incoming_log_msg); + _formatter->format(incoming_log_msg); + for (auto &s : _sinks) + { + if (s->should_log(incoming_log_msg.level)) + { + s->log(incoming_log_msg); + } + } handle_flush_interval(); - return true; - } - - switch (incoming_async_msg.msg_type) - { - case async_msg_type::flush: - flush_sinks(); - return true; - - - case async_msg_type::terminate: - //flush_sinks(); - return false; - - - default: - log_msg incoming_log_msg; - incoming_async_msg.fill_log_msg(incoming_log_msg); - _formatter->format(incoming_log_msg); - for (auto &s : _sinks) - { - if (s->should_log(incoming_log_msg.level)) - { - s->log(incoming_log_msg); - } - } - return true; - } - assert(false); - return true; // should not be reached - + return true; + } + assert(false); + return true; // should not be reached } -// flush all sinks if _flush_interval_ms has expired. only called if queue is empty -inline void spdlog::details::async_log_helper::handle_flush_interval() -{ - if (_flush_interval_ms == std::chrono::milliseconds::zero()) - { - return; - } - auto delta = details::os::now() - _last_flush;; - if (delta >= _flush_interval_ms) - { - flush_sinks(); - } -} inline void spdlog::details::async_log_helper::set_formatter(formatter_ptr msg_formatter) { - _formatter = std::move(msg_formatter); + _formatter = std::move(msg_formatter); } - inline void spdlog::details::async_log_helper::set_error_handler(spdlog::log_err_handler err_handler) { - _err_handler = std::move(err_handler); + _err_handler = std::move(err_handler); } +// flush all sinks if _flush_interval_ms has expired. +inline void spdlog::details::async_log_helper::handle_flush_interval() +{ + if (_flush_interval_ms == std::chrono::milliseconds::zero()) + { + return; + } + auto delta = details::os::now() - _last_flush; + ; + if (delta >= _flush_interval_ms) + { + flush_sinks(); + } +} + // flush all sinks if _flush_interval_ms has expired. only called if queue is empty inline void spdlog::details::async_log_helper::flush_sinks() -{ - printf("FLUSH!\n"); - for (auto &s : _sinks) - { - s->flush(); - } - _last_flush = os::now(); +{ + for (auto &s : _sinks) + { + s->flush(); + } + _last_flush = os::now(); } - diff --git a/include/spdlog/details/mpmc_blocking_q.h b/include/spdlog/details/mpmc_blocking_q.h index 0fc10f2c..d4a8a41c 100644 --- a/include/spdlog/details/mpmc_blocking_q.h +++ b/include/spdlog/details/mpmc_blocking_q.h @@ -6,84 +6,79 @@ // // async log helper : -// multi producer-multi consumer blocking queue +// multi producer-multi consumer blocking queue // enqueue(..) - will block until room found to put the new message // enqueue_nowait(..) - will return immediatly with false if no room left in the queue // dequeue_for(..) - will block until the queue is not empty or timeout passed - #include #include #include - namespace spdlog { - namespace details { +namespace details { - template - class mpmc_bounded_queue - { - public: +template +class mpmc_bounded_queue +{ +public: + using item_type = T; + explicit mpmc_bounded_queue(size_t max_items) + : max_items_(max_items) + { + } - using item_type = T; - explicit mpmc_bounded_queue(size_t max_items) : max_items_(max_items) {} + // try to enqueue and block if no room left + void enqueue(T &&item) + { + { + std::unique_lock lock(queue_mutex_); + pop_cv_.wait(lock, [this] { return this->q_.size() < this->max_items_; }); + q_.push(std::move(item)); + } + push_cv_.notify_one(); + } - // try to enqueue and block if no room left - void enqueue(T &&item) - { - { - std::unique_lock lock(queue_mutex_); - pop_cv_.wait(lock, [this] {return this->q_.size() <= this->max_items_; }); - q_.push(std::forward(item)); - } - push_cv_.notify_one(); - } + // try to enqueue and return immdeialty false if no room left + bool enqueue_nowait(T &&item) + { + { + std::unique_lock lock(queue_mutex_); + if (q_.size() == this->max_items_) + { + return false; + } + q_.push(std::forward(item)); + } + push_cv_.notify_one(); + return true; + } - // try to enqueue and return immdeialty false if no room left - bool enqueue_nowait(T &&item) - { - { - std::unique_lock lock(queue_mutex_); - if (this->q_.size() >= this->max_items_) - { - return false; - } - q_.push(std::forward(item)); - } - push_cv_.notify_one(); - return true; - } + // try to dequeue item. if no item found. wait upto timeout and try again + // Return true, if succeeded dequeue item, false otherwise + bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration) + { + { + std::unique_lock lock(queue_mutex_); + if (!push_cv_.wait_for(lock, wait_duration, [this] { return this->q_.size() > 0; })) + { + return false; + } - // try to dequeue item. if no item found. wait upto timeout and try again - // Return true, if succeeded dequeue item, false otherwise - bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration) - { - { - std::unique_lock lock(queue_mutex_); - //push_cv_.wait(lock, [this] {return this->q_.size() > 0; }); - bool found_msg = push_cv_.wait_for(lock, wait_duration, [this] {return this->q_.size() > 0; }); - if (!found_msg) - { - return false; - } - popped_item = std::move(q_.front()); - q_.pop(); - } - pop_cv_.notify_one(); - return true; - } + popped_item = std::move(q_.front()); + q_.pop(); + } + pop_cv_.notify_one(); + return true; + } +private: + size_t max_items_; + std::mutex queue_mutex_; + std::condition_variable push_cv_; + std::condition_variable pop_cv_; - - - private: - size_t max_items_; - std::mutex queue_mutex_; - std::condition_variable push_cv_; - std::condition_variable pop_cv_; - - std::queue q_; - - }; - } -} + std::queue q_; +}; +} // namespace details +} // namespace spdlog diff --git a/include/spdlog/sinks/test_sink.h b/include/spdlog/sinks/test_sink.h index ba7336de..f25654cd 100644 --- a/include/spdlog/sinks/test_sink.h +++ b/include/spdlog/sinks/test_sink.h @@ -17,20 +17,19 @@ template class test_sink : public base_sink { public: - size_t msg_counter() - { - return msg_counter_; - } + size_t msg_counter() + { + return msg_counter_; + } protected: - void _sink_it(const details::log_msg &) override - { - msg_counter_++; - } + void _sink_it(const details::log_msg &) override + { + msg_counter_++; + } void _flush() override {} - size_t msg_counter_{ 0 }; - + size_t msg_counter_{0}; }; using test_sink_mt = test_sink;