1/*
2 * Copyright (C) 2015-2016 Apple Inc. All rights reserved.
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions
6 * are met:
7 * 1. Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * 2. Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
12 *
13 * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
14 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
15 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
16 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
17 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
18 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
19 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
20 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
21 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
22 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
23 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
24 */
25
26#include "config.h"
27#include <wtf/ParkingLot.h>
28
29#include <condition_variable>
30#include <mutex>
31#include <thread>
32#include <wtf/DataLog.h>
33#include <wtf/HashFunctions.h>
34#include <wtf/StringPrintStream.h>
35#include <wtf/ThreadSpecific.h>
36#include <wtf/Threading.h>
37#include <wtf/Vector.h>
38#include <wtf/WeakRandom.h>
39#include <wtf/WordLock.h>
40
41namespace WTF {
42
43namespace {
44
45const bool verbose = false;
46
47struct ThreadData : public ThreadSafeRefCounted<ThreadData> {
48 WTF_MAKE_FAST_ALLOCATED;
49public:
50
51 ThreadData();
52 ~ThreadData();
53
54 Ref<Thread> thread;
55
56 Mutex parkingLock;
57 ThreadCondition parkingCondition;
58
59 const void* address { nullptr };
60
61 ThreadData* nextInQueue { nullptr };
62
63 intptr_t token { 0 };
64};
65
66enum class DequeueResult {
67 Ignore,
68 RemoveAndContinue,
69 RemoveAndStop
70};
71
72struct Bucket {
73 WTF_MAKE_FAST_ALLOCATED;
74public:
75 Bucket()
76 : random(static_cast<unsigned>(bitwise_cast<intptr_t>(this))) // Cannot use default seed since that recurses into Lock.
77 {
78 }
79
80 void enqueue(ThreadData* data)
81 {
82 if (verbose)
83 dataLog(toString(Thread::current(), ": enqueueing ", RawPointer(data), " with address = ", RawPointer(data->address), " onto ", RawPointer(this), "\n"));
84 ASSERT(data->address);
85 ASSERT(!data->nextInQueue);
86
87 if (queueTail) {
88 queueTail->nextInQueue = data;
89 queueTail = data;
90 return;
91 }
92
93 queueHead = data;
94 queueTail = data;
95 }
96
97 template<typename Functor>
98 void genericDequeue(const Functor& functor)
99 {
100 if (verbose)
101 dataLog(toString(Thread::current(), ": dequeueing from bucket at ", RawPointer(this), "\n"));
102
103 if (!queueHead) {
104 if (verbose)
105 dataLog(toString(Thread::current(), ": empty.\n"));
106 return;
107 }
108
109 // This loop is a very clever abomination. The induction variables are the pointer to the
110 // pointer to the current node, and the pointer to the previous node. This gives us everything
111 // we need to both proceed forward to the next node, and to remove nodes while maintaining the
112 // queueHead/queueTail and all of the nextInQueue links. For example, when we are at the head
113 // element, then removal means rewiring queueHead, and if it was also equal to queueTail, then
114 // we'd want queueTail to be set to nullptr. This works because:
115 //
116 // currentPtr == &queueHead
117 // previous == nullptr
118 //
119 // We remove by setting *currentPtr = (*currentPtr)->nextInQueue, i.e. changing the pointer
120 // that used to point to this node to instead point to this node's successor. Another example:
121 // if we were at the second node in the queue, then we'd have:
122 //
123 // currentPtr == &queueHead->nextInQueue
124 // previous == queueHead
125 //
126 // If this node is not equal to queueTail, then removing it simply means making
127 // queueHead->nextInQueue point to queueHead->nextInQueue->nextInQueue (which the algorithm
128 // achieves by mutating *currentPtr). If this node is equal to queueTail, then we want to set
129 // queueTail to previous, which in this case is queueHead - thus making the queue look like a
130 // proper one-element queue with queueHead == queueTail.
131 bool shouldContinue = true;
132 ThreadData** currentPtr = &queueHead;
133 ThreadData* previous = nullptr;
134
135 MonotonicTime time = MonotonicTime::now();
136 bool timeToBeFair = false;
137 if (time > nextFairTime)
138 timeToBeFair = true;
139
140 bool didDequeue = false;
141
142 while (shouldContinue) {
143 ThreadData* current = *currentPtr;
144 if (verbose)
145 dataLog(toString(Thread::current(), ": got thread ", RawPointer(current), "\n"));
146 if (!current)
147 break;
148 DequeueResult result = functor(current, timeToBeFair);
149 switch (result) {
150 case DequeueResult::Ignore:
151 if (verbose)
152 dataLog(toString(Thread::current(), ": currentPtr = ", RawPointer(currentPtr), ", *currentPtr = ", RawPointer(*currentPtr), "\n"));
153 previous = current;
154 currentPtr = &(*currentPtr)->nextInQueue;
155 break;
156 case DequeueResult::RemoveAndStop:
157 shouldContinue = false;
158 FALLTHROUGH;
159 case DequeueResult::RemoveAndContinue:
160 if (verbose)
161 dataLog(toString(Thread::current(), ": dequeueing ", RawPointer(current), " from ", RawPointer(this), "\n"));
162 if (current == queueTail)
163 queueTail = previous;
164 didDequeue = true;
165 *currentPtr = current->nextInQueue;
166 current->nextInQueue = nullptr;
167 break;
168 }
169 }
170
171 if (timeToBeFair && didDequeue)
172 nextFairTime = time + Seconds::fromMilliseconds(random.get());
173
174 ASSERT(!!queueHead == !!queueTail);
175 }
176
177 ThreadData* dequeue()
178 {
179 ThreadData* result = nullptr;
180 genericDequeue(
181 [&] (ThreadData* element, bool) -> DequeueResult {
182 result = element;
183 return DequeueResult::RemoveAndStop;
184 });
185 return result;
186 }
187
188 ThreadData* queueHead { nullptr };
189 ThreadData* queueTail { nullptr };
190
191 // This lock protects the entire bucket. Thou shall not make changes to Bucket without holding
192 // this lock.
193 WordLock lock;
194
195 MonotonicTime nextFairTime;
196
197 WeakRandom random;
198
199 // Put some distane between buckets in memory. This is one of several mitigations against false
200 // sharing.
201 char padding[64];
202};
203
204struct Hashtable;
205
206// We track all allocated hashtables so that hashtable resizing doesn't anger leak detectors.
207Vector<Hashtable*>* hashtables;
208WordLock hashtablesLock;
209
210struct Hashtable {
211 unsigned size;
212 Atomic<Bucket*> data[1];
213
214 static Hashtable* create(unsigned size)
215 {
216 ASSERT(size >= 1);
217
218 Hashtable* result = static_cast<Hashtable*>(
219 fastZeroedMalloc(sizeof(Hashtable) + sizeof(Atomic<Bucket*>) * (size - 1)));
220 result->size = size;
221
222 {
223 // This is not fast and it's not data-access parallel, but that's fine, because
224 // hashtable resizing is guaranteed to be rare and it will never happen in steady
225 // state.
226 WordLockHolder locker(hashtablesLock);
227 if (!hashtables)
228 hashtables = new Vector<Hashtable*>();
229 hashtables->append(result);
230 }
231
232 return result;
233 }
234
235 static void destroy(Hashtable* hashtable)
236 {
237 {
238 // This is not fast, but that's OK. See comment in create().
239 WordLockHolder locker(hashtablesLock);
240 hashtables->removeFirst(hashtable);
241 }
242
243 fastFree(hashtable);
244 }
245};
246
247Atomic<Hashtable*> hashtable;
248Atomic<unsigned> numThreads;
249
250// With 64 bytes of padding per bucket, assuming a hashtable is fully populated with buckets, the
251// memory usage per thread will still be less than 1KB.
252const unsigned maxLoadFactor = 3;
253
254const unsigned growthFactor = 2;
255
256unsigned hashAddress(const void* address)
257{
258 return WTF::PtrHash<const void*>::hash(address);
259}
260
261Hashtable* ensureHashtable()
262{
263 for (;;) {
264 Hashtable* currentHashtable = hashtable.load();
265
266 if (currentHashtable)
267 return currentHashtable;
268
269 if (!currentHashtable) {
270 currentHashtable = Hashtable::create(maxLoadFactor);
271 if (hashtable.compareExchangeWeak(nullptr, currentHashtable)) {
272 if (verbose)
273 dataLog(toString(Thread::current(), ": created initial hashtable ", RawPointer(currentHashtable), "\n"));
274 return currentHashtable;
275 }
276
277 Hashtable::destroy(currentHashtable);
278 }
279 }
280}
281
282// Locks the hashtable. This reloops in case of rehashing, so the current hashtable may be different
283// after this returns than when you called it. Guarantees that there is a hashtable. This is pretty
284// slow and not scalable, so it's only used during thread creation and for debugging/testing.
285Vector<Bucket*> lockHashtable()
286{
287 for (;;) {
288 Hashtable* currentHashtable = ensureHashtable();
289
290 ASSERT(currentHashtable);
291
292 // Now find all of the buckets. This makes sure that the hashtable is full of buckets so that
293 // we can lock all of the buckets, not just the ones that are materialized.
294 Vector<Bucket*> buckets;
295 for (unsigned i = currentHashtable->size; i--;) {
296 Atomic<Bucket*>& bucketPointer = currentHashtable->data[i];
297
298 for (;;) {
299 Bucket* bucket = bucketPointer.load();
300
301 if (!bucket) {
302 bucket = new Bucket();
303 if (!bucketPointer.compareExchangeWeak(nullptr, bucket)) {
304 delete bucket;
305 continue;
306 }
307 }
308
309 buckets.append(bucket);
310 break;
311 }
312 }
313
314 // Now lock the buckets in the right order.
315 std::sort(buckets.begin(), buckets.end());
316 for (Bucket* bucket : buckets)
317 bucket->lock.lock();
318
319 // If the hashtable didn't change (wasn't rehashed) while we were locking it, then we own it
320 // now.
321 if (hashtable.load() == currentHashtable)
322 return buckets;
323
324 // The hashtable rehashed. Unlock everything and try again.
325 for (Bucket* bucket : buckets)
326 bucket->lock.unlock();
327 }
328}
329
330void unlockHashtable(const Vector<Bucket*>& buckets)
331{
332 for (Bucket* bucket : buckets)
333 bucket->lock.unlock();
334}
335
336// Rehash the hashtable to handle numThreads threads.
337void ensureHashtableSize(unsigned numThreads)
338{
339 // We try to ensure that the size of the hashtable used for thread queues is always large enough
340 // to avoid collisions. So, since we started a new thread, we may need to increase the size of the
341 // hashtable. This does just that. Note that we never free the old spine, since we never lock
342 // around spine accesses (i.e. the "hashtable" global variable).
343
344 // First do a fast check to see if rehashing is needed.
345 Hashtable* oldHashtable = hashtable.load();
346 if (oldHashtable && static_cast<double>(oldHashtable->size) / static_cast<double>(numThreads) >= maxLoadFactor) {
347 if (verbose)
348 dataLog(toString(Thread::current(), ": no need to rehash because ", oldHashtable->size, " / ", numThreads, " >= ", maxLoadFactor, "\n"));
349 return;
350 }
351
352 // Seems like we *might* have to rehash, so lock the hashtable and try again.
353 Vector<Bucket*> bucketsToUnlock = lockHashtable();
354
355 // Check again, since the hashtable could have rehashed while we were locking it. Also,
356 // lockHashtable() creates an initial hashtable for us.
357 oldHashtable = hashtable.load();
358 if (oldHashtable && static_cast<double>(oldHashtable->size) / static_cast<double>(numThreads) >= maxLoadFactor) {
359 if (verbose)
360 dataLog(toString(Thread::current(), ": after locking, no need to rehash because ", oldHashtable->size, " / ", numThreads, " >= ", maxLoadFactor, "\n"));
361 unlockHashtable(bucketsToUnlock);
362 return;
363 }
364
365 Vector<Bucket*> reusableBuckets = bucketsToUnlock;
366
367 // OK, now we resize. First we gather all thread datas from the old hashtable. These thread datas
368 // are placed into the vector in queue order.
369 Vector<ThreadData*> threadDatas;
370 for (Bucket* bucket : reusableBuckets) {
371 while (ThreadData* threadData = bucket->dequeue())
372 threadDatas.append(threadData);
373 }
374
375 unsigned newSize = numThreads * growthFactor * maxLoadFactor;
376 RELEASE_ASSERT(newSize > oldHashtable->size);
377
378 Hashtable* newHashtable = Hashtable::create(newSize);
379 if (verbose)
380 dataLog(toString(Thread::current(), ": created new hashtable: ", RawPointer(newHashtable), "\n"));
381 for (ThreadData* threadData : threadDatas) {
382 if (verbose)
383 dataLog(toString(Thread::current(), ": rehashing thread data ", RawPointer(threadData), " with address = ", RawPointer(threadData->address), "\n"));
384 unsigned hash = hashAddress(threadData->address);
385 unsigned index = hash % newHashtable->size;
386 if (verbose)
387 dataLog(toString(Thread::current(), ": index = ", index, "\n"));
388 Bucket* bucket = newHashtable->data[index].load();
389 if (!bucket) {
390 if (reusableBuckets.isEmpty())
391 bucket = new Bucket();
392 else
393 bucket = reusableBuckets.takeLast();
394 newHashtable->data[index].store(bucket);
395 }
396
397 bucket->enqueue(threadData);
398 }
399
400 // At this point there may be some buckets left unreused. This could easily happen if the
401 // number of enqueued threads right now is low but the high watermark of the number of threads
402 // enqueued was high. We place these buckets into the hashtable basically at random, just to
403 // make sure we don't leak them.
404 for (unsigned i = 0; i < newHashtable->size && !reusableBuckets.isEmpty(); ++i) {
405 Atomic<Bucket*>& bucketPtr = newHashtable->data[i];
406 if (bucketPtr.load())
407 continue;
408 bucketPtr.store(reusableBuckets.takeLast());
409 }
410
411 // Since we increased the size of the hashtable, we should have exhausted our preallocated
412 // buckets by now.
413 ASSERT(reusableBuckets.isEmpty());
414
415 // OK, right now the old hashtable is locked up and the new hashtable is ready to rock and
416 // roll. After we install the new hashtable, we can release all bucket locks.
417
418 bool result = hashtable.compareExchangeStrong(oldHashtable, newHashtable) == oldHashtable;
419 RELEASE_ASSERT(result);
420
421 unlockHashtable(bucketsToUnlock);
422}
423
424ThreadData::ThreadData()
425 : thread(Thread::current())
426{
427 unsigned currentNumThreads;
428 for (;;) {
429 unsigned oldNumThreads = numThreads.load();
430 currentNumThreads = oldNumThreads + 1;
431 if (numThreads.compareExchangeWeak(oldNumThreads, currentNumThreads))
432 break;
433 }
434
435 ensureHashtableSize(currentNumThreads);
436}
437
438ThreadData::~ThreadData()
439{
440 for (;;) {
441 unsigned oldNumThreads = numThreads.load();
442 if (numThreads.compareExchangeWeak(oldNumThreads, oldNumThreads - 1))
443 break;
444 }
445}
446
447ThreadData* myThreadData()
448{
449 static ThreadSpecific<RefPtr<ThreadData>, CanBeGCThread::True>* threadData;
450 static std::once_flag initializeOnce;
451 std::call_once(
452 initializeOnce,
453 [] {
454 threadData = new ThreadSpecific<RefPtr<ThreadData>, CanBeGCThread::True>();
455 });
456
457 RefPtr<ThreadData>& result = **threadData;
458
459 if (!result)
460 result = adoptRef(new ThreadData());
461
462 return result.get();
463}
464
465template<typename Functor>
466bool enqueue(const void* address, const Functor& functor)
467{
468 unsigned hash = hashAddress(address);
469
470 for (;;) {
471 Hashtable* myHashtable = ensureHashtable();
472 unsigned index = hash % myHashtable->size;
473 Atomic<Bucket*>& bucketPointer = myHashtable->data[index];
474 Bucket* bucket;
475 for (;;) {
476 bucket = bucketPointer.load();
477 if (!bucket) {
478 bucket = new Bucket();
479 if (!bucketPointer.compareExchangeWeak(nullptr, bucket)) {
480 delete bucket;
481 continue;
482 }
483 }
484 break;
485 }
486 if (verbose)
487 dataLog(toString(Thread::current(), ": enqueueing onto bucket ", RawPointer(bucket), " with index ", index, " for address ", RawPointer(address), " with hash ", hash, "\n"));
488 bucket->lock.lock();
489
490 // At this point the hashtable could have rehashed under us.
491 if (hashtable.load() != myHashtable) {
492 bucket->lock.unlock();
493 continue;
494 }
495
496 ThreadData* threadData = functor();
497 bool result;
498 if (threadData) {
499 if (verbose)
500 dataLog(toString(Thread::current(), ": proceeding to enqueue ", RawPointer(threadData), "\n"));
501 bucket->enqueue(threadData);
502 result = true;
503 } else
504 result = false;
505 bucket->lock.unlock();
506 return result;
507 }
508}
509
510enum class BucketMode {
511 EnsureNonEmpty,
512 IgnoreEmpty
513};
514
515template<typename DequeueFunctor, typename FinishFunctor>
516bool dequeue(
517 const void* address, BucketMode bucketMode, const DequeueFunctor& dequeueFunctor,
518 const FinishFunctor& finishFunctor)
519{
520 unsigned hash = hashAddress(address);
521
522 for (;;) {
523 Hashtable* myHashtable = ensureHashtable();
524 unsigned index = hash % myHashtable->size;
525 Atomic<Bucket*>& bucketPointer = myHashtable->data[index];
526 Bucket* bucket = bucketPointer.load();
527 if (!bucket) {
528 if (bucketMode == BucketMode::IgnoreEmpty)
529 return false;
530
531 for (;;) {
532 bucket = bucketPointer.load();
533 if (!bucket) {
534 bucket = new Bucket();
535 if (!bucketPointer.compareExchangeWeak(nullptr, bucket)) {
536 delete bucket;
537 continue;
538 }
539 }
540 break;
541 }
542 }
543
544 bucket->lock.lock();
545
546 // At this point the hashtable could have rehashed under us.
547 if (hashtable.load() != myHashtable) {
548 bucket->lock.unlock();
549 continue;
550 }
551
552 bucket->genericDequeue(dequeueFunctor);
553 bool result = !!bucket->queueHead;
554 finishFunctor(result);
555 bucket->lock.unlock();
556 return result;
557 }
558}
559
560} // anonymous namespace
561
562NEVER_INLINE ParkingLot::ParkResult ParkingLot::parkConditionallyImpl(
563 const void* address,
564 const ScopedLambda<bool()>& validation,
565 const ScopedLambda<void()>& beforeSleep,
566 const TimeWithDynamicClockType& timeout)
567{
568 if (verbose)
569 dataLog(toString(Thread::current(), ": parking.\n"));
570
571 ThreadData* me = myThreadData();
572 me->token = 0;
573
574 // Guard against someone calling parkConditionally() recursively from beforeSleep().
575 RELEASE_ASSERT(!me->address);
576
577 bool enqueueResult = enqueue(
578 address,
579 [&] () -> ThreadData* {
580 if (!validation())
581 return nullptr;
582
583 me->address = address;
584 return me;
585 });
586
587 if (!enqueueResult)
588 return ParkResult();
589
590 beforeSleep();
591
592 bool didGetDequeued;
593 {
594 MutexLocker locker(me->parkingLock);
595 while (me->address && timeout.nowWithSameClock() < timeout) {
596 me->parkingCondition.timedWait(
597 me->parkingLock, timeout.approximateWallTime());
598
599 // It's possible for the OS to decide not to wait. If it does that then it will also
600 // decide not to release the lock. If there's a bug in the time math, then this could
601 // result in a deadlock. Flashing the lock means that at worst it's just a CPU-eating
602 // spin.
603 me->parkingLock.unlock();
604 me->parkingLock.lock();
605 }
606 ASSERT(!me->address || me->address == address);
607 didGetDequeued = !me->address;
608 }
609
610 if (didGetDequeued) {
611 // Great! We actually got dequeued rather than the timeout expiring.
612 ParkResult result;
613 result.wasUnparked = true;
614 result.token = me->token;
615 return result;
616 }
617
618 // Have to remove ourselves from the queue since we timed out and nobody has dequeued us yet.
619
620 bool didDequeue = false;
621 dequeue(
622 address, BucketMode::IgnoreEmpty,
623 [&] (ThreadData* element, bool) {
624 if (element == me) {
625 didDequeue = true;
626 return DequeueResult::RemoveAndStop;
627 }
628 return DequeueResult::Ignore;
629 },
630 [] (bool) { });
631
632 // If didDequeue is true, then we dequeued ourselves. This means that we were not unparked.
633 // If didDequeue is false, then someone unparked us.
634
635 RELEASE_ASSERT(!me->nextInQueue);
636
637 // Make sure that no matter what, me->address is null after this point.
638 {
639 MutexLocker locker(me->parkingLock);
640 if (!didDequeue) {
641 // If we did not dequeue ourselves, then someone else did. They will set our address to
642 // null. We don't want to proceed until they do this, because otherwise, they may set
643 // our address to null in some distant future when we're already trying to wait for
644 // other things.
645 while (me->address)
646 me->parkingCondition.wait(me->parkingLock);
647 }
648 me->address = nullptr;
649 }
650
651 ParkResult result;
652 result.wasUnparked = !didDequeue;
653 if (!didDequeue) {
654 // If we were unparked then there should be a token.
655 result.token = me->token;
656 }
657 return result;
658}
659
660NEVER_INLINE ParkingLot::UnparkResult ParkingLot::unparkOne(const void* address)
661{
662 if (verbose)
663 dataLog(toString(Thread::current(), ": unparking one.\n"));
664
665 UnparkResult result;
666
667 RefPtr<ThreadData> threadData;
668 result.mayHaveMoreThreads = dequeue(
669 address,
670 // Why is this here?
671 // FIXME: It seems like this could be IgnoreEmpty, but I switched this to EnsureNonEmpty
672 // without explanation in r199760. We need it to use EnsureNonEmpty if we need to perform
673 // some operation while holding the bucket lock, which usually goes into the finish func.
674 // But if that operation is a no-op, then it's not clear why we need this.
675 BucketMode::EnsureNonEmpty,
676 [&] (ThreadData* element, bool) {
677 if (element->address != address)
678 return DequeueResult::Ignore;
679 threadData = element;
680 result.didUnparkThread = true;
681 return DequeueResult::RemoveAndStop;
682 },
683 [] (bool) { });
684
685 if (!threadData) {
686 ASSERT(!result.didUnparkThread);
687 result.mayHaveMoreThreads = false;
688 return result;
689 }
690
691 ASSERT(threadData->address);
692
693 {
694 MutexLocker locker(threadData->parkingLock);
695 threadData->address = nullptr;
696 threadData->token = 0;
697 }
698 threadData->parkingCondition.signal();
699
700 return result;
701}
702
703NEVER_INLINE void ParkingLot::unparkOneImpl(
704 const void* address,
705 const ScopedLambda<intptr_t(ParkingLot::UnparkResult)>& callback)
706{
707 if (verbose)
708 dataLog(toString(Thread::current(), ": unparking one the hard way.\n"));
709
710 RefPtr<ThreadData> threadData;
711 bool timeToBeFair = false;
712 dequeue(
713 address,
714 BucketMode::EnsureNonEmpty,
715 [&] (ThreadData* element, bool passedTimeToBeFair) {
716 if (element->address != address)
717 return DequeueResult::Ignore;
718 threadData = element;
719 timeToBeFair = passedTimeToBeFair;
720 return DequeueResult::RemoveAndStop;
721 },
722 [&] (bool mayHaveMoreThreads) {
723 UnparkResult result;
724 result.didUnparkThread = !!threadData;
725 result.mayHaveMoreThreads = result.didUnparkThread && mayHaveMoreThreads;
726 if (timeToBeFair)
727 RELEASE_ASSERT(threadData);
728 result.timeToBeFair = timeToBeFair;
729 intptr_t token = callback(result);
730 if (threadData)
731 threadData->token = token;
732 });
733
734 if (!threadData)
735 return;
736
737 ASSERT(threadData->address);
738
739 {
740 MutexLocker locker(threadData->parkingLock);
741 threadData->address = nullptr;
742 }
743 // At this point, the threadData may die. Good thing we have a RefPtr<> on it.
744 threadData->parkingCondition.signal();
745}
746
747NEVER_INLINE unsigned ParkingLot::unparkCount(const void* address, unsigned count)
748{
749 if (!count)
750 return 0;
751
752 if (verbose)
753 dataLog(toString(Thread::current(), ": unparking count = ", count, " from ", RawPointer(address), ".\n"));
754
755 Vector<RefPtr<ThreadData>, 8> threadDatas;
756 dequeue(
757 address,
758 // FIXME: It seems like this ought to be EnsureNonEmpty if we follow what unparkOne() does,
759 // but that seems wrong.
760 BucketMode::IgnoreEmpty,
761 [&] (ThreadData* element, bool) {
762 if (verbose)
763 dataLog(toString(Thread::current(), ": Observing element with address = ", RawPointer(element->address), "\n"));
764 if (element->address != address)
765 return DequeueResult::Ignore;
766 threadDatas.append(element);
767 if (threadDatas.size() == count)
768 return DequeueResult::RemoveAndStop;
769 return DequeueResult::RemoveAndContinue;
770 },
771 [] (bool) { });
772
773 for (RefPtr<ThreadData>& threadData : threadDatas) {
774 if (verbose)
775 dataLog(toString(Thread::current(), ": unparking ", RawPointer(threadData.get()), " with address ", RawPointer(threadData->address), "\n"));
776 ASSERT(threadData->address);
777 {
778 MutexLocker locker(threadData->parkingLock);
779 threadData->address = nullptr;
780 }
781 threadData->parkingCondition.signal();
782 }
783
784 if (verbose)
785 dataLog(toString(Thread::current(), ": done unparking.\n"));
786
787 return threadDatas.size();
788}
789
790NEVER_INLINE void ParkingLot::unparkAll(const void* address)
791{
792 unparkCount(address, UINT_MAX);
793}
794
795NEVER_INLINE void ParkingLot::forEachImpl(const ScopedLambda<void(Thread&, const void*)>& callback)
796{
797 Vector<Bucket*> bucketsToUnlock = lockHashtable();
798
799 Hashtable* currentHashtable = hashtable.load();
800 for (unsigned i = currentHashtable->size; i--;) {
801 Bucket* bucket = currentHashtable->data[i].load();
802 if (!bucket)
803 continue;
804 for (ThreadData* currentThreadData = bucket->queueHead; currentThreadData; currentThreadData = currentThreadData->nextInQueue)
805 callback(currentThreadData->thread.get(), currentThreadData->address);
806 }
807
808 unlockHashtable(bucketsToUnlock);
809}
810
811} // namespace WTF
812
813