1/*
2 * Copyright (C) 2015 Igalia S.L.
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions
6 * are met:
7 * 1. Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * 2. Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
12 *
13 * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
14 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
15 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
16 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
17 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
18 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
19 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
20 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
21 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
22 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
23 * THE POSSIBILITY OF SUCH DAMAGE.
24 */
25
26#include "config.h"
27#include "NetworkCacheIOChannel.h"
28
29#include "NetworkCacheFileSystem.h"
30#include <wtf/MainThread.h>
31#include <wtf/RunLoop.h>
32#include <wtf/glib/GUniquePtr.h>
33#include <wtf/glib/RunLoopSourcePriority.h>
34
35namespace WebKit {
36namespace NetworkCache {
37
38static const size_t gDefaultReadBufferSize = 4096;
39
40IOChannel::IOChannel(const String& filePath, Type type)
41 : m_path(filePath)
42 , m_type(type)
43{
44 auto path = FileSystem::fileSystemRepresentation(filePath);
45 GRefPtr<GFile> file = adoptGRef(g_file_new_for_path(path.data()));
46 switch (m_type) {
47 case Type::Create: {
48 g_file_delete(file.get(), nullptr, nullptr);
49 m_outputStream = adoptGRef(G_OUTPUT_STREAM(g_file_create(file.get(), static_cast<GFileCreateFlags>(G_FILE_CREATE_PRIVATE), nullptr, nullptr)));
50#if !HAVE(STAT_BIRTHTIME)
51 GUniquePtr<char> birthtimeString(g_strdup_printf("%" G_GUINT64_FORMAT, WallTime::now().secondsSinceEpoch().secondsAs<uint64_t>()));
52 g_file_set_attribute_string(file.get(), "xattr::birthtime", birthtimeString.get(), G_FILE_QUERY_INFO_NONE, nullptr, nullptr);
53#endif
54 break;
55 }
56 case Type::Write: {
57 m_ioStream = adoptGRef(g_file_open_readwrite(file.get(), nullptr, nullptr));
58 break;
59 }
60 case Type::Read:
61 m_inputStream = adoptGRef(G_INPUT_STREAM(g_file_read(file.get(), nullptr, nullptr)));
62 break;
63 }
64}
65
66IOChannel::~IOChannel()
67{
68 RELEASE_ASSERT(!m_wasDeleted.exchange(true));
69}
70
71Ref<IOChannel> IOChannel::open(const String& filePath, IOChannel::Type type)
72{
73 return adoptRef(*new IOChannel(filePath, type));
74}
75
76static inline void runTaskInQueue(Function<void ()>&& task, WorkQueue* queue)
77{
78 if (queue) {
79 queue->dispatch(WTFMove(task));
80 return;
81 }
82
83 // Using nullptr as queue submits the result to the main context.
84 RunLoop::main().dispatch(WTFMove(task));
85}
86
87static void fillDataFromReadBuffer(SoupBuffer* readBuffer, size_t size, Data& data)
88{
89 GRefPtr<SoupBuffer> buffer;
90 if (size != readBuffer->length) {
91 // The subbuffer does not copy the data.
92 buffer = adoptGRef(soup_buffer_new_subbuffer(readBuffer, 0, size));
93 } else
94 buffer = readBuffer;
95
96 if (data.isNull()) {
97 // First chunk, we need to force the data to be copied.
98 data = { reinterpret_cast<const uint8_t*>(buffer->data), size };
99 } else {
100 Data dataRead(WTFMove(buffer));
101 // Concatenate will copy the data.
102 data = concatenate(data, dataRead);
103 }
104}
105
106struct ReadAsyncData {
107 RefPtr<IOChannel> channel;
108 GRefPtr<SoupBuffer> buffer;
109 RefPtr<WorkQueue> queue;
110 size_t bytesToRead;
111 Function<void (Data&, int error)> completionHandler;
112 Data data;
113};
114
115static void inputStreamReadReadyCallback(GInputStream* stream, GAsyncResult* result, gpointer userData)
116{
117 std::unique_ptr<ReadAsyncData> asyncData(static_cast<ReadAsyncData*>(userData));
118 gssize bytesRead = g_input_stream_read_finish(stream, result, nullptr);
119 if (bytesRead == -1) {
120 WorkQueue* queue = asyncData->queue.get();
121 runTaskInQueue([asyncData = WTFMove(asyncData)] {
122 asyncData->completionHandler(asyncData->data, -1);
123 }, queue);
124 return;
125 }
126
127 if (!bytesRead) {
128 WorkQueue* queue = asyncData->queue.get();
129 runTaskInQueue([asyncData = WTFMove(asyncData)] {
130 asyncData->completionHandler(asyncData->data, 0);
131 }, queue);
132 return;
133 }
134
135 ASSERT(bytesRead > 0);
136 fillDataFromReadBuffer(asyncData->buffer.get(), static_cast<size_t>(bytesRead), asyncData->data);
137
138 size_t pendingBytesToRead = asyncData->bytesToRead - asyncData->data.size();
139 if (!pendingBytesToRead) {
140 WorkQueue* queue = asyncData->queue.get();
141 runTaskInQueue([asyncData = WTFMove(asyncData)] {
142 asyncData->completionHandler(asyncData->data, 0);
143 }, queue);
144 return;
145 }
146
147 size_t bytesToRead = std::min(pendingBytesToRead, asyncData->buffer->length);
148 // Use a local variable for the data buffer to pass it to g_input_stream_read_async(), because ReadAsyncData is released.
149 auto data = const_cast<char*>(asyncData->buffer->data);
150 g_input_stream_read_async(stream, data, bytesToRead, RunLoopSourcePriority::DiskCacheRead, nullptr,
151 reinterpret_cast<GAsyncReadyCallback>(inputStreamReadReadyCallback), asyncData.release());
152}
153
154void IOChannel::read(size_t offset, size_t size, WorkQueue* queue, Function<void (Data&, int error)>&& completionHandler)
155{
156 RefPtr<IOChannel> channel(this);
157 if (!m_inputStream) {
158 runTaskInQueue([channel, completionHandler = WTFMove(completionHandler)] {
159 Data data;
160 completionHandler(data, -1);
161 }, queue);
162 return;
163 }
164
165 if (!RunLoop::isMain()) {
166 readSyncInThread(offset, size, queue, WTFMove(completionHandler));
167 return;
168 }
169
170 size_t bufferSize = std::min(size, gDefaultReadBufferSize);
171 uint8_t* bufferData = static_cast<uint8_t*>(fastMalloc(bufferSize));
172 GRefPtr<SoupBuffer> buffer = adoptGRef(soup_buffer_new_with_owner(bufferData, bufferSize, bufferData, fastFree));
173 ReadAsyncData* asyncData = new ReadAsyncData { this, buffer.get(), queue, size, WTFMove(completionHandler), { } };
174
175 // FIXME: implement offset.
176 g_input_stream_read_async(m_inputStream.get(), const_cast<char*>(buffer->data), bufferSize, RunLoopSourcePriority::DiskCacheRead, nullptr,
177 reinterpret_cast<GAsyncReadyCallback>(inputStreamReadReadyCallback), asyncData);
178}
179
180void IOChannel::readSyncInThread(size_t offset, size_t size, WorkQueue* queue, Function<void (Data&, int error)>&& completionHandler)
181{
182 ASSERT(!RunLoop::isMain());
183
184 RefPtr<IOChannel> channel(this);
185 Thread::create("IOChannel::readSync", [channel, size, queue, completionHandler = WTFMove(completionHandler)] () mutable {
186 size_t bufferSize = std::min(size, gDefaultReadBufferSize);
187 uint8_t* bufferData = static_cast<uint8_t*>(fastMalloc(bufferSize));
188 GRefPtr<SoupBuffer> readBuffer = adoptGRef(soup_buffer_new_with_owner(bufferData, bufferSize, bufferData, fastFree));
189 Data data;
190 size_t pendingBytesToRead = size;
191 size_t bytesToRead = bufferSize;
192 do {
193 // FIXME: implement offset.
194 gssize bytesRead = g_input_stream_read(channel->m_inputStream.get(), const_cast<char*>(readBuffer->data), bytesToRead, nullptr, nullptr);
195 if (bytesRead == -1) {
196 runTaskInQueue([channel, completionHandler = WTFMove(completionHandler)] {
197 Data data;
198 completionHandler(data, -1);
199 }, queue);
200 return;
201 }
202
203 if (!bytesRead)
204 break;
205
206 ASSERT(bytesRead > 0);
207 fillDataFromReadBuffer(readBuffer.get(), static_cast<size_t>(bytesRead), data);
208
209 pendingBytesToRead = size - data.size();
210 bytesToRead = std::min(pendingBytesToRead, readBuffer->length);
211 } while (pendingBytesToRead);
212
213 GRefPtr<SoupBuffer> bufferCapture = data.soupBuffer();
214 runTaskInQueue([channel, bufferCapture, completionHandler = WTFMove(completionHandler)] {
215 GRefPtr<SoupBuffer> buffer = bufferCapture;
216 Data data = { WTFMove(buffer) };
217 completionHandler(data, 0);
218 }, queue);
219 })->detach();
220}
221
222struct WriteAsyncData {
223 RefPtr<IOChannel> channel;
224 GRefPtr<SoupBuffer> buffer;
225 RefPtr<WorkQueue> queue;
226 Function<void (int error)> completionHandler;
227};
228
229static void outputStreamWriteReadyCallback(GOutputStream* stream, GAsyncResult* result, gpointer userData)
230{
231 std::unique_ptr<WriteAsyncData> asyncData(static_cast<WriteAsyncData*>(userData));
232 gssize bytesWritten = g_output_stream_write_finish(stream, result, nullptr);
233 if (bytesWritten == -1) {
234 WorkQueue* queue = asyncData->queue.get();
235 runTaskInQueue([asyncData = WTFMove(asyncData)] {
236 asyncData->completionHandler(-1);
237 }, queue);
238 return;
239 }
240
241 gssize pendingBytesToWrite = asyncData->buffer->length - bytesWritten;
242 if (!pendingBytesToWrite) {
243 WorkQueue* queue = asyncData->queue.get();
244 runTaskInQueue([asyncData = WTFMove(asyncData)] {
245 asyncData->completionHandler(0);
246 }, queue);
247 return;
248 }
249
250 asyncData->buffer = adoptGRef(soup_buffer_new_subbuffer(asyncData->buffer.get(), bytesWritten, pendingBytesToWrite));
251 // Use a local variable for the data buffer to pass it to g_output_stream_write_async(), because WriteAsyncData is released.
252 auto data = asyncData->buffer->data;
253 g_output_stream_write_async(stream, data, pendingBytesToWrite, RunLoopSourcePriority::DiskCacheWrite, nullptr,
254 reinterpret_cast<GAsyncReadyCallback>(outputStreamWriteReadyCallback), asyncData.release());
255}
256
257void IOChannel::write(size_t offset, const Data& data, WorkQueue* queue, Function<void (int error)>&& completionHandler)
258{
259 RefPtr<IOChannel> channel(this);
260 if (!m_outputStream && !m_ioStream) {
261 runTaskInQueue([channel, completionHandler = WTFMove(completionHandler)] {
262 completionHandler(-1);
263 }, queue);
264 return;
265 }
266
267 GOutputStream* stream = m_outputStream ? m_outputStream.get() : g_io_stream_get_output_stream(G_IO_STREAM(m_ioStream.get()));
268 if (!stream) {
269 runTaskInQueue([channel, completionHandler = WTFMove(completionHandler)] {
270 completionHandler(-1);
271 }, queue);
272 return;
273 }
274
275 WriteAsyncData* asyncData = new WriteAsyncData { this, data.soupBuffer(), queue, WTFMove(completionHandler) };
276 // FIXME: implement offset.
277 g_output_stream_write_async(stream, asyncData->buffer->data, data.size(), RunLoopSourcePriority::DiskCacheWrite, nullptr,
278 reinterpret_cast<GAsyncReadyCallback>(outputStreamWriteReadyCallback), asyncData);
279}
280
281} // namespace NetworkCache
282} // namespace WebKit
283