replaced the lockfree queue with bounded, locked queue

This commit is contained in:
gabime 2018-05-19 14:41:53 +03:00
parent 71e93a4f2d
commit a1a71804f4
7 changed files with 408 additions and 483 deletions

View File

@ -7,8 +7,7 @@
// bench.cpp : spdlog benchmarks
//
#include "spdlog/async_logger.h"
#include "spdlog/sinks/file_sinks.h"
#include "spdlog/sinks/null_sink.h"
#include "spdlog/sinks/test_sink.h"
#include "spdlog/spdlog.h"
#include "utils.h"
#include <atomic>
@ -30,11 +29,9 @@ void bench_mt(int howmany, std::shared_ptr<spdlog::logger> log, int thread_count
int main(int argc, char *argv[])
{
int queue_size = 1048576;
int queue_size = 1024*1024;
int howmany = 1000000;
int threads = 10;
int file_size = 30 * 1024 * 1024;
int rotating_files = 5;
try
{
@ -45,7 +42,7 @@ int main(int argc, char *argv[])
threads = atoi(argv[2]);
if (argc > 3)
queue_size = atoi(argv[3]);
/*
cout << "*******************************************************************************\n";
cout << "Single thread, " << format(howmany) << " iterations" << endl;
cout << "*******************************************************************************\n";
@ -67,17 +64,32 @@ int main(int argc, char *argv[])
bench_mt(howmany, daily_mt, threads);
bench(howmany, spdlog::create<null_sink_st>("null_mt"));
*/
cout << "\n*******************************************************************************\n";
cout << "async logging.. " << threads << " threads sharing same logger, " << format(howmany) << " iterations " << endl;
cout << "*******************************************************************************\n";
spdlog::set_async_mode(queue_size);
for (int i = 0; i < 3; ++i)
for (int i = 0; i < 300; ++i)
{
auto as = spdlog::daily_logger_st("as", "logs/daily_async.log");
//auto as = spdlog::daily_logger_mt("as", "logs/daily_async.log");
auto test_sink = std::make_shared<spdlog::sinks::test_sink_mt>();
//auto as = spdlog::basic_logger_mt("as", "logs/async.log", true);
auto as = std::make_shared<spdlog::async_logger>("as", test_sink, queue_size, async_overflow_policy::block_retry, nullptr, std::chrono::milliseconds(2000));
bench_mt(howmany, as, threads);
as.reset();
spdlog::drop("as");
auto msg_counter = test_sink->msg_counter();
cout << "Count:" << msg_counter << endl;
if (msg_counter != howmany)
{
cout << "ERROR! Expected " << howmany;
exit(0);
}
}
}
catch (std::exception &ex)
@ -119,6 +131,7 @@ void bench_mt(int howmany, std::shared_ptr<spdlog::logger> log, int thread_count
if (counter > howmany)
break;
log->info("Hello logger: msg number {}", counter);
//std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
}));
}

View File

@ -1,5 +1,5 @@
<?xml version="1.0" encoding="utf-8"?>
<Project DefaultTargets="Build" ToolsVersion="14.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Project DefaultTargets="Build" ToolsVersion="15.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ItemGroup Label="ProjectConfigurations">
<ProjectConfiguration Include="Debug|Win32">
<Configuration>Debug</Configuration>
@ -10,9 +10,6 @@
<Platform>Win32</Platform>
</ProjectConfiguration>
</ItemGroup>
<ItemGroup>
<ClCompile Include="example.cpp" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\include\spdlog\async_logger.h" />
<ClInclude Include="..\include\spdlog\common.h" />
@ -21,7 +18,6 @@
<ClInclude Include="..\include\spdlog\details\file_helper.h" />
<ClInclude Include="..\include\spdlog\details\logger_impl.h" />
<ClInclude Include="..\include\spdlog\details\log_msg.h" />
<ClInclude Include="..\include\spdlog\details\mpmc_bounded_q.h" />
<ClInclude Include="..\include\spdlog\details\null_mutex.h" />
<ClInclude Include="..\include\spdlog\details\os.h" />
<ClInclude Include="..\include\spdlog\details\pattern_formatter_impl.h" />
@ -46,23 +42,26 @@
<ClInclude Include="..\include\spdlog\spdlog.h" />
<ClInclude Include="..\include\spdlog\tweakme.h" />
</ItemGroup>
<ItemGroup>
<ClCompile Include="bench.cpp" />
</ItemGroup>
<PropertyGroup Label="Globals">
<ProjectGuid>{9E5AB93A-0CCE-4BAC-9FCB-0FC9CB5EB8D2}</ProjectGuid>
<Keyword>Win32Proj</Keyword>
<RootNamespace>.</RootNamespace>
<WindowsTargetPlatformVersion>8.1</WindowsTargetPlatformVersion>
<WindowsTargetPlatformVersion>10.0.16299.0</WindowsTargetPlatformVersion>
</PropertyGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" />
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'" Label="Configuration">
<ConfigurationType>Application</ConfigurationType>
<UseDebugLibraries>true</UseDebugLibraries>
<PlatformToolset>v120</PlatformToolset>
<PlatformToolset>v141</PlatformToolset>
<CharacterSet>Unicode</CharacterSet>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'" Label="Configuration">
<ConfigurationType>Application</ConfigurationType>
<UseDebugLibraries>false</UseDebugLibraries>
<PlatformToolset>v120</PlatformToolset>
<PlatformToolset>v141</PlatformToolset>
<WholeProgramOptimization>true</WholeProgramOptimization>
<CharacterSet>Unicode</CharacterSet>
</PropertyGroup>

