1/*
2 * Copyright (C) 2010-2016 Apple Inc. All rights reserved.
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions
6 * are met:
7 * 1. Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * 2. Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
12 *
13 * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
14 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
15 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
16 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
17 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
18 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
19 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
20 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
21 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
22 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
23 * THE POSSIBILITY OF SUCH DAMAGE.
24 */
25
26#include "config.h"
27#include "Connection.h"
28
29#include "Logging.h"
30#include <memory>
31#include <wtf/HashSet.h>
32#include <wtf/NeverDestroyed.h>
33#include <wtf/RunLoop.h>
34#include <wtf/text/WTFString.h>
35#include <wtf/threads/BinarySemaphore.h>
36
37#if PLATFORM(COCOA)
38#include "MachMessage.h"
39#endif
40
41#if USE(UNIX_DOMAIN_SOCKETS)
42#include "UnixMessage.h"
43#endif
44
45namespace IPC {
46
47#if PLATFORM(COCOA)
48// The IPC connection gets killed if the incoming message queue reaches 50000 messages before the main thread has a chance to dispatch them.
49const size_t maxPendingIncomingMessagesKillingThreshold { 50000 };
50#endif
51
52struct Connection::ReplyHandler {
53 RefPtr<FunctionDispatcher> dispatcher;
54 Function<void (std::unique_ptr<Decoder>)> handler;
55};
56
57struct Connection::WaitForMessageState {
58 WaitForMessageState(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, OptionSet<WaitForOption> waitForOptions)
59 : messageReceiverName(messageReceiverName)
60 , messageName(messageName)
61 , destinationID(destinationID)
62 , waitForOptions(waitForOptions)
63 {
64 }
65
66 StringReference messageReceiverName;
67 StringReference messageName;
68 uint64_t destinationID;
69
70 OptionSet<WaitForOption> waitForOptions;
71 bool messageWaitingInterrupted = false;
72
73 std::unique_ptr<Decoder> decoder;
74};
75
76class Connection::SyncMessageState {
77public:
78 static SyncMessageState& singleton();
79
80 SyncMessageState();
81 ~SyncMessageState() = delete;
82
83 void wakeUpClientRunLoop()
84 {
85 m_waitForSyncReplySemaphore.signal();
86 }
87
88 bool wait(TimeWithDynamicClockType absoluteTime)
89 {
90 return m_waitForSyncReplySemaphore.waitUntil(absoluteTime);
91 }
92
93 // Returns true if this message will be handled on a client thread that is currently
94 // waiting for a reply to a synchronous message.
95 bool processIncomingMessage(Connection&, std::unique_ptr<Decoder>&);
96
97 // Dispatch pending sync messages. if allowedConnection is not null, will only dispatch messages
98 // from that connection and put the other messages back in the queue.
99 void dispatchMessages(Connection* allowedConnection);
100
101private:
102 void dispatchMessageAndResetDidScheduleDispatchMessagesForConnection(Connection&);
103
104 BinarySemaphore m_waitForSyncReplySemaphore;
105
106 // Protects m_didScheduleDispatchMessagesWorkSet and m_messagesToDispatchWhileWaitingForSyncReply.
107 Lock m_mutex;
108
109 // The set of connections for which we've scheduled a call to dispatchMessageAndResetDidScheduleDispatchMessagesForConnection.
110 HashSet<RefPtr<Connection>> m_didScheduleDispatchMessagesWorkSet;
111
112 struct ConnectionAndIncomingMessage {
113 Ref<Connection> connection;
114 std::unique_ptr<Decoder> message;
115 };
116 Vector<ConnectionAndIncomingMessage> m_messagesToDispatchWhileWaitingForSyncReply;
117};
118
119Connection::SyncMessageState& Connection::SyncMessageState::singleton()
120{
121 static std::once_flag onceFlag;
122 static LazyNeverDestroyed<SyncMessageState> syncMessageState;
123
124 std::call_once(onceFlag, [] {
125 syncMessageState.construct();
126 });
127
128 return syncMessageState;
129}
130
131Connection::SyncMessageState::SyncMessageState()
132{
133}
134
135bool Connection::SyncMessageState::processIncomingMessage(Connection& connection, std::unique_ptr<Decoder>& message)
136{
137 if (!message->shouldDispatchMessageWhenWaitingForSyncReply())
138 return false;
139
140 ConnectionAndIncomingMessage connectionAndIncomingMessage { connection, WTFMove(message) };
141
142 {
143 std::lock_guard<Lock> lock(m_mutex);
144
145 if (m_didScheduleDispatchMessagesWorkSet.add(&connection).isNewEntry) {
146 RunLoop::main().dispatch([this, protectedConnection = Ref<Connection>(connection)]() mutable {
147 dispatchMessageAndResetDidScheduleDispatchMessagesForConnection(protectedConnection);
148 });
149 }
150
151 m_messagesToDispatchWhileWaitingForSyncReply.append(WTFMove(connectionAndIncomingMessage));
152 }
153
154 wakeUpClientRunLoop();
155
156 return true;
157}
158
159void Connection::SyncMessageState::dispatchMessages(Connection* allowedConnection)
160{
161 ASSERT(RunLoop::isMain());
162
163 Vector<ConnectionAndIncomingMessage> messagesToDispatchWhileWaitingForSyncReply;
164
165 {
166 std::lock_guard<Lock> lock(m_mutex);
167 m_messagesToDispatchWhileWaitingForSyncReply.swap(messagesToDispatchWhileWaitingForSyncReply);
168 }
169
170 Vector<ConnectionAndIncomingMessage> messagesToPutBack;
171
172 for (size_t i = 0; i < messagesToDispatchWhileWaitingForSyncReply.size(); ++i) {
173 ConnectionAndIncomingMessage& connectionAndIncomingMessage = messagesToDispatchWhileWaitingForSyncReply[i];
174
175 if (allowedConnection && allowedConnection != connectionAndIncomingMessage.connection.ptr()) {
176 // This incoming message belongs to another connection and we don't want to dispatch it now
177 // so mark it to be put back in the message queue.
178 messagesToPutBack.append(WTFMove(connectionAndIncomingMessage));
179 continue;
180 }
181
182 connectionAndIncomingMessage.connection->dispatchMessage(WTFMove(connectionAndIncomingMessage.message));
183 }
184
185 if (!messagesToPutBack.isEmpty()) {
186 std::lock_guard<Lock> lock(m_mutex);
187
188 for (auto& message : messagesToPutBack)
189 m_messagesToDispatchWhileWaitingForSyncReply.append(WTFMove(message));
190 }
191}
192
193void Connection::SyncMessageState::dispatchMessageAndResetDidScheduleDispatchMessagesForConnection(Connection& connection)
194{
195 {
196 std::lock_guard<Lock> lock(m_mutex);
197 ASSERT(m_didScheduleDispatchMessagesWorkSet.contains(&connection));
198 m_didScheduleDispatchMessagesWorkSet.remove(&connection);
199 }
200
201 dispatchMessages(&connection);
202}
203
204// Represents a sync request for which we're waiting on a reply.
205struct Connection::PendingSyncReply {
206 // The request ID.
207 uint64_t syncRequestID { 0 };
208
209 // The reply decoder, will be null if there was an error processing the sync
210 // message on the other side.
211 std::unique_ptr<Decoder> replyDecoder;
212
213 // Will be set to true once a reply has been received.
214 bool didReceiveReply { false };
215
216 PendingSyncReply() = default;
217
218 explicit PendingSyncReply(uint64_t syncRequestID)
219 : syncRequestID(syncRequestID)
220 {
221 }
222};
223
224Ref<Connection> Connection::createServerConnection(Identifier identifier, Client& client)
225{
226 return adoptRef(*new Connection(identifier, true, client));
227}
228
229Ref<Connection> Connection::createClientConnection(Identifier identifier, Client& client)
230{
231 return adoptRef(*new Connection(identifier, false, client));
232}
233
234static HashMap<IPC::Connection::UniqueID, Connection*>& allConnections()
235{
236 static NeverDestroyed<HashMap<IPC::Connection::UniqueID, Connection*>> map;
237 return map;
238}
239
240static HashMap<uintptr_t, HashMap<uint64_t, CompletionHandler<void(Decoder*)>>>& asyncReplyHandlerMap()
241{
242 static NeverDestroyed<HashMap<uintptr_t, HashMap<uint64_t, CompletionHandler<void(Decoder*)>>>> map;
243 return map.get();
244}
245
246static void clearAsyncReplyHandlers(const Connection&);
247
248Connection::Connection(Identifier identifier, bool isServer, Client& client)
249 : m_client(client)
250 , m_uniqueID(UniqueID::generate())
251 , m_isServer(isServer)
252 , m_syncRequestID(0)
253 , m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage(false)
254 , m_shouldExitOnSyncMessageSendFailure(false)
255 , m_didCloseOnConnectionWorkQueueCallback(0)
256 , m_isConnected(false)
257 , m_connectionQueue(WorkQueue::create("com.apple.IPC.ReceiveQueue"))
258 , m_inSendSyncCount(0)
259 , m_inDispatchMessageCount(0)
260 , m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount(0)
261 , m_didReceiveInvalidMessage(false)
262 , m_waitingForMessage(nullptr)
263 , m_shouldWaitForSyncReplies(true)
264{
265 ASSERT(RunLoop::isMain());
266 allConnections().add(m_uniqueID, this);
267
268 platformInitialize(identifier);
269
270#if HAVE(QOS_CLASSES)
271 ASSERT(pthread_main_np());
272 m_mainThread = pthread_self();
273#endif
274}
275
276Connection::~Connection()
277{
278 ASSERT(RunLoop::isMain());
279 ASSERT(!isValid());
280
281 allConnections().remove(m_uniqueID);
282
283 clearAsyncReplyHandlers(*this);
284}
285
286Connection* Connection::connection(UniqueID uniqueID)
287{
288 ASSERT(RunLoop::isMain());
289 return allConnections().get(uniqueID);
290}
291
292void Connection::setOnlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage(bool flag)
293{
294 ASSERT(!m_isConnected);
295
296 m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage = flag;
297}
298
299void Connection::setShouldExitOnSyncMessageSendFailure(bool shouldExitOnSyncMessageSendFailure)
300{
301 ASSERT(!m_isConnected);
302
303 m_shouldExitOnSyncMessageSendFailure = shouldExitOnSyncMessageSendFailure;
304}
305
306void Connection::addWorkQueueMessageReceiver(StringReference messageReceiverName, WorkQueue& workQueue, WorkQueueMessageReceiver* workQueueMessageReceiver)
307{
308 ASSERT(RunLoop::isMain());
309
310 m_connectionQueue->dispatch([protectedThis = makeRef(*this), messageReceiverName = WTFMove(messageReceiverName), workQueue = &workQueue, workQueueMessageReceiver]() mutable {
311 ASSERT(!protectedThis->m_workQueueMessageReceivers.contains(messageReceiverName));
312
313 protectedThis->m_workQueueMessageReceivers.add(messageReceiverName, std::make_pair(workQueue, workQueueMessageReceiver));
314 });
315}
316
317void Connection::removeWorkQueueMessageReceiver(StringReference messageReceiverName)
318{
319 ASSERT(RunLoop::isMain());
320
321 m_connectionQueue->dispatch([protectedThis = makeRef(*this), messageReceiverName = WTFMove(messageReceiverName)]() mutable {
322 ASSERT(protectedThis->m_workQueueMessageReceivers.contains(messageReceiverName));
323 protectedThis->m_workQueueMessageReceivers.remove(messageReceiverName);
324 });
325}
326
327void Connection::dispatchWorkQueueMessageReceiverMessage(WorkQueueMessageReceiver& workQueueMessageReceiver, Decoder& decoder)
328{
329 if (!decoder.isSyncMessage()) {
330 workQueueMessageReceiver.didReceiveMessage(*this, decoder);
331 return;
332 }
333
334 uint64_t syncRequestID = 0;
335 if (!decoder.decode(syncRequestID) || !syncRequestID) {
336 // We received an invalid sync message.
337 // FIXME: Handle this.
338 decoder.markInvalid();
339 return;
340 }
341
342 auto replyEncoder = std::make_unique<Encoder>("IPC", "SyncMessageReply", syncRequestID);
343
344 // Hand off both the decoder and encoder to the work queue message receiver.
345 workQueueMessageReceiver.didReceiveSyncMessage(*this, decoder, replyEncoder);
346
347 // FIXME: If the message was invalid, we should send back a SyncMessageError.
348 ASSERT(!decoder.isInvalid());
349
350 if (replyEncoder)
351 sendSyncReply(WTFMove(replyEncoder));
352}
353
354void Connection::setDidCloseOnConnectionWorkQueueCallback(DidCloseOnConnectionWorkQueueCallback callback)
355{
356 ASSERT(!m_isConnected);
357
358 m_didCloseOnConnectionWorkQueueCallback = callback;
359}
360
361void Connection::invalidate()
362{
363 ASSERT(RunLoop::isMain());
364
365 if (!isValid()) {
366 // Someone already called invalidate().
367 return;
368 }
369
370 m_isValid = false;
371
372 {
373 std::lock_guard<Lock> lock(m_replyHandlersLock);
374 for (auto& replyHandler : m_replyHandlers.values()) {
375 replyHandler.dispatcher->dispatch([handler = WTFMove(replyHandler.handler)] {
376 handler(nullptr);
377 });
378 }
379
380 m_replyHandlers.clear();
381 }
382
383 m_connectionQueue->dispatch([protectedThis = makeRef(*this)]() mutable {
384 protectedThis->platformInvalidate();
385 });
386}
387
388void Connection::markCurrentlyDispatchedMessageAsInvalid()
389{
390 // This should only be called while processing a message.
391 ASSERT(m_inDispatchMessageCount > 0);
392
393 m_didReceiveInvalidMessage = true;
394}
395
396std::unique_ptr<Encoder> Connection::createSyncMessageEncoder(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, uint64_t& syncRequestID)
397{
398 auto encoder = std::make_unique<Encoder>(messageReceiverName, messageName, destinationID);
399 encoder->setIsSyncMessage(true);
400
401 // Encode the sync request ID.
402 syncRequestID = ++m_syncRequestID;
403 *encoder << syncRequestID;
404
405 return encoder;
406}
407
408bool Connection::sendMessage(std::unique_ptr<Encoder> encoder, OptionSet<SendOption> sendOptions)
409{
410 if (!isValid())
411 return false;
412
413 if (isMainThread() && m_inDispatchMessageMarkedToUseFullySynchronousModeForTesting && !encoder->isSyncMessage() && !(encoder->messageReceiverName() == "IPC") && !sendOptions.contains(SendOption::IgnoreFullySynchronousMode)) {
414 uint64_t syncRequestID;
415 auto wrappedMessage = createSyncMessageEncoder("IPC", "WrappedAsyncMessageForTesting", encoder->destinationID(), syncRequestID);
416 wrappedMessage->setFullySynchronousModeForTesting();
417 wrappedMessage->wrapForTesting(WTFMove(encoder));
418 return static_cast<bool>(sendSyncMessage(syncRequestID, WTFMove(wrappedMessage), Seconds::infinity(), { }));
419 }
420
421 if (sendOptions.contains(SendOption::DispatchMessageEvenWhenWaitingForSyncReply)
422 && (!m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage
423 || m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount))
424 encoder->setShouldDispatchMessageWhenWaitingForSyncReply(true);
425
426 {
427 std::lock_guard<Lock> lock(m_outgoingMessagesMutex);
428 m_outgoingMessages.append(WTFMove(encoder));
429 }
430
431 // FIXME: We should add a boolean flag so we don't call this when work has already been scheduled.
432 m_connectionQueue->dispatch([protectedThis = makeRef(*this)]() mutable {
433 protectedThis->sendOutgoingMessages();
434 });
435 return true;
436}
437
438void Connection::sendMessageWithReply(uint64_t requestID, std::unique_ptr<Encoder> encoder, FunctionDispatcher& replyDispatcher, Function<void (std::unique_ptr<Decoder>)>&& replyHandler)
439{
440 {
441 std::lock_guard<Lock> lock(m_replyHandlersLock);
442
443 if (!isValid()) {
444 replyDispatcher.dispatch([replyHandler = WTFMove(replyHandler)] {
445 replyHandler(nullptr);
446 });
447 return;
448 }
449
450 ASSERT(!m_replyHandlers.contains(requestID));
451 m_replyHandlers.set(requestID, ReplyHandler { &replyDispatcher, WTFMove(replyHandler) });
452 }
453
454 sendMessage(WTFMove(encoder), { });
455}
456
457bool Connection::sendSyncReply(std::unique_ptr<Encoder> encoder)
458{
459 return sendMessage(WTFMove(encoder), { });
460}
461
462Seconds Connection::timeoutRespectingIgnoreTimeoutsForTesting(Seconds timeout) const
463{
464 return m_ignoreTimeoutsForTesting ? Seconds::infinity() : timeout;
465}
466
467std::unique_ptr<Decoder> Connection::waitForMessage(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, Seconds timeout, OptionSet<WaitForOption> waitForOptions)
468{
469 ASSERT(RunLoop::isMain());
470
471 timeout = timeoutRespectingIgnoreTimeoutsForTesting(timeout);
472
473 bool hasIncomingSynchronousMessage = false;
474
475 // First, check if this message is already in the incoming messages queue.
476 {
477 std::lock_guard<Lock> lock(m_incomingMessagesMutex);
478
479 for (auto it = m_incomingMessages.begin(), end = m_incomingMessages.end(); it != end; ++it) {
480 std::unique_ptr<Decoder>& message = *it;
481
482 if (message->messageReceiverName() == messageReceiverName && message->messageName() == messageName && message->destinationID() == destinationID) {
483 std::unique_ptr<Decoder> returnedMessage = WTFMove(message);
484
485 m_incomingMessages.remove(it);
486 return returnedMessage;
487 }
488
489 if (message->isSyncMessage())
490 hasIncomingSynchronousMessage = true;
491 }
492 }
493
494 // Don't even start waiting if we have InterruptWaitingIfSyncMessageArrives and there's a sync message already in the queue.
495 if (hasIncomingSynchronousMessage && waitForOptions.contains(WaitForOption::InterruptWaitingIfSyncMessageArrives)) {
496 m_waitingForMessage = nullptr;
497 return nullptr;
498 }
499
500 WaitForMessageState waitingForMessage(messageReceiverName, messageName, destinationID, waitForOptions);
501
502 {
503 std::lock_guard<Lock> lock(m_waitForMessageMutex);
504
505 // We don't support having multiple clients waiting for messages.
506 ASSERT(!m_waitingForMessage);
507
508 m_waitingForMessage = &waitingForMessage;
509 }
510
511 MonotonicTime absoluteTimeout = MonotonicTime::now() + timeout;
512
513 // Now wait for it to be set.
514 while (true) {
515 // Handle any messages that are blocked on a response from us.
516 SyncMessageState::singleton().dispatchMessages(nullptr);
517
518 std::unique_lock<Lock> lock(m_waitForMessageMutex);
519
520 if (m_waitingForMessage->decoder) {
521 auto decoder = WTFMove(m_waitingForMessage->decoder);
522 m_waitingForMessage = nullptr;
523 return decoder;
524 }
525
526 // Now we wait.
527 bool didTimeout = !m_waitForMessageCondition.waitUntil(lock, absoluteTimeout);
528 // We timed out, lost our connection, or a sync message came in with InterruptWaitingIfSyncMessageArrives, so stop waiting.
529 if (didTimeout || m_waitingForMessage->messageWaitingInterrupted) {
530 m_waitingForMessage = nullptr;
531 break;
532 }
533 }
534
535 return nullptr;
536}
537
538std::unique_ptr<Decoder> Connection::sendSyncMessage(uint64_t syncRequestID, std::unique_ptr<Encoder> encoder, Seconds timeout, OptionSet<SendSyncOption> sendSyncOptions)
539{
540 ASSERT(RunLoop::isMain());
541
542 if (!isValid()) {
543 didFailToSendSyncMessage();
544 return nullptr;
545 }
546
547 // Push the pending sync reply information on our stack.
548 {
549 LockHolder locker(m_syncReplyStateMutex);
550 if (!m_shouldWaitForSyncReplies) {
551 didFailToSendSyncMessage();
552 return nullptr;
553 }
554
555 m_pendingSyncReplies.append(PendingSyncReply(syncRequestID));
556 }
557
558 ++m_inSendSyncCount;
559
560 // First send the message.
561 sendMessage(WTFMove(encoder), IPC::SendOption::DispatchMessageEvenWhenWaitingForSyncReply);
562
563 // Then wait for a reply. Waiting for a reply could involve dispatching incoming sync messages, so
564 // keep an extra reference to the connection here in case it's invalidated.
565 Ref<Connection> protect(*this);
566 std::unique_ptr<Decoder> reply = waitForSyncReply(syncRequestID, timeout, sendSyncOptions);
567
568 --m_inSendSyncCount;
569
570 // Finally, pop the pending sync reply information.
571 {
572 LockHolder locker(m_syncReplyStateMutex);
573 ASSERT(m_pendingSyncReplies.last().syncRequestID == syncRequestID);
574 m_pendingSyncReplies.removeLast();
575 }
576
577 if (!reply)
578 didFailToSendSyncMessage();
579
580 return reply;
581}
582
583std::unique_ptr<Decoder> Connection::waitForSyncReply(uint64_t syncRequestID, Seconds timeout, OptionSet<SendSyncOption> sendSyncOptions)
584{
585 timeout = timeoutRespectingIgnoreTimeoutsForTesting(timeout);
586 WallTime absoluteTime = WallTime::now() + timeout;
587
588 willSendSyncMessage(sendSyncOptions);
589
590 bool timedOut = false;
591 while (!timedOut) {
592 // First, check if we have any messages that we need to process.
593 SyncMessageState::singleton().dispatchMessages(nullptr);
594
595 {
596 LockHolder locker(m_syncReplyStateMutex);
597
598 // Second, check if there is a sync reply at the top of the stack.
599 ASSERT(!m_pendingSyncReplies.isEmpty());
600
601 PendingSyncReply& pendingSyncReply = m_pendingSyncReplies.last();
602 ASSERT_UNUSED(syncRequestID, pendingSyncReply.syncRequestID == syncRequestID);
603
604 // We found the sync reply, or the connection was closed.
605 if (pendingSyncReply.didReceiveReply || !m_shouldWaitForSyncReplies) {
606 didReceiveSyncReply(sendSyncOptions);
607 return WTFMove(pendingSyncReply.replyDecoder);
608 }
609 }
610
611 // Processing a sync message could cause the connection to be invalidated.
612 // (If the handler ends up calling Connection::invalidate).
613 // If that happens, we need to stop waiting, or we'll hang since we won't get
614 // any more incoming messages.
615 if (!isValid()) {
616 RELEASE_LOG_ERROR(IPC, "Connection::waitForSyncReply: Connection no longer valid, id = %" PRIu64, syncRequestID);
617 didReceiveSyncReply(sendSyncOptions);
618 return nullptr;
619 }
620
621 // We didn't find a sync reply yet, keep waiting.
622 // This allows the WebProcess to still serve clients while waiting for the message to return.
623 // Notably, it can continue to process accessibility requests, which are on the main thread.
624 timedOut = !SyncMessageState::singleton().wait(absoluteTime);
625 }
626
627 RELEASE_LOG_ERROR(IPC, "Connection::waitForSyncReply: Timed-out while waiting for reply, id = %" PRIu64, syncRequestID);
628 didReceiveSyncReply(sendSyncOptions);
629
630 return nullptr;
631}
632
633void Connection::processIncomingSyncReply(std::unique_ptr<Decoder> decoder)
634{
635 {
636 LockHolder locker(m_syncReplyStateMutex);
637
638 // Go through the stack of sync requests that have pending replies and see which one
639 // this reply is for.
640 for (size_t i = m_pendingSyncReplies.size(); i > 0; --i) {
641 PendingSyncReply& pendingSyncReply = m_pendingSyncReplies[i - 1];
642
643 if (pendingSyncReply.syncRequestID != decoder->destinationID())
644 continue;
645
646 ASSERT(!pendingSyncReply.replyDecoder);
647
648 pendingSyncReply.replyDecoder = WTFMove(decoder);
649 pendingSyncReply.didReceiveReply = true;
650
651 // We got a reply to the last send message, wake up the client run loop so it can be processed.
652 if (i == m_pendingSyncReplies.size())
653 SyncMessageState::singleton().wakeUpClientRunLoop();
654
655 return;
656 }
657 }
658
659 {
660 LockHolder locker(m_replyHandlersLock);
661
662 auto replyHandler = m_replyHandlers.take(decoder->destinationID());
663 if (replyHandler.dispatcher) {
664 replyHandler.dispatcher->dispatch([protectedThis = makeRef(*this), handler = WTFMove(replyHandler.handler), decoder = WTFMove(decoder)] () mutable {
665 if (!protectedThis->isValid()) {
666 handler(nullptr);
667 return;
668 }
669
670 handler(WTFMove(decoder));
671 });
672 }
673 }
674
675 // If we get here, it means we got a reply for a message that wasn't in the sync request stack or map.
676 // This can happen if the send timed out, so it's fine to ignore.
677}
678
679void Connection::processIncomingMessage(std::unique_ptr<Decoder> message)
680{
681 ASSERT(!message->messageReceiverName().isEmpty());
682 ASSERT(!message->messageName().isEmpty());
683
684 if (message->messageReceiverName() == "IPC" && message->messageName() == "SyncMessageReply") {
685 processIncomingSyncReply(WTFMove(message));
686 return;
687 }
688
689 if (!m_workQueueMessageReceivers.isValidKey(message->messageReceiverName())) {
690 RefPtr<Connection> protectedThis(this);
691 StringReference messageReceiverNameReference = message->messageReceiverName();
692 String messageReceiverName(messageReceiverNameReference.isEmpty() ? "<unknown message receiver>" : String(messageReceiverNameReference.data(), messageReceiverNameReference.size()));
693 StringReference messageNameReference = message->messageName();
694 String messageName(messageNameReference.isEmpty() ? "<unknown message>" : String(messageNameReference.data(), messageNameReference.size()));
695
696 RunLoop::main().dispatch([protectedThis = makeRef(*this), messageReceiverName = WTFMove(messageReceiverName), messageName = WTFMove(messageName)]() mutable {
697 protectedThis->dispatchDidReceiveInvalidMessage(messageReceiverName.utf8(), messageName.utf8());
698 });
699 return;
700 }
701
702 auto it = m_workQueueMessageReceivers.find(message->messageReceiverName());
703 if (it != m_workQueueMessageReceivers.end()) {
704 it->value.first->dispatch([protectedThis = makeRef(*this), workQueueMessageReceiver = it->value.second, decoder = WTFMove(message)]() mutable {
705 protectedThis->dispatchWorkQueueMessageReceiverMessage(*workQueueMessageReceiver, *decoder);
706 });
707 return;
708 }
709
710#if HAVE(QOS_CLASSES)
711 if (message->isSyncMessage() && m_shouldBoostMainThreadOnSyncMessage) {
712 pthread_override_t override = pthread_override_qos_class_start_np(m_mainThread, Thread::adjustedQOSClass(QOS_CLASS_USER_INTERACTIVE), 0);
713 message->setQOSClassOverride(override);
714 }
715#endif
716
717 if (message->isSyncMessage()) {
718 std::lock_guard<Lock> lock(m_incomingSyncMessageCallbackMutex);
719
720 for (auto& callback : m_incomingSyncMessageCallbacks.values())
721 m_incomingSyncMessageCallbackQueue->dispatch(WTFMove(callback));
722
723 m_incomingSyncMessageCallbacks.clear();
724 }
725
726 // Check if we're waiting for this message, or if we need to interrupt waiting due to an incoming sync message.
727 {
728 std::lock_guard<Lock> lock(m_waitForMessageMutex);
729
730 if (m_waitingForMessage && !m_waitingForMessage->decoder) {
731 if (m_waitingForMessage->messageReceiverName == message->messageReceiverName() && m_waitingForMessage->messageName == message->messageName() && m_waitingForMessage->destinationID == message->destinationID()) {
732 m_waitingForMessage->decoder = WTFMove(message);
733 ASSERT(m_waitingForMessage->decoder);
734 m_waitForMessageCondition.notifyOne();
735 return;
736 }
737
738 if (m_waitingForMessage->waitForOptions.contains(WaitForOption::InterruptWaitingIfSyncMessageArrives) && message->isSyncMessage()) {
739 m_waitingForMessage->messageWaitingInterrupted = true;
740 m_waitForMessageCondition.notifyOne();
741 enqueueIncomingMessage(WTFMove(message));
742 return;
743 }
744 }
745 }
746
747 // Check if this is a sync message or if it's a message that should be dispatched even when waiting for
748 // a sync reply. If it is, and we're waiting for a sync reply this message needs to be dispatched.
749 // If we don't we'll end up with a deadlock where both sync message senders are stuck waiting for a reply.
750 if (SyncMessageState::singleton().processIncomingMessage(*this, message))
751 return;
752
753 enqueueIncomingMessage(WTFMove(message));
754}
755
756uint64_t Connection::installIncomingSyncMessageCallback(WTF::Function<void ()>&& callback)
757{
758 std::lock_guard<Lock> lock(m_incomingSyncMessageCallbackMutex);
759
760 m_nextIncomingSyncMessageCallbackID++;
761
762 if (!m_incomingSyncMessageCallbackQueue)
763 m_incomingSyncMessageCallbackQueue = WorkQueue::create("com.apple.WebKit.IPC.IncomingSyncMessageCallbackQueue");
764
765 m_incomingSyncMessageCallbacks.add(m_nextIncomingSyncMessageCallbackID, WTFMove(callback));
766
767 return m_nextIncomingSyncMessageCallbackID;
768}
769
770void Connection::uninstallIncomingSyncMessageCallback(uint64_t callbackID)
771{
772 std::lock_guard<Lock> lock(m_incomingSyncMessageCallbackMutex);
773 m_incomingSyncMessageCallbacks.remove(callbackID);
774}
775
776bool Connection::hasIncomingSyncMessage()
777{
778 std::lock_guard<Lock> lock(m_incomingMessagesMutex);
779
780 for (auto& message : m_incomingMessages) {
781 if (message->isSyncMessage())
782 return true;
783 }
784
785 return false;
786}
787
788void Connection::enableIncomingMessagesThrottling()
789{
790 if (m_incomingMessagesThrottler)
791 return;
792
793 m_incomingMessagesThrottler = std::make_unique<MessagesThrottler>(*this, &Connection::dispatchIncomingMessages);
794}
795
796void Connection::postConnectionDidCloseOnConnectionWorkQueue()
797{
798 m_connectionQueue->dispatch([protectedThis = makeRef(*this)]() mutable {
799 protectedThis->connectionDidClose();
800 });
801}
802
803void Connection::connectionDidClose()
804{
805 // The connection is now invalid.
806 platformInvalidate();
807
808 {
809 LockHolder locker(m_replyHandlersLock);
810 for (auto& replyHandler : m_replyHandlers.values()) {
811 replyHandler.dispatcher->dispatch([handler = WTFMove(replyHandler.handler)] {
812 handler(nullptr);
813 });
814 }
815
816 m_replyHandlers.clear();
817 }
818
819 {
820 LockHolder locker(m_syncReplyStateMutex);
821
822 ASSERT(m_shouldWaitForSyncReplies);
823 m_shouldWaitForSyncReplies = false;
824
825 if (!m_pendingSyncReplies.isEmpty())
826 SyncMessageState::singleton().wakeUpClientRunLoop();
827 }
828
829 {
830 std::lock_guard<Lock> lock(m_waitForMessageMutex);
831 if (m_waitingForMessage)
832 m_waitingForMessage->messageWaitingInterrupted = true;
833 }
834 m_waitForMessageCondition.notifyAll();
835
836 if (m_didCloseOnConnectionWorkQueueCallback)
837 m_didCloseOnConnectionWorkQueueCallback(this);
838
839 RunLoop::main().dispatch([protectedThis = makeRef(*this)]() mutable {
840 // If the connection has been explicitly invalidated before dispatchConnectionDidClose was called,
841 // then the connection will be invalid here.
842 if (!protectedThis->isValid())
843 return;
844
845 // Set m_isValid to false before calling didClose, otherwise, sendSync will try to send a message
846 // to the connection and will then wait indefinitely for a reply.
847 protectedThis->m_isValid = false;
848
849 protectedThis->m_client.didClose(protectedThis.get());
850
851 clearAsyncReplyHandlers(protectedThis.get());
852 });
853}
854
855bool Connection::canSendOutgoingMessages() const
856{
857 return m_isConnected && platformCanSendOutgoingMessages();
858}
859
860void Connection::sendOutgoingMessages()
861{
862 if (!canSendOutgoingMessages())
863 return;
864
865 while (true) {
866 std::unique_ptr<Encoder> message;
867
868 {
869 std::lock_guard<Lock> lock(m_outgoingMessagesMutex);
870 if (m_outgoingMessages.isEmpty())
871 break;
872 message = m_outgoingMessages.takeFirst();
873 }
874
875 if (!sendOutgoingMessage(WTFMove(message)))
876 break;
877 }
878}
879
880void Connection::dispatchSyncMessage(Decoder& decoder)
881{
882 ASSERT(decoder.isSyncMessage());
883
884 uint64_t syncRequestID = 0;
885 if (!decoder.decode(syncRequestID) || !syncRequestID) {
886 // We received an invalid sync message.
887 decoder.markInvalid();
888 return;
889 }
890
891 auto replyEncoder = std::make_unique<Encoder>("IPC", "SyncMessageReply", syncRequestID);
892
893 if (decoder.messageReceiverName() == "IPC" && decoder.messageName() == "WrappedAsyncMessageForTesting") {
894 if (!m_fullySynchronousModeIsAllowedForTesting) {
895 decoder.markInvalid();
896 return;
897 }
898 std::unique_ptr<Decoder> unwrappedDecoder = Decoder::unwrapForTesting(decoder);
899 RELEASE_ASSERT(unwrappedDecoder);
900 processIncomingMessage(WTFMove(unwrappedDecoder));
901
902 SyncMessageState::singleton().dispatchMessages(nullptr);
903 } else {
904 // Hand off both the decoder and encoder to the client.
905 m_client.didReceiveSyncMessage(*this, decoder, replyEncoder);
906 }
907
908 // FIXME: If the message was invalid, we should send back a SyncMessageError.
909 ASSERT(!decoder.isInvalid());
910
911 if (replyEncoder)
912 sendSyncReply(WTFMove(replyEncoder));
913}
914
915void Connection::dispatchDidReceiveInvalidMessage(const CString& messageReceiverNameString, const CString& messageNameString)
916{
917 ASSERT(RunLoop::isMain());
918
919 if (!isValid())
920 return;
921
922 m_client.didReceiveInvalidMessage(*this, StringReference(messageReceiverNameString.data(), messageReceiverNameString.length()), StringReference(messageNameString.data(), messageNameString.length()));
923}
924
925void Connection::didFailToSendSyncMessage()
926{
927 if (!m_shouldExitOnSyncMessageSendFailure)
928 return;
929
930 exit(0);
931}
932
933void Connection::enqueueIncomingMessage(std::unique_ptr<Decoder> incomingMessage)
934{
935 {
936 std::lock_guard<Lock> lock(m_incomingMessagesMutex);
937
938#if PLATFORM(COCOA)
939 if (m_wasKilled)
940 return;
941
942 if (m_incomingMessages.size() >= maxPendingIncomingMessagesKillingThreshold) {
943 if (kill()) {
944 RELEASE_LOG_ERROR(IPC, "%p - Connection::enqueueIncomingMessage: Over %zu incoming messages have been queued without the main thread processing them, killing the connection as the remote process seems to be misbehaving", this, maxPendingIncomingMessagesKillingThreshold);
945 m_incomingMessages.clear();
946 }
947 return;
948 }
949#endif
950
951 m_incomingMessages.append(WTFMove(incomingMessage));
952
953 if (m_incomingMessagesThrottler && m_incomingMessages.size() != 1)
954 return;
955 }
956
957 RunLoop::main().dispatch([protectedThis = makeRef(*this)]() mutable {
958 if (protectedThis->m_incomingMessagesThrottler)
959 protectedThis->dispatchIncomingMessages();
960 else
961 protectedThis->dispatchOneIncomingMessage();
962 });
963}
964
965void Connection::dispatchMessage(Decoder& decoder)
966{
967 RELEASE_ASSERT(isValid());
968 if (decoder.messageReceiverName() == "AsyncReply") {
969 Optional<uint64_t> listenerID;
970 decoder >> listenerID;
971 if (!listenerID) {
972 ASSERT_NOT_REACHED();
973 return;
974 }
975 auto handler = takeAsyncReplyHandler(*this, *listenerID);
976 if (!handler) {
977 ASSERT_NOT_REACHED();
978 return;
979 }
980 handler(&decoder);
981 return;
982 }
983 m_client.didReceiveMessage(*this, decoder);
984}
985
986void Connection::dispatchMessage(std::unique_ptr<Decoder> message)
987{
988 if (!isValid())
989 return;
990
991 if (message->shouldUseFullySynchronousModeForTesting()) {
992 if (!m_fullySynchronousModeIsAllowedForTesting) {
993 m_client.didReceiveInvalidMessage(*this, message->messageReceiverName(), message->messageName());
994 return;
995 }
996 m_inDispatchMessageMarkedToUseFullySynchronousModeForTesting++;
997 }
998
999 m_inDispatchMessageCount++;
1000
1001 if (message->shouldDispatchMessageWhenWaitingForSyncReply())
1002 m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount++;
1003
1004 bool oldDidReceiveInvalidMessage = m_didReceiveInvalidMessage;
1005 m_didReceiveInvalidMessage = false;
1006
1007 if (message->isSyncMessage())
1008 dispatchSyncMessage(*message);
1009 else
1010 dispatchMessage(*message);
1011
1012 m_didReceiveInvalidMessage |= message->isInvalid();
1013 m_inDispatchMessageCount--;
1014
1015 // FIXME: For synchronous messages, we should not decrement the counter until we send a response.
1016 // Otherwise, we would deadlock if processing the message results in a sync message back after we exit this function.
1017 if (message->shouldDispatchMessageWhenWaitingForSyncReply())
1018 m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount--;
1019
1020 if (message->shouldUseFullySynchronousModeForTesting())
1021 m_inDispatchMessageMarkedToUseFullySynchronousModeForTesting--;
1022
1023 if (m_didReceiveInvalidMessage && isValid())
1024 m_client.didReceiveInvalidMessage(*this, message->messageReceiverName(), message->messageName());
1025
1026 m_didReceiveInvalidMessage = oldDidReceiveInvalidMessage;
1027}
1028
1029Connection::MessagesThrottler::MessagesThrottler(Connection& connection, DispatchMessagesFunction dispatchMessages)
1030 : m_dispatchMessagesTimer(RunLoop::main(), &connection, dispatchMessages)
1031 , m_connection(connection)
1032 , m_dispatchMessages(dispatchMessages)
1033{
1034 ASSERT(RunLoop::isMain());
1035}
1036
1037void Connection::MessagesThrottler::scheduleMessagesDispatch()
1038{
1039 ASSERT(RunLoop::isMain());
1040
1041 if (m_throttlingLevel) {
1042 m_dispatchMessagesTimer.startOneShot(0_s);
1043 return;
1044 }
1045 RunLoop::main().dispatch([this, protectedConnection = makeRefPtr(&m_connection)]() mutable {
1046 (protectedConnection.get()->*m_dispatchMessages)();
1047 });
1048}
1049
1050size_t Connection::MessagesThrottler::numberOfMessagesToProcess(size_t totalMessages)
1051{
1052 ASSERT(RunLoop::isMain());
1053
1054 // Never dispatch more than 600 messages without returning to the run loop, we can go as low as 60 with maximum throttling level.
1055 static const size_t maxIncomingMessagesDispatchingBatchSize { 600 };
1056 static const unsigned maxThrottlingLevel = 9;
1057
1058 size_t batchSize = maxIncomingMessagesDispatchingBatchSize / (m_throttlingLevel + 1);
1059
1060 if (totalMessages > maxIncomingMessagesDispatchingBatchSize)
1061 m_throttlingLevel = std::min(m_throttlingLevel + 1, maxThrottlingLevel);
1062 else if (m_throttlingLevel)
1063 --m_throttlingLevel;
1064
1065 return std::min(totalMessages, batchSize);
1066}
1067
1068void Connection::dispatchOneIncomingMessage()
1069{
1070 std::unique_ptr<Decoder> message;
1071 {
1072 std::lock_guard<Lock> lock(m_incomingMessagesMutex);
1073 if (m_incomingMessages.isEmpty())
1074 return;
1075
1076 message = m_incomingMessages.takeFirst();
1077 }
1078
1079 dispatchMessage(WTFMove(message));
1080}
1081
1082void Connection::dispatchIncomingMessages()
1083{
1084 ASSERT(RunLoop::isMain());
1085
1086 std::unique_ptr<Decoder> message;
1087
1088 size_t messagesToProcess = 0;
1089 {
1090 std::lock_guard<Lock> lock(m_incomingMessagesMutex);
1091 if (m_incomingMessages.isEmpty())
1092 return;
1093
1094 message = m_incomingMessages.takeFirst();
1095
1096 // Incoming messages may get adding to the queue by the IPC thread while we're dispatching the messages below.
1097 // To make sure dispatchIncomingMessages() yields, we only ever process messages that were in the queue when
1098 // dispatchIncomingMessages() was called. Additionally, the MessageThrottler may further cap the number of
1099 // messages to process to make sure we give the main run loop a chance to process other events.
1100 messagesToProcess = m_incomingMessagesThrottler->numberOfMessagesToProcess(m_incomingMessages.size());
1101 if (messagesToProcess < m_incomingMessages.size()) {
1102 RELEASE_LOG_ERROR(IPC, "%p - Connection::dispatchIncomingMessages: IPC throttling was triggered (has %zu pending incoming messages, will only process %zu before yielding)", this, m_incomingMessages.size(), messagesToProcess);
1103#if PLATFORM(COCOA)
1104 RELEASE_LOG_ERROR(IPC, "%p - Connection::dispatchIncomingMessages: first IPC message in queue is %{public}s::%{public}s", this, message->messageReceiverName().toString().data(), message->messageName().toString().data());
1105#endif
1106 }
1107
1108 // Re-schedule ourselves *before* we dispatch the messages because we want to process follow-up messages if the client
1109 // spins a nested run loop while we're dispatching a message. Note that this means we can re-enter this method.
1110 if (!m_incomingMessages.isEmpty())
1111 m_incomingMessagesThrottler->scheduleMessagesDispatch();
1112 }
1113
1114 dispatchMessage(WTFMove(message));
1115
1116 for (size_t i = 1; i < messagesToProcess; ++i) {
1117 {
1118 std::lock_guard<Lock> lock(m_incomingMessagesMutex);
1119 if (m_incomingMessages.isEmpty())
1120 return;
1121
1122 message = m_incomingMessages.takeFirst();
1123 }
1124 dispatchMessage(WTFMove(message));
1125 }
1126}
1127
1128uint64_t nextAsyncReplyHandlerID()
1129{
1130 static uint64_t identifier { 0 };
1131 return ++identifier;
1132}
1133
1134void addAsyncReplyHandler(Connection& connection, uint64_t identifier, CompletionHandler<void(Decoder*)>&& completionHandler)
1135{
1136 auto result = asyncReplyHandlerMap().ensure(reinterpret_cast<uintptr_t>(&connection), [] {
1137 return HashMap<uint64_t, CompletionHandler<void(Decoder*)>>();
1138 }).iterator->value.add(identifier, WTFMove(completionHandler));
1139 ASSERT_UNUSED(result, result.isNewEntry);
1140}
1141
1142void clearAsyncReplyHandlers(const Connection& connection)
1143{
1144 auto map = asyncReplyHandlerMap().take(reinterpret_cast<uintptr_t>(&connection));
1145 for (auto& handler : map.values()) {
1146 if (handler)
1147 handler(nullptr);
1148 }
1149}
1150
1151CompletionHandler<void(Decoder*)> takeAsyncReplyHandler(Connection& connection, uint64_t identifier)
1152{
1153 auto iterator = asyncReplyHandlerMap().find(reinterpret_cast<uintptr_t>(&connection));
1154 if (iterator != asyncReplyHandlerMap().end()) {
1155 if (!iterator->value.isValidKey(identifier)) {
1156 ASSERT_NOT_REACHED();
1157 connection.markCurrentlyDispatchedMessageAsInvalid();
1158 return nullptr;
1159 }
1160 ASSERT(iterator->value.contains(identifier));
1161 return iterator->value.take(identifier);
1162 }
1163 ASSERT_NOT_REACHED();
1164 return nullptr;
1165}
1166
1167void Connection::wakeUpRunLoop()
1168{
1169 RunLoop::main().wakeUp();
1170}
1171
1172} // namespace IPC
1173