From 8321449c447c7064c43895ab65af3ae1b39137c0 Mon Sep 17 00:00:00 2001 From: gabime Date: Sat, 19 May 2018 14:41:53 +0300 Subject: [PATCH] replaced the lockfree queue with bounded, locked queue --- example/bench.cpp | 33 ++-- example/example.vcxproj | 15 +- include/spdlog/details/async_logger_impl.h | 4 +- include/spdlog/details/mpmc_blocking_q.h | 89 ++++++++++ include/spdlog/details/mpmc_bounded_q.h | 183 --------------------- include/spdlog/sinks/test_sink.h | 40 +++++ 6 files changed, 161 insertions(+), 203 deletions(-) create mode 100644 include/spdlog/details/mpmc_blocking_q.h delete mode 100644 include/spdlog/details/mpmc_bounded_q.h create mode 100644 include/spdlog/sinks/test_sink.h diff --git a/example/bench.cpp b/example/bench.cpp index 49385e0f..d92e20bc 100644 --- a/example/bench.cpp +++ b/example/bench.cpp @@ -7,8 +7,7 @@ // bench.cpp : spdlog benchmarks // #include "spdlog/async_logger.h" -#include "spdlog/sinks/file_sinks.h" -#include "spdlog/sinks/null_sink.h" +#include "spdlog/sinks/test_sink.h" #include "spdlog/spdlog.h" #include "utils.h" #include @@ -30,12 +29,10 @@ void bench_mt(int howmany, std::shared_ptr log, int thread_count int main(int argc, char *argv[]) { - int queue_size = 1048576; + int queue_size = 1024*1024; int howmany = 1000000; int threads = 10; - int file_size = 30 * 1024 * 1024; - int rotating_files = 5; - + try { @@ -45,7 +42,7 @@ int main(int argc, char *argv[]) threads = atoi(argv[2]); if (argc > 3) queue_size = atoi(argv[3]); - + /* cout << "*******************************************************************************\n"; cout << "Single thread, " << format(howmany) << " iterations" << endl; cout << "*******************************************************************************\n"; @@ -67,17 +64,32 @@ int main(int argc, char *argv[]) bench_mt(howmany, daily_mt, threads); bench(howmany, spdlog::create("null_mt")); + */ + cout << "\n*******************************************************************************\n"; cout << "async logging.. " << threads << " threads sharing same logger, " << format(howmany) << " iterations " << endl; cout << "*******************************************************************************\n"; spdlog::set_async_mode(queue_size); - for (int i = 0; i < 3; ++i) + + for (int i = 0; i < 300; ++i) { - auto as = spdlog::daily_logger_st("as", "logs/daily_async.log"); - bench_mt(howmany, as, threads); + //auto as = spdlog::daily_logger_mt("as", "logs/daily_async.log"); + auto test_sink = std::make_shared(); + //auto as = spdlog::basic_logger_mt("as", "logs/async.log", true); + auto as = std::make_shared("as", test_sink, queue_size, async_overflow_policy::block_retry, nullptr, std::chrono::milliseconds(2000)); + bench_mt(howmany, as, threads); + as.reset(); spdlog::drop("as"); + + auto msg_counter = test_sink->msg_counter(); + cout << "Count:" << msg_counter << endl; + if (msg_counter != howmany) + { + cout << "ERROR! Expected " << howmany; + exit(0); + } } } catch (std::exception &ex) @@ -119,6 +131,7 @@ void bench_mt(int howmany, std::shared_ptr log, int thread_count if (counter > howmany) break; log->info("Hello logger: msg number {}", counter); + //std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } })); } diff --git a/example/example.vcxproj b/example/example.vcxproj index 63db2b5d..9827f685 100644 --- a/example/example.vcxproj +++ b/example/example.vcxproj @@ -1,5 +1,5 @@  - + Debug @@ -10,9 +10,6 @@ Win32 - - - @@ -21,7 +18,6 @@ - @@ -46,23 +42,26 @@ + + + {9E5AB93A-0CCE-4BAC-9FCB-0FC9CB5EB8D2} Win32Proj . - 8.1 + 10.0.16299.0 Application true - v120 + v141 Unicode Application false - v120 + v141 true Unicode diff --git a/include/spdlog/details/async_logger_impl.h b/include/spdlog/details/async_logger_impl.h index 0f7a9e4a..b4f68afd 100644 --- a/include/spdlog/details/async_logger_impl.h +++ b/include/spdlog/details/async_logger_impl.h @@ -44,7 +44,7 @@ inline spdlog::async_logger::async_logger(const std::string &logger_name, sink_p inline void spdlog::async_logger::flush() { - _async_log_helper->flush(false);//dont wat for the q to draing before returning from flush + _async_log_helper->flush(); } // Error handler @@ -80,7 +80,7 @@ inline void spdlog::async_logger::_sink_it(details::log_msg &msg) _async_log_helper->log(msg); if (_should_flush_on(msg)) { - _async_log_helper->flush(false); // do async flush + _async_log_helper->flush(); // do async flush } } catch (const std::exception &ex) diff --git a/include/spdlog/details/mpmc_blocking_q.h b/include/spdlog/details/mpmc_blocking_q.h new file mode 100644 index 00000000..0fc10f2c --- /dev/null +++ b/include/spdlog/details/mpmc_blocking_q.h @@ -0,0 +1,89 @@ +#pragma once + +// +// Copyright(c) 2018 Gabi Melman. +// Distributed under the MIT License (http://opensource.org/licenses/MIT) +// + +// async log helper : +// multi producer-multi consumer blocking queue +// enqueue(..) - will block until room found to put the new message +// enqueue_nowait(..) - will return immediatly with false if no room left in the queue +// dequeue_for(..) - will block until the queue is not empty or timeout passed + + +#include +#include +#include + + +namespace spdlog { + namespace details { + + template + class mpmc_bounded_queue + { + public: + + using item_type = T; + explicit mpmc_bounded_queue(size_t max_items) : max_items_(max_items) {} + + // try to enqueue and block if no room left + void enqueue(T &&item) + { + { + std::unique_lock lock(queue_mutex_); + pop_cv_.wait(lock, [this] {return this->q_.size() <= this->max_items_; }); + q_.push(std::forward(item)); + } + push_cv_.notify_one(); + } + + // try to enqueue and return immdeialty false if no room left + bool enqueue_nowait(T &&item) + { + { + std::unique_lock lock(queue_mutex_); + if (this->q_.size() >= this->max_items_) + { + return false; + } + q_.push(std::forward(item)); + } + push_cv_.notify_one(); + return true; + } + + // try to dequeue item. if no item found. wait upto timeout and try again + // Return true, if succeeded dequeue item, false otherwise + bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration) + { + { + std::unique_lock lock(queue_mutex_); + //push_cv_.wait(lock, [this] {return this->q_.size() > 0; }); + bool found_msg = push_cv_.wait_for(lock, wait_duration, [this] {return this->q_.size() > 0; }); + if (!found_msg) + { + return false; + } + popped_item = std::move(q_.front()); + q_.pop(); + } + pop_cv_.notify_one(); + return true; + } + + + + + private: + size_t max_items_; + std::mutex queue_mutex_; + std::condition_variable push_cv_; + std::condition_variable pop_cv_; + + std::queue q_; + + }; + } +} diff --git a/include/spdlog/details/mpmc_bounded_q.h b/include/spdlog/details/mpmc_bounded_q.h deleted file mode 100644 index fd41e4c8..00000000 --- a/include/spdlog/details/mpmc_bounded_q.h +++ /dev/null @@ -1,183 +0,0 @@ -/* -A modified version of Bounded MPMC queue by Dmitry Vyukov. - -Original code from: -http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue - -licensed by Dmitry Vyukov under the terms below: - -Simplified BSD license - -Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. -Redistribution and use in source and binary forms, with or without modification, -are permitted provided that the following conditions are met: -1. Redistributions of source code must retain the above copyright notice, this list of -conditions and the following disclaimer. - -2. Redistributions in binary form must reproduce the above copyright notice, this list -of conditions and the following disclaimer in the documentation and/or other materials -provided with the distribution. - -THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED -WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF -MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT -SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, -INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, -OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF -LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE -OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF -ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -The views and conclusions contained in the software and documentation are those of the authors and -should not be interpreted as representing official policies, either expressed or implied, of Dmitry Vyukov. -*/ - -/* -The code in its current form adds the license below: - -Copyright(c) 2015 Gabi Melman. -Distributed under the MIT License (http://opensource.org/licenses/MIT) - -*/ - -#pragma once - -#include "../common.h" - -#include -#include - -namespace spdlog { -namespace details { - -template -class mpmc_bounded_queue -{ -public: - using item_type = T; - - explicit mpmc_bounded_queue(size_t buffer_size) - : max_size_(buffer_size) - , buffer_(new cell_t[buffer_size]) - , buffer_mask_(buffer_size - 1) - { - // queue size must be power of two - if (!((buffer_size >= 2) && ((buffer_size & (buffer_size - 1)) == 0))) - { - throw spdlog_ex("async logger queue size must be power of two"); - } - - for (size_t i = 0; i != buffer_size; i += 1) - { - buffer_[i].sequence_.store(i, std::memory_order_relaxed); - } - enqueue_pos_.store(0, std::memory_order_relaxed); - dequeue_pos_.store(0, std::memory_order_relaxed); - } - - ~mpmc_bounded_queue() - { - delete[] buffer_; - } - - mpmc_bounded_queue(mpmc_bounded_queue const &) = delete; - void operator=(mpmc_bounded_queue const &) = delete; - - bool enqueue(T &&data) - { - cell_t *cell; - size_t pos = enqueue_pos_.load(std::memory_order_relaxed); - for (;;) - { - cell = &buffer_[pos & buffer_mask_]; - size_t seq = cell->sequence_.load(std::memory_order_acquire); - intptr_t dif = static_cast(seq) - static_cast(pos); - if (dif == 0) - { - if (enqueue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) - { - break; - } - } - else if (dif < 0) - { - return false; - } - else - { - pos = enqueue_pos_.load(std::memory_order_relaxed); - } - } - cell->data_ = std::move(data); - cell->sequence_.store(pos + 1, std::memory_order_release); - return true; - } - - bool dequeue(T &data) - { - cell_t *cell; - size_t pos = dequeue_pos_.load(std::memory_order_relaxed); - for (;;) - { - cell = &buffer_[pos & buffer_mask_]; - size_t seq = cell->sequence_.load(std::memory_order_acquire); - intptr_t dif = static_cast(seq) - static_cast(pos + 1); - if (dif == 0) - { - if (dequeue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) - { - break; - } - } - else if (dif < 0) - { - return false; - } - else - { - pos = dequeue_pos_.load(std::memory_order_relaxed); - } - } - data = std::move(cell->data_); - cell->sequence_.store(pos + buffer_mask_ + 1, std::memory_order_release); - return true; - } - - bool is_empty() - { - size_t front, front1, back; - // try to take a consistent snapshot of front/tail. - do - { - front = enqueue_pos_.load(std::memory_order_acquire); - back = dequeue_pos_.load(std::memory_order_acquire); - front1 = enqueue_pos_.load(std::memory_order_relaxed); - } while (front != front1); - return back == front; - } - -private: - struct cell_t - { - std::atomic sequence_; - T data_; - }; - - size_t const max_size_; - - static size_t const cacheline_size = 64; - using cacheline_pad_t = char[cacheline_size]; - - cacheline_pad_t pad0_; - cell_t *const buffer_; - size_t const buffer_mask_; - cacheline_pad_t pad1_; - std::atomic enqueue_pos_; - cacheline_pad_t pad2_; - std::atomic dequeue_pos_; - cacheline_pad_t pad3_; -}; - -} // namespace details -} // namespace spdlog diff --git a/include/spdlog/sinks/test_sink.h b/include/spdlog/sinks/test_sink.h new file mode 100644 index 00000000..ba7336de --- /dev/null +++ b/include/spdlog/sinks/test_sink.h @@ -0,0 +1,40 @@ +// +// Copyright(c) 2015 Gabi Melman. +// Distributed under the MIT License (http://opensource.org/licenses/MIT) +// + +#pragma once + +#include "../details/null_mutex.h" +#include "base_sink.h" + +#include + +namespace spdlog { +namespace sinks { + +template +class test_sink : public base_sink +{ +public: + size_t msg_counter() + { + return msg_counter_; + } + +protected: + void _sink_it(const details::log_msg &) override + { + msg_counter_++; + } + + void _flush() override {} + size_t msg_counter_{ 0 }; + +}; + +using test_sink_mt = test_sink; +using test_sink_st = test_sink; + +} // namespace sinks +} // namespace spdlog