Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/boostorg/capy
8 : //
9 :
10 : #include <boost/capy/thread_pool.hpp>
11 : #include "work_allocator.hpp"
12 : #include <condition_variable>
13 : #include <mutex>
14 : #include <thread>
15 : #include <vector>
16 :
17 : namespace boost {
18 : namespace capy {
19 :
20 : class thread_pool::impl
21 : {
22 : // Prepended to each work allocation to track metadata
23 : struct header
24 : {
25 : header* next;
26 : std::size_t size;
27 : std::size_t align;
28 : };
29 :
30 : std::mutex mutex_;
31 : std::condition_variable cv_;
32 : header* head_;
33 : header* tail_;
34 : std::vector<std::thread> threads_;
35 : work_allocator arena_;
36 : bool stop_;
37 :
38 : static header*
39 38 : to_header(void* p) noexcept
40 : {
41 38 : return static_cast<header*>(p) - 1;
42 : }
43 :
44 : static void*
45 76 : from_header(header* h) noexcept
46 : {
47 76 : return h + 1;
48 : }
49 :
50 : public:
51 12 : ~impl()
52 : {
53 : {
54 12 : std::lock_guard<std::mutex> lock(mutex_);
55 12 : stop_ = true;
56 12 : }
57 12 : cv_.notify_all();
58 :
59 36 : for(auto& t : threads_)
60 24 : t.join();
61 :
62 : // Drain remaining work (no lock needed, threads are joined)
63 12 : while(head_)
64 : {
65 0 : header* h = head_;
66 0 : head_ = head_->next;
67 0 : auto* w = static_cast<executor::work*>(from_header(h));
68 0 : w->~work();
69 0 : arena_.deallocate(h, h->size, h->align);
70 : }
71 12 : }
72 :
73 : explicit
74 12 : impl(std::size_t num_threads)
75 24 : : head_(nullptr)
76 12 : , tail_(nullptr)
77 12 : , stop_(false)
78 : {
79 12 : if(num_threads == 0)
80 1 : num_threads = std::thread::hardware_concurrency();
81 12 : if(num_threads == 0)
82 0 : num_threads = 1;
83 :
84 12 : threads_.reserve(num_threads);
85 36 : for(std::size_t i = 0; i < num_threads; ++i)
86 48 : threads_.emplace_back([this]{ run(); });
87 12 : }
88 :
89 : void*
90 38 : allocate(std::size_t size, std::size_t align)
91 : {
92 : // Allocate space for header + work object
93 38 : std::size_t total = sizeof(header) + size;
94 38 : std::lock_guard<std::mutex> lock(mutex_);
95 38 : void* p = arena_.allocate(total, align);
96 38 : auto* h = new(p) header{nullptr, total, align};
97 76 : return from_header(h);
98 38 : }
99 :
100 : void
101 0 : deallocate(void* p, std::size_t, std::size_t) noexcept
102 : {
103 : // Size/align from caller are ignored; we use stored values
104 0 : header* h = to_header(p);
105 0 : std::lock_guard<std::mutex> lock(mutex_);
106 0 : arena_.deallocate(h, h->size, h->align);
107 0 : }
108 :
109 : void
110 38 : submit(executor::work* w)
111 : {
112 38 : header* h = to_header(w);
113 : {
114 38 : std::lock_guard<std::mutex> lock(mutex_);
115 38 : h->next = nullptr;
116 38 : if(tail_)
117 29 : tail_->next = h;
118 : else
119 9 : head_ = h;
120 38 : tail_ = h;
121 38 : }
122 38 : cv_.notify_one();
123 38 : }
124 :
125 : private:
126 : void
127 24 : run()
128 : {
129 : for(;;)
130 : {
131 62 : header* h = nullptr;
132 : {
133 62 : std::unique_lock<std::mutex> lock(mutex_);
134 62 : cv_.wait(lock, [this]{
135 65 : return stop_ || head_ != nullptr;
136 : });
137 :
138 62 : if(stop_ && !head_)
139 48 : return;
140 :
141 38 : h = head_;
142 38 : head_ = head_->next;
143 38 : if(!head_)
144 9 : tail_ = nullptr;
145 62 : }
146 :
147 38 : auto* w = static_cast<executor::work*>(from_header(h));
148 38 : w->invoke();
149 38 : w->~work();
150 :
151 : {
152 38 : std::lock_guard<std::mutex> lock(mutex_);
153 38 : arena_.deallocate(h, h->size, h->align);
154 38 : }
155 38 : }
156 : }
157 : };
158 :
159 : //------------------------------------------------------------------------------
160 :
161 12 : thread_pool::
162 : ~thread_pool()
163 : {
164 12 : delete impl_;
165 12 : }
166 :
167 12 : thread_pool::
168 12 : thread_pool(std::size_t num_threads)
169 12 : : impl_(new impl(num_threads))
170 : {
171 12 : }
172 :
173 : executor
174 14 : thread_pool::
175 : get_executor() noexcept
176 : {
177 14 : return executor(*impl_);
178 : }
179 :
180 : } // capy
181 : } // boost
|