diff --git a/include/spdlog/details/mpmc_blocking_q.h b/include/spdlog/details/mpmc_blocking_q.h index 785180c1..101ea8c0 100644 --- a/include/spdlog/details/mpmc_blocking_q.h +++ b/include/spdlog/details/mpmc_blocking_q.h @@ -49,7 +49,7 @@ public: push_cv_.notify_one(); } - // try to dequeue item. if no item found. wait up to timeout and try again + // dequeue with a timeout. // Return true, if succeeded dequeue item, false otherwise bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration) { @@ -66,6 +66,18 @@ public: return true; } + // blocking dequeue without a timeout. + void dequeue(T &popped_item) + { + { + std::unique_lock lock(queue_mutex_); + push_cv_.wait(lock, [this] { return !this->q_.empty(); }); + popped_item = std::move(q_.front()); + q_.pop_front(); + } + pop_cv_.notify_one(); + } + #else // apparently mingw deadlocks if the mutex is released before cv.notify_one(), // so release the mutex at the very end each function. @@ -87,7 +99,7 @@ public: push_cv_.notify_one(); } - // try to dequeue item. if no item found. wait up to timeout and try again + // dequeue with a timeout. // Return true, if succeeded dequeue item, false otherwise bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration) { @@ -102,6 +114,16 @@ public: return true; } + // blocking dequeue without a timeout. + void dequeue(T &popped_item) + { + std::unique_lock lock(queue_mutex_); + push_cv_.wait(lock, [this] { return !this->q_.empty(); }); + popped_item = std::move(q_.front()); + q_.pop_front(); + pop_cv_.notify_one(); + } + #endif size_t overrun_counter() diff --git a/include/spdlog/details/thread_pool-inl.h b/include/spdlog/details/thread_pool-inl.h index 369f30fe..dbd424ff 100644 --- a/include/spdlog/details/thread_pool-inl.h +++ b/include/spdlog/details/thread_pool-inl.h @@ -108,11 +108,7 @@ void SPDLOG_INLINE thread_pool::worker_loop_() bool SPDLOG_INLINE thread_pool::process_next_msg_() { async_msg incoming_async_msg; - bool dequeued = q_.dequeue_for(incoming_async_msg, std::chrono::seconds(10)); - if (!dequeued) - { - return true; - } + q_.dequeue(incoming_async_msg); switch (incoming_async_msg.msg_type) { diff --git a/tests/test_mpmc_q.cpp b/tests/test_mpmc_q.cpp index 3b8aec36..1540dcc8 100644 --- a/tests/test_mpmc_q.cpp +++ b/tests/test_mpmc_q.cpp @@ -43,6 +43,26 @@ TEST_CASE("dequeue-empty-wait", "[mpmc_blocking_q]") REQUIRE(delta_ms <= wait_ms + tolerance_wait); } +TEST_CASE("dequeue-full-nowait", "[mpmc_blocking_q]") +{ + spdlog::details::mpmc_blocking_queue q(1); + q.enqueue(42); + + int item = 0; + q.dequeue_for(item, milliseconds::zero()); + REQUIRE(item == 42); +} + +TEST_CASE("dequeue-full-wait", "[mpmc_blocking_q]") +{ + spdlog::details::mpmc_blocking_queue q(1); + q.enqueue(42); + + int item = 0; + q.dequeue(item); + REQUIRE(item == 42); +} + TEST_CASE("enqueue_nowait", "[mpmc_blocking_q]") { @@ -95,12 +115,12 @@ TEST_CASE("full_queue", "[mpmc_blocking_q]") for (int i = 1; i < static_cast(q_size); i++) { int item = -1; - q.dequeue_for(item, milliseconds(0)); + q.dequeue(item); REQUIRE(item == i); } // last item pushed has overridden the oldest. int item = -1; - q.dequeue_for(item, milliseconds(0)); + q.dequeue(item); REQUIRE(item == 123456); }