1/*
2 * Copyright (C) 2015-2016 Apple Inc. All rights reserved.
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions
6 * are met:
7 * 1. Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * 2. Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
12 *
13 * THIS SOFTWARE IS PROVIDED BY APPLE INC. ``AS IS'' AND ANY
14 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
15 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
16 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR
17 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
18 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
19 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
20 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
21 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
22 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
23 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
24 */
25
26#include "config.h"
27#include <wtf/ParkingLot.h>
28
29#include <condition_variable>
30#include <mutex>
31#include <thread>
32#include <wtf/DataLog.h>
33#include <wtf/HashFunctions.h>
34#include <wtf/StringPrintStream.h>
35#include <wtf/ThreadSpecific.h>
36#include <wtf/Threading.h>
37#include <wtf/Vector.h>
38#include <wtf/WeakRandom.h>
39#include <wtf/WordLock.h>
40
41namespace WTF {
42
43namespace {
44
45const bool verbose = false;
46
47struct ThreadData : public ThreadSafeRefCounted<ThreadData> {
48 WTF_MAKE_FAST_ALLOCATED;
49public:
50
51 ThreadData();
52 ~ThreadData();
53
54 Ref<Thread> thread;
55
56 Mutex parkingLock;
57 ThreadCondition parkingCondition;
58
59 const void* address { nullptr };
60
61 ThreadData* nextInQueue { nullptr };
62
63 intptr_t token { 0 };
64};
65
66enum class DequeueResult {
67 Ignore,
68 RemoveAndContinue,
69 RemoveAndStop
70};
71
72struct Bucket {
73 WTF_MAKE_FAST_ALLOCATED;
74public:
75 Bucket()
76 : random(static_cast<unsigned>(bitwise_cast<intptr_t>(this))) // Cannot use default seed since that recurses into Lock.
77 {
78 }
79
80 void enqueue(ThreadData* data)
81 {
82 if (verbose)
83 dataLog(toString(Thread::current(), ": enqueueing ", RawPointer(data), " with address = ", RawPointer(data->address), " onto ", RawPointer(this), "\n"));
84 ASSERT(data->address);
85 ASSERT(!data->nextInQueue);
86
87 if (queueTail) {
88 queueTail->nextInQueue = data;
89 queueTail = data;
90 return;
91 }
92
93 queueHead = data;
94 queueTail = data;
95 }
96
97 template<typename Functor>
98 void genericDequeue(const Functor& functor)
99 {
100 if (verbose)
101 dataLog(toString(Thread::current(), ": dequeueing from bucket at ", RawPointer(this), "\n"));
102
103 if (!queueHead) {
104 if (verbose)
105 dataLog(toString(Thread::current(), ": empty.\n"));
106 return;
107 }
108
109 // This loop is a very clever abomination. The induction variables are the pointer to the
110 // pointer to the current node, and the pointer to the previous node. This gives us everything
111 // we need to both proceed forward to the next node, and to remove nodes while maintaining the
112 // queueHead/queueTail and all of the nextInQueue links. For example, when we are at the head
113 // element, then removal means rewiring queueHead, and if it was also equal to queueTail, then
114 // we'd want queueTail to be set to nullptr. This works because:
115 //
116 // currentPtr == &queueHead
117 // previous == nullptr
118 //
119 // We remove by setting *currentPtr = (*currentPtr)->nextInQueue, i.e. changing the pointer
120 // that used to point to this node to instead point to this node's successor. Another example:
121 // if we were at the second node in the queue, then we'd have:
122 //
123 // currentPtr == &queueHead->nextInQueue
124 // previous == queueHead
125 //
126 // If this node is not equal to queueTail, then removing it simply means making
127 // queueHead->nextInQueue point to queueHead->nextInQueue->nextInQueue (which the algorithm
128 // achieves by mutating *currentPtr). If this node is equal to queueTail, then we want to set
129 // queueTail to previous, which in this case is queueHead - thus making the queue look like a
130 // proper one-element queue with queueHead == queueTail.
131 bool shouldContinue = true;
132 ThreadData** currentPtr = &queueHead;
133 ThreadData* previous = nullptr;
134
135 MonotonicTime time = MonotonicTime::now();
136 bool timeToBeFair = false;
137 if (time > nextFairTime)
138 timeToBeFair = true;
139
140 bool didDequeue = false;
141
142 while (shouldContinue) {
143 ThreadData* current = *currentPtr;
144 if (verbose)
145 dataLog(toString(Thread::current(), ": got thread ", RawPointer(current), "\n"));
146 if (!current)
147 break;
148 DequeueResult result = functor(current, timeToBeFair);
149 switch (result) {
150 case DequeueResult::Ignore:
151 if (verbose)
152 dataLog(toString(Thread::current(), ": currentPtr = ", RawPointer(currentPtr), ", *currentPtr = ", RawPointer(*currentPtr), "\n"));
153 previous = current;
154 currentPtr = &(*currentPtr)->nextInQueue;
155 break;
156 case DequeueResult::RemoveAndStop:
157 shouldContinue = false;
158 FALLTHROUGH;
159 case DequeueResult::RemoveAndContinue:
160 if (verbose)
161 dataLog(toString(Thread::current(), ": dequeueing ", RawPointer(current), " from ", RawPointer(this), "\n"));
162 if (current == queueTail)
163 queueTail = previous;
164 didDequeue = true;
165 *currentPtr = current->nextInQueue;
166 current->nextInQueue = nullptr;
167 break;
168 }
169 }
170
171 if (timeToBeFair && didDequeue)
172 nextFairTime = time + Seconds::fromMilliseconds(random.get());
173
174 ASSERT(!!queueHead == !!queueTail);
175 }
176
177 ThreadData* dequeue()
178 {
179 ThreadData* result = nullptr;
180 genericDequeue(
181 [&] (ThreadData* element, bool) -> DequeueResult {
182 result = element;
183 return DequeueResult::RemoveAndStop;
184 });
185 return result;
186 }
187
188 ThreadData* queueHead { nullptr };
189 ThreadData* queueTail { nullptr };
190
191 // This lock protects the entire bucket. Thou shall not make changes to Bucket without holding
192 // this lock.
193 WordLock lock;
194
195 MonotonicTime nextFairTime;
196
197 WeakRandom random;
198
199 // Put some distane between buckets in memory. This is one of several mitigations against false
200 // sharing.
201 char padding[64];
202};
203
204struct Hashtable;
205
206// We track all allocated hashtables so that hashtable resizing doesn't anger leak detectors.
207Vector<Hashtable*>* hashtables;
208WordLock hashtablesLock;
209
210struct Hashtable {
211 unsigned size;
212 Atomic<Bucket*> data[1];
213
214 static Hashtable* create(unsigned size)
215 {
216 ASSERT(size >= 1);
217
218 Hashtable* result = static_cast<Hashtable*>(
219 fastZeroedMalloc(sizeof(Hashtable) + sizeof(Atomic<Bucket*>) * (size - 1)));
220 result->size = size;
221
222 {
223 // This is not fast and it's not data-access parallel, but that's fine, because
224 // hashtable resizing is guaranteed to be rare and it will never happen in steady
225 // state.
226 WordLockHolder locker(hashtablesLock);
227 if (!hashtables)
228 hashtables = new Vector<Hashtable*>();
229 hashtables->append(result);
230 }
231
232 return result;
233 }
234
235 static void destroy(Hashtable* hashtable)
236 {
237 {
238 // This is not fast, but that's OK. See comment in create().
239 WordLockHolder locker(hashtablesLock);
240 hashtables->removeFirst(hashtable);
241 }
242
243 fastFree(hashtable);
244 }
245};
246
247Atomic<Hashtable*> hashtable;
248Atomic<unsigned> numThreads;
249
250// With 64 bytes of padding per bucket, assuming a hashtable is fully populated with buckets, the
251// memory usage per thread will still be less than 1KB.
252const unsigned maxLoadFactor = 3;
253
254const unsigned growthFactor = 2;
255
256unsigned hashAddress(const void* address)
257{
258 return WTF::PtrHash<const void*>::hash(address);
259}
260
261Hashtable* ensureHashtable()
262{
263 for (;;) {
264 Hashtable* currentHashtable = hashtable.load();
265
266 if (currentHashtable)
267 return currentHashtable;
268
269 if (!currentHashtable) {
270 currentHashtable = Hashtable::create(maxLoadFactor);
271 if (hashtable.compareExchangeWeak(nullptr, currentHashtable)) {
272 if (verbose)
273 dataLog(toString(Thread::current(), ": created initial hashtable ", RawPointer(currentHashtable), "\n"));
274 return currentHashtable;
275 }
276
277 Hashtable::destroy(currentHashtable);
278 }
279 }
280}
281
282// Locks the hashtable. This reloops in case of rehashing, so the current hashtable may be different
283// after this returns than when you called it. Guarantees that there is a hashtable. This is pretty
284// slow and not scalable, so it's only used during thread creation and for debugging/testing.
285Vector<Bucket*> lockHashtable()
286{
287 for (;;) {
288 Hashtable* currentHashtable = ensureHashtable();
289
290 ASSERT(currentHashtable);
291
292 // Now find all of the buckets. This makes sure that the hashtable is full of buckets so that
293 // we can lock all of the buckets, not just the ones that are materialized.
294 Vector<Bucket*> buckets;
295 for (unsigned i = currentHashtable->size; i--;) {
296 Atomic<Bucket*>& bucketPointer = currentHashtable->data[i];
297
298 for (;;) {
299 Bucket* bucket = bucketPointer.load();
300
301 if (!bucket) {
302 bucket = new Bucket();
303 if (!bucketPointer.compareExchangeWeak(nullptr, bucket)) {
304 delete bucket;
305 continue;
306 }
307 }
308
309 buckets.append(bucket);
310 break;
311 }
312 }
313
314 // Now lock the buckets in the right order.
315 std::sort(buckets.begin(), buckets.end());
316 for (Bucket* bucket : buckets)
317 bucket->lock.lock();
318
319 // If the hashtable didn't change (wasn't rehashed) while we were locking it, then we own it
320 // now.
321 if (hashtable.load() == currentHashtable)
322 return buckets;
323
324 // The hashtable rehashed. Unlock everything and try again.
325 for (Bucket* bucket : buckets)
326 bucket->lock.unlock();
327 }
328}
329
330void unlockHashtable(const Vector<Bucket*>& buckets)
331{
332 for (Bucket* bucket : buckets)
333 bucket->lock.unlock();
334}
335
336// Rehash the hashtable to handle numThreads threads.
337void ensureHashtableSize(unsigned numThreads)
338{
339 // We try to ensure that the size of the hashtable used for thread queues is always large enough
340 // to avoid collisions. So, since we started a new thread, we may need to increase the size of the
341 // hashtable. This does just that. Note that we never free the old spine, since we never lock
342 // around spine accesses (i.e. the "hashtable" global variable).
343
344 // First do a fast check to see if rehashing is needed.
345 Hashtable* oldHashtable = hashtable.load();
346 if (oldHashtable && static_cast<double>(oldHashtable->size) / static_cast<double>(numThreads) >= maxLoadFactor) {
347 if (verbose)
348 dataLog(toString(Thread::current(), ": no need to rehash because ", oldHashtable->size, " / ", numThreads, " >= ", maxLoadFactor, "\n"));
349 return;
350 }
351
352 // Seems like we *might* have to rehash, so lock the hashtable and try again.
353 Vector<Bucket*> bucketsToUnlock = lockHashtable();
354
355 // Check again, since the hashtable could have rehashed while we were locking it. Also,
356 // lockHashtable() creates an initial hashtable for us.
357 oldHashtable = hashtable.load();
358 RELEASE_ASSERT(oldHashtable);
359 if (static_cast<double>(oldHashtable->size) / static_cast<double>(numThreads) >= maxLoadFactor) {
360 if (verbose)
361 dataLog(toString(Thread::current(), ": after locking, no need to rehash because ", oldHashtable->size, " / ", numThreads, " >= ", maxLoadFactor, "\n"));
362 unlockHashtable(bucketsToUnlock);
363 return;
364 }
365
366 Vector<Bucket*> reusableBuckets = bucketsToUnlock;
367
368 // OK, now we resize. First we gather all thread datas from the old hashtable. These thread datas
369 // are placed into the vector in queue order.
370 Vector<ThreadData*> threadDatas;
371 for (Bucket* bucket : reusableBuckets) {
372 while (ThreadData* threadData = bucket->dequeue())
373 threadDatas.append(threadData);
374 }
375
376 unsigned newSize = numThreads * growthFactor * maxLoadFactor;
377 RELEASE_ASSERT(newSize > oldHashtable->size);
378
379 Hashtable* newHashtable = Hashtable::create(newSize);
380 if (verbose)
381 dataLog(toString(Thread::current(), ": created new hashtable: ", RawPointer(newHashtable), "\n"));
382 for (ThreadData* threadData : threadDatas) {
383 if (verbose)
384 dataLog(toString(Thread::current(), ": rehashing thread data ", RawPointer(threadData), " with address = ", RawPointer(threadData->address), "\n"));
385 unsigned hash = hashAddress(threadData->address);
386 unsigned index = hash % newHashtable->size;
387 if (verbose)
388 dataLog(toString(Thread::current(), ": index = ", index, "\n"));
389 Bucket* bucket = newHashtable->data[index].load();
390 if (!bucket) {
391 if (reusableBuckets.isEmpty())
392 bucket = new Bucket();
393 else
394 bucket = reusableBuckets.takeLast();
395 newHashtable->data[index].store(bucket);
396 }
397
398 bucket->enqueue(threadData);
399 }
400
401 // At this point there may be some buckets left unreused. This could easily happen if the
402 // number of enqueued threads right now is low but the high watermark of the number of threads
403 // enqueued was high. We place these buckets into the hashtable basically at random, just to
404 // make sure we don't leak them.
405 for (unsigned i = 0; i < newHashtable->size && !reusableBuckets.isEmpty(); ++i) {
406 Atomic<Bucket*>& bucketPtr = newHashtable->data[i];
407 if (bucketPtr.load())
408 continue;
409 bucketPtr.store(reusableBuckets.takeLast());
410 }
411
412 // Since we increased the size of the hashtable, we should have exhausted our preallocated
413 // buckets by now.
414 ASSERT(reusableBuckets.isEmpty());
415
416 // OK, right now the old hashtable is locked up and the new hashtable is ready to rock and
417 // roll. After we install the new hashtable, we can release all bucket locks.
418
419 bool result = hashtable.compareExchangeStrong(oldHashtable, newHashtable) == oldHashtable;
420 RELEASE_ASSERT(result);
421
422 unlockHashtable(bucketsToUnlock);
423}
424
425ThreadData::ThreadData()
426 : thread(Thread::current())
427{
428 unsigned currentNumThreads;
429 for (;;) {
430 unsigned oldNumThreads = numThreads.load();
431 currentNumThreads = oldNumThreads + 1;
432 if (numThreads.compareExchangeWeak(oldNumThreads, currentNumThreads))
433 break;
434 }
435
436 ensureHashtableSize(currentNumThreads);
437}
438
439ThreadData::~ThreadData()
440{
441 for (;;) {
442 unsigned oldNumThreads = numThreads.load();
443 if (numThreads.compareExchangeWeak(oldNumThreads, oldNumThreads - 1))
444 break;
445 }
446}
447
448ThreadData* myThreadData()
449{
450 static ThreadSpecific<RefPtr<ThreadData>, CanBeGCThread::True>* threadData;
451 static std::once_flag initializeOnce;
452 std::call_once(
453 initializeOnce,
454 [] {
455 threadData = new ThreadSpecific<RefPtr<ThreadData>, CanBeGCThread::True>();
456 });
457
458 RefPtr<ThreadData>& result = **threadData;
459
460 if (!result)
461 result = adoptRef(new ThreadData());
462
463 return result.get();
464}
465
466template<typename Functor>
467bool enqueue(const void* address, const Functor& functor)
468{
469 unsigned hash = hashAddress(address);
470
471 for (;;) {
472 Hashtable* myHashtable = ensureHashtable();
473 unsigned index = hash % myHashtable->size;
474 Atomic<Bucket*>& bucketPointer = myHashtable->data[index];
475 Bucket* bucket;
476 for (;;) {
477 bucket = bucketPointer.load();
478 if (!bucket) {
479 bucket = new Bucket();
480 if (!bucketPointer.compareExchangeWeak(nullptr, bucket)) {
481 delete bucket;
482 continue;
483 }
484 }
485 break;
486 }
487 if (verbose)
488 dataLog(toString(Thread::current(), ": enqueueing onto bucket ", RawPointer(bucket), " with index ", index, " for address ", RawPointer(address), " with hash ", hash, "\n"));
489 bucket->lock.lock();
490
491 // At this point the hashtable could have rehashed under us.
492 if (hashtable.load() != myHashtable) {
493 bucket->lock.unlock();
494 continue;
495 }
496
497 ThreadData* threadData = functor();
498 bool result;
499 if (threadData) {
500 if (verbose)
501 dataLog(toString(Thread::current(), ": proceeding to enqueue ", RawPointer(threadData), "\n"));
502 bucket->enqueue(threadData);
503 result = true;
504 } else
505 result = false;
506 bucket->lock.unlock();
507 return result;
508 }
509}
510
511enum class BucketMode {
512 EnsureNonEmpty,
513 IgnoreEmpty
514};
515
516template<typename DequeueFunctor, typename FinishFunctor>
517bool dequeue(
518 const void* address, BucketMode bucketMode, const DequeueFunctor& dequeueFunctor,
519 const FinishFunctor& finishFunctor)
520{
521 unsigned hash = hashAddress(address);
522
523 for (;;) {
524 Hashtable* myHashtable = ensureHashtable();
525 unsigned index = hash % myHashtable->size;
526 Atomic<Bucket*>& bucketPointer = myHashtable->data[index];
527 Bucket* bucket = bucketPointer.load();
528 if (!bucket) {
529 if (bucketMode == BucketMode::IgnoreEmpty)
530 return false;
531
532 for (;;) {
533 bucket = bucketPointer.load();
534 if (!bucket) {
535 bucket = new Bucket();
536 if (!bucketPointer.compareExchangeWeak(nullptr, bucket)) {
537 delete bucket;
538 continue;
539 }
540 }
541 break;
542 }
543 }
544
545 bucket->lock.lock();
546
547 // At this point the hashtable could have rehashed under us.
548 if (hashtable.load() != myHashtable) {
549 bucket->lock.unlock();
550 continue;
551 }
552
553 bucket->genericDequeue(dequeueFunctor);
554 bool result = !!bucket->queueHead;
555 finishFunctor(result);
556 bucket->lock.unlock();
557 return result;
558 }
559}
560
561} // anonymous namespace
562
563NEVER_INLINE ParkingLot::ParkResult ParkingLot::parkConditionallyImpl(
564 const void* address,
565 const ScopedLambda<bool()>& validation,
566 const ScopedLambda<void()>& beforeSleep,
567 const TimeWithDynamicClockType& timeout)
568{
569 if (verbose)
570 dataLog(toString(Thread::current(), ": parking.\n"));
571
572 ThreadData* me = myThreadData();
573 me->token = 0;
574
575 // Guard against someone calling parkConditionally() recursively from beforeSleep().
576 RELEASE_ASSERT(!me->address);
577
578 bool enqueueResult = enqueue(
579 address,
580 [&] () -> ThreadData* {
581 if (!validation())
582 return nullptr;
583
584 me->address = address;
585 return me;
586 });
587
588 if (!enqueueResult)
589 return ParkResult();
590
591 beforeSleep();
592
593 bool didGetDequeued;
594 {
595 MutexLocker locker(me->parkingLock);
596 while (me->address && timeout.nowWithSameClock() < timeout) {
597 me->parkingCondition.timedWait(
598 me->parkingLock, timeout.approximateWallTime());
599
600 // It's possible for the OS to decide not to wait. If it does that then it will also
601 // decide not to release the lock. If there's a bug in the time math, then this could
602 // result in a deadlock. Flashing the lock means that at worst it's just a CPU-eating
603 // spin.
604 me->parkingLock.unlock();
605 me->parkingLock.lock();
606 }
607 ASSERT(!me->address || me->address == address);
608 didGetDequeued = !me->address;
609 }
610
611 if (didGetDequeued) {
612 // Great! We actually got dequeued rather than the timeout expiring.
613 ParkResult result;
614 result.wasUnparked = true;
615 result.token = me->token;
616 return result;
617 }
618
619 // Have to remove ourselves from the queue since we timed out and nobody has dequeued us yet.
620
621 bool didDequeue = false;
622 dequeue(
623 address, BucketMode::IgnoreEmpty,
624 [&] (ThreadData* element, bool) {
625 if (element == me) {
626 didDequeue = true;
627 return DequeueResult::RemoveAndStop;
628 }
629 return DequeueResult::Ignore;
630 },
631 [] (bool) { });
632
633 // If didDequeue is true, then we dequeued ourselves. This means that we were not unparked.
634 // If didDequeue is false, then someone unparked us.
635
636 RELEASE_ASSERT(!me->nextInQueue);
637
638 // Make sure that no matter what, me->address is null after this point.
639 {
640 MutexLocker locker(me->parkingLock);
641 if (!didDequeue) {
642 // If we did not dequeue ourselves, then someone else did. They will set our address to
643 // null. We don't want to proceed until they do this, because otherwise, they may set
644 // our address to null in some distant future when we're already trying to wait for
645 // other things.
646 while (me->address)
647 me->parkingCondition.wait(me->parkingLock);
648 }
649 me->address = nullptr;
650 }
651
652 ParkResult result;
653 result.wasUnparked = !didDequeue;
654 if (!didDequeue) {
655 // If we were unparked then there should be a token.
656 result.token = me->token;
657 }
658 return result;
659}
660
661NEVER_INLINE ParkingLot::UnparkResult ParkingLot::unparkOne(const void* address)
662{
663 if (verbose)
664 dataLog(toString(Thread::current(), ": unparking one.\n"));
665
666 UnparkResult result;
667
668 RefPtr<ThreadData> threadData;
669 result.mayHaveMoreThreads = dequeue(
670 address,
671 // Why is this here?
672 // FIXME: It seems like this could be IgnoreEmpty, but I switched this to EnsureNonEmpty
673 // without explanation in r199760. We need it to use EnsureNonEmpty if we need to perform
674 // some operation while holding the bucket lock, which usually goes into the finish func.
675 // But if that operation is a no-op, then it's not clear why we need this.
676 BucketMode::EnsureNonEmpty,
677 [&] (ThreadData* element, bool) {
678 if (element->address != address)
679 return DequeueResult::Ignore;
680 threadData = element;
681 result.didUnparkThread = true;
682 return DequeueResult::RemoveAndStop;
683 },
684 [] (bool) { });
685
686 if (!threadData) {
687 ASSERT(!result.didUnparkThread);
688 result.mayHaveMoreThreads = false;
689 return result;
690 }
691
692 ASSERT(threadData->address);
693
694 {
695 MutexLocker locker(threadData->parkingLock);
696 threadData->address = nullptr;
697 threadData->token = 0;
698 }
699 threadData->parkingCondition.signal();
700
701 return result;
702}
703
704NEVER_INLINE void ParkingLot::unparkOneImpl(
705 const void* address,
706 const ScopedLambda<intptr_t(ParkingLot::UnparkResult)>& callback)
707{
708 if (verbose)
709 dataLog(toString(Thread::current(), ": unparking one the hard way.\n"));
710
711 RefPtr<ThreadData> threadData;
712 bool timeToBeFair = false;
713 dequeue(
714 address,
715 BucketMode::EnsureNonEmpty,
716 [&] (ThreadData* element, bool passedTimeToBeFair) {
717 if (element->address != address)
718 return DequeueResult::Ignore;
719 threadData = element;
720 timeToBeFair = passedTimeToBeFair;
721 return DequeueResult::RemoveAndStop;
722 },
723 [&] (bool mayHaveMoreThreads) {
724 UnparkResult result;
725 result.didUnparkThread = !!threadData;
726 result.mayHaveMoreThreads = result.didUnparkThread && mayHaveMoreThreads;
727 if (timeToBeFair)
728 RELEASE_ASSERT(threadData);
729 result.timeToBeFair = timeToBeFair;
730 intptr_t token = callback(result);
731 if (threadData)
732 threadData->token = token;
733 });
734
735 if (!threadData)
736 return;
737
738 ASSERT(threadData->address);
739
740 {
741 MutexLocker locker(threadData->parkingLock);
742 threadData->address = nullptr;
743 }
744 // At this point, the threadData may die. Good thing we have a RefPtr<> on it.
745 threadData->parkingCondition.signal();
746}
747
748NEVER_INLINE unsigned ParkingLot::unparkCount(const void* address, unsigned count)
749{
750 if (!count)
751 return 0;
752
753 if (verbose)
754 dataLog(toString(Thread::current(), ": unparking count = ", count, " from ", RawPointer(address), ".\n"));
755
756 Vector<RefPtr<ThreadData>, 8> threadDatas;
757 dequeue(
758 address,
759 // FIXME: It seems like this ought to be EnsureNonEmpty if we follow what unparkOne() does,
760 // but that seems wrong.
761 BucketMode::IgnoreEmpty,
762 [&] (ThreadData* element, bool) {
763 if (verbose)
764 dataLog(toString(Thread::current(), ": Observing element with address = ", RawPointer(element->address), "\n"));
765 if (element->address != address)
766 return DequeueResult::Ignore;
767 threadDatas.append(element);
768 if (threadDatas.size() == count)
769 return DequeueResult::RemoveAndStop;
770 return DequeueResult::RemoveAndContinue;
771 },
772 [] (bool) { });
773
774 for (RefPtr<ThreadData>& threadData : threadDatas) {
775 if (verbose)
776 dataLog(toString(Thread::current(), ": unparking ", RawPointer(threadData.get()), " with address ", RawPointer(threadData->address), "\n"));
777 ASSERT(threadData->address);
778 {
779 MutexLocker locker(threadData->parkingLock);
780 threadData->address = nullptr;
781 }
782 threadData->parkingCondition.signal();
783 }
784
785 if (verbose)
786 dataLog(toString(Thread::current(), ": done unparking.\n"));
787
788 return threadDatas.size();
789}
790
791NEVER_INLINE void ParkingLot::unparkAll(const void* address)
792{
793 unparkCount(address, UINT_MAX);
794}
795
796NEVER_INLINE void ParkingLot::forEachImpl(const ScopedLambda<void(Thread&, const void*)>& callback)
797{
798 Vector<Bucket*> bucketsToUnlock = lockHashtable();
799
800 Hashtable* currentHashtable = hashtable.load();
801 for (unsigned i = currentHashtable->size; i--;) {
802 Bucket* bucket = currentHashtable->data[i].load();
803 if (!bucket)
804 continue;
805 for (ThreadData* currentThreadData = bucket->queueHead; currentThreadData; currentThreadData = currentThreadData->nextInQueue)
806 callback(currentThreadData->thread.get(), currentThreadData->address);
807 }
808
809 unlockHashtable(bucketsToUnlock);
810}
811
812} // namespace WTF
813
814