diff --git a/include/c11log/details/blocking_queue.h b/include/c11log/details/blocking_queue.h index c49d3096..ab3a2dae 100644 --- a/include/c11log/details/blocking_queue.h +++ b/include/c11log/details/blocking_queue.h @@ -1,5 +1,9 @@ #pragma once +// blocking_queue: +// A blocking multi-consumer/multi-producer thread safe queue. +// Has max capacity and supports timeout on push or pop operations. + #include #include #include @@ -8,57 +12,91 @@ #include namespace c11log { -namespace details { - +namespace details +{ template class blocking_queue { public: - explicit blocking_queue(std::size_t max_size) :_max_size(max_size), _q() + using queue_t = std::queue; + using size_type = typename queue_t::size_type; + using clock = std::chrono::system_clock; + + explicit blocking_queue(size_type max_size) :_max_size(max_size), _q() {} blocking_queue(const blocking_queue&) = delete; blocking_queue& operator=(const blocking_queue&) = delete; + blocking_queue& operator=(const blocking_queue&) volatile = delete; ~blocking_queue() = default; - std::size_t size() + size_type size() { std::lock_guard lock(_mutex); return _q.size(); } - bool push(const T& item, const std::chrono::milliseconds& timeout) + + // Push copy of item into the back of the queue. + // If queue is full, block the calling thread util there is room or timeout have passed. + // Return: false on timeout, true on successful push. + template + bool push(const T& item, const std::chrono::duration& timeout) { std::unique_lock ul(_mutex); - if (_q.size() >= _max_size) { - if (_item_popped_cond.wait_for(ul, timeout) == std::cv_status::timeout || _q.size() >= _max_size) + if (_q.size() >= _max_size) + { + if (!_item_popped_cond.wait_until(ul, clock::now() + timeout, [this]() { return this->_q.size() < this->_max_size; })) return false; } - _q.push(item); - if (_q.size() <= 1) - _item_pushed_cond.notify_all(); - + if (_q.size() == 1) + { + ul.unlock(); + _item_pushed_cond.notify_one(); + } return true; } - bool pop(T& item, const std::chrono::milliseconds& timeout) + + // Push copy of item into the back of the queue. + // If queue is full, block the calling thread until there is room + void push(const T& item) + { + while (!push(item, std::chrono::hours::max())); + } + + // Pop a copy of the front item in the queue into the given item ref + // If queue is empty, block the calling thread util there is item to pop or timeout have passed. + // Return: false on timeout , true on successful pop + template + bool pop(T& item, const std::chrono::duration& timeout) { std::unique_lock ul(_mutex); - if (_q.empty()) { - if (_item_pushed_cond.wait_for(ul, timeout) == std::cv_status::timeout || _q.empty()) + if (_q.empty()) + { + if (!_item_pushed_cond.wait_until(ul, clock::now() + timeout, [this]() { return !this->_q.empty(); })) return false; } item = _q.front(); _q.pop(); if (_q.size() >= _max_size - 1) - _item_popped_cond.notify_all(); - + { + ul.unlock(); + _item_popped_cond.notify_one(); + } return true; } + + // Pop a copy of the front item in the queue into the given item ref + // If queue is empty, block the calling thread util there is item to pop. + void pop(T& item) + { + while (!pop(item, std::chrono::hours::max())); + } + private: - std::size_t _max_size; + size_type _max_size; std::queue _q; std::mutex _mutex; std::condition_variable _item_pushed_cond; std::condition_variable _item_popped_cond; }; - } } \ No newline at end of file