LCOV - code coverage report
Current view: top level - libs/capy/src - thread_pool.cpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 86.4 % 81 70
Test Date: 2025-12-30 20:31:35 Functions: 92.3 % 13 12

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/boostorg/capy
       8              : //
       9              : 
      10              : #include <boost/capy/thread_pool.hpp>
      11              : #include "work_allocator.hpp"
      12              : #include <condition_variable>
      13              : #include <mutex>
      14              : #include <thread>
      15              : #include <vector>
      16              : 
      17              : namespace boost {
      18              : namespace capy {
      19              : 
      20              : class thread_pool::impl
      21              : {
      22              :     // Prepended to each work allocation to track metadata
      23              :     struct header
      24              :     {
      25              :         header* next;
      26              :         std::size_t size;
      27              :         std::size_t align;
      28              :     };
      29              : 
      30              :     std::mutex mutex_;
      31              :     std::condition_variable cv_;
      32              :     header* head_;
      33              :     header* tail_;
      34              :     std::vector<std::thread> threads_;
      35              :     work_allocator arena_;
      36              :     bool stop_;
      37              : 
      38              :     static header*
      39           38 :     to_header(void* p) noexcept
      40              :     {
      41           38 :         return static_cast<header*>(p) - 1;
      42              :     }
      43              : 
      44              :     static void*
      45           76 :     from_header(header* h) noexcept
      46              :     {
      47           76 :         return h + 1;
      48              :     }
      49              : 
      50              : public:
      51           12 :     ~impl()
      52              :     {
      53              :         {
      54           12 :             std::lock_guard<std::mutex> lock(mutex_);
      55           12 :             stop_ = true;
      56           12 :         }
      57           12 :         cv_.notify_all();
      58              : 
      59           36 :         for(auto& t : threads_)
      60           24 :             t.join();
      61              : 
      62              :         // Drain remaining work (no lock needed, threads are joined)
      63           12 :         while(head_)
      64              :         {
      65            0 :             header* h = head_;
      66            0 :             head_ = head_->next;
      67            0 :             auto* w = static_cast<executor::work*>(from_header(h));
      68            0 :             w->~work();
      69            0 :             arena_.deallocate(h, h->size, h->align);
      70              :         }
      71           12 :     }
      72              : 
      73              :     explicit
      74           12 :     impl(std::size_t num_threads)
      75           24 :         : head_(nullptr)
      76           12 :         , tail_(nullptr)
      77           12 :         , stop_(false)
      78              :     {
      79           12 :         if(num_threads == 0)
      80            1 :             num_threads = std::thread::hardware_concurrency();
      81           12 :         if(num_threads == 0)
      82            0 :             num_threads = 1;
      83              : 
      84           12 :         threads_.reserve(num_threads);
      85           36 :         for(std::size_t i = 0; i < num_threads; ++i)
      86           48 :             threads_.emplace_back([this]{ run(); });
      87           12 :     }
      88              : 
      89              :     void*
      90           38 :     allocate(std::size_t size, std::size_t align)
      91              :     {
      92              :         // Allocate space for header + work object
      93           38 :         std::size_t total = sizeof(header) + size;
      94           38 :         std::lock_guard<std::mutex> lock(mutex_);
      95           38 :         void* p = arena_.allocate(total, align);
      96           38 :         auto* h = new(p) header{nullptr, total, align};
      97           76 :         return from_header(h);
      98           38 :     }
      99              : 
     100              :     void
     101            0 :     deallocate(void* p, std::size_t, std::size_t) noexcept
     102              :     {
     103              :         // Size/align from caller are ignored; we use stored values
     104            0 :         header* h = to_header(p);
     105            0 :         std::lock_guard<std::mutex> lock(mutex_);
     106            0 :         arena_.deallocate(h, h->size, h->align);
     107            0 :     }
     108              : 
     109              :     void
     110           38 :     submit(executor::work* w)
     111              :     {
     112           38 :         header* h = to_header(w);
     113              :         {
     114           38 :             std::lock_guard<std::mutex> lock(mutex_);
     115           38 :             h->next = nullptr;
     116           38 :             if(tail_)
     117           29 :                 tail_->next = h;
     118              :             else
     119            9 :                 head_ = h;
     120           38 :             tail_ = h;
     121           38 :         }
     122           38 :         cv_.notify_one();
     123           38 :     }
     124              : 
     125              : private:
     126              :     void
     127           24 :     run()
     128              :     {
     129              :         for(;;)
     130              :         {
     131           62 :             header* h = nullptr;
     132              :             {
     133           62 :                 std::unique_lock<std::mutex> lock(mutex_);
     134           62 :                 cv_.wait(lock, [this]{
     135           65 :                     return stop_ || head_ != nullptr;
     136              :                 });
     137              : 
     138           62 :                 if(stop_ && !head_)
     139           48 :                     return;
     140              : 
     141           38 :                 h = head_;
     142           38 :                 head_ = head_->next;
     143           38 :                 if(!head_)
     144            9 :                     tail_ = nullptr;
     145           62 :             }
     146              : 
     147           38 :             auto* w = static_cast<executor::work*>(from_header(h));
     148           38 :             w->invoke();
     149           38 :             w->~work();
     150              : 
     151              :             {
     152           38 :                 std::lock_guard<std::mutex> lock(mutex_);
     153           38 :                 arena_.deallocate(h, h->size, h->align);
     154           38 :             }
     155           38 :         }
     156              :     }
     157              : };
     158              : 
     159              : //------------------------------------------------------------------------------
     160              : 
     161           12 : thread_pool::
     162              : ~thread_pool()
     163              : {
     164           12 :     delete impl_;
     165           12 : }
     166              : 
     167           12 : thread_pool::
     168           12 : thread_pool(std::size_t num_threads)
     169           12 :     : impl_(new impl(num_threads))
     170              : {
     171           12 : }
     172              : 
     173              : executor
     174           14 : thread_pool::
     175              : get_executor() noexcept
     176              : {
     177           14 :     return executor(*impl_);
     178              : }
     179              : 
     180              : } // capy
     181              : } // boost
        

Generated by: LCOV version 2.1