1/*
2 * Copyright (C) 2016 Apple Inc. All rights reserved.
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions
6 * are met:
7 * 1. Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * 2. Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
12 *
13 * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
14 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
15 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
16 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
17 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
18 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
19 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
20 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
21 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
22 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
23 * THE POSSIBILITY OF SUCH DAMAGE.
24 */
25
26#include "config.h"
27
28#include <chrono>
29#include <thread>
30
31#include <wtf/ASCIICType.h>
32#include <wtf/SynchronizedFixedQueue.h>
33#include <wtf/WorkQueue.h>
34#include <wtf/text/CString.h>
35#include <wtf/threads/BinarySemaphore.h>
36
37namespace TestWebKitAPI {
38
39static char const* textItem(size_t index)
40{
41 static char const* items[] = { "first", "second", "third", "fourth", "fifth", "sixth" };
42 return index < sizeof(items) / sizeof(items[0]) ? items[index] : nullptr;
43}
44
45static CString toUpper(const CString& lower)
46{
47 CString upper = lower;
48
49 for (char* buffer = upper.mutableData(); *buffer; ++buffer)
50 *buffer = toASCIIUpper(*buffer);
51
52 return upper;
53}
54
55template <size_t BufferSize>
56class ToUpperConverter {
57public:
58 ToUpperConverter()
59 : m_lowerQueue(SynchronizedFixedQueue<CString, BufferSize>::create())
60 , m_upperQueue(SynchronizedFixedQueue<CString, BufferSize>::create())
61 {
62 }
63
64 WorkQueue* produceQueue()
65 {
66 if (!m_produceQueue)
67 m_produceQueue = WorkQueue::create("org.webkit.Produce");
68 return m_produceQueue.get();
69 }
70
71 WorkQueue* consumeQueue()
72 {
73 if (!m_consumeQueue)
74 m_consumeQueue = WorkQueue::create("org.webkit.Consume");
75 return m_consumeQueue.get();
76 }
77
78 void startProducing()
79 {
80 if (isProducing())
81 return;
82
83 produceQueue()->dispatch([this] {
84 CString lower;
85 while (m_lowerQueue->dequeue(lower)) {
86 m_upperQueue->enqueue(toUpper(lower));
87 EXPECT_TRUE(lower == textItem(m_produceCount++));
88#if PLATFORM(WIN)
89 auto sleepAmount = std::chrono::milliseconds(20);
90#else
91 auto sleepAmount = std::chrono::milliseconds(10);
92#endif
93 std::this_thread::sleep_for(sleepAmount);
94 }
95 m_produceCloseSemaphore.signal();
96 });
97 }
98
99 void startConsuming()
100 {
101 if (isConsuming())
102 return;
103
104 consumeQueue()->dispatch([this] {
105 CString upper;
106 while (m_upperQueue->dequeue(upper)) {
107 EXPECT_TRUE(upper == toUpper(textItem(m_consumeCount++)));
108 std::this_thread::sleep_for(std::chrono::milliseconds(50));
109 }
110 m_consumeCloseSemaphore.signal();
111 });
112 }
113
114 void start()
115 {
116 startProducing();
117 startConsuming();
118 }
119
120 void stopProducing()
121 {
122 if (!isProducing())
123 return;
124
125 m_lowerQueue->close();
126 m_produceCloseSemaphore.wait();
127 m_produceQueue = nullptr;
128 }
129
130 void stopConsuming()
131 {
132 if (!isConsuming())
133 return;
134
135 m_upperQueue->close();
136 m_consumeCloseSemaphore.wait();
137 m_consumeQueue = nullptr;
138 }
139
140 void stop()
141 {
142 stopProducing();
143 stopConsuming();
144 }
145
146 void enqueueLower(const CString& lower)
147 {
148 m_lowerQueue->enqueue(lower);
149 }
150
151 bool isProducing() { return m_produceQueue; }
152 bool isConsuming() { return m_consumeQueue; }
153
154 size_t produceCount() const { return m_produceCount; }
155 size_t consumeCount() const { return m_consumeCount; }
156
157private:
158 Ref<SynchronizedFixedQueue<CString, BufferSize>> m_lowerQueue;
159 Ref<SynchronizedFixedQueue<CString, BufferSize>> m_upperQueue;
160 RefPtr<WorkQueue> m_produceQueue;
161 RefPtr<WorkQueue> m_consumeQueue;
162 BinarySemaphore m_produceCloseSemaphore;
163 BinarySemaphore m_consumeCloseSemaphore;
164 size_t m_produceCount { 0 };
165 size_t m_consumeCount { 0 };
166};
167
168TEST(WTF_SynchronizedFixedQueue, Basic)
169{
170 ToUpperConverter<4U> converter;
171
172 converter.start();
173 EXPECT_TRUE(converter.isProducing() && converter.isConsuming());
174
175 converter.stop();
176 EXPECT_FALSE(converter.isProducing() || converter.isConsuming());
177
178 EXPECT_EQ(converter.produceCount(), 0U);
179 EXPECT_EQ(converter.consumeCount(), 0U);
180}
181
182TEST(WTF_SynchronizedFixedQueue, ProduceOnly)
183{
184 ToUpperConverter<4U> converter;
185
186 converter.startProducing();
187 EXPECT_TRUE(converter.isProducing() && !converter.isConsuming());
188
189 size_t count = 0;
190 while (char const* item = textItem(count)) {
191 converter.enqueueLower(item);
192 ++count;
193
194 std::this_thread::sleep_for(std::chrono::milliseconds(1));
195 }
196
197 converter.stop();
198 EXPECT_FALSE(converter.isProducing() || converter.isConsuming());
199}
200
201TEST(WTF_SynchronizedFixedQueue, ConsumeOnly)
202{
203 ToUpperConverter<4U> converter;
204
205 converter.startConsuming();
206 EXPECT_TRUE(!converter.isProducing() && converter.isConsuming());
207
208 converter.stop();
209 EXPECT_FALSE(converter.isProducing() || converter.isConsuming());
210}
211
212TEST(WTF_SynchronizedFixedQueue, Limits)
213{
214 ToUpperConverter<4U> converter;
215
216 converter.start();
217 EXPECT_TRUE(converter.isProducing() && converter.isConsuming());
218
219 size_t count = 0;
220 while (char const* item = textItem(count)) {
221 converter.enqueueLower(item);
222 ++count;
223
224 std::this_thread::sleep_for(std::chrono::milliseconds(1));
225 }
226
227 std::this_thread::sleep_for(std::chrono::milliseconds(400));
228
229 converter.stop();
230 EXPECT_FALSE(converter.isProducing() || converter.isConsuming());
231
232 EXPECT_EQ(converter.produceCount(), count);
233 EXPECT_EQ(converter.consumeCount(), count);
234}
235
236}
237