mirror of
https://github.com/gabime/spdlog.git
synced 2025-01-23 14:12:06 +08:00
Use blocking queue
This commit is contained in:
parent
cf63bcb808
commit
b9d7c45e40
@ -28,6 +28,8 @@ int main(int, char *[])
|
||||
|
||||
try
|
||||
{
|
||||
async_example();
|
||||
return 0;
|
||||
auto console = spdlog::stdout_color_st("console");
|
||||
console->info("Welcome to spdlog!");
|
||||
|
||||
@ -82,7 +84,7 @@ int main(int, char *[])
|
||||
|
||||
// Asynchronous logging is very fast..
|
||||
// Just call spdlog::set_async_mode(q_size) and all created loggers from now on will be asynchronous..
|
||||
async_example();
|
||||
//async_example();
|
||||
|
||||
// Log user-defined types example
|
||||
user_defined_example();
|
||||
@ -107,15 +109,23 @@ int main(int, char *[])
|
||||
#include "spdlog/async.h"
|
||||
void async_example()
|
||||
{
|
||||
auto async_file = spd::basic_logger_mt<spdlog::create_async>("async_file_logger", "logs/async_log.txt");
|
||||
for (int i = 0; i < 100; ++i)
|
||||
{
|
||||
async_file->info("Async message #{}", i);
|
||||
}
|
||||
//auto async_file = spd::basic_logger_mt<spdlog::create_async>("async_file_logger", "logs/async_log.txt");
|
||||
|
||||
for (int j = 0; j < 1; j++)
|
||||
{
|
||||
spdlog::init_thread_pool(1024, 10);
|
||||
auto async_file = spd::stderr_color_mt<spdlog::create_async>("console");
|
||||
for (int i = 0; i < 1024; ++i)
|
||||
{
|
||||
async_file->info("{} Async message #{}", j, i);
|
||||
}
|
||||
spdlog::drop_all();
|
||||
}
|
||||
//std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
|
||||
|
||||
// you can also modify thread pool settings *before* creating the logger:
|
||||
// spdlog::init_thread_pool(32768, 4); // queue with 32k of pre allocated items and 4 backing threads.
|
||||
// if not called a defaults are: preallocated 8192 queue items and 1 worker thread.
|
||||
// spdlog::init_thread_pool(32768, 4); // queue with max 32k items 4 backing threads.
|
||||
}
|
||||
|
||||
// syslog example (linux/osx/freebsd)
|
||||
|
@ -8,7 +8,7 @@
|
||||
// async logger implementation
|
||||
// uses a thread pool to perform the actual logging
|
||||
|
||||
#include "../details/thread_pool.h"
|
||||
#include "spdlog/details/thread_pool.h"
|
||||
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
|
84
include/spdlog/details/mpmc_blocking_q.h
Normal file
84
include/spdlog/details/mpmc_blocking_q.h
Normal file
@ -0,0 +1,84 @@
|
||||
#pragma once
|
||||
|
||||
//
|
||||
// Copyright(c) 2018 Gabi Melman.
|
||||
// Distributed under the MIT License (http://opensource.org/licenses/MIT)
|
||||
//
|
||||
|
||||
// async log helper :
|
||||
// multi producer-multi consumer blocking queue
|
||||
// enqueue(..) - will block until room found to put the new message
|
||||
// enqueue_nowait(..) - will return immediatly with false if no room left in the queue
|
||||
// dequeue_for(..) - will block until the queue is not empty or timeout passed
|
||||
|
||||
#include <condition_variable>
|
||||
#include <mutex>
|
||||
#include <queue>
|
||||
|
||||
namespace spdlog {
|
||||
namespace details {
|
||||
|
||||
template<typename T>
|
||||
class mpmc_bounded_queue
|
||||
{
|
||||
public:
|
||||
using item_type = T;
|
||||
explicit mpmc_bounded_queue(size_t max_items)
|
||||
: max_items_(max_items)
|
||||
{
|
||||
}
|
||||
|
||||
// try to enqueue and block if no room left
|
||||
void enqueue(T &&item)
|
||||
{
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(queue_mutex_);
|
||||
pop_cv_.wait(lock, [this] { return this->q_.size() < this->max_items_; });
|
||||
q_.push(std::move(item));
|
||||
}
|
||||
push_cv_.notify_one();
|
||||
}
|
||||
|
||||
// try to enqueue and return immdeialty false if no room left
|
||||
bool enqueue_nowait(T &&item)
|
||||
{
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(queue_mutex_);
|
||||
if (q_.size() == this->max_items_)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
q_.push(std::forward<T>(item));
|
||||
}
|
||||
push_cv_.notify_one();
|
||||
return true;
|
||||
}
|
||||
|
||||
// try to dequeue item. if no item found. wait upto timeout and try again
|
||||
// Return true, if succeeded dequeue item, false otherwise
|
||||
bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration)
|
||||
{
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(queue_mutex_);
|
||||
if (!push_cv_.wait_for(lock, wait_duration, [this] { return this->q_.size() > 0; }))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
popped_item = std::move(q_.front());
|
||||
q_.pop();
|
||||
}
|
||||
pop_cv_.notify_one();
|
||||
return true;
|
||||
}
|
||||
|
||||
private:
|
||||
size_t max_items_;
|
||||
std::mutex queue_mutex_;
|
||||
std::condition_variable push_cv_;
|
||||
std::condition_variable pop_cv_;
|
||||
|
||||
std::queue<T> q_;
|
||||
};
|
||||
} // namespace details
|
||||
} // namespace spdlog
|
@ -1,183 +0,0 @@
|
||||
/*
|
||||
A modified version of Bounded MPMC queue by Dmitry Vyukov.
|
||||
|
||||
Original code from:
|
||||
http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
|
||||
|
||||
licensed by Dmitry Vyukov under the terms below:
|
||||
|
||||
Simplified BSD license
|
||||
|
||||
Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
|
||||
Redistribution and use in source and binary forms, with or without modification,
|
||||
are permitted provided that the following conditions are met:
|
||||
1. Redistributions of source code must retain the above copyright notice, this list of
|
||||
conditions and the following disclaimer.
|
||||
|
||||
2. Redistributions in binary form must reproduce the above copyright notice, this list
|
||||
of conditions and the following disclaimer in the documentation and/or other materials
|
||||
provided with the distribution.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
|
||||
WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
|
||||
SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
|
||||
INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
|
||||
OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
|
||||
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
|
||||
OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
|
||||
ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
The views and conclusions contained in the software and documentation are those of the authors and
|
||||
should not be interpreted as representing official policies, either expressed or implied, of Dmitry Vyukov.
|
||||
*/
|
||||
|
||||
/*
|
||||
The code in its current form adds the license below:
|
||||
|
||||
Copyright(c) 2015 Gabi Melman.
|
||||
Distributed under the MIT License (http://opensource.org/licenses/MIT)
|
||||
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "spdlog/common.h"
|
||||
|
||||
#include <atomic>
|
||||
#include <utility>
|
||||
|
||||
namespace spdlog {
|
||||
namespace details {
|
||||
|
||||
template<typename T>
|
||||
class mpmc_bounded_queue
|
||||
{
|
||||
public:
|
||||
using item_type = T;
|
||||
|
||||
explicit mpmc_bounded_queue(size_t buffer_size)
|
||||
: max_size_(buffer_size)
|
||||
, buffer_(new cell_t[buffer_size])
|
||||
, buffer_mask_(buffer_size - 1)
|
||||
{
|
||||
// queue size must be power of two
|
||||
if (!((buffer_size >= 2) && ((buffer_size & (buffer_size - 1)) == 0)))
|
||||
{
|
||||
throw spdlog_ex("async logger queue size must be power of two");
|
||||
}
|
||||
|
||||
for (size_t i = 0; i != buffer_size; i += 1)
|
||||
{
|
||||
buffer_[i].sequence_.store(i, std::memory_order_relaxed);
|
||||
}
|
||||
enqueue_pos_.store(0, std::memory_order_relaxed);
|
||||
dequeue_pos_.store(0, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
~mpmc_bounded_queue()
|
||||
{
|
||||
delete[] buffer_;
|
||||
}
|
||||
|
||||
mpmc_bounded_queue(mpmc_bounded_queue const &) = delete;
|
||||
void operator=(mpmc_bounded_queue const &) = delete;
|
||||
|
||||
bool enqueue(T &&data)
|
||||
{
|
||||
cell_t *cell;
|
||||
size_t pos = enqueue_pos_.load(std::memory_order_relaxed);
|
||||
for (;;)
|
||||
{
|
||||
cell = &buffer_[pos & buffer_mask_];
|
||||
size_t seq = cell->sequence_.load(std::memory_order_acquire);
|
||||
intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos);
|
||||
if (dif == 0)
|
||||
{
|
||||
if (enqueue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
else if (dif < 0)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
else
|
||||
{
|
||||
pos = enqueue_pos_.load(std::memory_order_relaxed);
|
||||
}
|
||||
}
|
||||
cell->data_ = std::move(data);
|
||||
cell->sequence_.store(pos + 1, std::memory_order_release);
|
||||
return true;
|
||||
}
|
||||
|
||||
bool dequeue(T &data)
|
||||
{
|
||||
cell_t *cell;
|
||||
size_t pos = dequeue_pos_.load(std::memory_order_relaxed);
|
||||
for (;;)
|
||||
{
|
||||
cell = &buffer_[pos & buffer_mask_];
|
||||
size_t seq = cell->sequence_.load(std::memory_order_acquire);
|
||||
intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1);
|
||||
if (dif == 0)
|
||||
{
|
||||
if (dequeue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
else if (dif < 0)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
else
|
||||
{
|
||||
pos = dequeue_pos_.load(std::memory_order_relaxed);
|
||||
}
|
||||
}
|
||||
data = std::move(cell->data_);
|
||||
cell->sequence_.store(pos + buffer_mask_ + 1, std::memory_order_release);
|
||||
return true;
|
||||
}
|
||||
|
||||
bool is_empty()
|
||||
{
|
||||
size_t front, front1, back;
|
||||
// try to take a consistent snapshot of front/tail.
|
||||
do
|
||||
{
|
||||
front = enqueue_pos_.load(std::memory_order_acquire);
|
||||
back = dequeue_pos_.load(std::memory_order_acquire);
|
||||
front1 = enqueue_pos_.load(std::memory_order_relaxed);
|
||||
} while (front != front1);
|
||||
return back == front;
|
||||
}
|
||||
|
||||
private:
|
||||
struct cell_t
|
||||
{
|
||||
std::atomic<size_t> sequence_;
|
||||
T data_;
|
||||
};
|
||||
|
||||
size_t const max_size_;
|
||||
|
||||
static size_t const cacheline_size = 64;
|
||||
using cacheline_pad_t = char[cacheline_size];
|
||||
|
||||
cacheline_pad_t pad0_;
|
||||
cell_t *const buffer_;
|
||||
size_t const buffer_mask_;
|
||||
cacheline_pad_t pad1_;
|
||||
std::atomic<size_t> enqueue_pos_;
|
||||
cacheline_pad_t pad2_;
|
||||
std::atomic<size_t> dequeue_pos_;
|
||||
cacheline_pad_t pad3_;
|
||||
};
|
||||
|
||||
} // namespace details
|
||||
} // namespace spdlog
|
@ -1,7 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include "spdlog/details/log_msg.h"
|
||||
#include "spdlog/details/mpmc_bounded_q.h"
|
||||
#include "spdlog/details/mpmc_blocking_q.h"
|
||||
#include "spdlog/details/os.h"
|
||||
|
||||
#include <chrono>
|
||||
@ -10,233 +10,196 @@
|
||||
#include <vector>
|
||||
|
||||
namespace spdlog {
|
||||
namespace details {
|
||||
namespace details {
|
||||
|
||||
using async_logger_ptr = std::shared_ptr<spdlog::async_logger>;
|
||||
using async_logger_ptr = std::shared_ptr<spdlog::async_logger>;
|
||||
|
||||
enum class async_msg_type
|
||||
{
|
||||
log,
|
||||
flush,
|
||||
terminate
|
||||
};
|
||||
enum class async_msg_type
|
||||
{
|
||||
log,
|
||||
flush,
|
||||
terminate
|
||||
};
|
||||
|
||||
// Async msg to move to/from the queue
|
||||
// Movable only. should never be copied
|
||||
struct async_msg
|
||||
{
|
||||
async_msg_type msg_type;
|
||||
level::level_enum level;
|
||||
log_clock::time_point time;
|
||||
size_t thread_id;
|
||||
fmt::MemoryWriter raw;
|
||||
// Async msg to move to/from the queue
|
||||
// Movable only. should never be copied
|
||||
struct async_msg
|
||||
{
|
||||
async_msg_type msg_type;
|
||||
level::level_enum level;
|
||||
log_clock::time_point time;
|
||||
size_t thread_id;
|
||||
fmt::MemoryWriter raw;
|
||||
|
||||
size_t msg_id;
|
||||
async_logger_ptr worker_ptr;
|
||||
size_t msg_id;
|
||||
async_logger_ptr worker_ptr;
|
||||
|
||||
async_msg() = default;
|
||||
~async_msg() = default;
|
||||
async_msg() = default;
|
||||
~async_msg() = default;
|
||||
|
||||
// never copy or assign. should only be move assigned in to the queue..
|
||||
async_msg(const async_msg &) = delete;
|
||||
async_msg &operator=(const async_msg &other) = delete;
|
||||
async_msg(async_msg &&other) = delete;
|
||||
// should only be moved in or out of the queue..
|
||||
async_msg(const async_msg &) = delete;
|
||||
async_msg(async_msg &&other) = default;
|
||||
async_msg &operator=(async_msg &&other) = default;
|
||||
|
||||
// construct from log_msg with given type
|
||||
async_msg(async_logger_ptr &&worker, async_msg_type the_type, details::log_msg &&m)
|
||||
: msg_type(the_type)
|
||||
, level(m.level)
|
||||
, time(m.time)
|
||||
, thread_id(m.thread_id)
|
||||
, raw(std::move(m.raw))
|
||||
, msg_id(m.msg_id)
|
||||
, worker_ptr(std::forward<async_logger_ptr>(worker))
|
||||
{
|
||||
}
|
||||
// construct from log_msg with given type
|
||||
async_msg(async_logger_ptr &&worker, async_msg_type the_type, details::log_msg &&m)
|
||||
: msg_type(the_type)
|
||||
, level(m.level)
|
||||
, time(m.time)
|
||||
, thread_id(m.thread_id)
|
||||
, raw(std::move(m.raw))
|
||||
, msg_id(m.msg_id)
|
||||
, worker_ptr(std::forward<async_logger_ptr>(worker))
|
||||
{
|
||||
}
|
||||
|
||||
async_msg(async_logger_ptr &&worker, async_msg_type the_type)
|
||||
: async_msg(std::forward<async_logger_ptr>(worker), the_type, details::log_msg())
|
||||
{
|
||||
}
|
||||
async_msg(async_logger_ptr &&worker, async_msg_type the_type)
|
||||
: async_msg(std::forward<async_logger_ptr>(worker), the_type, details::log_msg())
|
||||
{
|
||||
}
|
||||
|
||||
async_msg(async_msg_type the_type)
|
||||
: async_msg(nullptr, the_type, details::log_msg())
|
||||
{
|
||||
}
|
||||
async_msg(async_msg_type the_type)
|
||||
: async_msg(nullptr, the_type, details::log_msg())
|
||||
{
|
||||
}
|
||||
|
||||
// used to move to the message queue
|
||||
async_msg &operator=(async_msg &&other) SPDLOG_NOEXCEPT
|
||||
{
|
||||
msg_type = other.msg_type;
|
||||
level = other.level;
|
||||
time = other.time;
|
||||
thread_id = other.thread_id;
|
||||
raw = std::move(other.raw);
|
||||
msg_id = other.msg_id;
|
||||
worker_ptr = std::move(other.worker_ptr);
|
||||
return *this;
|
||||
}
|
||||
// copy into log_msg
|
||||
void to_log_msg(log_msg &&msg)
|
||||
{
|
||||
msg.logger_name = &worker_ptr->name();
|
||||
msg.level = level;
|
||||
msg.time = time;
|
||||
msg.thread_id = thread_id;
|
||||
msg.raw = std::move(raw);
|
||||
msg.formatted.clear();
|
||||
msg.msg_id = msg_id;
|
||||
msg.color_range_start = 0;
|
||||
msg.color_range_end = 0;
|
||||
}
|
||||
};
|
||||
|
||||
// copy into log_msg
|
||||
void to_log_msg(log_msg &&msg)
|
||||
{
|
||||
msg.logger_name = &worker_ptr->name();
|
||||
msg.level = level;
|
||||
msg.time = time;
|
||||
msg.thread_id = thread_id;
|
||||
msg.raw = std::move(raw);
|
||||
msg.formatted.clear();
|
||||
msg.msg_id = msg_id;
|
||||
msg.color_range_start = 0;
|
||||
msg.color_range_end = 0;
|
||||
}
|
||||
};
|
||||
class thread_pool
|
||||
{
|
||||
public:
|
||||
using item_type = async_msg;
|
||||
using q_type = details::mpmc_bounded_queue<item_type>;
|
||||
using clock_type = std::chrono::steady_clock;
|
||||
|
||||
class thread_pool
|
||||
{
|
||||
public:
|
||||
using item_type = async_msg;
|
||||
using q_type = details::mpmc_bounded_queue<item_type>;
|
||||
using clock_type = std::chrono::steady_clock;
|
||||
thread_pool(size_t q_size_bytes, size_t threads_n)
|
||||
: msg_counter_(0)
|
||||
, _q(q_size_bytes)
|
||||
{
|
||||
// std::cout << "thread_pool() q_size_bytes: " << q_size_bytes << "\tthreads_n: " << threads_n << std::endl;
|
||||
if (threads_n == 0 || threads_n > 1000)
|
||||
{
|
||||
throw spdlog_ex("spdlog::thread_pool(): invalid threads_n param (valid range is 1-1000)");
|
||||
}
|
||||
for (size_t i = 0; i < threads_n; i++)
|
||||
{
|
||||
_threads.emplace_back(std::bind(&thread_pool::worker_loop, this));
|
||||
}
|
||||
}
|
||||
|
||||
thread_pool(size_t q_size_bytes, size_t threads_n)
|
||||
: _msg_counter(0)
|
||||
, _q(q_size_bytes)
|
||||
{
|
||||
// std::cout << "thread_pool() q_size_bytes: " << q_size_bytes << "\tthreads_n: " << threads_n << std::endl;
|
||||
if (threads_n == 0 || threads_n > 1000)
|
||||
{
|
||||
throw spdlog_ex("spdlog::thread_pool(): invalid threads_n param (valid range is 1-1000)");
|
||||
}
|
||||
for (size_t i = 0; i < threads_n; i++)
|
||||
{
|
||||
_threads.emplace_back(std::bind(&thread_pool::_worker_loop, this));
|
||||
}
|
||||
}
|
||||
// message all threads to terminate gracefully join them
|
||||
~thread_pool()
|
||||
{
|
||||
try
|
||||
{
|
||||
for (size_t i = 0; i < _threads.size(); i++)
|
||||
{
|
||||
post_async_msg(async_msg(async_msg_type::terminate), async_overflow_policy::block_retry);
|
||||
}
|
||||
|
||||
// message all threads to terminate gracefully join them
|
||||
~thread_pool()
|
||||
{
|
||||
try
|
||||
{
|
||||
for (size_t i = 0; i < _threads.size(); i++)
|
||||
{
|
||||
_post_async_msg(async_msg(async_msg_type::terminate), async_overflow_policy::block_retry);
|
||||
}
|
||||
for (auto &t : _threads)
|
||||
{
|
||||
t.join();
|
||||
}
|
||||
// std::cout << "~thread_pool() msg_counter_: " << msg_counter_ << std::endl;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
}
|
||||
}
|
||||
|
||||
for (auto &t : _threads)
|
||||
{
|
||||
t.join();
|
||||
}
|
||||
// std::cout << "~thread_pool() _msg_counter: " << _msg_counter << std::endl;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
}
|
||||
}
|
||||
void post_log(async_logger_ptr &&worker_ptr, details::log_msg &&msg, async_overflow_policy overflow_policy)
|
||||
{
|
||||
async_msg async_m(std::forward<async_logger_ptr>(worker_ptr), async_msg_type::log, std::forward<log_msg>(msg));
|
||||
post_async_msg(std::move(async_m), overflow_policy);
|
||||
}
|
||||
|
||||
void post_log(async_logger_ptr &&worker_ptr, details::log_msg &&msg, async_overflow_policy overflow_policy)
|
||||
{
|
||||
async_msg as_m(std::forward<async_logger_ptr>(worker_ptr), async_msg_type::log, std::forward<log_msg>(msg));
|
||||
_post_async_msg(std::move(as_m), overflow_policy);
|
||||
}
|
||||
void post_flush(async_logger_ptr &&worker_ptr, async_overflow_policy overflow_policy)
|
||||
{
|
||||
post_async_msg(async_msg(std::forward<async_logger_ptr>(worker_ptr), async_msg_type::flush), overflow_policy);
|
||||
}
|
||||
|
||||
void post_flush(async_logger_ptr &&worker_ptr, async_overflow_policy overflow_policy)
|
||||
{
|
||||
_post_async_msg(async_msg(std::forward<async_logger_ptr>(worker_ptr), async_msg_type::flush), overflow_policy);
|
||||
}
|
||||
size_t msg_counter()
|
||||
{
|
||||
return msg_counter_.load(std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
size_t msg_counter()
|
||||
{
|
||||
return _msg_counter.load(std::memory_order_relaxed);
|
||||
}
|
||||
private:
|
||||
std::atomic<size_t> msg_counter_; // total # of messages processed in this pool
|
||||
q_type _q;
|
||||
|
||||
private:
|
||||
std::atomic<size_t> _msg_counter; // total # of messages processed in this pool
|
||||
q_type _q;
|
||||
std::vector<std::thread> _threads;
|
||||
|
||||
std::vector<std::thread> _threads;
|
||||
void post_async_msg(async_msg &&new_msg, async_overflow_policy overflow_policy)
|
||||
{
|
||||
if (overflow_policy == async_overflow_policy::block_retry)
|
||||
{
|
||||
_q.enqueue(std::move(new_msg));
|
||||
}
|
||||
else
|
||||
{
|
||||
_q.enqueue_nowait(std::move(new_msg));
|
||||
}
|
||||
}
|
||||
|
||||
void _post_async_msg(async_msg &&new_msg, async_overflow_policy overflow_policy)
|
||||
{
|
||||
void worker_loop()
|
||||
{
|
||||
while (process_next_msg())
|
||||
{
|
||||
};
|
||||
}
|
||||
|
||||
if (!_q.enqueue(std::forward<async_msg>(new_msg)) && overflow_policy == async_overflow_policy::block_retry)
|
||||
{
|
||||
auto last_op_time = clock_type::now();
|
||||
auto now = last_op_time;
|
||||
do
|
||||
{
|
||||
now = clock_type::now();
|
||||
sleep_or_yield(now, last_op_time);
|
||||
} while (!_q.enqueue(std::move(new_msg)));
|
||||
}
|
||||
}
|
||||
// process next message in the queue
|
||||
// return true if this thread should still be active (while no terminate msg was received)
|
||||
bool process_next_msg()
|
||||
{
|
||||
async_msg incoming_async_msg;
|
||||
bool dequeued = _q.dequeue_for(incoming_async_msg, std::chrono::seconds(10));
|
||||
if (!dequeued)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
// pop log messages from the queue and send to the logger worker
|
||||
void _worker_loop()
|
||||
{
|
||||
async_msg popped_async_msg;
|
||||
log_msg msg;
|
||||
bool active = true;
|
||||
auto last_pop_time = clock_type::now();
|
||||
while (active)
|
||||
{
|
||||
if (_q.dequeue(popped_async_msg))
|
||||
{
|
||||
last_pop_time = clock_type::now();
|
||||
switch (popped_async_msg.msg_type)
|
||||
{
|
||||
case async_msg_type::flush:
|
||||
{
|
||||
auto worker = std::move(popped_async_msg.worker_ptr);
|
||||
worker->_backend_flush();
|
||||
break;
|
||||
}
|
||||
switch (incoming_async_msg.msg_type)
|
||||
{
|
||||
case async_msg_type::flush:
|
||||
{
|
||||
incoming_async_msg.worker_ptr->_backend_flush();
|
||||
return true;
|
||||
}
|
||||
|
||||
case async_msg_type::terminate:
|
||||
active = false;
|
||||
break;
|
||||
case async_msg_type::terminate:
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
default:
|
||||
{
|
||||
popped_async_msg.to_log_msg(std::move(msg));
|
||||
auto worker = std::move(popped_async_msg.worker_ptr);
|
||||
worker->_backend_log(msg);
|
||||
_msg_counter.fetch_add(1, std::memory_order_relaxed);
|
||||
}
|
||||
}
|
||||
}
|
||||
else // queue is empty - the only place we can terminate the thread if needed.
|
||||
{
|
||||
sleep_or_yield(clock_type::now(), last_pop_time);
|
||||
}
|
||||
}
|
||||
}
|
||||
default:
|
||||
{
|
||||
log_msg msg;
|
||||
incoming_async_msg.to_log_msg(std::move(msg));
|
||||
incoming_async_msg.worker_ptr->_backend_log(msg);
|
||||
msg_counter_.fetch_add(1, std::memory_order_relaxed);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
assert(false);
|
||||
return true; // should not be reached
|
||||
}
|
||||
};
|
||||
|
||||
// spin, yield or sleep. use the time passed since last message as a hint
|
||||
static void sleep_or_yield(const clock_type::time_point &now, const clock_type::time_point &last_op_time)
|
||||
{
|
||||
using std::chrono::microseconds;
|
||||
using std::chrono::milliseconds;
|
||||
|
||||
auto time_since_op = now - last_op_time;
|
||||
|
||||
// yield upto 150 micros
|
||||
if (time_since_op <= microseconds(150))
|
||||
{
|
||||
return std::this_thread::yield();
|
||||
}
|
||||
|
||||
// sleep for 20 ms upto 200 ms
|
||||
if (time_since_op <= milliseconds(200))
|
||||
{
|
||||
return details::os::sleep_for_millis(20);
|
||||
}
|
||||
|
||||
// sleep for 500 ms
|
||||
return details::os::sleep_for_millis(500);
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace details
|
||||
} // namespace details
|
||||
} // namespace spdlog
|
||||
|
@ -7,7 +7,8 @@ set(SPDLOG_UTESTS_SOURCES
|
||||
file_helper.cpp
|
||||
file_log.cpp
|
||||
test_misc.cpp
|
||||
test_pattern_formatter
|
||||
test_pattern_formatter.cpp
|
||||
test_async.cpp
|
||||
includes.h
|
||||
registry.cpp
|
||||
test_macros.cpp
|
||||
|
134
tests/test_async.cpp
Normal file
134
tests/test_async.cpp
Normal file
@ -0,0 +1,134 @@
|
||||
#include "includes.h"
|
||||
#include "test_sink.h"
|
||||
#include "spdlog/async.h"
|
||||
#include "spdlog/sinks/simple_file_sink.h"
|
||||
|
||||
//std::unique_ptr<spdlog::async_logger> create_logger(size_t tp_queue_size, size_t tp_threads)
|
||||
//{
|
||||
// auto tp = std::make_shared<details::thread_pool>(8192, 1);
|
||||
// auto logger = std::make_shared<async_logger>("as", test_sink, tp, async_overflow_policy::block_retry);
|
||||
//}
|
||||
|
||||
TEST_CASE("basic async test ", "[async]")
|
||||
{
|
||||
using namespace spdlog;
|
||||
auto test_sink = std::make_shared<sinks::test_sink_mt>();
|
||||
size_t queue_size = 128;
|
||||
size_t messages = 256;
|
||||
{
|
||||
auto tp = std::make_shared<details::thread_pool>(queue_size, 1);
|
||||
auto logger = std::make_shared<async_logger>("as", test_sink, tp, async_overflow_policy::block_retry);
|
||||
for (size_t i = 0; i < messages; i++)
|
||||
{
|
||||
logger->info("Hello message #{}", i);
|
||||
}
|
||||
logger->flush();
|
||||
}
|
||||
REQUIRE(test_sink->msg_counter() == messages);
|
||||
REQUIRE(test_sink->flushed_msg_counter() == messages);
|
||||
}
|
||||
|
||||
TEST_CASE("discard policy ", "[async]")
|
||||
{
|
||||
using namespace spdlog;
|
||||
auto test_sink = std::make_shared<sinks::test_sink_mt>();
|
||||
size_t queue_size = 2;
|
||||
size_t messages = 1024;
|
||||
{
|
||||
auto tp = std::make_shared<details::thread_pool>(queue_size, 1);
|
||||
auto logger = std::make_shared<async_logger>("as", test_sink, tp, async_overflow_policy::discard_log_msg);
|
||||
for (size_t i = 0; i < messages; i++)
|
||||
{
|
||||
logger->info("Hello message #{}", i);
|
||||
}
|
||||
}
|
||||
|
||||
REQUIRE(test_sink->msg_counter() < messages);
|
||||
REQUIRE(test_sink->flushed_msg_counter() < messages);
|
||||
}
|
||||
|
||||
TEST_CASE("flush", "[async]")
|
||||
{
|
||||
using namespace spdlog;
|
||||
auto test_sink = std::make_shared<sinks::test_sink_mt>();
|
||||
size_t queue_size = 256;
|
||||
size_t messages = 256;
|
||||
{
|
||||
auto tp = std::make_shared<details::thread_pool>(queue_size, 1);
|
||||
auto logger = std::make_shared<async_logger>("as", test_sink, tp, async_overflow_policy::block_retry);
|
||||
for (size_t i = 0; i < messages; i++)
|
||||
{
|
||||
logger->info("Hello message #{}", i);
|
||||
}
|
||||
|
||||
logger->flush();
|
||||
}
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(250));
|
||||
REQUIRE(test_sink->msg_counter() == messages);
|
||||
REQUIRE(test_sink->flushed_msg_counter() == messages);
|
||||
}
|
||||
|
||||
TEST_CASE("multi threads", "[async]")
|
||||
{
|
||||
using namespace spdlog;
|
||||
auto test_sink = std::make_shared<sinks::test_sink_mt>();
|
||||
size_t queue_size = 128;
|
||||
size_t messages = 256;
|
||||
size_t n_threads = 10;
|
||||
{
|
||||
auto tp = std::make_shared<details::thread_pool>(queue_size, 1);
|
||||
auto logger = std::make_shared<async_logger>("as", test_sink, tp, async_overflow_policy::block_retry);
|
||||
|
||||
std::vector<std::thread> threads;
|
||||
for (size_t i = 0; i < n_threads; i++)
|
||||
{
|
||||
threads.emplace_back([logger, messages] {
|
||||
for (size_t j = 0; j < messages; j++)
|
||||
{
|
||||
logger->info("Hello message #{}", j);
|
||||
}
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
for (auto &t : threads)
|
||||
{
|
||||
t.join();
|
||||
}
|
||||
logger->flush();
|
||||
|
||||
}
|
||||
|
||||
REQUIRE(test_sink->msg_counter() == messages * n_threads);
|
||||
REQUIRE(test_sink->flushed_msg_counter() == messages * n_threads);
|
||||
}
|
||||
|
||||
TEST_CASE("to_file", "[async]")
|
||||
{
|
||||
prepare_logdir();
|
||||
size_t queue_size = 512;
|
||||
size_t messages = 512;
|
||||
size_t n_threads = 4;
|
||||
spdlog::init_thread_pool(queue_size, n_threads);
|
||||
auto logger= spdlog::basic_logger_mt<spdlog::create_async>("as", "logs/async_test.log", true);
|
||||
|
||||
std::vector<std::thread> threads;
|
||||
for (size_t i = 0; i < n_threads; i++)
|
||||
{
|
||||
threads.emplace_back([logger, messages] {
|
||||
for (size_t j = 0; j < messages; j++)
|
||||
{
|
||||
logger->info("Hello message #{}", j);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
for (auto &t : threads)
|
||||
{
|
||||
t.join();
|
||||
}
|
||||
logger.reset();
|
||||
spdlog::drop("as");
|
||||
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
REQUIRE(count_lines("logs/async_test.log") == messages * n_threads);
|
||||
}
|
48
tests/test_sink.h
Normal file
48
tests/test_sink.h
Normal file
@ -0,0 +1,48 @@
|
||||
//
|
||||
// Copyright(c) 2018 Gabi Melman.
|
||||
// Distributed under the MIT License (http://opensource.org/licenses/MIT)
|
||||
//
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "spdlog/details/null_mutex.h"
|
||||
#include "spdlog/sinks/base_sink.h"
|
||||
|
||||
#include <mutex>
|
||||
|
||||
namespace spdlog {
|
||||
namespace sinks {
|
||||
|
||||
template<class Mutex>
|
||||
class test_sink : public base_sink<Mutex>
|
||||
{
|
||||
public:
|
||||
size_t msg_counter()
|
||||
{
|
||||
return msg_counter_;
|
||||
}
|
||||
|
||||
size_t flushed_msg_counter()
|
||||
{
|
||||
return flushed_msg_counter_;
|
||||
}
|
||||
|
||||
protected:
|
||||
void _sink_it(const details::log_msg &) override
|
||||
{
|
||||
msg_counter_++;
|
||||
}
|
||||
|
||||
void _flush() override
|
||||
{
|
||||
flushed_msg_counter_ += msg_counter_;
|
||||
}
|
||||
size_t msg_counter_{0};
|
||||
size_t flushed_msg_counter_{0};
|
||||
};
|
||||
|
||||
using test_sink_mt = test_sink<std::mutex>;
|
||||
using test_sink_st = test_sink<details::null_mutex>;
|
||||
|
||||
} // namespace sinks
|
||||
} // namespace spdlog
|
@ -129,6 +129,7 @@
|
||||
<ClCompile Include="errors.cpp" />
|
||||
<ClCompile Include="file_helper.cpp" />
|
||||
<ClCompile Include="file_log.cpp" />
|
||||
<ClCompile Include="test_async.cpp" />
|
||||
<ClCompile Include="test_misc.cpp" />
|
||||
<ClCompile Include="main.cpp" />
|
||||
<ClCompile Include="registry.cpp" />
|
||||
|
@ -42,6 +42,9 @@
|
||||
<ClCompile Include="test_misc.cpp">
|
||||
<Filter>Source Files</Filter>
|
||||
</ClCompile>
|
||||
<ClCompile Include="test_async.cpp">
|
||||
<Filter>Source Files</Filter>
|
||||
</ClCompile>
|
||||
</ItemGroup>
|
||||
<ItemGroup>
|
||||
<ClInclude Include="includes.h">
|
||||
|
Loading…
Reference in New Issue
Block a user