View File

@ -14,11 +14,12 @@
#include "../common.h"
#include "../details/log_msg.h"
#include "../details/mpmc_bounded_q.h"
#include "../details/mpmc_blocking_q.h"
#include "../details/os.h"
#include "../formatter.h"
#include "../sinks/sink.h"
#include<iostream>
#include <chrono>
#include <exception>
#include <functional>
@ -27,6 +28,7 @@
#include <thread>
#include <utility>
#include <vector>
#include <condition_variable>
namespace spdlog {
namespace details {
@ -136,7 +138,7 @@ public:
void set_formatter(formatter_ptr msg_formatter);
void flush(bool wait_for_q);
void flush();
void set_error_handler(spdlog::log_err_handler err_handler);
@ -149,9 +151,7 @@ private:
log_err_handler _err_handler;
bool _flush_requested;
bool _terminate_requested;
std::chrono::time_point<log_clock> _last_flush;
// overflow policy
const async_overflow_policy _overflow_policy;
@ -165,25 +165,27 @@ private:
// worker thread teardown callback
const std::function<void()> _worker_teardown_cb;
std::mutex null_mutex_;
//null_mutex null_mutex_;
std::condition_variable_any not_empty_cv_;
std::condition_variable_any not_full_cv_;
// worker thread
std::thread _worker_thread;
void push_msg(async_msg &&new_msg);
void enqueue_msg(async_msg &&new_msg, async_overflow_policy policy);
// worker thread main loop
void worker_loop();
// pop next message from the queue and process it. will set the last_pop to the pop time
// dequeue next message from the queue and process it.
// return false if termination of the queue is required
bool process_next_msg(log_clock::time_point &last_pop, log_clock::time_point &last_flush);
bool process_next_msg();
void handle_flush_interval(log_clock::time_point &now, log_clock::time_point &last_flush);
void handle_flush_interval();
// sleep,yield or return immediately using the time passed since last message as a hint
static void sleep_or_yield(const spdlog::log_clock::time_point &now, const log_clock::time_point &last_op_time);
void flush_sinks();
// wait until the queue is empty
void wait_empty_q();
};
} // namespace details
} // namespace spdlog
@ -198,8 +200,7 @@ inline spdlog::details::async_log_helper::async_log_helper(formatter_ptr formatt
, _sinks(std::move(sinks))
, _q(queue_size)
, _err_handler(std::move(err_handler))
, _flush_requested(false)
, _terminate_requested(false)
, _last_flush(os::now())
, _overflow_policy(overflow_policy)
, _worker_warmup_cb(std::move(worker_warmup_cb))
, _flush_interval_ms(flush_interval_ms)
@ -214,7 +215,7 @@ inline spdlog::details::async_log_helper::~async_log_helper()
{
try
{
push_msg(async_msg(async_msg_type::terminate));
enqueue_msg(async_msg(async_msg_type::terminate), async_overflow_policy::block_retry);
_worker_thread.join();
}
catch (...) // don't crash in destructor
@ -225,31 +226,28 @@ inline spdlog::details::async_log_helper::~async_log_helper()
// Try to push and block until succeeded (if the policy is not to discard when the queue is full)
inline void spdlog::details::async_log_helper::log(const details::log_msg &msg)
{
push_msg(async_msg(msg));
enqueue_msg(async_msg(msg), _overflow_policy);
}
inline void spdlog::details::async_log_helper::push_msg(details::async_log_helper::async_msg &&new_msg)
inline void spdlog::details::async_log_helper::enqueue_msg(details::async_log_helper::async_msg &&new_msg, async_overflow_policy policy)
{
if (!_q.enqueue(std::move(new_msg)) && _overflow_policy != async_overflow_policy::discard_log_msg)
// block until succeeded pushing to the queue
if (policy == async_overflow_policy::block_retry)
{
auto last_op_time = details::os::now();
auto now = last_op_time;
do
{
now = details::os::now();
sleep_or_yield(now, last_op_time);
} while (!_q.enqueue(std::move(new_msg)));
_q.enqueue(std::move(new_msg));
}
else
{
_q.enqueue_nowait(std::move(new_msg));
}
}
// optionally wait for the queue be empty and request flush from the sinks
inline void spdlog::details::async_log_helper::flush(bool wait_for_q)
inline void spdlog::details::async_log_helper::flush()
{
push_msg(async_msg(async_msg_type::flush));
if (wait_for_q)
{
wait_empty_q(); // return when queue is empty
}
enqueue_msg(async_msg(async_msg_type::flush), _overflow_policy);
}
inline void spdlog::details::async_log_helper::worker_loop()
@ -258,14 +256,12 @@ inline void spdlog::details::async_log_helper::worker_loop()
{
_worker_warmup_cb();
}
auto last_pop = details::os::now();
auto last_flush = last_pop;
auto active = true;
while (active)
{
try
{
active = process_next_msg(last_pop, last_flush);
active = process_next_msg();
}
catch (const std::exception &ex)
{
@ -284,23 +280,27 @@ inline void spdlog::details::async_log_helper::worker_loop()
// process next message in the queue
// return true if this thread should still be active (while no terminate msg was received)
inline bool spdlog::details::async_log_helper::process_next_msg(log_clock::time_point &last_pop, log_clock::time_point &last_flush)
inline bool spdlog::details::async_log_helper::process_next_msg()
{
async_msg incoming_async_msg;
if (_q.dequeue(incoming_async_msg))
bool dequeued = _q.dequeue_for(incoming_async_msg, std::chrono::milliseconds(1000));
if (!dequeued)
{
last_pop = details::os::now();
handle_flush_interval();
return true;
}
switch (incoming_async_msg.msg_type)
{
case async_msg_type::flush:
_flush_requested = true;
break;
flush_sinks();
return true;
case async_msg_type::terminate:
_flush_requested = true;
_terminate_requested = true;
break;
//flush_sinks();
return false;
default:
log_msg incoming_log_msg;
@ -313,81 +313,48 @@ inline bool spdlog::details::async_log_helper::process_next_msg(log_clock::time_
s->log(incoming_log_msg);
}
}
}
return true;
}
assert(false);
return true; // should not be reached
// Handle empty queue..
// This is the only place where the queue can terminate or flush to avoid losing messages already in the queue
auto now = details::os::now();
handle_flush_interval(now, last_flush);
sleep_or_yield(now, last_pop);
return !_terminate_requested;
}
// flush all sinks if _flush_interval_ms has expired
inline void spdlog::details::async_log_helper::handle_flush_interval(log_clock::time_point &now, log_clock::time_point &last_flush)
// flush all sinks if _flush_interval_ms has expired. only called if queue is empty
inline void spdlog::details::async_log_helper::handle_flush_interval()
{
auto should_flush =
_flush_requested || (_flush_interval_ms != std::chrono::milliseconds::zero() && now - last_flush >= _flush_interval_ms);
if (should_flush)
if (_flush_interval_ms == std::chrono::milliseconds::zero())
{
for (auto &s : _sinks)
{
s->flush();
return;
}
now = last_flush = details::os::now();
_flush_requested = false;
auto delta = details::os::now() - _last_flush;;
if (delta >= _flush_interval_ms)
{
flush_sinks();
}
}
inline void spdlog::details::async_log_helper::set_formatter(formatter_ptr msg_formatter)
{
_formatter = std::move(msg_formatter);
}
// spin, yield or sleep. use the time passed since last message as a hint
inline void spdlog::details::async_log_helper::sleep_or_yield(
const spdlog::log_clock::time_point &now, const spdlog::log_clock::time_point &last_op_time)
{
using std::chrono::microseconds;
using std::chrono::milliseconds;
auto time_since_op = now - last_op_time;
// spin upto 50 micros
if (time_since_op <= microseconds(50))
{
return;
}
// yield upto 150 micros
if (time_since_op <= microseconds(100))
{
return std::this_thread::yield();
}
// sleep for 20 ms upto 200 ms
if (time_since_op <= milliseconds(200))
{
return details::os::sleep_for_millis(20);
}
// sleep for 500 ms
return details::os::sleep_for_millis(500);
}
// wait for the queue to be empty
inline void spdlog::details::async_log_helper::wait_empty_q()
{
auto last_op = details::os::now();
while (!_q.is_empty())
{
sleep_or_yield(details::os::now(), last_op);
}
}
inline void spdlog::details::async_log_helper::set_error_handler(spdlog::log_err_handler err_handler)
{
_err_handler = std::move(err_handler);
}
// flush all sinks if _flush_interval_ms has expired. only called if queue is empty
inline void spdlog::details::async_log_helper::flush_sinks()
{
printf("FLUSH!\n");
for (auto &s : _sinks)
{
s->flush();
}
_last_flush = os::now();
}

View File

@ -44,7 +44,7 @@ inline spdlog::async_logger::async_logger(const std::string &logger_name, sink_p
inline void spdlog::async_logger::flush()
{
_async_log_helper->flush(true);
_async_log_helper->flush();
}
// Error handler
@ -80,7 +80,7 @@ inline void spdlog::async_logger::_sink_it(details::log_msg &msg)
_async_log_helper->log(msg);
if (_should_flush_on(msg))
{
_async_log_helper->flush(false); // do async flush
_async_log_helper->flush(); // do async flush
}
}
catch (const std::exception &ex)

View File

@ -0,0 +1,89 @@
#pragma once
//
// Copyright(c) 2018 Gabi Melman.
// Distributed under the MIT License (http://opensource.org/licenses/MIT)
//
// async log helper :
// multi producer-multi consumer blocking queue
// enqueue(..) - will block until room found to put the new message
// enqueue_nowait(..) - will return immediatly with false if no room left in the queue
// dequeue_for(..) - will block until the queue is not empty or timeout passed
#include <condition_variable>
#include <mutex>
#include <queue>
namespace spdlog {
namespace details {
template<typename T>
class mpmc_bounded_queue
{
public:
using item_type = T;
explicit mpmc_bounded_queue(size_t max_items) : max_items_(max_items) {}
// try to enqueue and block if no room left
void enqueue(T &&item)
{
{
std::unique_lock<std::mutex> lock(queue_mutex_);
pop_cv_.wait(lock, [this] {return this->q_.size() <= this->max_items_; });
q_.push(std::forward<T>(item));
}
push_cv_.notify_one();
}
// try to enqueue and return immdeialty false if no room left
bool enqueue_nowait(T &&item)
{
{
std::unique_lock<std::mutex> lock(queue_mutex_);
if (this->q_.size() >= this->max_items_)
{
return false;
}
q_.push(std::forward<T>(item));
}
push_cv_.notify_one();
return true;
}
// try to dequeue item. if no item found. wait upto timeout and try again
// Return true, if succeeded dequeue item, false otherwise
bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration)
{
{
std::unique_lock<std::mutex> lock(queue_mutex_);
//push_cv_.wait(lock, [this] {return this->q_.size() > 0; });
bool found_msg = push_cv_.wait_for(lock, wait_duration, [this] {return this->q_.size() > 0; });
if (!found_msg)
{
return false;
}
popped_item = std::move(q_.front());
q_.pop();
}
pop_cv_.notify_one();
return true;
}
private:
size_t max_items_;
std::mutex queue_mutex_;
std::condition_variable push_cv_;
std::condition_variable pop_cv_;
std::queue<T> q_;
};
}
}

View File

@ -1,183 +0,0 @@
/*
A modified version of Bounded MPMC queue by Dmitry Vyukov.
Original code from:
http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
licensed by Dmitry Vyukov under the terms below:
Simplified BSD license
Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this list of
conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice, this list
of conditions and the following disclaimer in the documentation and/or other materials
provided with the distribution.
THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
The views and conclusions contained in the software and documentation are those of the authors and
should not be interpreted as representing official policies, either expressed or implied, of Dmitry Vyukov.
*/
/*
The code in its current form adds the license below:
Copyright(c) 2015 Gabi Melman.
Distributed under the MIT License (http://opensource.org/licenses/MIT)
*/
#pragma once
#include "../common.h"
#include <atomic>
#include <utility>
namespace spdlog {
namespace details {
template<typename T>
class mpmc_bounded_queue
{
public:
using item_type = T;
explicit mpmc_bounded_queue(size_t buffer_size)
: max_size_(buffer_size)
, buffer_(new cell_t[buffer_size])
, buffer_mask_(buffer_size - 1)
{
// queue size must be power of two
if (!((buffer_size >= 2) && ((buffer_size & (buffer_size - 1)) == 0)))
{
throw spdlog_ex("async logger queue size must be power of two");
}
for (size_t i = 0; i != buffer_size; i += 1)
{
buffer_[i].sequence_.store(i, std::memory_order_relaxed);
}
enqueue_pos_.store(0, std::memory_order_relaxed);
dequeue_pos_.store(0, std::memory_order_relaxed);
}
~mpmc_bounded_queue()
{
delete[] buffer_;
}
mpmc_bounded_queue(mpmc_bounded_queue const &) = delete;
void operator=(mpmc_bounded_queue const &) = delete;
bool enqueue(T &&data)
{
cell_t *cell;
size_t pos = enqueue_pos_.load(std::memory_order_relaxed);
for (;;)
{
cell = &buffer_[pos & buffer_mask_];
size_t seq = cell->sequence_.load(std::memory_order_acquire);
intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos);
if (dif == 0)
{
if (enqueue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
{
break;
}
}
else if (dif < 0)
{
return false;
}
else
{
pos = enqueue_pos_.load(std::memory_order_relaxed);
}
}
cell->data_ = std::move(data);
cell->sequence_.store(pos + 1, std::memory_order_release);
return true;
}
bool dequeue(T &data)
{
cell_t *cell;
size_t pos = dequeue_pos_.load(std::memory_order_relaxed);
for (;;)
{
cell = &buffer_[pos & buffer_mask_];
size_t seq = cell->sequence_.load(std::memory_order_acquire);
intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
if (dif == 0)
{
if (dequeue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
{
break;
}
}
else if (dif < 0)
{
return false;
}
else
{
pos = dequeue_pos_.load(std::memory_order_relaxed);
}
}
data = std::move(cell->data_);
cell->sequence_.store(pos + buffer_mask_ + 1, std::memory_order_release);
return true;
}
bool is_empty()
{
size_t front, front1, back;
// try to take a consistent snapshot of front/tail.
do
{
front = enqueue_pos_.load(std::memory_order_acquire);
back = dequeue_pos_.load(std::memory_order_acquire);
front1 = enqueue_pos_.load(std::memory_order_relaxed);
} while (front != front1);
return back == front;
}
private:
struct cell_t
{
std::atomic<size_t> sequence_;
T data_;
};
size_t const max_size_;
static size_t const cacheline_size = 64;
using cacheline_pad_t = char[cacheline_size];
cacheline_pad_t pad0_;
cell_t *const buffer_;
size_t const buffer_mask_;
cacheline_pad_t pad1_;
std::atomic<size_t> enqueue_pos_;
cacheline_pad_t pad2_;
std::atomic<size_t> dequeue_pos_;
cacheline_pad_t pad3_;
};
} // namespace details
} // namespace spdlog

View File

@ -0,0 +1,40 @@
//
// Copyright(c) 2015 Gabi Melman.
// Distributed under the MIT License (http://opensource.org/licenses/MIT)
//
#pragma once
#include "../details/null_mutex.h"
#include "base_sink.h"
#include <mutex>
namespace spdlog {
namespace sinks {
template<class Mutex>
class test_sink : public base_sink<Mutex>
{
public:
size_t msg_counter()
{
return msg_counter_;
}
protected:
void _sink_it(const details::log_msg &) override
{
msg_counter_++;
}
void _flush() override {}
size_t msg_counter_{ 0 };
};
using test_sink_mt = test_sink<std::mutex>;
using test_sink_st = test_sink<details::null_mutex>;
} // namespace sinks
} // namespace spdlog