libuv-svace-build
2672 строки · 77.0 Кб
1/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2*
3* Permission is hereby granted, free of charge, to any person obtaining a copy
4* of this software and associated documentation files (the "Software"), to
5* deal in the Software without restriction, including without limitation the
6* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7* sell copies of the Software, and to permit persons to whom the Software is
8* furnished to do so, subject to the following conditions:
9*
10* The above copyright notice and this permission notice shall be included in
11* all copies or substantial portions of the Software.
12*
13* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19* IN THE SOFTWARE.
20*/
21
22#include <assert.h>
23#include <io.h>
24#include <stdio.h>
25#include <stdlib.h>
26#include <string.h>
27
28#include "handle-inl.h"
29#include "internal.h"
30#include "req-inl.h"
31#include "stream-inl.h"
32#include "uv-common.h"
33#include "uv.h"
34
35#include <aclapi.h>
36#include <accctrl.h>
37
38/* A zero-size buffer for use by uv_pipe_read */
39static char uv_zero_[] = "";
40
41/* Null uv_buf_t */
42static const uv_buf_t uv_null_buf_ = { 0, NULL };
43
44/* The timeout that the pipe will wait for the remote end to write data when
45* the local ends wants to shut it down. */
46static const int64_t eof_timeout = 50; /* ms */
47
48static const int default_pending_pipe_instances = 4;
49
50/* Pipe prefix */
51static char pipe_prefix[] = "\\\\?\\pipe";
52static const size_t pipe_prefix_len = sizeof(pipe_prefix) - 1;
53
54/* IPC incoming xfer queue item. */
55typedef struct {
56uv__ipc_socket_xfer_type_t xfer_type;
57uv__ipc_socket_xfer_info_t xfer_info;
58struct uv__queue member;
59} uv__ipc_xfer_queue_item_t;
60
61/* IPC frame header flags. */
62/* clang-format off */
63enum {
64UV__IPC_FRAME_HAS_DATA = 0x01,
65UV__IPC_FRAME_HAS_SOCKET_XFER = 0x02,
66UV__IPC_FRAME_XFER_IS_TCP_CONNECTION = 0x04,
67/* These are combinations of the flags above. */
68UV__IPC_FRAME_XFER_FLAGS = 0x06,
69UV__IPC_FRAME_VALID_FLAGS = 0x07
70};
71/* clang-format on */
72
73/* IPC frame header. */
74typedef struct {
75uint32_t flags;
76uint32_t reserved1; /* Ignored. */
77uint32_t data_length; /* Must be zero if there is no data. */
78uint32_t reserved2; /* Must be zero. */
79} uv__ipc_frame_header_t;
80
81/* To implement the IPC protocol correctly, these structures must have exactly
82* the right size. */
83STATIC_ASSERT(sizeof(uv__ipc_frame_header_t) == 16);
84STATIC_ASSERT(sizeof(uv__ipc_socket_xfer_info_t) == 632);
85
86/* Coalesced write request. */
87typedef struct {
88uv_write_t req; /* Internal heap-allocated write request. */
89uv_write_t* user_req; /* Pointer to user-specified uv_write_t. */
90} uv__coalesced_write_t;
91
92
93static void eof_timer_init(uv_pipe_t* pipe);
94static void eof_timer_start(uv_pipe_t* pipe);
95static void eof_timer_stop(uv_pipe_t* pipe);
96static void eof_timer_cb(uv_timer_t* timer);
97static void eof_timer_destroy(uv_pipe_t* pipe);
98static void eof_timer_close_cb(uv_handle_t* handle);
99
100
101/* Does the file path contain embedded nul bytes? */
102static int includes_nul(const char *s, size_t n) {
103if (n == 0)
104return 0;
105return NULL != memchr(s, '\0', n);
106}
107
108
109static void uv__unique_pipe_name(char* ptr, char* name, size_t size) {
110snprintf(name, size, "\\\\?\\pipe\\uv\\%p-%lu", ptr, GetCurrentProcessId());
111}
112
113
114int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
115uv__stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE);
116
117handle->reqs_pending = 0;
118handle->handle = INVALID_HANDLE_VALUE;
119handle->name = NULL;
120handle->pipe.conn.ipc_remote_pid = 0;
121handle->pipe.conn.ipc_data_frame.payload_remaining = 0;
122uv__queue_init(&handle->pipe.conn.ipc_xfer_queue);
123handle->pipe.conn.ipc_xfer_queue_length = 0;
124handle->ipc = ipc;
125handle->pipe.conn.non_overlapped_writes_tail = NULL;
126
127return 0;
128}
129
130
131static void uv__pipe_connection_init(uv_pipe_t* handle) {
132assert(!(handle->flags & UV_HANDLE_PIPESERVER));
133uv__connection_init((uv_stream_t*) handle);
134handle->read_req.data = handle;
135handle->pipe.conn.eof_timer = NULL;
136}
137
138
139static HANDLE open_named_pipe(const WCHAR* name, DWORD* duplex_flags) {
140HANDLE pipeHandle;
141
142/*
143* Assume that we have a duplex pipe first, so attempt to
144* connect with GENERIC_READ | GENERIC_WRITE.
145*/
146pipeHandle = CreateFileW(name,
147GENERIC_READ | GENERIC_WRITE,
1480,
149NULL,
150OPEN_EXISTING,
151FILE_FLAG_OVERLAPPED,
152NULL);
153if (pipeHandle != INVALID_HANDLE_VALUE) {
154*duplex_flags = UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
155return pipeHandle;
156}
157
158/*
159* If the pipe is not duplex CreateFileW fails with
160* ERROR_ACCESS_DENIED. In that case try to connect
161* as a read-only or write-only.
162*/
163if (GetLastError() == ERROR_ACCESS_DENIED) {
164pipeHandle = CreateFileW(name,
165GENERIC_READ | FILE_WRITE_ATTRIBUTES,
1660,
167NULL,
168OPEN_EXISTING,
169FILE_FLAG_OVERLAPPED,
170NULL);
171
172if (pipeHandle != INVALID_HANDLE_VALUE) {
173*duplex_flags = UV_HANDLE_READABLE;
174return pipeHandle;
175}
176}
177
178if (GetLastError() == ERROR_ACCESS_DENIED) {
179pipeHandle = CreateFileW(name,
180GENERIC_WRITE | FILE_READ_ATTRIBUTES,
1810,
182NULL,
183OPEN_EXISTING,
184FILE_FLAG_OVERLAPPED,
185NULL);
186
187if (pipeHandle != INVALID_HANDLE_VALUE) {
188*duplex_flags = UV_HANDLE_WRITABLE;
189return pipeHandle;
190}
191}
192
193return INVALID_HANDLE_VALUE;
194}
195
196
197static void close_pipe(uv_pipe_t* pipe) {
198assert(pipe->u.fd == -1 || pipe->u.fd > 2);
199if (pipe->u.fd == -1)
200CloseHandle(pipe->handle);
201else
202_close(pipe->u.fd);
203
204pipe->u.fd = -1;
205pipe->handle = INVALID_HANDLE_VALUE;
206}
207
208
209static int uv__pipe_server(
210HANDLE* pipeHandle_ptr, DWORD access,
211char* name, size_t nameSize, char* random) {
212HANDLE pipeHandle;
213int err;
214
215for (;;) {
216uv__unique_pipe_name(random, name, nameSize);
217
218pipeHandle = CreateNamedPipeA(name,
219access | FILE_FLAG_FIRST_PIPE_INSTANCE,
220PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, 1, 65536, 65536, 0,
221NULL);
222
223if (pipeHandle != INVALID_HANDLE_VALUE) {
224/* No name collisions. We're done. */
225break;
226}
227
228err = GetLastError();
229if (err != ERROR_PIPE_BUSY && err != ERROR_ACCESS_DENIED) {
230goto error;
231}
232
233/* Pipe name collision. Increment the random number and try again. */
234random++;
235}
236
237*pipeHandle_ptr = pipeHandle;
238
239return 0;
240
241error:
242if (pipeHandle != INVALID_HANDLE_VALUE)
243CloseHandle(pipeHandle);
244
245return err;
246}
247
248
249static int uv__create_pipe_pair(
250HANDLE* server_pipe_ptr, HANDLE* client_pipe_ptr,
251unsigned int server_flags, unsigned int client_flags,
252int inherit_client, char* random) {
253/* allowed flags are: UV_READABLE_PIPE | UV_WRITABLE_PIPE | UV_NONBLOCK_PIPE */
254char pipe_name[64];
255SECURITY_ATTRIBUTES sa;
256DWORD server_access;
257DWORD client_access;
258HANDLE server_pipe;
259HANDLE client_pipe;
260int err;
261
262server_pipe = INVALID_HANDLE_VALUE;
263client_pipe = INVALID_HANDLE_VALUE;
264
265server_access = 0;
266if (server_flags & UV_READABLE_PIPE)
267server_access |= PIPE_ACCESS_INBOUND;
268if (server_flags & UV_WRITABLE_PIPE)
269server_access |= PIPE_ACCESS_OUTBOUND;
270if (server_flags & UV_NONBLOCK_PIPE)
271server_access |= FILE_FLAG_OVERLAPPED;
272server_access |= WRITE_DAC;
273
274client_access = 0;
275if (client_flags & UV_READABLE_PIPE)
276client_access |= GENERIC_READ;
277else
278client_access |= FILE_READ_ATTRIBUTES;
279if (client_flags & UV_WRITABLE_PIPE)
280client_access |= GENERIC_WRITE;
281else
282client_access |= FILE_WRITE_ATTRIBUTES;
283client_access |= WRITE_DAC;
284
285/* Create server pipe handle. */
286err = uv__pipe_server(&server_pipe,
287server_access,
288pipe_name,
289sizeof(pipe_name),
290random);
291if (err)
292goto error;
293
294/* Create client pipe handle. */
295sa.nLength = sizeof sa;
296sa.lpSecurityDescriptor = NULL;
297sa.bInheritHandle = inherit_client;
298
299client_pipe = CreateFileA(pipe_name,
300client_access,
3010,
302&sa,
303OPEN_EXISTING,
304(client_flags & UV_NONBLOCK_PIPE) ? FILE_FLAG_OVERLAPPED : 0,
305NULL);
306if (client_pipe == INVALID_HANDLE_VALUE) {
307err = GetLastError();
308goto error;
309}
310
311#ifndef NDEBUG
312/* Validate that the pipe was opened in the right mode. */
313{
314DWORD mode;
315BOOL r;
316r = GetNamedPipeHandleState(client_pipe, &mode, NULL, NULL, NULL, NULL, 0);
317if (r == TRUE) {
318assert(mode == (PIPE_READMODE_BYTE | PIPE_WAIT));
319} else {
320fprintf(stderr, "libuv assertion failure: GetNamedPipeHandleState failed\n");
321}
322}
323#endif
324
325/* Do a blocking ConnectNamedPipe. This should not block because we have
326* both ends of the pipe created. */
327if (!ConnectNamedPipe(server_pipe, NULL)) {
328if (GetLastError() != ERROR_PIPE_CONNECTED) {
329err = GetLastError();
330goto error;
331}
332}
333
334*client_pipe_ptr = client_pipe;
335*server_pipe_ptr = server_pipe;
336return 0;
337
338error:
339if (server_pipe != INVALID_HANDLE_VALUE)
340CloseHandle(server_pipe);
341
342if (client_pipe != INVALID_HANDLE_VALUE)
343CloseHandle(client_pipe);
344
345return err;
346}
347
348
349int uv_pipe(uv_file fds[2], int read_flags, int write_flags) {
350uv_file temp[2];
351int err;
352HANDLE readh;
353HANDLE writeh;
354
355/* Make the server side the inbound (read) end, */
356/* so that both ends will have FILE_READ_ATTRIBUTES permission. */
357/* TODO: better source of local randomness than &fds? */
358read_flags |= UV_READABLE_PIPE;
359write_flags |= UV_WRITABLE_PIPE;
360err = uv__create_pipe_pair(&readh, &writeh, read_flags, write_flags, 0, (char*) &fds[0]);
361if (err != 0)
362return err;
363temp[0] = _open_osfhandle((intptr_t) readh, 0);
364if (temp[0] == -1) {
365if (errno == UV_EMFILE)
366err = UV_EMFILE;
367else
368err = UV_UNKNOWN;
369CloseHandle(readh);
370CloseHandle(writeh);
371return err;
372}
373temp[1] = _open_osfhandle((intptr_t) writeh, 0);
374if (temp[1] == -1) {
375if (errno == UV_EMFILE)
376err = UV_EMFILE;
377else
378err = UV_UNKNOWN;
379_close(temp[0]);
380CloseHandle(writeh);
381return err;
382}
383fds[0] = temp[0];
384fds[1] = temp[1];
385return 0;
386}
387
388
389int uv__create_stdio_pipe_pair(uv_loop_t* loop,
390uv_pipe_t* parent_pipe, HANDLE* child_pipe_ptr, unsigned int flags) {
391/* The parent_pipe is always the server_pipe and kept by libuv.
392* The child_pipe is always the client_pipe and is passed to the child.
393* The flags are specified with respect to their usage in the child. */
394HANDLE server_pipe;
395HANDLE client_pipe;
396unsigned int server_flags;
397unsigned int client_flags;
398int err;
399
400uv__pipe_connection_init(parent_pipe);
401
402server_pipe = INVALID_HANDLE_VALUE;
403client_pipe = INVALID_HANDLE_VALUE;
404
405server_flags = 0;
406client_flags = 0;
407if (flags & UV_READABLE_PIPE) {
408/* The server needs inbound (read) access too, otherwise CreateNamedPipe()
409* won't give us the FILE_READ_ATTRIBUTES permission. We need that to probe
410* the state of the write buffer when we're trying to shutdown the pipe. */
411server_flags |= UV_READABLE_PIPE | UV_WRITABLE_PIPE;
412client_flags |= UV_READABLE_PIPE;
413}
414if (flags & UV_WRITABLE_PIPE) {
415server_flags |= UV_READABLE_PIPE;
416client_flags |= UV_WRITABLE_PIPE;
417}
418server_flags |= UV_NONBLOCK_PIPE;
419if (flags & UV_NONBLOCK_PIPE || parent_pipe->ipc) {
420client_flags |= UV_NONBLOCK_PIPE;
421}
422
423err = uv__create_pipe_pair(&server_pipe, &client_pipe,
424server_flags, client_flags, 1, (char*) server_pipe);
425if (err)
426goto error;
427
428if (CreateIoCompletionPort(server_pipe,
429loop->iocp,
430(ULONG_PTR) parent_pipe,
4310) == NULL) {
432err = GetLastError();
433goto error;
434}
435
436parent_pipe->handle = server_pipe;
437*child_pipe_ptr = client_pipe;
438
439/* The server end is now readable and/or writable. */
440if (flags & UV_READABLE_PIPE)
441parent_pipe->flags |= UV_HANDLE_WRITABLE;
442if (flags & UV_WRITABLE_PIPE)
443parent_pipe->flags |= UV_HANDLE_READABLE;
444
445return 0;
446
447error:
448if (server_pipe != INVALID_HANDLE_VALUE)
449CloseHandle(server_pipe);
450
451if (client_pipe != INVALID_HANDLE_VALUE)
452CloseHandle(client_pipe);
453
454return err;
455}
456
457
458static int uv__set_pipe_handle(uv_loop_t* loop,
459uv_pipe_t* handle,
460HANDLE pipeHandle,
461int fd,
462DWORD duplex_flags) {
463NTSTATUS nt_status;
464IO_STATUS_BLOCK io_status;
465FILE_MODE_INFORMATION mode_info;
466DWORD mode = PIPE_READMODE_BYTE | PIPE_WAIT;
467DWORD current_mode = 0;
468DWORD err = 0;
469
470assert(handle->flags & UV_HANDLE_CONNECTION);
471assert(!(handle->flags & UV_HANDLE_PIPESERVER));
472if (handle->flags & UV_HANDLE_CLOSING)
473return UV_EINVAL;
474if (handle->handle != INVALID_HANDLE_VALUE)
475return UV_EBUSY;
476
477if (!SetNamedPipeHandleState(pipeHandle, &mode, NULL, NULL)) {
478err = GetLastError();
479if (err == ERROR_ACCESS_DENIED) {
480/*
481* SetNamedPipeHandleState can fail if the handle doesn't have either
482* GENERIC_WRITE or FILE_WRITE_ATTRIBUTES.
483* But if the handle already has the desired wait and blocking modes
484* we can continue.
485*/
486if (!GetNamedPipeHandleState(pipeHandle, ¤t_mode, NULL, NULL,
487NULL, NULL, 0)) {
488return uv_translate_sys_error(GetLastError());
489} else if (current_mode & PIPE_NOWAIT) {
490return UV_EACCES;
491}
492} else {
493/* If this returns ERROR_INVALID_PARAMETER we probably opened
494* something that is not a pipe. */
495if (err == ERROR_INVALID_PARAMETER) {
496return UV_ENOTSOCK;
497}
498return uv_translate_sys_error(err);
499}
500}
501
502/* Check if the pipe was created with FILE_FLAG_OVERLAPPED. */
503nt_status = pNtQueryInformationFile(pipeHandle,
504&io_status,
505&mode_info,
506sizeof(mode_info),
507FileModeInformation);
508if (nt_status != STATUS_SUCCESS) {
509return uv_translate_sys_error(err);
510}
511
512if (mode_info.Mode & FILE_SYNCHRONOUS_IO_ALERT ||
513mode_info.Mode & FILE_SYNCHRONOUS_IO_NONALERT) {
514/* Non-overlapped pipe. */
515handle->flags |= UV_HANDLE_NON_OVERLAPPED_PIPE;
516handle->pipe.conn.readfile_thread_handle = NULL;
517InitializeCriticalSection(&handle->pipe.conn.readfile_thread_lock);
518} else {
519/* Overlapped pipe. Try to associate with IOCP. */
520if (CreateIoCompletionPort(pipeHandle,
521loop->iocp,
522(ULONG_PTR) handle,
5230) == NULL) {
524handle->flags |= UV_HANDLE_EMULATE_IOCP;
525}
526}
527
528handle->handle = pipeHandle;
529handle->u.fd = fd;
530handle->flags |= duplex_flags;
531
532return 0;
533}
534
535
536static int pipe_alloc_accept(uv_loop_t* loop, uv_pipe_t* handle,
537uv_pipe_accept_t* req, BOOL firstInstance) {
538assert(req->pipeHandle == INVALID_HANDLE_VALUE);
539
540req->pipeHandle =
541CreateNamedPipeW(handle->name,
542PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | WRITE_DAC |
543(firstInstance ? FILE_FLAG_FIRST_PIPE_INSTANCE : 0),
544PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
545PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL);
546
547if (req->pipeHandle == INVALID_HANDLE_VALUE) {
548return 0;
549}
550
551/* Associate it with IOCP so we can get events. */
552if (CreateIoCompletionPort(req->pipeHandle,
553loop->iocp,
554(ULONG_PTR) handle,
5550) == NULL) {
556uv_fatal_error(GetLastError(), "CreateIoCompletionPort");
557}
558
559/* Stash a handle in the server object for use from places such as
560* getsockname and chmod. As we transfer ownership of these to client
561* objects, we'll allocate new ones here. */
562handle->handle = req->pipeHandle;
563
564return 1;
565}
566
567
568static DWORD WINAPI pipe_shutdown_thread_proc(void* parameter) {
569uv_loop_t* loop;
570uv_pipe_t* handle;
571uv_shutdown_t* req;
572
573req = (uv_shutdown_t*) parameter;
574assert(req);
575handle = (uv_pipe_t*) req->handle;
576assert(handle);
577loop = handle->loop;
578assert(loop);
579
580FlushFileBuffers(handle->handle);
581
582/* Post completed */
583POST_COMPLETION_FOR_REQ(loop, req);
584
585return 0;
586}
587
588
589void uv__pipe_shutdown(uv_loop_t* loop, uv_pipe_t* handle, uv_shutdown_t *req) {
590DWORD result;
591NTSTATUS nt_status;
592IO_STATUS_BLOCK io_status;
593FILE_PIPE_LOCAL_INFORMATION pipe_info;
594
595assert(handle->flags & UV_HANDLE_CONNECTION);
596assert(req != NULL);
597assert(handle->stream.conn.write_reqs_pending == 0);
598SET_REQ_SUCCESS(req);
599
600if (handle->flags & UV_HANDLE_CLOSING) {
601uv__insert_pending_req(loop, (uv_req_t*) req);
602return;
603}
604
605/* Try to avoid flushing the pipe buffer in the thread pool. */
606nt_status = pNtQueryInformationFile(handle->handle,
607&io_status,
608&pipe_info,
609sizeof pipe_info,
610FilePipeLocalInformation);
611
612if (nt_status != STATUS_SUCCESS) {
613SET_REQ_ERROR(req, pRtlNtStatusToDosError(nt_status));
614handle->flags |= UV_HANDLE_WRITABLE; /* Questionable. */
615uv__insert_pending_req(loop, (uv_req_t*) req);
616return;
617}
618
619if (pipe_info.OutboundQuota == pipe_info.WriteQuotaAvailable) {
620/* Short-circuit, no need to call FlushFileBuffers:
621* all writes have been read. */
622uv__insert_pending_req(loop, (uv_req_t*) req);
623return;
624}
625
626/* Run FlushFileBuffers in the thread pool. */
627result = QueueUserWorkItem(pipe_shutdown_thread_proc,
628req,
629WT_EXECUTELONGFUNCTION);
630if (!result) {
631SET_REQ_ERROR(req, GetLastError());
632handle->flags |= UV_HANDLE_WRITABLE; /* Questionable. */
633uv__insert_pending_req(loop, (uv_req_t*) req);
634return;
635}
636}
637
638
639void uv__pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
640uv__ipc_xfer_queue_item_t* xfer_queue_item;
641
642assert(handle->reqs_pending == 0);
643assert(handle->flags & UV_HANDLE_CLOSING);
644assert(!(handle->flags & UV_HANDLE_CLOSED));
645
646if (handle->flags & UV_HANDLE_CONNECTION) {
647/* Free pending sockets */
648while (!uv__queue_empty(&handle->pipe.conn.ipc_xfer_queue)) {
649struct uv__queue* q;
650SOCKET socket;
651
652q = uv__queue_head(&handle->pipe.conn.ipc_xfer_queue);
653uv__queue_remove(q);
654xfer_queue_item = uv__queue_data(q, uv__ipc_xfer_queue_item_t, member);
655
656/* Materialize socket and close it */
657socket = WSASocketW(FROM_PROTOCOL_INFO,
658FROM_PROTOCOL_INFO,
659FROM_PROTOCOL_INFO,
660&xfer_queue_item->xfer_info.socket_info,
6610,
662WSA_FLAG_OVERLAPPED);
663uv__free(xfer_queue_item);
664
665if (socket != INVALID_SOCKET)
666closesocket(socket);
667}
668handle->pipe.conn.ipc_xfer_queue_length = 0;
669
670if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
671if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
672UnregisterWait(handle->read_req.wait_handle);
673handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
674}
675if (handle->read_req.event_handle != NULL) {
676CloseHandle(handle->read_req.event_handle);
677handle->read_req.event_handle = NULL;
678}
679}
680
681if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)
682DeleteCriticalSection(&handle->pipe.conn.readfile_thread_lock);
683}
684
685if (handle->flags & UV_HANDLE_PIPESERVER) {
686assert(handle->pipe.serv.accept_reqs);
687uv__free(handle->pipe.serv.accept_reqs);
688handle->pipe.serv.accept_reqs = NULL;
689}
690
691uv__handle_close(handle);
692}
693
694
695void uv_pipe_pending_instances(uv_pipe_t* handle, int count) {
696if (handle->flags & UV_HANDLE_BOUND)
697return;
698handle->pipe.serv.pending_instances = count;
699handle->flags |= UV_HANDLE_PIPESERVER;
700}
701
702
703/* Creates a pipe server. */
704int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
705return uv_pipe_bind2(handle, name, strlen(name), 0);
706}
707
708
709int uv_pipe_bind2(uv_pipe_t* handle,
710const char* name,
711size_t namelen,
712unsigned int flags) {
713uv_loop_t* loop = handle->loop;
714int i, err;
715uv_pipe_accept_t* req;
716char* name_copy;
717
718if (flags & ~UV_PIPE_NO_TRUNCATE) {
719return UV_EINVAL;
720}
721
722if (name == NULL) {
723return UV_EINVAL;
724}
725
726if (namelen == 0) {
727return UV_EINVAL;
728}
729
730if (includes_nul(name, namelen)) {
731return UV_EINVAL;
732}
733
734if (handle->flags & UV_HANDLE_BOUND) {
735return UV_EINVAL;
736}
737
738if (uv__is_closing(handle)) {
739return UV_EINVAL;
740}
741
742name_copy = uv__malloc(namelen + 1);
743if (name_copy == NULL) {
744return UV_ENOMEM;
745}
746
747memcpy(name_copy, name, namelen);
748name_copy[namelen] = '\0';
749
750if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
751handle->pipe.serv.pending_instances = default_pending_pipe_instances;
752}
753
754err = UV_ENOMEM;
755handle->pipe.serv.accept_reqs = (uv_pipe_accept_t*)
756uv__malloc(sizeof(uv_pipe_accept_t) * handle->pipe.serv.pending_instances);
757if (handle->pipe.serv.accept_reqs == NULL) {
758goto error;
759}
760
761for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
762req = &handle->pipe.serv.accept_reqs[i];
763UV_REQ_INIT(req, UV_ACCEPT);
764req->data = handle;
765req->pipeHandle = INVALID_HANDLE_VALUE;
766req->next_pending = NULL;
767}
768
769/* TODO(bnoordhuis) Add converters that take a |length| parameter. */
770err = uv__convert_utf8_to_utf16(name_copy, &handle->name);
771uv__free(name_copy);
772name_copy = NULL;
773
774if (err) {
775goto error;
776}
777
778/*
779* Attempt to create the first pipe with FILE_FLAG_FIRST_PIPE_INSTANCE.
780* If this fails then there's already a pipe server for the given pipe name.
781*/
782if (!pipe_alloc_accept(loop,
783handle,
784&handle->pipe.serv.accept_reqs[0],
785TRUE)) {
786err = GetLastError();
787if (err == ERROR_ACCESS_DENIED) {
788err = UV_EADDRINUSE;
789} else if (err == ERROR_PATH_NOT_FOUND || err == ERROR_INVALID_NAME) {
790err = UV_EACCES;
791} else {
792err = uv_translate_sys_error(err);
793}
794goto error;
795}
796
797handle->pipe.serv.pending_accepts = NULL;
798handle->flags |= UV_HANDLE_PIPESERVER;
799handle->flags |= UV_HANDLE_BOUND;
800
801return 0;
802
803error:
804uv__free(handle->pipe.serv.accept_reqs);
805uv__free(handle->name);
806uv__free(name_copy);
807handle->pipe.serv.accept_reqs = NULL;
808handle->name = NULL;
809
810return err;
811}
812
813
814static DWORD WINAPI pipe_connect_thread_proc(void* parameter) {
815uv_loop_t* loop;
816uv_pipe_t* handle;
817uv_connect_t* req;
818HANDLE pipeHandle = INVALID_HANDLE_VALUE;
819DWORD duplex_flags;
820
821req = (uv_connect_t*) parameter;
822assert(req);
823handle = (uv_pipe_t*) req->handle;
824assert(handle);
825loop = handle->loop;
826assert(loop);
827
828/* We're here because CreateFile on a pipe returned ERROR_PIPE_BUSY. We wait
829* up to 30 seconds for the pipe to become available with WaitNamedPipe. */
830while (WaitNamedPipeW(req->u.connect.name, 30000)) {
831/* The pipe is now available, try to connect. */
832pipeHandle = open_named_pipe(req->u.connect.name, &duplex_flags);
833if (pipeHandle != INVALID_HANDLE_VALUE)
834break;
835
836SwitchToThread();
837}
838
839uv__free(req->u.connect.name);
840req->u.connect.name = NULL;
841if (pipeHandle != INVALID_HANDLE_VALUE) {
842SET_REQ_SUCCESS(req);
843req->u.connect.pipeHandle = pipeHandle;
844req->u.connect.duplex_flags = duplex_flags;
845} else {
846SET_REQ_ERROR(req, GetLastError());
847}
848
849/* Post completed */
850POST_COMPLETION_FOR_REQ(loop, req);
851
852return 0;
853}
854
855
856void uv_pipe_connect(uv_connect_t* req,
857uv_pipe_t* handle,
858const char* name,
859uv_connect_cb cb) {
860uv_loop_t* loop;
861int err;
862
863err = uv_pipe_connect2(req, handle, name, strlen(name), 0, cb);
864
865if (err) {
866loop = handle->loop;
867/* Make this req pending reporting an error. */
868SET_REQ_ERROR(req, err);
869uv__insert_pending_req(loop, (uv_req_t*) req);
870handle->reqs_pending++;
871REGISTER_HANDLE_REQ(loop, handle, req);
872}
873}
874
875
876int uv_pipe_connect2(uv_connect_t* req,
877uv_pipe_t* handle,
878const char* name,
879size_t namelen,
880unsigned int flags,
881uv_connect_cb cb) {
882uv_loop_t* loop;
883int err;
884size_t nameSize;
885HANDLE pipeHandle = INVALID_HANDLE_VALUE;
886DWORD duplex_flags;
887char* name_copy;
888
889loop = handle->loop;
890UV_REQ_INIT(req, UV_CONNECT);
891req->handle = (uv_stream_t*) handle;
892req->cb = cb;
893req->u.connect.pipeHandle = INVALID_HANDLE_VALUE;
894req->u.connect.duplex_flags = 0;
895req->u.connect.name = NULL;
896
897if (flags & ~UV_PIPE_NO_TRUNCATE) {
898return UV_EINVAL;
899}
900
901if (name == NULL) {
902return UV_EINVAL;
903}
904
905if (namelen == 0) {
906return UV_EINVAL;
907}
908
909if (includes_nul(name, namelen)) {
910return UV_EINVAL;
911}
912
913name_copy = uv__malloc(namelen + 1);
914if (name_copy == NULL) {
915return UV_ENOMEM;
916}
917
918memcpy(name_copy, name, namelen);
919name_copy[namelen] = '\0';
920
921if (handle->flags & UV_HANDLE_PIPESERVER) {
922err = ERROR_INVALID_PARAMETER;
923goto error;
924}
925if (handle->flags & UV_HANDLE_CONNECTION) {
926err = ERROR_PIPE_BUSY;
927goto error;
928}
929uv__pipe_connection_init(handle);
930
931/* TODO(bnoordhuis) Add converters that take a |length| parameter. */
932err = uv__convert_utf8_to_utf16(name_copy, &handle->name);
933uv__free(name_copy);
934name_copy = NULL;
935
936if (err) {
937err = ERROR_NO_UNICODE_TRANSLATION;
938goto error;
939}
940
941pipeHandle = open_named_pipe(handle->name, &duplex_flags);
942if (pipeHandle == INVALID_HANDLE_VALUE) {
943if (GetLastError() == ERROR_PIPE_BUSY) {
944nameSize = (wcslen(handle->name) + 1) * sizeof(WCHAR);
945req->u.connect.name = uv__malloc(nameSize);
946if (!req->u.connect.name) {
947uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
948}
949
950memcpy(req->u.connect.name, handle->name, nameSize);
951
952/* Wait for the server to make a pipe instance available. */
953if (!QueueUserWorkItem(&pipe_connect_thread_proc,
954req,
955WT_EXECUTELONGFUNCTION)) {
956uv__free(req->u.connect.name);
957req->u.connect.name = NULL;
958err = GetLastError();
959goto error;
960}
961
962REGISTER_HANDLE_REQ(loop, handle, req);
963handle->reqs_pending++;
964
965return 0;
966}
967
968err = GetLastError();
969goto error;
970}
971
972req->u.connect.pipeHandle = pipeHandle;
973req->u.connect.duplex_flags = duplex_flags;
974SET_REQ_SUCCESS(req);
975uv__insert_pending_req(loop, (uv_req_t*) req);
976handle->reqs_pending++;
977REGISTER_HANDLE_REQ(loop, handle, req);
978return 0;
979
980error:
981uv__free(name_copy);
982
983if (handle->name) {
984uv__free(handle->name);
985handle->name = NULL;
986}
987
988if (pipeHandle != INVALID_HANDLE_VALUE)
989CloseHandle(pipeHandle);
990
991/* Make this req pending reporting an error. */
992SET_REQ_ERROR(req, err);
993uv__insert_pending_req(loop, (uv_req_t*) req);
994handle->reqs_pending++;
995REGISTER_HANDLE_REQ(loop, handle, req);
996return 0;
997}
998
999
1000void uv__pipe_interrupt_read(uv_pipe_t* handle) {
1001BOOL r;
1002
1003if (!(handle->flags & UV_HANDLE_READ_PENDING))
1004return; /* No pending reads. */
1005if (handle->flags & UV_HANDLE_CANCELLATION_PENDING)
1006return; /* Already cancelled. */
1007if (handle->handle == INVALID_HANDLE_VALUE)
1008return; /* Pipe handle closed. */
1009
1010if (!(handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)) {
1011/* Cancel asynchronous read. */
1012r = CancelIoEx(handle->handle, &handle->read_req.u.io.overlapped);
1013assert(r || GetLastError() == ERROR_NOT_FOUND);
1014(void) r;
1015} else {
1016/* Cancel synchronous read (which is happening in the thread pool). */
1017HANDLE thread;
1018volatile HANDLE* thread_ptr = &handle->pipe.conn.readfile_thread_handle;
1019
1020EnterCriticalSection(&handle->pipe.conn.readfile_thread_lock);
1021
1022thread = *thread_ptr;
1023if (thread == NULL) {
1024/* The thread pool thread has not yet reached the point of blocking, we
1025* can pre-empt it by setting thread_handle to INVALID_HANDLE_VALUE. */
1026*thread_ptr = INVALID_HANDLE_VALUE;
1027
1028} else {
1029/* Spin until the thread has acknowledged (by setting the thread to
1030* INVALID_HANDLE_VALUE) that it is past the point of blocking. */
1031while (thread != INVALID_HANDLE_VALUE) {
1032r = CancelSynchronousIo(thread);
1033assert(r || GetLastError() == ERROR_NOT_FOUND);
1034SwitchToThread(); /* Yield thread. */
1035thread = *thread_ptr;
1036}
1037}
1038
1039LeaveCriticalSection(&handle->pipe.conn.readfile_thread_lock);
1040}
1041
1042/* Set flag to indicate that read has been cancelled. */
1043handle->flags |= UV_HANDLE_CANCELLATION_PENDING;
1044}
1045
1046
1047void uv__pipe_read_stop(uv_pipe_t* handle) {
1048handle->flags &= ~UV_HANDLE_READING;
1049DECREASE_ACTIVE_COUNT(handle->loop, handle);
1050uv__pipe_interrupt_read(handle);
1051}
1052
1053
1054/* Cleans up uv_pipe_t (server or connection) and all resources associated with
1055* it. */
1056void uv__pipe_close(uv_loop_t* loop, uv_pipe_t* handle) {
1057int i;
1058HANDLE pipeHandle;
1059
1060if (handle->flags & UV_HANDLE_READING) {
1061handle->flags &= ~UV_HANDLE_READING;
1062DECREASE_ACTIVE_COUNT(loop, handle);
1063}
1064
1065if (handle->flags & UV_HANDLE_LISTENING) {
1066handle->flags &= ~UV_HANDLE_LISTENING;
1067DECREASE_ACTIVE_COUNT(loop, handle);
1068}
1069
1070handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
1071
1072uv__handle_closing(handle);
1073
1074uv__pipe_interrupt_read(handle);
1075
1076if (handle->name) {
1077uv__free(handle->name);
1078handle->name = NULL;
1079}
1080
1081if (handle->flags & UV_HANDLE_PIPESERVER) {
1082for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
1083pipeHandle = handle->pipe.serv.accept_reqs[i].pipeHandle;
1084if (pipeHandle != INVALID_HANDLE_VALUE) {
1085CloseHandle(pipeHandle);
1086handle->pipe.serv.accept_reqs[i].pipeHandle = INVALID_HANDLE_VALUE;
1087}
1088}
1089handle->handle = INVALID_HANDLE_VALUE;
1090}
1091
1092if (handle->flags & UV_HANDLE_CONNECTION) {
1093eof_timer_destroy(handle);
1094}
1095
1096if ((handle->flags & UV_HANDLE_CONNECTION)
1097&& handle->handle != INVALID_HANDLE_VALUE) {
1098/* This will eventually destroy the write queue for us too. */
1099close_pipe(handle);
1100}
1101
1102if (handle->reqs_pending == 0)
1103uv__want_endgame(loop, (uv_handle_t*) handle);
1104}
1105
1106
1107static void uv__pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle,
1108uv_pipe_accept_t* req, BOOL firstInstance) {
1109assert(handle->flags & UV_HANDLE_LISTENING);
1110
1111if (!firstInstance && !pipe_alloc_accept(loop, handle, req, FALSE)) {
1112SET_REQ_ERROR(req, GetLastError());
1113uv__insert_pending_req(loop, (uv_req_t*) req);
1114handle->reqs_pending++;
1115return;
1116}
1117
1118assert(req->pipeHandle != INVALID_HANDLE_VALUE);
1119
1120/* Prepare the overlapped structure. */
1121memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped));
1122
1123if (!ConnectNamedPipe(req->pipeHandle, &req->u.io.overlapped) &&
1124GetLastError() != ERROR_IO_PENDING) {
1125if (GetLastError() == ERROR_PIPE_CONNECTED) {
1126SET_REQ_SUCCESS(req);
1127} else {
1128CloseHandle(req->pipeHandle);
1129req->pipeHandle = INVALID_HANDLE_VALUE;
1130/* Make this req pending reporting an error. */
1131SET_REQ_ERROR(req, GetLastError());
1132}
1133uv__insert_pending_req(loop, (uv_req_t*) req);
1134handle->reqs_pending++;
1135return;
1136}
1137
1138/* Wait for completion via IOCP */
1139handle->reqs_pending++;
1140}
1141
1142
1143int uv__pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
1144uv_loop_t* loop = server->loop;
1145uv_pipe_t* pipe_client;
1146uv_pipe_accept_t* req;
1147struct uv__queue* q;
1148uv__ipc_xfer_queue_item_t* item;
1149int err;
1150
1151if (server->ipc) {
1152if (uv__queue_empty(&server->pipe.conn.ipc_xfer_queue)) {
1153/* No valid pending sockets. */
1154return WSAEWOULDBLOCK;
1155}
1156
1157q = uv__queue_head(&server->pipe.conn.ipc_xfer_queue);
1158uv__queue_remove(q);
1159server->pipe.conn.ipc_xfer_queue_length--;
1160item = uv__queue_data(q, uv__ipc_xfer_queue_item_t, member);
1161
1162err = uv__tcp_xfer_import(
1163(uv_tcp_t*) client, item->xfer_type, &item->xfer_info);
1164
1165uv__free(item);
1166
1167if (err != 0)
1168return err;
1169
1170} else {
1171pipe_client = (uv_pipe_t*) client;
1172uv__pipe_connection_init(pipe_client);
1173
1174/* Find a connection instance that has been connected, but not yet
1175* accepted. */
1176req = server->pipe.serv.pending_accepts;
1177
1178if (!req) {
1179/* No valid connections found, so we error out. */
1180return WSAEWOULDBLOCK;
1181}
1182
1183/* Initialize the client handle and copy the pipeHandle to the client */
1184pipe_client->handle = req->pipeHandle;
1185pipe_client->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
1186
1187/* Prepare the req to pick up a new connection */
1188server->pipe.serv.pending_accepts = req->next_pending;
1189req->next_pending = NULL;
1190req->pipeHandle = INVALID_HANDLE_VALUE;
1191
1192server->handle = INVALID_HANDLE_VALUE;
1193if (!(server->flags & UV_HANDLE_CLOSING)) {
1194uv__pipe_queue_accept(loop, server, req, FALSE);
1195}
1196}
1197
1198return 0;
1199}
1200
1201
1202/* Starts listening for connections for the given pipe. */
1203int uv__pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
1204uv_loop_t* loop = handle->loop;
1205int i;
1206
1207if (handle->flags & UV_HANDLE_LISTENING) {
1208handle->stream.serv.connection_cb = cb;
1209}
1210
1211if (!(handle->flags & UV_HANDLE_BOUND)) {
1212return WSAEINVAL;
1213}
1214
1215if (handle->flags & UV_HANDLE_READING) {
1216return WSAEISCONN;
1217}
1218
1219if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
1220return ERROR_NOT_SUPPORTED;
1221}
1222
1223if (handle->ipc) {
1224return WSAEINVAL;
1225}
1226
1227handle->flags |= UV_HANDLE_LISTENING;
1228INCREASE_ACTIVE_COUNT(loop, handle);
1229handle->stream.serv.connection_cb = cb;
1230
1231/* First pipe handle should have already been created in uv_pipe_bind */
1232assert(handle->pipe.serv.accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE);
1233
1234for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
1235uv__pipe_queue_accept(loop, handle, &handle->pipe.serv.accept_reqs[i], i == 0);
1236}
1237
1238return 0;
1239}
1240
1241
1242static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* arg) {
1243uv_read_t* req = (uv_read_t*) arg;
1244uv_pipe_t* handle = (uv_pipe_t*) req->data;
1245uv_loop_t* loop = handle->loop;
1246volatile HANDLE* thread_ptr = &handle->pipe.conn.readfile_thread_handle;
1247CRITICAL_SECTION* lock = &handle->pipe.conn.readfile_thread_lock;
1248HANDLE thread;
1249DWORD bytes;
1250DWORD err;
1251
1252assert(req->type == UV_READ);
1253assert(handle->type == UV_NAMED_PIPE);
1254
1255err = 0;
1256
1257/* Create a handle to the current thread. */
1258if (!DuplicateHandle(GetCurrentProcess(),
1259GetCurrentThread(),
1260GetCurrentProcess(),
1261&thread,
12620,
1263FALSE,
1264DUPLICATE_SAME_ACCESS)) {
1265err = GetLastError();
1266goto out1;
1267}
1268
1269/* The lock needs to be held when thread handle is modified. */
1270EnterCriticalSection(lock);
1271if (*thread_ptr == INVALID_HANDLE_VALUE) {
1272/* uv__pipe_interrupt_read() cancelled reading before we got here. */
1273err = ERROR_OPERATION_ABORTED;
1274} else {
1275/* Let main thread know which worker thread is doing the blocking read. */
1276assert(*thread_ptr == NULL);
1277*thread_ptr = thread;
1278}
1279LeaveCriticalSection(lock);
1280
1281if (err)
1282goto out2;
1283
1284/* Block the thread until data is available on the pipe, or the read is
1285* cancelled. */
1286if (!ReadFile(handle->handle, &uv_zero_, 0, &bytes, NULL))
1287err = GetLastError();
1288
1289/* Let the main thread know the worker is past the point of blocking. */
1290assert(thread == *thread_ptr);
1291*thread_ptr = INVALID_HANDLE_VALUE;
1292
1293/* Briefly acquire the mutex. Since the main thread holds the lock while it
1294* is spinning trying to cancel this thread's I/O, we will block here until
1295* it stops doing that. */
1296EnterCriticalSection(lock);
1297LeaveCriticalSection(lock);
1298
1299out2:
1300/* Close the handle to the current thread. */
1301CloseHandle(thread);
1302
1303out1:
1304/* Set request status and post a completion record to the IOCP. */
1305if (err)
1306SET_REQ_ERROR(req, err);
1307else
1308SET_REQ_SUCCESS(req);
1309POST_COMPLETION_FOR_REQ(loop, req);
1310
1311return 0;
1312}
1313
1314
1315static DWORD WINAPI uv_pipe_writefile_thread_proc(void* parameter) {
1316int result;
1317DWORD bytes;
1318uv_write_t* req = (uv_write_t*) parameter;
1319uv_pipe_t* handle = (uv_pipe_t*) req->handle;
1320uv_loop_t* loop = handle->loop;
1321
1322assert(req != NULL);
1323assert(req->type == UV_WRITE);
1324assert(handle->type == UV_NAMED_PIPE);
1325
1326result = WriteFile(handle->handle,
1327req->write_buffer.base,
1328req->write_buffer.len,
1329&bytes,
1330NULL);
1331
1332if (!result) {
1333SET_REQ_ERROR(req, GetLastError());
1334}
1335
1336POST_COMPLETION_FOR_REQ(loop, req);
1337return 0;
1338}
1339
1340
1341static void CALLBACK post_completion_read_wait(void* context, BOOLEAN timed_out) {
1342uv_read_t* req;
1343uv_tcp_t* handle;
1344
1345req = (uv_read_t*) context;
1346assert(req != NULL);
1347handle = (uv_tcp_t*)req->data;
1348assert(handle != NULL);
1349assert(!timed_out);
1350
1351if (!PostQueuedCompletionStatus(handle->loop->iocp,
1352req->u.io.overlapped.InternalHigh,
13530,
1354&req->u.io.overlapped)) {
1355uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
1356}
1357}
1358
1359
1360static void CALLBACK post_completion_write_wait(void* context, BOOLEAN timed_out) {
1361uv_write_t* req;
1362uv_tcp_t* handle;
1363
1364req = (uv_write_t*) context;
1365assert(req != NULL);
1366handle = (uv_tcp_t*)req->handle;
1367assert(handle != NULL);
1368assert(!timed_out);
1369
1370if (!PostQueuedCompletionStatus(handle->loop->iocp,
1371req->u.io.overlapped.InternalHigh,
13720,
1373&req->u.io.overlapped)) {
1374uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
1375}
1376}
1377
1378
1379static void uv__pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) {
1380uv_read_t* req;
1381int result;
1382
1383assert(handle->flags & UV_HANDLE_READING);
1384assert(!(handle->flags & UV_HANDLE_READ_PENDING));
1385
1386assert(handle->handle != INVALID_HANDLE_VALUE);
1387
1388req = &handle->read_req;
1389
1390if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
1391handle->pipe.conn.readfile_thread_handle = NULL; /* Reset cancellation. */
1392if (!QueueUserWorkItem(&uv_pipe_zero_readfile_thread_proc,
1393req,
1394WT_EXECUTELONGFUNCTION)) {
1395/* Make this req pending reporting an error. */
1396SET_REQ_ERROR(req, GetLastError());
1397goto error;
1398}
1399} else {
1400memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
1401if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1402assert(req->event_handle != NULL);
1403req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
1404}
1405
1406/* Do 0-read */
1407result = ReadFile(handle->handle,
1408&uv_zero_,
14090,
1410NULL,
1411&req->u.io.overlapped);
1412
1413if (!result && GetLastError() != ERROR_IO_PENDING) {
1414/* Make this req pending reporting an error. */
1415SET_REQ_ERROR(req, GetLastError());
1416goto error;
1417}
1418
1419if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1420if (req->wait_handle == INVALID_HANDLE_VALUE) {
1421if (!RegisterWaitForSingleObject(&req->wait_handle,
1422req->event_handle, post_completion_read_wait, (void*) req,
1423INFINITE, WT_EXECUTEINWAITTHREAD)) {
1424SET_REQ_ERROR(req, GetLastError());
1425goto error;
1426}
1427}
1428}
1429}
1430
1431/* Start the eof timer if there is one */
1432eof_timer_start(handle);
1433handle->flags |= UV_HANDLE_READ_PENDING;
1434handle->reqs_pending++;
1435return;
1436
1437error:
1438uv__insert_pending_req(loop, (uv_req_t*)req);
1439handle->flags |= UV_HANDLE_READ_PENDING;
1440handle->reqs_pending++;
1441}
1442
1443
1444int uv__pipe_read_start(uv_pipe_t* handle,
1445uv_alloc_cb alloc_cb,
1446uv_read_cb read_cb) {
1447uv_loop_t* loop = handle->loop;
1448
1449handle->flags |= UV_HANDLE_READING;
1450INCREASE_ACTIVE_COUNT(loop, handle);
1451handle->read_cb = read_cb;
1452handle->alloc_cb = alloc_cb;
1453
1454/* If reading was stopped and then started again, there could still be a read
1455* request pending. */
1456if (!(handle->flags & UV_HANDLE_READ_PENDING)) {
1457if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
1458handle->read_req.event_handle == NULL) {
1459handle->read_req.event_handle = CreateEvent(NULL, 0, 0, NULL);
1460if (handle->read_req.event_handle == NULL) {
1461uv_fatal_error(GetLastError(), "CreateEvent");
1462}
1463}
1464uv__pipe_queue_read(loop, handle);
1465}
1466
1467return 0;
1468}
1469
1470
1471static void uv__insert_non_overlapped_write_req(uv_pipe_t* handle,
1472uv_write_t* req) {
1473req->next_req = NULL;
1474if (handle->pipe.conn.non_overlapped_writes_tail) {
1475req->next_req =
1476handle->pipe.conn.non_overlapped_writes_tail->next_req;
1477handle->pipe.conn.non_overlapped_writes_tail->next_req = (uv_req_t*)req;
1478handle->pipe.conn.non_overlapped_writes_tail = req;
1479} else {
1480req->next_req = (uv_req_t*)req;
1481handle->pipe.conn.non_overlapped_writes_tail = req;
1482}
1483}
1484
1485
1486static uv_write_t* uv_remove_non_overlapped_write_req(uv_pipe_t* handle) {
1487uv_write_t* req;
1488
1489if (handle->pipe.conn.non_overlapped_writes_tail) {
1490req = (uv_write_t*)handle->pipe.conn.non_overlapped_writes_tail->next_req;
1491
1492if (req == handle->pipe.conn.non_overlapped_writes_tail) {
1493handle->pipe.conn.non_overlapped_writes_tail = NULL;
1494} else {
1495handle->pipe.conn.non_overlapped_writes_tail->next_req =
1496req->next_req;
1497}
1498
1499return req;
1500} else {
1501/* queue empty */
1502return NULL;
1503}
1504}
1505
1506
1507static void uv__queue_non_overlapped_write(uv_pipe_t* handle) {
1508uv_write_t* req = uv_remove_non_overlapped_write_req(handle);
1509if (req) {
1510if (!QueueUserWorkItem(&uv_pipe_writefile_thread_proc,
1511req,
1512WT_EXECUTELONGFUNCTION)) {
1513uv_fatal_error(GetLastError(), "QueueUserWorkItem");
1514}
1515}
1516}
1517
1518
1519static int uv__build_coalesced_write_req(uv_write_t* user_req,
1520const uv_buf_t bufs[],
1521size_t nbufs,
1522uv_write_t** req_out,
1523uv_buf_t* write_buf_out) {
1524/* Pack into a single heap-allocated buffer:
1525* (a) a uv_write_t structure where libuv stores the actual state.
1526* (b) a pointer to the original uv_write_t.
1527* (c) data from all `bufs` entries.
1528*/
1529char* heap_buffer;
1530size_t heap_buffer_length, heap_buffer_offset;
1531uv__coalesced_write_t* coalesced_write_req; /* (a) + (b) */
1532char* data_start; /* (c) */
1533size_t data_length;
1534unsigned int i;
1535
1536/* Compute combined size of all combined buffers from `bufs`. */
1537data_length = 0;
1538for (i = 0; i < nbufs; i++)
1539data_length += bufs[i].len;
1540
1541/* The total combined size of data buffers should not exceed UINT32_MAX,
1542* because WriteFile() won't accept buffers larger than that. */
1543if (data_length > UINT32_MAX)
1544return WSAENOBUFS; /* Maps to UV_ENOBUFS. */
1545
1546/* Compute heap buffer size. */
1547heap_buffer_length = sizeof *coalesced_write_req + /* (a) + (b) */
1548data_length; /* (c) */
1549
1550/* Allocate buffer. */
1551heap_buffer = uv__malloc(heap_buffer_length);
1552if (heap_buffer == NULL)
1553return ERROR_NOT_ENOUGH_MEMORY; /* Maps to UV_ENOMEM. */
1554
1555/* Copy uv_write_t information to the buffer. */
1556coalesced_write_req = (uv__coalesced_write_t*) heap_buffer;
1557coalesced_write_req->req = *user_req; /* copy (a) */
1558coalesced_write_req->req.coalesced = 1;
1559coalesced_write_req->user_req = user_req; /* copy (b) */
1560heap_buffer_offset = sizeof *coalesced_write_req; /* offset (a) + (b) */
1561
1562/* Copy data buffers to the heap buffer. */
1563data_start = &heap_buffer[heap_buffer_offset];
1564for (i = 0; i < nbufs; i++) {
1565memcpy(&heap_buffer[heap_buffer_offset],
1566bufs[i].base,
1567bufs[i].len); /* copy (c) */
1568heap_buffer_offset += bufs[i].len; /* offset (c) */
1569}
1570assert(heap_buffer_offset == heap_buffer_length);
1571
1572/* Set out arguments and return. */
1573*req_out = &coalesced_write_req->req;
1574*write_buf_out = uv_buf_init(data_start, (unsigned int) data_length);
1575return 0;
1576}
1577
1578
1579static int uv__pipe_write_data(uv_loop_t* loop,
1580uv_write_t* req,
1581uv_pipe_t* handle,
1582const uv_buf_t bufs[],
1583size_t nbufs,
1584uv_write_cb cb,
1585int copy_always) {
1586int err;
1587int result;
1588uv_buf_t write_buf;
1589
1590assert(handle->handle != INVALID_HANDLE_VALUE);
1591
1592UV_REQ_INIT(req, UV_WRITE);
1593req->handle = (uv_stream_t*) handle;
1594req->send_handle = NULL;
1595req->cb = cb;
1596/* Private fields. */
1597req->coalesced = 0;
1598req->event_handle = NULL;
1599req->wait_handle = INVALID_HANDLE_VALUE;
1600
1601/* Prepare the overlapped structure. */
1602memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
1603if (handle->flags & (UV_HANDLE_EMULATE_IOCP | UV_HANDLE_BLOCKING_WRITES)) {
1604req->event_handle = CreateEvent(NULL, 0, 0, NULL);
1605if (req->event_handle == NULL) {
1606uv_fatal_error(GetLastError(), "CreateEvent");
1607}
1608req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
1609}
1610req->write_buffer = uv_null_buf_;
1611
1612if (nbufs == 0) {
1613/* Write empty buffer. */
1614write_buf = uv_null_buf_;
1615} else if (nbufs == 1 && !copy_always) {
1616/* Write directly from bufs[0]. */
1617write_buf = bufs[0];
1618} else {
1619/* Coalesce all `bufs` into one big buffer. This also creates a new
1620* write-request structure that replaces the old one. */
1621err = uv__build_coalesced_write_req(req, bufs, nbufs, &req, &write_buf);
1622if (err != 0)
1623return err;
1624}
1625
1626if ((handle->flags &
1627(UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) ==
1628(UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) {
1629DWORD bytes;
1630result =
1631WriteFile(handle->handle, write_buf.base, write_buf.len, &bytes, NULL);
1632
1633if (!result) {
1634err = GetLastError();
1635return err;
1636} else {
1637/* Request completed immediately. */
1638req->u.io.queued_bytes = 0;
1639}
1640
1641REGISTER_HANDLE_REQ(loop, handle, req);
1642handle->reqs_pending++;
1643handle->stream.conn.write_reqs_pending++;
1644POST_COMPLETION_FOR_REQ(loop, req);
1645return 0;
1646} else if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
1647req->write_buffer = write_buf;
1648uv__insert_non_overlapped_write_req(handle, req);
1649if (handle->stream.conn.write_reqs_pending == 0) {
1650uv__queue_non_overlapped_write(handle);
1651}
1652
1653/* Request queued by the kernel. */
1654req->u.io.queued_bytes = write_buf.len;
1655handle->write_queue_size += req->u.io.queued_bytes;
1656} else if (handle->flags & UV_HANDLE_BLOCKING_WRITES) {
1657/* Using overlapped IO, but wait for completion before returning */
1658result = WriteFile(handle->handle,
1659write_buf.base,
1660write_buf.len,
1661NULL,
1662&req->u.io.overlapped);
1663
1664if (!result && GetLastError() != ERROR_IO_PENDING) {
1665err = GetLastError();
1666CloseHandle(req->event_handle);
1667req->event_handle = NULL;
1668return err;
1669}
1670
1671if (result) {
1672/* Request completed immediately. */
1673req->u.io.queued_bytes = 0;
1674} else {
1675/* Request queued by the kernel. */
1676req->u.io.queued_bytes = write_buf.len;
1677handle->write_queue_size += req->u.io.queued_bytes;
1678if (WaitForSingleObject(req->event_handle, INFINITE) !=
1679WAIT_OBJECT_0) {
1680err = GetLastError();
1681CloseHandle(req->event_handle);
1682req->event_handle = NULL;
1683return err;
1684}
1685}
1686CloseHandle(req->event_handle);
1687req->event_handle = NULL;
1688
1689REGISTER_HANDLE_REQ(loop, handle, req);
1690handle->reqs_pending++;
1691handle->stream.conn.write_reqs_pending++;
1692return 0;
1693} else {
1694result = WriteFile(handle->handle,
1695write_buf.base,
1696write_buf.len,
1697NULL,
1698&req->u.io.overlapped);
1699
1700if (!result && GetLastError() != ERROR_IO_PENDING) {
1701return GetLastError();
1702}
1703
1704if (result) {
1705/* Request completed immediately. */
1706req->u.io.queued_bytes = 0;
1707} else {
1708/* Request queued by the kernel. */
1709req->u.io.queued_bytes = write_buf.len;
1710handle->write_queue_size += req->u.io.queued_bytes;
1711}
1712
1713if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1714if (!RegisterWaitForSingleObject(&req->wait_handle,
1715req->event_handle, post_completion_write_wait, (void*) req,
1716INFINITE, WT_EXECUTEINWAITTHREAD)) {
1717return GetLastError();
1718}
1719}
1720}
1721
1722REGISTER_HANDLE_REQ(loop, handle, req);
1723handle->reqs_pending++;
1724handle->stream.conn.write_reqs_pending++;
1725
1726return 0;
1727}
1728
1729
1730static DWORD uv__pipe_get_ipc_remote_pid(uv_pipe_t* handle) {
1731DWORD* pid = &handle->pipe.conn.ipc_remote_pid;
1732
1733/* If the both ends of the IPC pipe are owned by the same process,
1734* the remote end pid may not yet be set. If so, do it here.
1735* TODO: this is weird; it'd probably better to use a handshake. */
1736if (*pid == 0) {
1737GetNamedPipeClientProcessId(handle->handle, pid);
1738if (*pid == GetCurrentProcessId()) {
1739GetNamedPipeServerProcessId(handle->handle, pid);
1740}
1741}
1742
1743return *pid;
1744}
1745
1746
1747int uv__pipe_write_ipc(uv_loop_t* loop,
1748uv_write_t* req,
1749uv_pipe_t* handle,
1750const uv_buf_t data_bufs[],
1751size_t data_buf_count,
1752uv_stream_t* send_handle,
1753uv_write_cb cb) {
1754uv_buf_t stack_bufs[6];
1755uv_buf_t* bufs;
1756size_t buf_count, buf_index;
1757uv__ipc_frame_header_t frame_header;
1758uv__ipc_socket_xfer_type_t xfer_type = UV__IPC_SOCKET_XFER_NONE;
1759uv__ipc_socket_xfer_info_t xfer_info;
1760uint64_t data_length;
1761size_t i;
1762int err;
1763
1764/* Compute the combined size of data buffers. */
1765data_length = 0;
1766for (i = 0; i < data_buf_count; i++)
1767data_length += data_bufs[i].len;
1768if (data_length > UINT32_MAX)
1769return WSAENOBUFS; /* Maps to UV_ENOBUFS. */
1770
1771/* Prepare the frame's socket xfer payload. */
1772if (send_handle != NULL) {
1773uv_tcp_t* send_tcp_handle = (uv_tcp_t*) send_handle;
1774
1775/* Verify that `send_handle` it is indeed a tcp handle. */
1776if (send_tcp_handle->type != UV_TCP)
1777return ERROR_NOT_SUPPORTED;
1778
1779/* Export the tcp handle. */
1780err = uv__tcp_xfer_export(send_tcp_handle,
1781uv__pipe_get_ipc_remote_pid(handle),
1782&xfer_type,
1783&xfer_info);
1784if (err != 0)
1785return err;
1786}
1787
1788/* Compute the number of uv_buf_t's required. */
1789buf_count = 1 + data_buf_count; /* Frame header and data buffers. */
1790if (send_handle != NULL)
1791buf_count += 1; /* One extra for the socket xfer information. */
1792
1793/* Use the on-stack buffer array if it is big enough; otherwise allocate
1794* space for it on the heap. */
1795if (buf_count < ARRAY_SIZE(stack_bufs)) {
1796/* Use on-stack buffer array. */
1797bufs = stack_bufs;
1798} else {
1799/* Use heap-allocated buffer array. */
1800bufs = uv__calloc(buf_count, sizeof(uv_buf_t));
1801if (bufs == NULL)
1802return ERROR_NOT_ENOUGH_MEMORY; /* Maps to UV_ENOMEM. */
1803}
1804buf_index = 0;
1805
1806/* Initialize frame header and add it to the buffers list. */
1807memset(&frame_header, 0, sizeof frame_header);
1808bufs[buf_index++] = uv_buf_init((char*) &frame_header, sizeof frame_header);
1809
1810if (send_handle != NULL) {
1811/* Add frame header flags. */
1812switch (xfer_type) {
1813case UV__IPC_SOCKET_XFER_TCP_CONNECTION:
1814frame_header.flags |= UV__IPC_FRAME_HAS_SOCKET_XFER |
1815UV__IPC_FRAME_XFER_IS_TCP_CONNECTION;
1816break;
1817case UV__IPC_SOCKET_XFER_TCP_SERVER:
1818frame_header.flags |= UV__IPC_FRAME_HAS_SOCKET_XFER;
1819break;
1820default:
1821assert(0); /* Unreachable. */
1822}
1823/* Add xfer info buffer. */
1824bufs[buf_index++] = uv_buf_init((char*) &xfer_info, sizeof xfer_info);
1825}
1826
1827if (data_length > 0) {
1828/* Update frame header. */
1829frame_header.flags |= UV__IPC_FRAME_HAS_DATA;
1830frame_header.data_length = (uint32_t) data_length;
1831/* Add data buffers to buffers list. */
1832for (i = 0; i < data_buf_count; i++)
1833bufs[buf_index++] = data_bufs[i];
1834}
1835
1836/* Write buffers. We set the `always_copy` flag, so it is not a problem that
1837* some of the written data lives on the stack. */
1838err = uv__pipe_write_data(loop, req, handle, bufs, buf_count, cb, 1);
1839
1840/* If we had to heap-allocate the bufs array, free it now. */
1841if (bufs != stack_bufs) {
1842uv__free(bufs);
1843}
1844
1845return err;
1846}
1847
1848
1849int uv__pipe_write(uv_loop_t* loop,
1850uv_write_t* req,
1851uv_pipe_t* handle,
1852const uv_buf_t bufs[],
1853size_t nbufs,
1854uv_stream_t* send_handle,
1855uv_write_cb cb) {
1856if (handle->ipc) {
1857/* IPC pipe write: use framing protocol. */
1858return uv__pipe_write_ipc(loop, req, handle, bufs, nbufs, send_handle, cb);
1859} else {
1860/* Non-IPC pipe write: put data on the wire directly. */
1861assert(send_handle == NULL);
1862return uv__pipe_write_data(loop, req, handle, bufs, nbufs, cb, 0);
1863}
1864}
1865
1866
1867static void uv__pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle,
1868uv_buf_t buf) {
1869/* If there is an eof timer running, we don't need it any more, so discard
1870* it. */
1871eof_timer_destroy(handle);
1872
1873uv_read_stop((uv_stream_t*) handle);
1874
1875handle->read_cb((uv_stream_t*) handle, UV_EOF, &buf);
1876}
1877
1878
1879static void uv__pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error,
1880uv_buf_t buf) {
1881/* If there is an eof timer running, we don't need it any more, so discard
1882* it. */
1883eof_timer_destroy(handle);
1884
1885uv_read_stop((uv_stream_t*) handle);
1886
1887handle->read_cb((uv_stream_t*)handle, uv_translate_sys_error(error), &buf);
1888}
1889
1890
1891static void uv__pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle,
1892int error, uv_buf_t buf) {
1893if (error == ERROR_BROKEN_PIPE) {
1894uv__pipe_read_eof(loop, handle, buf);
1895} else {
1896uv__pipe_read_error(loop, handle, error, buf);
1897}
1898}
1899
1900
1901static void uv__pipe_queue_ipc_xfer_info(
1902uv_pipe_t* handle,
1903uv__ipc_socket_xfer_type_t xfer_type,
1904uv__ipc_socket_xfer_info_t* xfer_info) {
1905uv__ipc_xfer_queue_item_t* item;
1906
1907item = (uv__ipc_xfer_queue_item_t*) uv__malloc(sizeof(*item));
1908if (item == NULL)
1909uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
1910
1911item->xfer_type = xfer_type;
1912item->xfer_info = *xfer_info;
1913
1914uv__queue_insert_tail(&handle->pipe.conn.ipc_xfer_queue, &item->member);
1915handle->pipe.conn.ipc_xfer_queue_length++;
1916}
1917
1918
1919/* Read an exact number of bytes from a pipe. If an error or end-of-file is
1920* encountered before the requested number of bytes are read, an error is
1921* returned. */
1922static int uv__pipe_read_exactly(HANDLE h, void* buffer, DWORD count) {
1923DWORD bytes_read, bytes_read_now;
1924
1925bytes_read = 0;
1926while (bytes_read < count) {
1927if (!ReadFile(h,
1928(char*) buffer + bytes_read,
1929count - bytes_read,
1930&bytes_read_now,
1931NULL)) {
1932return GetLastError();
1933}
1934
1935bytes_read += bytes_read_now;
1936}
1937
1938assert(bytes_read == count);
1939return 0;
1940}
1941
1942
1943static DWORD uv__pipe_read_data(uv_loop_t* loop,
1944uv_pipe_t* handle,
1945DWORD suggested_bytes,
1946DWORD max_bytes) {
1947DWORD bytes_read;
1948uv_buf_t buf;
1949
1950/* Ask the user for a buffer to read data into. */
1951buf = uv_buf_init(NULL, 0);
1952handle->alloc_cb((uv_handle_t*) handle, suggested_bytes, &buf);
1953if (buf.base == NULL || buf.len == 0) {
1954handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf);
1955return 0; /* Break out of read loop. */
1956}
1957
1958/* Ensure we read at most the smaller of:
1959* (a) the length of the user-allocated buffer.
1960* (b) the maximum data length as specified by the `max_bytes` argument.
1961*/
1962if (max_bytes > buf.len)
1963max_bytes = buf.len;
1964
1965/* Read into the user buffer. */
1966if (!ReadFile(handle->handle, buf.base, max_bytes, &bytes_read, NULL)) {
1967uv__pipe_read_error_or_eof(loop, handle, GetLastError(), buf);
1968return 0; /* Break out of read loop. */
1969}
1970
1971/* Call the read callback. */
1972handle->read_cb((uv_stream_t*) handle, bytes_read, &buf);
1973
1974return bytes_read;
1975}
1976
1977
1978static DWORD uv__pipe_read_ipc(uv_loop_t* loop, uv_pipe_t* handle) {
1979uint32_t* data_remaining = &handle->pipe.conn.ipc_data_frame.payload_remaining;
1980int err;
1981
1982if (*data_remaining > 0) {
1983/* Read frame data payload. */
1984DWORD bytes_read =
1985uv__pipe_read_data(loop, handle, *data_remaining, *data_remaining);
1986*data_remaining -= bytes_read;
1987return bytes_read;
1988
1989} else {
1990/* Start of a new IPC frame. */
1991uv__ipc_frame_header_t frame_header;
1992uint32_t xfer_flags;
1993uv__ipc_socket_xfer_type_t xfer_type;
1994uv__ipc_socket_xfer_info_t xfer_info;
1995
1996/* Read the IPC frame header. */
1997err = uv__pipe_read_exactly(
1998handle->handle, &frame_header, sizeof frame_header);
1999if (err)
2000goto error;
2001
2002/* Validate that flags are valid. */
2003if ((frame_header.flags & ~UV__IPC_FRAME_VALID_FLAGS) != 0)
2004goto invalid;
2005/* Validate that reserved2 is zero. */
2006if (frame_header.reserved2 != 0)
2007goto invalid;
2008
2009/* Parse xfer flags. */
2010xfer_flags = frame_header.flags & UV__IPC_FRAME_XFER_FLAGS;
2011if (xfer_flags & UV__IPC_FRAME_HAS_SOCKET_XFER) {
2012/* Socket coming -- determine the type. */
2013xfer_type = xfer_flags & UV__IPC_FRAME_XFER_IS_TCP_CONNECTION
2014? UV__IPC_SOCKET_XFER_TCP_CONNECTION
2015: UV__IPC_SOCKET_XFER_TCP_SERVER;
2016} else if (xfer_flags == 0) {
2017/* No socket. */
2018xfer_type = UV__IPC_SOCKET_XFER_NONE;
2019} else {
2020/* Invalid flags. */
2021goto invalid;
2022}
2023
2024/* Parse data frame information. */
2025if (frame_header.flags & UV__IPC_FRAME_HAS_DATA) {
2026*data_remaining = frame_header.data_length;
2027} else if (frame_header.data_length != 0) {
2028/* Data length greater than zero but data flag not set -- invalid. */
2029goto invalid;
2030}
2031
2032/* If no socket xfer info follows, return here. Data will be read in a
2033* subsequent invocation of uv__pipe_read_ipc(). */
2034if (xfer_type == UV__IPC_SOCKET_XFER_NONE)
2035return sizeof frame_header; /* Number of bytes read. */
2036
2037/* Read transferred socket information. */
2038err = uv__pipe_read_exactly(handle->handle, &xfer_info, sizeof xfer_info);
2039if (err)
2040goto error;
2041
2042/* Store the pending socket info. */
2043uv__pipe_queue_ipc_xfer_info(handle, xfer_type, &xfer_info);
2044
2045/* Return number of bytes read. */
2046return sizeof frame_header + sizeof xfer_info;
2047}
2048
2049invalid:
2050/* Invalid frame. */
2051err = WSAECONNABORTED; /* Maps to UV_ECONNABORTED. */
2052
2053error:
2054uv__pipe_read_error_or_eof(loop, handle, err, uv_null_buf_);
2055return 0; /* Break out of read loop. */
2056}
2057
2058
2059void uv__process_pipe_read_req(uv_loop_t* loop,
2060uv_pipe_t* handle,
2061uv_req_t* req) {
2062assert(handle->type == UV_NAMED_PIPE);
2063
2064handle->flags &= ~(UV_HANDLE_READ_PENDING | UV_HANDLE_CANCELLATION_PENDING);
2065DECREASE_PENDING_REQ_COUNT(handle);
2066eof_timer_stop(handle);
2067
2068/* At this point, we're done with bookkeeping. If the user has stopped
2069* reading the pipe in the meantime, there is nothing left to do, since there
2070* is no callback that we can call. */
2071if (!(handle->flags & UV_HANDLE_READING))
2072return;
2073
2074if (!REQ_SUCCESS(req)) {
2075/* An error occurred doing the zero-read. */
2076DWORD err = GET_REQ_ERROR(req);
2077
2078/* If the read was cancelled by uv__pipe_interrupt_read(), the request may
2079* indicate an ERROR_OPERATION_ABORTED error. This error isn't relevant to
2080* the user; we'll start a new zero-read at the end of this function. */
2081if (err != ERROR_OPERATION_ABORTED)
2082uv__pipe_read_error_or_eof(loop, handle, err, uv_null_buf_);
2083
2084} else {
2085/* The zero-read completed without error, indicating there is data
2086* available in the kernel buffer. */
2087DWORD avail;
2088
2089/* Get the number of bytes available. */
2090avail = 0;
2091if (!PeekNamedPipe(handle->handle, NULL, 0, NULL, &avail, NULL))
2092uv__pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
2093
2094/* Read until we've either read all the bytes available, or the 'reading'
2095* flag is cleared. */
2096while (avail > 0 && handle->flags & UV_HANDLE_READING) {
2097/* Depending on the type of pipe, read either IPC frames or raw data. */
2098DWORD bytes_read =
2099handle->ipc ? uv__pipe_read_ipc(loop, handle)
2100: uv__pipe_read_data(loop, handle, avail, (DWORD) -1);
2101
2102/* If no bytes were read, treat this as an indication that an error
2103* occurred, and break out of the read loop. */
2104if (bytes_read == 0)
2105break;
2106
2107/* It is possible that more bytes were read than we thought were
2108* available. To prevent `avail` from underflowing, break out of the loop
2109* if this is the case. */
2110if (bytes_read > avail)
2111break;
2112
2113/* Recompute the number of bytes available. */
2114avail -= bytes_read;
2115}
2116}
2117
2118/* Start another zero-read request if necessary. */
2119if ((handle->flags & UV_HANDLE_READING) &&
2120!(handle->flags & UV_HANDLE_READ_PENDING)) {
2121uv__pipe_queue_read(loop, handle);
2122}
2123}
2124
2125
2126void uv__process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
2127uv_write_t* req) {
2128int err;
2129
2130assert(handle->type == UV_NAMED_PIPE);
2131
2132assert(handle->write_queue_size >= req->u.io.queued_bytes);
2133handle->write_queue_size -= req->u.io.queued_bytes;
2134
2135UNREGISTER_HANDLE_REQ(loop, handle, req);
2136
2137if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
2138if (req->wait_handle != INVALID_HANDLE_VALUE) {
2139UnregisterWait(req->wait_handle);
2140req->wait_handle = INVALID_HANDLE_VALUE;
2141}
2142if (req->event_handle) {
2143CloseHandle(req->event_handle);
2144req->event_handle = NULL;
2145}
2146}
2147
2148err = GET_REQ_ERROR(req);
2149
2150/* If this was a coalesced write, extract pointer to the user_provided
2151* uv_write_t structure so we can pass the expected pointer to the callback,
2152* then free the heap-allocated write req. */
2153if (req->coalesced) {
2154uv__coalesced_write_t* coalesced_write =
2155container_of(req, uv__coalesced_write_t, req);
2156req = coalesced_write->user_req;
2157uv__free(coalesced_write);
2158}
2159if (req->cb) {
2160req->cb(req, uv_translate_sys_error(err));
2161}
2162
2163handle->stream.conn.write_reqs_pending--;
2164
2165if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE &&
2166handle->pipe.conn.non_overlapped_writes_tail) {
2167assert(handle->stream.conn.write_reqs_pending > 0);
2168uv__queue_non_overlapped_write(handle);
2169}
2170
2171if (handle->stream.conn.write_reqs_pending == 0 &&
2172uv__is_stream_shutting(handle))
2173uv__pipe_shutdown(loop, handle, handle->stream.conn.shutdown_req);
2174
2175DECREASE_PENDING_REQ_COUNT(handle);
2176}
2177
2178
2179void uv__process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle,
2180uv_req_t* raw_req) {
2181uv_pipe_accept_t* req = (uv_pipe_accept_t*) raw_req;
2182
2183assert(handle->type == UV_NAMED_PIPE);
2184
2185if (handle->flags & UV_HANDLE_CLOSING) {
2186/* The req->pipeHandle should be freed already in uv__pipe_close(). */
2187assert(req->pipeHandle == INVALID_HANDLE_VALUE);
2188DECREASE_PENDING_REQ_COUNT(handle);
2189return;
2190}
2191
2192if (REQ_SUCCESS(req)) {
2193assert(req->pipeHandle != INVALID_HANDLE_VALUE);
2194req->next_pending = handle->pipe.serv.pending_accepts;
2195handle->pipe.serv.pending_accepts = req;
2196
2197if (handle->stream.serv.connection_cb) {
2198handle->stream.serv.connection_cb((uv_stream_t*)handle, 0);
2199}
2200} else {
2201if (req->pipeHandle != INVALID_HANDLE_VALUE) {
2202CloseHandle(req->pipeHandle);
2203req->pipeHandle = INVALID_HANDLE_VALUE;
2204}
2205if (!(handle->flags & UV_HANDLE_CLOSING)) {
2206uv__pipe_queue_accept(loop, handle, req, FALSE);
2207}
2208}
2209
2210DECREASE_PENDING_REQ_COUNT(handle);
2211}
2212
2213
2214void uv__process_pipe_connect_req(uv_loop_t* loop, uv_pipe_t* handle,
2215uv_connect_t* req) {
2216HANDLE pipeHandle;
2217DWORD duplex_flags;
2218int err;
2219
2220assert(handle->type == UV_NAMED_PIPE);
2221
2222UNREGISTER_HANDLE_REQ(loop, handle, req);
2223
2224err = 0;
2225if (REQ_SUCCESS(req)) {
2226pipeHandle = req->u.connect.pipeHandle;
2227duplex_flags = req->u.connect.duplex_flags;
2228if (handle->flags & UV_HANDLE_CLOSING)
2229err = UV_ECANCELED;
2230else
2231err = uv__set_pipe_handle(loop, handle, pipeHandle, -1, duplex_flags);
2232if (err)
2233CloseHandle(pipeHandle);
2234} else {
2235err = uv_translate_sys_error(GET_REQ_ERROR(req));
2236}
2237
2238if (req->cb)
2239req->cb(req, err);
2240
2241DECREASE_PENDING_REQ_COUNT(handle);
2242}
2243
2244
2245
2246void uv__process_pipe_shutdown_req(uv_loop_t* loop, uv_pipe_t* handle,
2247uv_shutdown_t* req) {
2248int err;
2249
2250assert(handle->type == UV_NAMED_PIPE);
2251
2252/* Clear the shutdown_req field so we don't go here again. */
2253handle->stream.conn.shutdown_req = NULL;
2254UNREGISTER_HANDLE_REQ(loop, handle, req);
2255
2256if (handle->flags & UV_HANDLE_CLOSING) {
2257/* Already closing. Cancel the shutdown. */
2258err = UV_ECANCELED;
2259} else if (!REQ_SUCCESS(req)) {
2260/* An error occurred in trying to shutdown gracefully. */
2261err = uv_translate_sys_error(GET_REQ_ERROR(req));
2262} else {
2263if (handle->flags & UV_HANDLE_READABLE) {
2264/* Initialize and optionally start the eof timer. Only do this if the pipe
2265* is readable and we haven't seen EOF come in ourselves. */
2266eof_timer_init(handle);
2267
2268/* If reading start the timer right now. Otherwise uv__pipe_queue_read will
2269* start it. */
2270if (handle->flags & UV_HANDLE_READ_PENDING) {
2271eof_timer_start(handle);
2272}
2273
2274} else {
2275/* This pipe is not readable. We can just close it to let the other end
2276* know that we're done writing. */
2277close_pipe(handle);
2278}
2279err = 0;
2280}
2281
2282if (req->cb)
2283req->cb(req, err);
2284
2285DECREASE_PENDING_REQ_COUNT(handle);
2286}
2287
2288
2289static void eof_timer_init(uv_pipe_t* pipe) {
2290int r;
2291
2292assert(pipe->pipe.conn.eof_timer == NULL);
2293assert(pipe->flags & UV_HANDLE_CONNECTION);
2294
2295pipe->pipe.conn.eof_timer = (uv_timer_t*) uv__malloc(sizeof *pipe->pipe.conn.eof_timer);
2296
2297r = uv_timer_init(pipe->loop, pipe->pipe.conn.eof_timer);
2298assert(r == 0); /* timers can't fail */
2299(void) r;
2300pipe->pipe.conn.eof_timer->data = pipe;
2301uv_unref((uv_handle_t*) pipe->pipe.conn.eof_timer);
2302}
2303
2304
2305static void eof_timer_start(uv_pipe_t* pipe) {
2306assert(pipe->flags & UV_HANDLE_CONNECTION);
2307
2308if (pipe->pipe.conn.eof_timer != NULL) {
2309uv_timer_start(pipe->pipe.conn.eof_timer, eof_timer_cb, eof_timeout, 0);
2310}
2311}
2312
2313
2314static void eof_timer_stop(uv_pipe_t* pipe) {
2315assert(pipe->flags & UV_HANDLE_CONNECTION);
2316
2317if (pipe->pipe.conn.eof_timer != NULL) {
2318uv_timer_stop(pipe->pipe.conn.eof_timer);
2319}
2320}
2321
2322
2323static void eof_timer_cb(uv_timer_t* timer) {
2324uv_pipe_t* pipe = (uv_pipe_t*) timer->data;
2325uv_loop_t* loop = timer->loop;
2326
2327assert(pipe->type == UV_NAMED_PIPE);
2328
2329/* This should always be true, since we start the timer only in
2330* uv__pipe_queue_read after successfully calling ReadFile, or in
2331* uv__process_pipe_shutdown_req if a read is pending, and we always
2332* immediately stop the timer in uv__process_pipe_read_req. */
2333assert(pipe->flags & UV_HANDLE_READ_PENDING);
2334
2335/* If there are many packets coming off the iocp then the timer callback may
2336* be called before the read request is coming off the queue. Therefore we
2337* check here if the read request has completed but will be processed later.
2338*/
2339if ((pipe->flags & UV_HANDLE_READ_PENDING) &&
2340HasOverlappedIoCompleted(&pipe->read_req.u.io.overlapped)) {
2341return;
2342}
2343
2344/* Force both ends off the pipe. */
2345close_pipe(pipe);
2346
2347/* Stop reading, so the pending read that is going to fail will not be
2348* reported to the user. */
2349uv_read_stop((uv_stream_t*) pipe);
2350
2351/* Report the eof and update flags. This will get reported even if the user
2352* stopped reading in the meantime. TODO: is that okay? */
2353uv__pipe_read_eof(loop, pipe, uv_null_buf_);
2354}
2355
2356
2357static void eof_timer_destroy(uv_pipe_t* pipe) {
2358assert(pipe->flags & UV_HANDLE_CONNECTION);
2359
2360if (pipe->pipe.conn.eof_timer) {
2361uv_close((uv_handle_t*) pipe->pipe.conn.eof_timer, eof_timer_close_cb);
2362pipe->pipe.conn.eof_timer = NULL;
2363}
2364}
2365
2366
2367static void eof_timer_close_cb(uv_handle_t* handle) {
2368assert(handle->type == UV_TIMER);
2369uv__free(handle);
2370}
2371
2372
2373int uv_pipe_open(uv_pipe_t* pipe, uv_file file) {
2374HANDLE os_handle = uv__get_osfhandle(file);
2375NTSTATUS nt_status;
2376IO_STATUS_BLOCK io_status;
2377FILE_ACCESS_INFORMATION access;
2378DWORD duplex_flags = 0;
2379int err;
2380
2381if (os_handle == INVALID_HANDLE_VALUE)
2382return UV_EBADF;
2383if (pipe->flags & UV_HANDLE_PIPESERVER)
2384return UV_EINVAL;
2385if (pipe->flags & UV_HANDLE_CONNECTION)
2386return UV_EBUSY;
2387
2388uv__pipe_connection_init(pipe);
2389uv__once_init();
2390/* In order to avoid closing a stdio file descriptor 0-2, duplicate the
2391* underlying OS handle and forget about the original fd.
2392* We could also opt to use the original OS handle and just never close it,
2393* but then there would be no reliable way to cancel pending read operations
2394* upon close.
2395*/
2396if (file <= 2) {
2397if (!DuplicateHandle(INVALID_HANDLE_VALUE,
2398os_handle,
2399INVALID_HANDLE_VALUE,
2400&os_handle,
24010,
2402FALSE,
2403DUPLICATE_SAME_ACCESS))
2404return uv_translate_sys_error(GetLastError());
2405assert(os_handle != INVALID_HANDLE_VALUE);
2406file = -1;
2407}
2408
2409/* Determine what kind of permissions we have on this handle.
2410* Cygwin opens the pipe in message mode, but we can support it,
2411* just query the access flags and set the stream flags accordingly.
2412*/
2413nt_status = pNtQueryInformationFile(os_handle,
2414&io_status,
2415&access,
2416sizeof(access),
2417FileAccessInformation);
2418if (nt_status != STATUS_SUCCESS)
2419return UV_EINVAL;
2420
2421if (pipe->ipc) {
2422if (!(access.AccessFlags & FILE_WRITE_DATA) ||
2423!(access.AccessFlags & FILE_READ_DATA)) {
2424return UV_EINVAL;
2425}
2426}
2427
2428if (access.AccessFlags & FILE_WRITE_DATA)
2429duplex_flags |= UV_HANDLE_WRITABLE;
2430if (access.AccessFlags & FILE_READ_DATA)
2431duplex_flags |= UV_HANDLE_READABLE;
2432
2433err = uv__set_pipe_handle(pipe->loop,
2434pipe,
2435os_handle,
2436file,
2437duplex_flags);
2438if (err) {
2439if (file == -1)
2440CloseHandle(os_handle);
2441return err;
2442}
2443
2444if (pipe->ipc) {
2445assert(!(pipe->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
2446GetNamedPipeClientProcessId(os_handle, &pipe->pipe.conn.ipc_remote_pid);
2447if (pipe->pipe.conn.ipc_remote_pid == GetCurrentProcessId()) {
2448GetNamedPipeServerProcessId(os_handle, &pipe->pipe.conn.ipc_remote_pid);
2449}
2450assert(pipe->pipe.conn.ipc_remote_pid != (DWORD)(uv_pid_t) -1);
2451}
2452return 0;
2453}
2454
2455
2456static int uv__pipe_getname(const uv_pipe_t* handle, char* buffer, size_t* size) {
2457NTSTATUS nt_status;
2458IO_STATUS_BLOCK io_status;
2459FILE_NAME_INFORMATION tmp_name_info;
2460FILE_NAME_INFORMATION* name_info;
2461WCHAR* name_buf;
2462unsigned int name_size;
2463unsigned int name_len;
2464int err;
2465
2466uv__once_init();
2467name_info = NULL;
2468
2469if (handle->name != NULL) {
2470/* The user might try to query the name before we are connected,
2471* and this is just easier to return the cached value if we have it. */
2472return uv__copy_utf16_to_utf8(handle->name, -1, buffer, size);
2473}
2474
2475if (handle->handle == INVALID_HANDLE_VALUE) {
2476*size = 0;
2477return UV_EINVAL;
2478}
2479
2480/* NtQueryInformationFile will block if another thread is performing a
2481* blocking operation on the queried handle. If the pipe handle is
2482* synchronous, there may be a worker thread currently calling ReadFile() on
2483* the pipe handle, which could cause a deadlock. To avoid this, interrupt
2484* the read. */
2485if (handle->flags & UV_HANDLE_CONNECTION &&
2486handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
2487uv__pipe_interrupt_read((uv_pipe_t*) handle); /* cast away const warning */
2488}
2489
2490nt_status = pNtQueryInformationFile(handle->handle,
2491&io_status,
2492&tmp_name_info,
2493sizeof tmp_name_info,
2494FileNameInformation);
2495if (nt_status == STATUS_BUFFER_OVERFLOW) {
2496name_size = sizeof(*name_info) + tmp_name_info.FileNameLength;
2497name_info = uv__malloc(name_size);
2498if (!name_info) {
2499*size = 0;
2500return UV_ENOMEM;
2501}
2502
2503nt_status = pNtQueryInformationFile(handle->handle,
2504&io_status,
2505name_info,
2506name_size,
2507FileNameInformation);
2508}
2509
2510if (nt_status != STATUS_SUCCESS) {
2511*size = 0;
2512err = uv_translate_sys_error(pRtlNtStatusToDosError(nt_status));
2513goto error;
2514}
2515
2516if (!name_info) {
2517/* the struct on stack was used */
2518name_buf = tmp_name_info.FileName;
2519name_len = tmp_name_info.FileNameLength;
2520} else {
2521name_buf = name_info->FileName;
2522name_len = name_info->FileNameLength;
2523}
2524
2525if (name_len == 0) {
2526*size = 0;
2527err = 0;
2528goto error;
2529}
2530
2531name_len /= sizeof(WCHAR);
2532
2533/* "\\\\.\\pipe" + name */
2534if (*size < pipe_prefix_len) {
2535*size = 0;
2536}
2537else {
2538memcpy(buffer, pipe_prefix, pipe_prefix_len);
2539*size -= pipe_prefix_len;
2540}
2541err = uv__copy_utf16_to_utf8(name_buf, name_len, buffer+pipe_prefix_len, size);
2542*size += pipe_prefix_len;
2543
2544error:
2545uv__free(name_info);
2546return err;
2547}
2548
2549
2550int uv_pipe_pending_count(uv_pipe_t* handle) {
2551if (!handle->ipc)
2552return 0;
2553return handle->pipe.conn.ipc_xfer_queue_length;
2554}
2555
2556
2557int uv_pipe_getsockname(const uv_pipe_t* handle, char* buffer, size_t* size) {
2558if (handle->flags & UV_HANDLE_BOUND)
2559return uv__pipe_getname(handle, buffer, size);
2560
2561if (handle->flags & UV_HANDLE_CONNECTION ||
2562handle->handle != INVALID_HANDLE_VALUE) {
2563*size = 0;
2564return 0;
2565}
2566
2567return UV_EBADF;
2568}
2569
2570
2571int uv_pipe_getpeername(const uv_pipe_t* handle, char* buffer, size_t* size) {
2572/* emulate unix behaviour */
2573if (handle->flags & UV_HANDLE_BOUND)
2574return UV_ENOTCONN;
2575
2576if (handle->handle != INVALID_HANDLE_VALUE)
2577return uv__pipe_getname(handle, buffer, size);
2578
2579if (handle->flags & UV_HANDLE_CONNECTION) {
2580if (handle->name != NULL)
2581return uv__pipe_getname(handle, buffer, size);
2582}
2583
2584return UV_EBADF;
2585}
2586
2587
2588uv_handle_type uv_pipe_pending_type(uv_pipe_t* handle) {
2589if (!handle->ipc)
2590return UV_UNKNOWN_HANDLE;
2591if (handle->pipe.conn.ipc_xfer_queue_length == 0)
2592return UV_UNKNOWN_HANDLE;
2593else
2594return UV_TCP;
2595}
2596
2597int uv_pipe_chmod(uv_pipe_t* handle, int mode) {
2598SID_IDENTIFIER_AUTHORITY sid_world = { SECURITY_WORLD_SID_AUTHORITY };
2599PACL old_dacl, new_dacl;
2600PSECURITY_DESCRIPTOR sd;
2601EXPLICIT_ACCESS ea;
2602PSID everyone;
2603int error;
2604
2605if (handle == NULL || handle->handle == INVALID_HANDLE_VALUE)
2606return UV_EBADF;
2607
2608if (mode != UV_READABLE &&
2609mode != UV_WRITABLE &&
2610mode != (UV_WRITABLE | UV_READABLE))
2611return UV_EINVAL;
2612
2613if (!AllocateAndInitializeSid(&sid_world,
26141,
2615SECURITY_WORLD_RID,
26160, 0, 0, 0, 0, 0, 0,
2617&everyone)) {
2618error = GetLastError();
2619goto done;
2620}
2621
2622if (GetSecurityInfo(handle->handle,
2623SE_KERNEL_OBJECT,
2624DACL_SECURITY_INFORMATION,
2625NULL,
2626NULL,
2627&old_dacl,
2628NULL,
2629&sd)) {
2630error = GetLastError();
2631goto clean_sid;
2632}
2633
2634memset(&ea, 0, sizeof(EXPLICIT_ACCESS));
2635if (mode & UV_READABLE)
2636ea.grfAccessPermissions |= GENERIC_READ | FILE_WRITE_ATTRIBUTES;
2637if (mode & UV_WRITABLE)
2638ea.grfAccessPermissions |= GENERIC_WRITE | FILE_READ_ATTRIBUTES;
2639ea.grfAccessPermissions |= SYNCHRONIZE;
2640ea.grfAccessMode = SET_ACCESS;
2641ea.grfInheritance = NO_INHERITANCE;
2642ea.Trustee.TrusteeForm = TRUSTEE_IS_SID;
2643ea.Trustee.TrusteeType = TRUSTEE_IS_WELL_KNOWN_GROUP;
2644ea.Trustee.ptstrName = (LPTSTR)everyone;
2645
2646if (SetEntriesInAcl(1, &ea, old_dacl, &new_dacl)) {
2647error = GetLastError();
2648goto clean_sd;
2649}
2650
2651if (SetSecurityInfo(handle->handle,
2652SE_KERNEL_OBJECT,
2653DACL_SECURITY_INFORMATION,
2654NULL,
2655NULL,
2656new_dacl,
2657NULL)) {
2658error = GetLastError();
2659goto clean_dacl;
2660}
2661
2662error = 0;
2663
2664clean_dacl:
2665LocalFree((HLOCAL) new_dacl);
2666clean_sd:
2667LocalFree((HLOCAL) sd);
2668clean_sid:
2669FreeSid(everyone);
2670done:
2671return uv_translate_sys_error(error);
2672}
2673