1/*
2 * Copyright (C) 2015-2017 Apple Inc. All rights reserved.
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions
6 * are met:
7 * 1. Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * 2. Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
12 *
13 * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
14 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
15 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
16 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
17 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
18 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
19 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
20 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
21 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
22 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
23 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
24 */
25
26#include "config.h"
27#include <wtf/ParallelHelperPool.h>
28
29#include <wtf/AutomaticThread.h>
30#include <wtf/DataLog.h>
31#include <wtf/StringPrintStream.h>
32
33namespace WTF {
34
35ParallelHelperClient::ParallelHelperClient(RefPtr<ParallelHelperPool>&& pool)
36 : m_pool(WTFMove(pool))
37{
38 LockHolder locker(*m_pool->m_lock);
39 RELEASE_ASSERT(!m_pool->m_isDying);
40 m_pool->m_clients.append(this);
41}
42
43ParallelHelperClient::~ParallelHelperClient()
44{
45 LockHolder locker(*m_pool->m_lock);
46 finish(locker);
47
48 for (size_t i = 0; i < m_pool->m_clients.size(); ++i) {
49 if (m_pool->m_clients[i] == this) {
50 m_pool->m_clients[i] = m_pool->m_clients.last();
51 m_pool->m_clients.removeLast();
52 break;
53 }
54 }
55}
56
57void ParallelHelperClient::setTask(RefPtr<SharedTask<void ()>>&& task)
58{
59 LockHolder locker(*m_pool->m_lock);
60 RELEASE_ASSERT(!m_task);
61 m_task = WTFMove(task);
62 m_pool->didMakeWorkAvailable(locker);
63}
64
65void ParallelHelperClient::finish()
66{
67 LockHolder locker(*m_pool->m_lock);
68 finish(locker);
69}
70
71void ParallelHelperClient::doSomeHelping()
72{
73 RefPtr<SharedTask<void ()>> task;
74 {
75 LockHolder locker(*m_pool->m_lock);
76 task = claimTask(locker);
77 if (!task)
78 return;
79 }
80
81 runTask(task);
82}
83
84void ParallelHelperClient::runTaskInParallel(RefPtr<SharedTask<void ()>>&& task)
85{
86 setTask(WTFMove(task));
87 doSomeHelping();
88 finish();
89}
90
91void ParallelHelperClient::finish(const AbstractLocker&)
92{
93 m_task = nullptr;
94 while (m_numActive)
95 m_pool->m_workCompleteCondition.wait(*m_pool->m_lock);
96}
97
98RefPtr<SharedTask<void ()>> ParallelHelperClient::claimTask(const AbstractLocker&)
99{
100 if (!m_task)
101 return nullptr;
102
103 m_numActive++;
104 return m_task;
105}
106
107void ParallelHelperClient::runTask(const RefPtr<SharedTask<void ()>>& task)
108{
109 RELEASE_ASSERT(m_numActive);
110 RELEASE_ASSERT(task);
111
112 task->run();
113
114 {
115 LockHolder locker(*m_pool->m_lock);
116 RELEASE_ASSERT(m_numActive);
117 // No new task could have been installed, since we were still active.
118 RELEASE_ASSERT(!m_task || m_task == task);
119 m_task = nullptr;
120 m_numActive--;
121 if (!m_numActive)
122 m_pool->m_workCompleteCondition.notifyAll();
123 }
124}
125
126ParallelHelperPool::ParallelHelperPool(CString&& threadName)
127 : m_lock(Box<Lock>::create())
128 , m_workAvailableCondition(AutomaticThreadCondition::create())
129 , m_threadName(WTFMove(threadName))
130{
131}
132
133ParallelHelperPool::~ParallelHelperPool()
134{
135 RELEASE_ASSERT(m_clients.isEmpty());
136
137 {
138 LockHolder locker(*m_lock);
139 m_isDying = true;
140 m_workAvailableCondition->notifyAll(locker);
141 }
142
143 for (RefPtr<AutomaticThread>& thread : m_threads)
144 thread->join();
145}
146
147void ParallelHelperPool::ensureThreads(unsigned numThreads)
148{
149 LockHolder locker(*m_lock);
150 if (numThreads < m_numThreads)
151 return;
152 m_numThreads = numThreads;
153 if (getClientWithTask(locker))
154 didMakeWorkAvailable(locker);
155}
156
157void ParallelHelperPool::doSomeHelping()
158{
159 ParallelHelperClient* client;
160 RefPtr<SharedTask<void ()>> task;
161 {
162 LockHolder locker(*m_lock);
163 client = getClientWithTask(locker);
164 if (!client)
165 return;
166 task = client->claimTask(locker);
167 }
168
169 client->runTask(task);
170}
171
172class ParallelHelperPool::Thread : public AutomaticThread {
173public:
174 Thread(const AbstractLocker& locker, ParallelHelperPool& pool)
175 : AutomaticThread(locker, pool.m_lock, pool.m_workAvailableCondition.copyRef())
176 , m_pool(pool)
177 {
178 }
179
180 const char* name() const override
181 {
182 return m_pool.m_threadName.data();
183 }
184
185protected:
186 PollResult poll(const AbstractLocker& locker) override
187 {
188 if (m_pool.m_isDying)
189 return PollResult::Stop;
190 m_client = m_pool.getClientWithTask(locker);
191 if (m_client) {
192 m_task = m_client->claimTask(locker);
193 return PollResult::Work;
194 }
195 return PollResult::Wait;
196 }
197
198 WorkResult work() override
199 {
200 m_client->runTask(m_task);
201 m_client = nullptr;
202 m_task = nullptr;
203 return WorkResult::Continue;
204 }
205
206private:
207 ParallelHelperPool& m_pool;
208 ParallelHelperClient* m_client { nullptr };
209 RefPtr<SharedTask<void ()>> m_task;
210};
211
212void ParallelHelperPool::didMakeWorkAvailable(const AbstractLocker& locker)
213{
214 while (m_numThreads > m_threads.size())
215 m_threads.append(adoptRef(new Thread(locker, *this)));
216 m_workAvailableCondition->notifyAll(locker);
217}
218
219bool ParallelHelperPool::hasClientWithTask(const AbstractLocker& locker)
220{
221 return !!getClientWithTask(locker);
222}
223
224ParallelHelperClient* ParallelHelperPool::getClientWithTask(const AbstractLocker&)
225{
226 // We load-balance by being random.
227 unsigned startIndex = m_random.getUint32(m_clients.size());
228 for (unsigned index = startIndex; index < m_clients.size(); ++index) {
229 ParallelHelperClient* client = m_clients[index];
230 if (client->m_task)
231 return client;
232 }
233 for (unsigned index = 0; index < startIndex; ++index) {
234 ParallelHelperClient* client = m_clients[index];
235 if (client->m_task)
236 return client;
237 }
238
239 return nullptr;
240}
241
242} // namespace WTF
243
244