mirror of
https://github.com/gabime/spdlog.git
synced 2025-01-26 15:39:03 +08:00
using mpmc bounded q for async and many async optimizations
This commit is contained in:
parent
754cac85ac
commit
52d02af950
@ -33,15 +33,14 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <thread>
|
||||
#include <chrono>
|
||||
#include <thread>
|
||||
#include <atomic>
|
||||
|
||||
#include "../sinks/sink.h"
|
||||
#include "../logger.h"
|
||||
#include "../details/mpcs_q.h"
|
||||
#include "../details/log_msg.h"
|
||||
#include "../details/format.h"
|
||||
#include "./mpmc_bounded_q.h"
|
||||
#include "./log_msg.h"
|
||||
#include "./format.h"
|
||||
|
||||
|
||||
namespace spdlog
|
||||
@ -49,42 +48,67 @@ namespace spdlog
|
||||
namespace details
|
||||
{
|
||||
|
||||
|
||||
class async_log_helper
|
||||
{
|
||||
// Async msg to move to/from the queue
|
||||
// Movable only. should never be copied
|
||||
struct async_msg
|
||||
{
|
||||
std::string logger_name;
|
||||
level::level_enum level;
|
||||
log_clock::time_point time;
|
||||
std::string raw_msg_str;
|
||||
log_clock::time_point time;
|
||||
std::string txt;
|
||||
|
||||
async_msg() = default;
|
||||
async_msg() = default;
|
||||
~async_msg() = default;
|
||||
|
||||
async_msg(const async_msg&) = delete;
|
||||
async_msg& operator=(async_msg& other) = delete;
|
||||
|
||||
async_msg(const details::log_msg& m) :
|
||||
logger_name(m.logger_name),
|
||||
level(m.level),
|
||||
time(m.time),
|
||||
raw_msg_str(m.raw.data(), m.raw.size())
|
||||
time(m.time),
|
||||
txt(m.raw.data(), m.raw.size())
|
||||
{}
|
||||
|
||||
log_msg to_log_msg()
|
||||
async_msg(async_msg&& other) :
|
||||
logger_name(std::move(other.logger_name)),
|
||||
level(std::move(other.level)),
|
||||
time(std::move(other.time)),
|
||||
txt(std::move(other.txt))
|
||||
{}
|
||||
|
||||
async_msg& operator=(async_msg&& other)
|
||||
{
|
||||
logger_name = std::move(other.logger_name);
|
||||
level = other.level;
|
||||
time = std::move(other.time);
|
||||
txt = std::move(other.txt);
|
||||
return *this;
|
||||
}
|
||||
|
||||
|
||||
void fill_log_msg(log_msg &msg)
|
||||
{
|
||||
log_msg msg;
|
||||
msg.clear();
|
||||
msg.logger_name = logger_name;
|
||||
msg.level = level;
|
||||
msg.time = time;
|
||||
msg.raw << raw_msg_str;
|
||||
return msg;
|
||||
msg.time = time;
|
||||
msg.raw << txt;
|
||||
}
|
||||
};
|
||||
|
||||
public:
|
||||
|
||||
using q_type = details::mpsc_q < std::unique_ptr<async_msg> >;
|
||||
using item_type = async_msg;
|
||||
using q_type = details::mpmc_bounded_queue<item_type>;
|
||||
|
||||
using clock = std::chrono::steady_clock;
|
||||
|
||||
|
||||
explicit async_log_helper(size_t max_queue_size);
|
||||
explicit async_log_helper(size_t queue_size);
|
||||
void log(const details::log_msg& msg);
|
||||
|
||||
//Stop logging and join the back thread
|
||||
@ -97,8 +121,6 @@ public:
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
private:
|
||||
std::vector<std::shared_ptr<sinks::sink>> _sinks;
|
||||
std::atomic<bool> _active;
|
||||
@ -113,18 +135,19 @@ private:
|
||||
formatter_ptr _formatter;
|
||||
|
||||
|
||||
// will throw last back thread exception or if worker hread no active
|
||||
void _push_sentry();
|
||||
// will throw last worker thread exception or if worker thread no active
|
||||
void throw_if_bad_worker();
|
||||
|
||||
// worker thread loop
|
||||
void _thread_loop();
|
||||
void thread_loop();
|
||||
|
||||
// guess how much to sleep if queue is empty/full using last succesful op time as hint
|
||||
static void _sleep_or_yield(const clock::time_point& last_op_time);
|
||||
static void sleep_or_yield(const clock::time_point& last_op_time);
|
||||
|
||||
|
||||
// clear all remaining messages(if any), stop the _worker_thread and join it
|
||||
void _join();
|
||||
void join_worker();
|
||||
|
||||
|
||||
};
|
||||
}
|
||||
@ -133,57 +156,60 @@ private:
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
// async_sink class implementation
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
inline spdlog::details::async_log_helper::async_log_helper(size_t max_queue_size)
|
||||
inline spdlog::details::async_log_helper::async_log_helper(size_t queue_size)
|
||||
:_sinks(),
|
||||
_active(true),
|
||||
_q(max_queue_size),
|
||||
_worker_thread(&async_log_helper::_thread_loop, this)
|
||||
_q(queue_size),
|
||||
_worker_thread(&async_log_helper::thread_loop, this)
|
||||
{}
|
||||
|
||||
inline spdlog::details::async_log_helper::~async_log_helper()
|
||||
{
|
||||
_join();
|
||||
join_worker();
|
||||
}
|
||||
|
||||
|
||||
//Try to push and block until succeeded
|
||||
inline void spdlog::details::async_log_helper::log(const details::log_msg& msg)
|
||||
{
|
||||
_push_sentry();
|
||||
throw_if_bad_worker();
|
||||
|
||||
//Only if queue is full, enter wait loop
|
||||
if (!_q.push(std::unique_ptr < async_msg >(new async_msg(msg))))
|
||||
{
|
||||
//if (!_q.push(std::unique_ptr < async_msg >(new async_msg(msg))))
|
||||
//async_msg* as = new async_msg(msg);
|
||||
//if (!_q.enqueue(std::unique_ptr<async_msg>(new async_msg(msg))))
|
||||
if (!_q.enqueue(std::move(async_msg(msg))))
|
||||
{
|
||||
auto last_op_time = clock::now();
|
||||
do
|
||||
{
|
||||
_sleep_or_yield(last_op_time);
|
||||
sleep_or_yield(last_op_time);
|
||||
}
|
||||
while (!_q.push(std::unique_ptr < async_msg >(new async_msg(msg))));
|
||||
while (!_q.enqueue(std::move(async_msg(msg))));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
inline void spdlog::details::async_log_helper::_thread_loop()
|
||||
inline void spdlog::details::async_log_helper::thread_loop()
|
||||
{
|
||||
|
||||
log_msg popped_log_msg;
|
||||
clock::time_point last_pop = clock::now();
|
||||
size_t counter = 0;
|
||||
while (_active)
|
||||
{
|
||||
q_type::item_type popped_msg;
|
||||
|
||||
if (_q.pop(popped_msg))
|
||||
{
|
||||
|
||||
if (_q.dequeue(popped_msg))
|
||||
{
|
||||
last_pop = clock::now();
|
||||
|
||||
try
|
||||
{
|
||||
details::log_msg log_msg = popped_msg->to_log_msg();
|
||||
|
||||
_formatter->format(log_msg);
|
||||
popped_msg.fill_log_msg(popped_log_msg);
|
||||
_formatter->format(popped_log_msg);
|
||||
for (auto &s : _sinks)
|
||||
s->log(log_msg);
|
||||
|
||||
s->log(popped_log_msg);
|
||||
}
|
||||
catch (const std::exception& ex)
|
||||
{
|
||||
@ -196,8 +222,8 @@ inline void spdlog::details::async_log_helper::_thread_loop()
|
||||
}
|
||||
// sleep or yield if queue is empty.
|
||||
else
|
||||
{
|
||||
_sleep_or_yield(last_pop);
|
||||
{
|
||||
sleep_or_yield(last_pop);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -226,6 +252,7 @@ inline void spdlog::details::async_log_helper::set_formatter(formatter_ptr msg_f
|
||||
|
||||
inline void spdlog::details::async_log_helper::shutdown(const log_clock::duration& timeout)
|
||||
{
|
||||
/*
|
||||
if (timeout > std::chrono::milliseconds::zero())
|
||||
{
|
||||
auto until = log_clock::now() + timeout;
|
||||
@ -234,12 +261,13 @@ inline void spdlog::details::async_log_helper::shutdown(const log_clock::duratio
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(5));
|
||||
}
|
||||
}
|
||||
_join();
|
||||
join_worker();
|
||||
*/
|
||||
}
|
||||
|
||||
|
||||
// Sleep or yield using the time passed since last message as a hint
|
||||
inline void spdlog::details::async_log_helper::_sleep_or_yield(const clock::time_point& last_op_time)
|
||||
inline void spdlog::details::async_log_helper::sleep_or_yield(const clock::time_point& last_op_time)
|
||||
{
|
||||
using std::chrono::milliseconds;
|
||||
using std::this_thread::sleep_for;
|
||||
@ -255,7 +283,8 @@ inline void spdlog::details::async_log_helper::_sleep_or_yield(const clock::time
|
||||
yield();
|
||||
}
|
||||
|
||||
inline void spdlog::details::async_log_helper::_push_sentry()
|
||||
//throw if the worker thread threw an exception or not active
|
||||
inline void spdlog::details::async_log_helper::throw_if_bad_worker()
|
||||
{
|
||||
if (_last_workerthread_ex)
|
||||
{
|
||||
@ -264,11 +293,11 @@ inline void spdlog::details::async_log_helper::_push_sentry()
|
||||
throw *ex;
|
||||
}
|
||||
if (!_active)
|
||||
throw(spdlog_ex("async_sink not active"));
|
||||
throw(spdlog_ex("async logger is not active"));
|
||||
}
|
||||
|
||||
|
||||
inline void spdlog::details::async_log_helper::_join()
|
||||
inline void spdlog::details::async_log_helper::join_worker()
|
||||
{
|
||||
_active = false;
|
||||
if (_worker_thread.joinable())
|
||||
|
@ -1050,7 +1050,7 @@ public:
|
||||
{
|
||||
default:
|
||||
assert(false);
|
||||
// Fall through.
|
||||
// Fall through.
|
||||
case Arg::INT:
|
||||
return FMT_DISPATCH(visit_int(arg.int_value));
|
||||
case Arg::UINT:
|
||||
@ -2219,7 +2219,7 @@ void BasicWriter<Char>::write_double(
|
||||
// MSVC's printf doesn't support 'F'.
|
||||
type = 'f';
|
||||
#endif
|
||||
// Fall through.
|
||||
// Fall through.
|
||||
case 'E':
|
||||
case 'G':
|
||||
case 'A':
|
||||
|
@ -65,7 +65,6 @@ public:
|
||||
{
|
||||
_log_msg.logger_name = _callback_logger->name();
|
||||
_log_msg.time = log_clock::now();
|
||||
//_log_msg.tm_time = details::os::localtime(log_clock::to_time_t(_log_msg.time));
|
||||
_callback_logger->_log_msg(_log_msg);
|
||||
}
|
||||
}
|
||||
|
@ -37,7 +37,7 @@ struct log_msg
|
||||
log_msg(level::level_enum l):
|
||||
logger_name(),
|
||||
level(l),
|
||||
time(),
|
||||
time(),
|
||||
raw(),
|
||||
formatted() {}
|
||||
|
||||
@ -57,7 +57,7 @@ struct log_msg
|
||||
log_msg(log_msg&& other) :
|
||||
logger_name(std::move(other.logger_name)),
|
||||
level(other.level),
|
||||
time(std::move(other.time)),
|
||||
time(std::move(other.time)),
|
||||
raw(std::move(other.raw)),
|
||||
formatted(std::move(other.formatted))
|
||||
{
|
||||
@ -71,15 +71,13 @@ struct log_msg
|
||||
|
||||
logger_name = std::move(other.logger_name);
|
||||
level = other.level;
|
||||
time = std::move(other.time);
|
||||
time = std::move(other.time);
|
||||
raw = std::move(other.raw);
|
||||
formatted = std::move(other.formatted);
|
||||
other.clear();
|
||||
return *this;
|
||||
}
|
||||
|
||||
|
||||
|
||||
void clear()
|
||||
{
|
||||
level = level::OFF;
|
||||
@ -90,7 +88,6 @@ struct log_msg
|
||||
std::string logger_name;
|
||||
level::level_enum level;
|
||||
log_clock::time_point time;
|
||||
//std::tm tm_time;
|
||||
fmt::MemoryWriter raw;
|
||||
fmt::MemoryWriter formatted;
|
||||
};
|
||||
|
175
include/spdlog/details/mpmc_bounded_q.h
Normal file
175
include/spdlog/details/mpmc_bounded_q.h
Normal file
@ -0,0 +1,175 @@
|
||||
/*
|
||||
A modified version of Bounded MPMC queue by Dmitry Vyukov.
|
||||
|
||||
Original code from:
|
||||
http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
|
||||
|
||||
licensed by Dmitry Vyukov under the terms below:
|
||||
|
||||
Simplified BSD license
|
||||
|
||||
Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
|
||||
Redistribution and use in source and binary forms, with or without modification,
|
||||
are permitted provided that the following conditions are met:
|
||||
1. Redistributions of source code must retain the above copyright notice, this list of
|
||||
conditions and the following disclaimer.
|
||||
|
||||
2. Redistributions in binary form must reproduce the above copyright notice, this list
|
||||
of conditions and the following disclaimer in the documentation and/or other materials
|
||||
provided with the distribution.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
|
||||
WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
|
||||
SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
|
||||
INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
|
||||
OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
|
||||
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
|
||||
OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
|
||||
ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
The views and conclusions contained in the software and documentation are those of the authors and
|
||||
should not be interpreted as representing official policies, either expressed or implied, of Dmitry Vyukov.
|
||||
*/
|
||||
|
||||
/*
|
||||
The code in its current form adds the license below:
|
||||
|
||||
spdlog - an extremely fast and easy to use c++11 logging library.
|
||||
Copyright (c) 2014 Gabi Melman.
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining
|
||||
a copy of this software and associated documentation files (the
|
||||
"Software"), to deal in the Software without restriction, including
|
||||
without limitation the rights to use, copy, modify, merge, publish,
|
||||
distribute, sublicense, and/or sell copies of the Software, and to
|
||||
permit persons to whom the Software is furnished to do so, subject to
|
||||
the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be
|
||||
included in all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
|
||||
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
|
||||
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
|
||||
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
|
||||
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <atomic>
|
||||
#include "../common.h"
|
||||
|
||||
namespace spdlog
|
||||
{
|
||||
namespace details
|
||||
{
|
||||
|
||||
template<typename T>
|
||||
class mpmc_bounded_queue
|
||||
{
|
||||
public:
|
||||
|
||||
using item_type = T;
|
||||
mpmc_bounded_queue(size_t buffer_size)
|
||||
: buffer_(new cell_t [buffer_size]),
|
||||
buffer_mask_(buffer_size - 1)
|
||||
{
|
||||
//queue size must be power of two
|
||||
if(!((buffer_size >= 2) && ((buffer_size & (buffer_size - 1)) == 0)))
|
||||
throw spdlog_ex("async logger queue size must be power of two");
|
||||
|
||||
for (size_t i = 0; i != buffer_size; i += 1)
|
||||
buffer_[i].sequence_.store(i, std::memory_order_relaxed);
|
||||
enqueue_pos_.store(0, std::memory_order_relaxed);
|
||||
dequeue_pos_.store(0, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
~mpmc_bounded_queue()
|
||||
{
|
||||
delete [] buffer_;
|
||||
}
|
||||
|
||||
|
||||
bool enqueue(T&& data)
|
||||
{
|
||||
cell_t* cell;
|
||||
size_t pos = enqueue_pos_.load(std::memory_order_relaxed);
|
||||
for (;;)
|
||||
{
|
||||
cell = &buffer_[pos & buffer_mask_];
|
||||
size_t seq = cell->sequence_.load(std::memory_order_acquire);
|
||||
intptr_t dif = (intptr_t)seq - (intptr_t)pos;
|
||||
if (dif == 0)
|
||||
{
|
||||
if (enqueue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
|
||||
break;
|
||||
}
|
||||
else if (dif < 0)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
else
|
||||
{
|
||||
pos = enqueue_pos_.load(std::memory_order_relaxed);
|
||||
}
|
||||
}
|
||||
cell->data_ = std::move(data);
|
||||
cell->sequence_.store(pos + 1, std::memory_order_release);
|
||||
return true;
|
||||
}
|
||||
|
||||
bool dequeue(T& data)
|
||||
{
|
||||
cell_t* cell;
|
||||
size_t pos = dequeue_pos_.load(std::memory_order_relaxed);
|
||||
for (;;)
|
||||
{
|
||||
cell = &buffer_[pos & buffer_mask_];
|
||||
size_t seq =
|
||||
cell->sequence_.load(std::memory_order_acquire);
|
||||
intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);
|
||||
if (dif == 0)
|
||||
{
|
||||
if (dequeue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
|
||||
break;
|
||||
}
|
||||
else if (dif < 0)
|
||||
return false;
|
||||
else
|
||||
pos = dequeue_pos_.load(std::memory_order_relaxed);
|
||||
}
|
||||
data = std::move(cell->data_);
|
||||
cell->sequence_.store(pos + buffer_mask_ + 1, std::memory_order_release);
|
||||
return true;
|
||||
}
|
||||
|
||||
private:
|
||||
struct cell_t
|
||||
{
|
||||
std::atomic<size_t> sequence_;
|
||||
T data_;
|
||||
};
|
||||
|
||||
static size_t const cacheline_size = 64;
|
||||
typedef char cacheline_pad_t [cacheline_size];
|
||||
|
||||
cacheline_pad_t pad0_;
|
||||
cell_t* const buffer_;
|
||||
size_t const buffer_mask_;
|
||||
cacheline_pad_t pad1_;
|
||||
std::atomic<size_t> enqueue_pos_;
|
||||
cacheline_pad_t pad2_;
|
||||
std::atomic<size_t> dequeue_pos_;
|
||||
cacheline_pad_t pad3_;
|
||||
|
||||
mpmc_bounded_queue(mpmc_bounded_queue const&);
|
||||
void operator = (mpmc_bounded_queue const&);
|
||||
};
|
||||
|
||||
} // ns details
|
||||
} // ns spdlog
|
@ -458,7 +458,7 @@ inline void spdlog::pattern_formatter::handle_flag(char flag)
|
||||
{
|
||||
switch (flag)
|
||||
{
|
||||
// logger name
|
||||
// logger name
|
||||
case 'n':
|
||||
_formatters.push_back(std::unique_ptr<details::flag_formatter>(new details::name_formatter()));
|
||||
break;
|
||||
@ -574,7 +574,7 @@ inline void spdlog::pattern_formatter::format(details::log_msg& msg)
|
||||
{
|
||||
try
|
||||
{
|
||||
auto tm_time = details::os::localtime(log_clock::to_time_t(msg.time));
|
||||
auto tm_time = details::os::localtime(log_clock::to_time_t(msg.time));
|
||||
for (auto &f : _formatters)
|
||||
{
|
||||
f->format(msg, tm_time);
|
||||
|
@ -24,7 +24,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <ostream>
|
||||
#include <ostream>
|
||||
#include <mutex>
|
||||
#include <memory>
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user