libuv-svace-build

Форк
0
2672 строки · 77.0 Кб
1
/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
2
 *
3
 * Permission is hereby granted, free of charge, to any person obtaining a copy
4
 * of this software and associated documentation files (the "Software"), to
5
 * deal in the Software without restriction, including without limitation the
6
 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
7
 * sell copies of the Software, and to permit persons to whom the Software is
8
 * furnished to do so, subject to the following conditions:
9
 *
10
 * The above copyright notice and this permission notice shall be included in
11
 * all copies or substantial portions of the Software.
12
 *
13
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18
 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
19
 * IN THE SOFTWARE.
20
 */
21

22
#include <assert.h>
23
#include <io.h>
24
#include <stdio.h>
25
#include <stdlib.h>
26
#include <string.h>
27

28
#include "handle-inl.h"
29
#include "internal.h"
30
#include "req-inl.h"
31
#include "stream-inl.h"
32
#include "uv-common.h"
33
#include "uv.h"
34

35
#include <aclapi.h>
36
#include <accctrl.h>
37

38
/* A zero-size buffer for use by uv_pipe_read */
39
static char uv_zero_[] = "";
40

41
/* Null uv_buf_t */
42
static const uv_buf_t uv_null_buf_ = { 0, NULL };
43

44
/* The timeout that the pipe will wait for the remote end to write data when
45
 * the local ends wants to shut it down. */
46
static const int64_t eof_timeout = 50; /* ms */
47

48
static const int default_pending_pipe_instances = 4;
49

50
/* Pipe prefix */
51
static char pipe_prefix[] = "\\\\?\\pipe";
52
static const size_t pipe_prefix_len = sizeof(pipe_prefix) - 1;
53

54
/* IPC incoming xfer queue item. */
55
typedef struct {
56
  uv__ipc_socket_xfer_type_t xfer_type;
57
  uv__ipc_socket_xfer_info_t xfer_info;
58
  struct uv__queue member;
59
} uv__ipc_xfer_queue_item_t;
60

61
/* IPC frame header flags. */
62
/* clang-format off */
63
enum {
64
  UV__IPC_FRAME_HAS_DATA                = 0x01,
65
  UV__IPC_FRAME_HAS_SOCKET_XFER         = 0x02,
66
  UV__IPC_FRAME_XFER_IS_TCP_CONNECTION  = 0x04,
67
  /* These are combinations of the flags above. */
68
  UV__IPC_FRAME_XFER_FLAGS              = 0x06,
69
  UV__IPC_FRAME_VALID_FLAGS             = 0x07
70
};
71
/* clang-format on */
72

73
/* IPC frame header. */
74
typedef struct {
75
  uint32_t flags;
76
  uint32_t reserved1;   /* Ignored. */
77
  uint32_t data_length; /* Must be zero if there is no data. */
78
  uint32_t reserved2;   /* Must be zero. */
79
} uv__ipc_frame_header_t;
80

81
/* To implement the IPC protocol correctly, these structures must have exactly
82
 * the right size. */
83
STATIC_ASSERT(sizeof(uv__ipc_frame_header_t) == 16);
84
STATIC_ASSERT(sizeof(uv__ipc_socket_xfer_info_t) == 632);
85

86
/* Coalesced write request. */
87
typedef struct {
88
  uv_write_t req;       /* Internal heap-allocated write request. */
89
  uv_write_t* user_req; /* Pointer to user-specified uv_write_t. */
90
} uv__coalesced_write_t;
91

92

93
static void eof_timer_init(uv_pipe_t* pipe);
94
static void eof_timer_start(uv_pipe_t* pipe);
95
static void eof_timer_stop(uv_pipe_t* pipe);
96
static void eof_timer_cb(uv_timer_t* timer);
97
static void eof_timer_destroy(uv_pipe_t* pipe);
98
static void eof_timer_close_cb(uv_handle_t* handle);
99

100

101
/* Does the file path contain embedded nul bytes? */
102
static int includes_nul(const char *s, size_t n) {
103
  if (n == 0)
104
    return 0;
105
  return NULL != memchr(s, '\0', n);
106
}
107

108

109
static void uv__unique_pipe_name(char* ptr, char* name, size_t size) {
110
  snprintf(name, size, "\\\\?\\pipe\\uv\\%p-%lu", ptr, GetCurrentProcessId());
111
}
112

113

114
int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
115
  uv__stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE);
116

117
  handle->reqs_pending = 0;
118
  handle->handle = INVALID_HANDLE_VALUE;
119
  handle->name = NULL;
120
  handle->pipe.conn.ipc_remote_pid = 0;
121
  handle->pipe.conn.ipc_data_frame.payload_remaining = 0;
122
  uv__queue_init(&handle->pipe.conn.ipc_xfer_queue);
123
  handle->pipe.conn.ipc_xfer_queue_length = 0;
124
  handle->ipc = ipc;
125
  handle->pipe.conn.non_overlapped_writes_tail = NULL;
126

127
  return 0;
128
}
129

130

131
static void uv__pipe_connection_init(uv_pipe_t* handle) {
132
  assert(!(handle->flags & UV_HANDLE_PIPESERVER));
133
  uv__connection_init((uv_stream_t*) handle);
134
  handle->read_req.data = handle;
135
  handle->pipe.conn.eof_timer = NULL;
136
}
137

138

139
static HANDLE open_named_pipe(const WCHAR* name, DWORD* duplex_flags) {
140
  HANDLE pipeHandle;
141

142
  /*
143
   * Assume that we have a duplex pipe first, so attempt to
144
   * connect with GENERIC_READ | GENERIC_WRITE.
145
   */
146
  pipeHandle = CreateFileW(name,
147
                           GENERIC_READ | GENERIC_WRITE,
148
                           0,
149
                           NULL,
150
                           OPEN_EXISTING,
151
                           FILE_FLAG_OVERLAPPED,
152
                           NULL);
153
  if (pipeHandle != INVALID_HANDLE_VALUE) {
154
    *duplex_flags = UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
155
    return pipeHandle;
156
  }
157

158
  /*
159
   * If the pipe is not duplex CreateFileW fails with
160
   * ERROR_ACCESS_DENIED.  In that case try to connect
161
   * as a read-only or write-only.
162
   */
163
  if (GetLastError() == ERROR_ACCESS_DENIED) {
164
    pipeHandle = CreateFileW(name,
165
                             GENERIC_READ | FILE_WRITE_ATTRIBUTES,
166
                             0,
167
                             NULL,
168
                             OPEN_EXISTING,
169
                             FILE_FLAG_OVERLAPPED,
170
                             NULL);
171

172
    if (pipeHandle != INVALID_HANDLE_VALUE) {
173
      *duplex_flags = UV_HANDLE_READABLE;
174
      return pipeHandle;
175
    }
176
  }
177

178
  if (GetLastError() == ERROR_ACCESS_DENIED) {
179
    pipeHandle = CreateFileW(name,
180
                             GENERIC_WRITE | FILE_READ_ATTRIBUTES,
181
                             0,
182
                             NULL,
183
                             OPEN_EXISTING,
184
                             FILE_FLAG_OVERLAPPED,
185
                             NULL);
186

187
    if (pipeHandle != INVALID_HANDLE_VALUE) {
188
      *duplex_flags = UV_HANDLE_WRITABLE;
189
      return pipeHandle;
190
    }
191
  }
192

193
  return INVALID_HANDLE_VALUE;
194
}
195

196

197
static void close_pipe(uv_pipe_t* pipe) {
198
  assert(pipe->u.fd == -1 || pipe->u.fd > 2);
199
  if (pipe->u.fd == -1)
200
    CloseHandle(pipe->handle);
201
  else
202
    _close(pipe->u.fd);
203

204
  pipe->u.fd = -1;
205
  pipe->handle = INVALID_HANDLE_VALUE;
206
}
207

208

209
static int uv__pipe_server(
210
    HANDLE* pipeHandle_ptr, DWORD access,
211
    char* name, size_t nameSize, char* random) {
212
  HANDLE pipeHandle;
213
  int err;
214

215
  for (;;) {
216
    uv__unique_pipe_name(random, name, nameSize);
217

218
    pipeHandle = CreateNamedPipeA(name,
219
      access | FILE_FLAG_FIRST_PIPE_INSTANCE,
220
      PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, 1, 65536, 65536, 0,
221
      NULL);
222

223
    if (pipeHandle != INVALID_HANDLE_VALUE) {
224
      /* No name collisions.  We're done. */
225
      break;
226
    }
227

228
    err = GetLastError();
229
    if (err != ERROR_PIPE_BUSY && err != ERROR_ACCESS_DENIED) {
230
      goto error;
231
    }
232

233
    /* Pipe name collision.  Increment the random number and try again. */
234
    random++;
235
  }
236

237
  *pipeHandle_ptr = pipeHandle;
238

239
  return 0;
240

241
 error:
242
  if (pipeHandle != INVALID_HANDLE_VALUE)
243
    CloseHandle(pipeHandle);
244

245
  return err;
246
}
247

248

249
static int uv__create_pipe_pair(
250
    HANDLE* server_pipe_ptr, HANDLE* client_pipe_ptr,
251
    unsigned int server_flags, unsigned int client_flags,
252
    int inherit_client, char* random) {
253
  /* allowed flags are: UV_READABLE_PIPE | UV_WRITABLE_PIPE | UV_NONBLOCK_PIPE */
254
  char pipe_name[64];
255
  SECURITY_ATTRIBUTES sa;
256
  DWORD server_access;
257
  DWORD client_access;
258
  HANDLE server_pipe;
259
  HANDLE client_pipe;
260
  int err;
261

262
  server_pipe = INVALID_HANDLE_VALUE;
263
  client_pipe = INVALID_HANDLE_VALUE;
264

265
  server_access = 0;
266
  if (server_flags & UV_READABLE_PIPE)
267
    server_access |= PIPE_ACCESS_INBOUND;
268
  if (server_flags & UV_WRITABLE_PIPE)
269
    server_access |= PIPE_ACCESS_OUTBOUND;
270
  if (server_flags & UV_NONBLOCK_PIPE)
271
    server_access |= FILE_FLAG_OVERLAPPED;
272
  server_access |= WRITE_DAC;
273

274
  client_access = 0;
275
  if (client_flags & UV_READABLE_PIPE)
276
    client_access |= GENERIC_READ;
277
  else
278
    client_access |= FILE_READ_ATTRIBUTES;
279
  if (client_flags & UV_WRITABLE_PIPE)
280
    client_access |= GENERIC_WRITE;
281
  else
282
    client_access |= FILE_WRITE_ATTRIBUTES;
283
  client_access |= WRITE_DAC;
284

285
  /* Create server pipe handle. */
286
  err = uv__pipe_server(&server_pipe,
287
                        server_access,
288
                        pipe_name,
289
                        sizeof(pipe_name),
290
                        random);
291
  if (err)
292
    goto error;
293

294
  /* Create client pipe handle. */
295
  sa.nLength = sizeof sa;
296
  sa.lpSecurityDescriptor = NULL;
297
  sa.bInheritHandle = inherit_client;
298

299
  client_pipe = CreateFileA(pipe_name,
300
                            client_access,
301
                            0,
302
                            &sa,
303
                            OPEN_EXISTING,
304
                            (client_flags & UV_NONBLOCK_PIPE) ? FILE_FLAG_OVERLAPPED : 0,
305
                            NULL);
306
  if (client_pipe == INVALID_HANDLE_VALUE) {
307
    err = GetLastError();
308
    goto error;
309
  }
310

311
#ifndef NDEBUG
312
  /* Validate that the pipe was opened in the right mode. */
313
  {
314
    DWORD mode;
315
    BOOL r;
316
    r = GetNamedPipeHandleState(client_pipe, &mode, NULL, NULL, NULL, NULL, 0);
317
    if (r == TRUE) {
318
      assert(mode == (PIPE_READMODE_BYTE | PIPE_WAIT));
319
    } else {
320
      fprintf(stderr, "libuv assertion failure: GetNamedPipeHandleState failed\n");
321
    }
322
  }
323
#endif
324

325
  /* Do a blocking ConnectNamedPipe.  This should not block because we have
326
   * both ends of the pipe created. */
327
  if (!ConnectNamedPipe(server_pipe, NULL)) {
328
    if (GetLastError() != ERROR_PIPE_CONNECTED) {
329
      err = GetLastError();
330
      goto error;
331
    }
332
  }
333

334
  *client_pipe_ptr = client_pipe;
335
  *server_pipe_ptr = server_pipe;
336
  return 0;
337

338
 error:
339
  if (server_pipe != INVALID_HANDLE_VALUE)
340
    CloseHandle(server_pipe);
341

342
  if (client_pipe != INVALID_HANDLE_VALUE)
343
    CloseHandle(client_pipe);
344

345
  return err;
346
}
347

348

349
int uv_pipe(uv_file fds[2], int read_flags, int write_flags) {
350
  uv_file temp[2];
351
  int err;
352
  HANDLE readh;
353
  HANDLE writeh;
354

355
  /* Make the server side the inbound (read) end, */
356
  /* so that both ends will have FILE_READ_ATTRIBUTES permission. */
357
  /* TODO: better source of local randomness than &fds? */
358
  read_flags |= UV_READABLE_PIPE;
359
  write_flags |= UV_WRITABLE_PIPE;
360
  err = uv__create_pipe_pair(&readh, &writeh, read_flags, write_flags, 0, (char*) &fds[0]);
361
  if (err != 0)
362
    return err;
363
  temp[0] = _open_osfhandle((intptr_t) readh, 0);
364
  if (temp[0] == -1) {
365
    if (errno == UV_EMFILE)
366
      err = UV_EMFILE;
367
    else
368
      err = UV_UNKNOWN;
369
    CloseHandle(readh);
370
    CloseHandle(writeh);
371
    return err;
372
  }
373
  temp[1] = _open_osfhandle((intptr_t) writeh, 0);
374
  if (temp[1] == -1) {
375
    if (errno == UV_EMFILE)
376
      err = UV_EMFILE;
377
    else
378
      err = UV_UNKNOWN;
379
    _close(temp[0]);
380
    CloseHandle(writeh);
381
    return err;
382
  }
383
  fds[0] = temp[0];
384
  fds[1] = temp[1];
385
  return 0;
386
}
387

388

389
int uv__create_stdio_pipe_pair(uv_loop_t* loop,
390
    uv_pipe_t* parent_pipe, HANDLE* child_pipe_ptr, unsigned int flags) {
391
  /* The parent_pipe is always the server_pipe and kept by libuv.
392
   * The child_pipe is always the client_pipe and is passed to the child.
393
   * The flags are specified with respect to their usage in the child. */
394
  HANDLE server_pipe;
395
  HANDLE client_pipe;
396
  unsigned int server_flags;
397
  unsigned int client_flags;
398
  int err;
399

400
  uv__pipe_connection_init(parent_pipe);
401

402
  server_pipe = INVALID_HANDLE_VALUE;
403
  client_pipe = INVALID_HANDLE_VALUE;
404

405
  server_flags = 0;
406
  client_flags = 0;
407
  if (flags & UV_READABLE_PIPE) {
408
    /* The server needs inbound (read) access too, otherwise CreateNamedPipe()
409
     * won't give us the FILE_READ_ATTRIBUTES permission. We need that to probe
410
     * the state of the write buffer when we're trying to shutdown the pipe. */
411
    server_flags |= UV_READABLE_PIPE | UV_WRITABLE_PIPE;
412
    client_flags |= UV_READABLE_PIPE;
413
  }
414
  if (flags & UV_WRITABLE_PIPE) {
415
    server_flags |= UV_READABLE_PIPE;
416
    client_flags |= UV_WRITABLE_PIPE;
417
  }
418
  server_flags |= UV_NONBLOCK_PIPE;
419
  if (flags & UV_NONBLOCK_PIPE || parent_pipe->ipc) {
420
    client_flags |= UV_NONBLOCK_PIPE;
421
  }
422

423
  err = uv__create_pipe_pair(&server_pipe, &client_pipe,
424
          server_flags, client_flags, 1, (char*) server_pipe);
425
  if (err)
426
    goto error;
427

428
  if (CreateIoCompletionPort(server_pipe,
429
                             loop->iocp,
430
                             (ULONG_PTR) parent_pipe,
431
                             0) == NULL) {
432
    err = GetLastError();
433
    goto error;
434
  }
435

436
  parent_pipe->handle = server_pipe;
437
  *child_pipe_ptr = client_pipe;
438

439
  /* The server end is now readable and/or writable. */
440
  if (flags & UV_READABLE_PIPE)
441
    parent_pipe->flags |= UV_HANDLE_WRITABLE;
442
  if (flags & UV_WRITABLE_PIPE)
443
    parent_pipe->flags |= UV_HANDLE_READABLE;
444

445
  return 0;
446

447
 error:
448
  if (server_pipe != INVALID_HANDLE_VALUE)
449
    CloseHandle(server_pipe);
450

451
  if (client_pipe != INVALID_HANDLE_VALUE)
452
    CloseHandle(client_pipe);
453

454
  return err;
455
}
456

457

458
static int uv__set_pipe_handle(uv_loop_t* loop,
459
                               uv_pipe_t* handle,
460
                               HANDLE pipeHandle,
461
                               int fd,
462
                               DWORD duplex_flags) {
463
  NTSTATUS nt_status;
464
  IO_STATUS_BLOCK io_status;
465
  FILE_MODE_INFORMATION mode_info;
466
  DWORD mode = PIPE_READMODE_BYTE | PIPE_WAIT;
467
  DWORD current_mode = 0;
468
  DWORD err = 0;
469

470
  assert(handle->flags & UV_HANDLE_CONNECTION);
471
  assert(!(handle->flags & UV_HANDLE_PIPESERVER));
472
  if (handle->flags & UV_HANDLE_CLOSING)
473
    return UV_EINVAL;
474
  if (handle->handle != INVALID_HANDLE_VALUE)
475
    return UV_EBUSY;
476

477
  if (!SetNamedPipeHandleState(pipeHandle, &mode, NULL, NULL)) {
478
    err = GetLastError();
479
    if (err == ERROR_ACCESS_DENIED) {
480
      /*
481
       * SetNamedPipeHandleState can fail if the handle doesn't have either
482
       * GENERIC_WRITE  or FILE_WRITE_ATTRIBUTES.
483
       * But if the handle already has the desired wait and blocking modes
484
       * we can continue.
485
       */
486
      if (!GetNamedPipeHandleState(pipeHandle, &current_mode, NULL, NULL,
487
                                   NULL, NULL, 0)) {
488
        return uv_translate_sys_error(GetLastError());
489
      } else if (current_mode & PIPE_NOWAIT) {
490
        return UV_EACCES;
491
      }
492
    } else {
493
      /* If this returns ERROR_INVALID_PARAMETER we probably opened
494
       * something that is not a pipe. */
495
      if (err == ERROR_INVALID_PARAMETER) {
496
        return UV_ENOTSOCK;
497
      }
498
      return uv_translate_sys_error(err);
499
    }
500
  }
501

502
  /* Check if the pipe was created with FILE_FLAG_OVERLAPPED. */
503
  nt_status = pNtQueryInformationFile(pipeHandle,
504
                                      &io_status,
505
                                      &mode_info,
506
                                      sizeof(mode_info),
507
                                      FileModeInformation);
508
  if (nt_status != STATUS_SUCCESS) {
509
    return uv_translate_sys_error(err);
510
  }
511

512
  if (mode_info.Mode & FILE_SYNCHRONOUS_IO_ALERT ||
513
      mode_info.Mode & FILE_SYNCHRONOUS_IO_NONALERT) {
514
    /* Non-overlapped pipe. */
515
    handle->flags |= UV_HANDLE_NON_OVERLAPPED_PIPE;
516
    handle->pipe.conn.readfile_thread_handle = NULL;
517
    InitializeCriticalSection(&handle->pipe.conn.readfile_thread_lock);
518
  } else {
519
    /* Overlapped pipe.  Try to associate with IOCP. */
520
    if (CreateIoCompletionPort(pipeHandle,
521
                               loop->iocp,
522
                               (ULONG_PTR) handle,
523
                               0) == NULL) {
524
      handle->flags |= UV_HANDLE_EMULATE_IOCP;
525
    }
526
  }
527

528
  handle->handle = pipeHandle;
529
  handle->u.fd = fd;
530
  handle->flags |= duplex_flags;
531

532
  return 0;
533
}
534

535

536
static int pipe_alloc_accept(uv_loop_t* loop, uv_pipe_t* handle,
537
                             uv_pipe_accept_t* req, BOOL firstInstance) {
538
  assert(req->pipeHandle == INVALID_HANDLE_VALUE);
539

540
  req->pipeHandle =
541
      CreateNamedPipeW(handle->name,
542
                       PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | WRITE_DAC |
543
                         (firstInstance ? FILE_FLAG_FIRST_PIPE_INSTANCE : 0),
544
                       PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
545
                       PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL);
546

547
  if (req->pipeHandle == INVALID_HANDLE_VALUE) {
548
    return 0;
549
  }
550

551
  /* Associate it with IOCP so we can get events. */
552
  if (CreateIoCompletionPort(req->pipeHandle,
553
                             loop->iocp,
554
                             (ULONG_PTR) handle,
555
                             0) == NULL) {
556
    uv_fatal_error(GetLastError(), "CreateIoCompletionPort");
557
  }
558

559
  /* Stash a handle in the server object for use from places such as
560
   * getsockname and chmod. As we transfer ownership of these to client
561
   * objects, we'll allocate new ones here. */
562
  handle->handle = req->pipeHandle;
563

564
  return 1;
565
}
566

567

568
static DWORD WINAPI pipe_shutdown_thread_proc(void* parameter) {
569
  uv_loop_t* loop;
570
  uv_pipe_t* handle;
571
  uv_shutdown_t* req;
572

573
  req = (uv_shutdown_t*) parameter;
574
  assert(req);
575
  handle = (uv_pipe_t*) req->handle;
576
  assert(handle);
577
  loop = handle->loop;
578
  assert(loop);
579

580
  FlushFileBuffers(handle->handle);
581

582
  /* Post completed */
583
  POST_COMPLETION_FOR_REQ(loop, req);
584

585
  return 0;
586
}
587

588

589
void uv__pipe_shutdown(uv_loop_t* loop, uv_pipe_t* handle, uv_shutdown_t *req) {
590
  DWORD result;
591
  NTSTATUS nt_status;
592
  IO_STATUS_BLOCK io_status;
593
  FILE_PIPE_LOCAL_INFORMATION pipe_info;
594

595
  assert(handle->flags & UV_HANDLE_CONNECTION);
596
  assert(req != NULL);
597
  assert(handle->stream.conn.write_reqs_pending == 0);
598
  SET_REQ_SUCCESS(req);
599

600
  if (handle->flags & UV_HANDLE_CLOSING) {
601
    uv__insert_pending_req(loop, (uv_req_t*) req);
602
    return;
603
  }
604

605
  /* Try to avoid flushing the pipe buffer in the thread pool. */
606
  nt_status = pNtQueryInformationFile(handle->handle,
607
                                      &io_status,
608
                                      &pipe_info,
609
                                      sizeof pipe_info,
610
                                      FilePipeLocalInformation);
611

612
  if (nt_status != STATUS_SUCCESS) {
613
    SET_REQ_ERROR(req, pRtlNtStatusToDosError(nt_status));
614
    handle->flags |= UV_HANDLE_WRITABLE; /* Questionable. */
615
    uv__insert_pending_req(loop, (uv_req_t*) req);
616
    return;
617
  }
618

619
  if (pipe_info.OutboundQuota == pipe_info.WriteQuotaAvailable) {
620
    /* Short-circuit, no need to call FlushFileBuffers:
621
     * all writes have been read. */
622
    uv__insert_pending_req(loop, (uv_req_t*) req);
623
    return;
624
  }
625

626
  /* Run FlushFileBuffers in the thread pool. */
627
  result = QueueUserWorkItem(pipe_shutdown_thread_proc,
628
                             req,
629
                             WT_EXECUTELONGFUNCTION);
630
  if (!result) {
631
    SET_REQ_ERROR(req, GetLastError());
632
    handle->flags |= UV_HANDLE_WRITABLE; /* Questionable. */
633
    uv__insert_pending_req(loop, (uv_req_t*) req);
634
    return;
635
  }
636
}
637

638

639
void uv__pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
640
  uv__ipc_xfer_queue_item_t* xfer_queue_item;
641

642
  assert(handle->reqs_pending == 0);
643
  assert(handle->flags & UV_HANDLE_CLOSING);
644
  assert(!(handle->flags & UV_HANDLE_CLOSED));
645

646
  if (handle->flags & UV_HANDLE_CONNECTION) {
647
    /* Free pending sockets */
648
    while (!uv__queue_empty(&handle->pipe.conn.ipc_xfer_queue)) {
649
      struct uv__queue* q;
650
      SOCKET socket;
651

652
      q = uv__queue_head(&handle->pipe.conn.ipc_xfer_queue);
653
      uv__queue_remove(q);
654
      xfer_queue_item = uv__queue_data(q, uv__ipc_xfer_queue_item_t, member);
655

656
      /* Materialize socket and close it */
657
      socket = WSASocketW(FROM_PROTOCOL_INFO,
658
                          FROM_PROTOCOL_INFO,
659
                          FROM_PROTOCOL_INFO,
660
                          &xfer_queue_item->xfer_info.socket_info,
661
                          0,
662
                          WSA_FLAG_OVERLAPPED);
663
      uv__free(xfer_queue_item);
664

665
      if (socket != INVALID_SOCKET)
666
        closesocket(socket);
667
    }
668
    handle->pipe.conn.ipc_xfer_queue_length = 0;
669

670
    if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
671
      if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
672
        UnregisterWait(handle->read_req.wait_handle);
673
        handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
674
      }
675
      if (handle->read_req.event_handle != NULL) {
676
        CloseHandle(handle->read_req.event_handle);
677
        handle->read_req.event_handle = NULL;
678
      }
679
    }
680

681
    if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)
682
      DeleteCriticalSection(&handle->pipe.conn.readfile_thread_lock);
683
  }
684

685
  if (handle->flags & UV_HANDLE_PIPESERVER) {
686
    assert(handle->pipe.serv.accept_reqs);
687
    uv__free(handle->pipe.serv.accept_reqs);
688
    handle->pipe.serv.accept_reqs = NULL;
689
  }
690

691
  uv__handle_close(handle);
692
}
693

694

695
void uv_pipe_pending_instances(uv_pipe_t* handle, int count) {
696
  if (handle->flags & UV_HANDLE_BOUND)
697
    return;
698
  handle->pipe.serv.pending_instances = count;
699
  handle->flags |= UV_HANDLE_PIPESERVER;
700
}
701

702

703
/* Creates a pipe server. */
704
int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
705
  return uv_pipe_bind2(handle, name, strlen(name), 0);
706
}
707

708

709
int uv_pipe_bind2(uv_pipe_t* handle,
710
                  const char* name,
711
                  size_t namelen,
712
                  unsigned int flags) {
713
  uv_loop_t* loop = handle->loop;
714
  int i, err;
715
  uv_pipe_accept_t* req;
716
  char* name_copy;
717

718
  if (flags & ~UV_PIPE_NO_TRUNCATE) {
719
    return UV_EINVAL;
720
  }
721

722
  if (name == NULL) {
723
    return UV_EINVAL;
724
  }
725

726
  if (namelen == 0) {
727
    return UV_EINVAL;
728
  }
729

730
  if (includes_nul(name, namelen)) {
731
    return UV_EINVAL;
732
  }
733

734
  if (handle->flags & UV_HANDLE_BOUND) {
735
    return UV_EINVAL;
736
  }
737

738
  if (uv__is_closing(handle)) {
739
    return UV_EINVAL;
740
  }
741

742
  name_copy = uv__malloc(namelen + 1);
743
  if (name_copy == NULL) {
744
    return UV_ENOMEM;
745
  }
746

747
  memcpy(name_copy, name, namelen);
748
  name_copy[namelen] = '\0';
749

750
  if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
751
    handle->pipe.serv.pending_instances = default_pending_pipe_instances;
752
  }
753

754
  err = UV_ENOMEM;
755
  handle->pipe.serv.accept_reqs = (uv_pipe_accept_t*)
756
    uv__malloc(sizeof(uv_pipe_accept_t) * handle->pipe.serv.pending_instances);
757
  if (handle->pipe.serv.accept_reqs == NULL) {
758
    goto error;
759
  }
760

761
  for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
762
    req = &handle->pipe.serv.accept_reqs[i];
763
    UV_REQ_INIT(req, UV_ACCEPT);
764
    req->data = handle;
765
    req->pipeHandle = INVALID_HANDLE_VALUE;
766
    req->next_pending = NULL;
767
  }
768

769
  /* TODO(bnoordhuis) Add converters that take a |length| parameter. */
770
  err = uv__convert_utf8_to_utf16(name_copy, &handle->name);
771
  uv__free(name_copy);
772
  name_copy = NULL;
773

774
  if (err) {
775
    goto error;
776
  }
777

778
  /*
779
   * Attempt to create the first pipe with FILE_FLAG_FIRST_PIPE_INSTANCE.
780
   * If this fails then there's already a pipe server for the given pipe name.
781
   */
782
  if (!pipe_alloc_accept(loop,
783
                         handle,
784
                         &handle->pipe.serv.accept_reqs[0],
785
                         TRUE)) {
786
    err = GetLastError();
787
    if (err == ERROR_ACCESS_DENIED) {
788
      err = UV_EADDRINUSE;
789
    } else if (err == ERROR_PATH_NOT_FOUND || err == ERROR_INVALID_NAME) {
790
      err = UV_EACCES;
791
    } else {
792
      err = uv_translate_sys_error(err);
793
    }
794
    goto error;
795
  }
796

797
  handle->pipe.serv.pending_accepts = NULL;
798
  handle->flags |= UV_HANDLE_PIPESERVER;
799
  handle->flags |= UV_HANDLE_BOUND;
800

801
  return 0;
802

803
error:
804
  uv__free(handle->pipe.serv.accept_reqs);
805
  uv__free(handle->name);
806
  uv__free(name_copy);
807
  handle->pipe.serv.accept_reqs = NULL;
808
  handle->name = NULL;
809

810
  return err;
811
}
812

813

814
static DWORD WINAPI pipe_connect_thread_proc(void* parameter) {
815
  uv_loop_t* loop;
816
  uv_pipe_t* handle;
817
  uv_connect_t* req;
818
  HANDLE pipeHandle = INVALID_HANDLE_VALUE;
819
  DWORD duplex_flags;
820

821
  req = (uv_connect_t*) parameter;
822
  assert(req);
823
  handle = (uv_pipe_t*) req->handle;
824
  assert(handle);
825
  loop = handle->loop;
826
  assert(loop);
827

828
  /* We're here because CreateFile on a pipe returned ERROR_PIPE_BUSY. We wait
829
   * up to 30 seconds for the pipe to become available with WaitNamedPipe. */
830
  while (WaitNamedPipeW(req->u.connect.name, 30000)) {
831
    /* The pipe is now available, try to connect. */
832
    pipeHandle = open_named_pipe(req->u.connect.name, &duplex_flags);
833
    if (pipeHandle != INVALID_HANDLE_VALUE)
834
      break;
835

836
    SwitchToThread();
837
  }
838

839
  uv__free(req->u.connect.name);
840
  req->u.connect.name = NULL;
841
  if (pipeHandle != INVALID_HANDLE_VALUE) {
842
    SET_REQ_SUCCESS(req);
843
    req->u.connect.pipeHandle = pipeHandle;
844
    req->u.connect.duplex_flags = duplex_flags;
845
  } else {
846
    SET_REQ_ERROR(req, GetLastError());
847
  }
848

849
  /* Post completed */
850
  POST_COMPLETION_FOR_REQ(loop, req);
851

852
  return 0;
853
}
854

855

856
void uv_pipe_connect(uv_connect_t* req,
857
                    uv_pipe_t* handle,
858
                    const char* name,
859
                    uv_connect_cb cb) {
860
  uv_loop_t* loop;
861
  int err;
862

863
  err = uv_pipe_connect2(req, handle, name, strlen(name), 0, cb);
864

865
  if (err) {
866
    loop = handle->loop;
867
    /* Make this req pending reporting an error. */
868
    SET_REQ_ERROR(req, err);
869
    uv__insert_pending_req(loop, (uv_req_t*) req);
870
    handle->reqs_pending++;
871
    REGISTER_HANDLE_REQ(loop, handle, req);
872
  }
873
}
874

875

876
int uv_pipe_connect2(uv_connect_t* req,
877
                     uv_pipe_t* handle,
878
                     const char* name,
879
                     size_t namelen,
880
                     unsigned int flags,
881
                     uv_connect_cb cb) {
882
  uv_loop_t* loop;
883
  int err;
884
  size_t nameSize;
885
  HANDLE pipeHandle = INVALID_HANDLE_VALUE;
886
  DWORD duplex_flags;
887
  char* name_copy;
888

889
  loop = handle->loop;
890
  UV_REQ_INIT(req, UV_CONNECT);
891
  req->handle = (uv_stream_t*) handle;
892
  req->cb = cb;
893
  req->u.connect.pipeHandle = INVALID_HANDLE_VALUE;
894
  req->u.connect.duplex_flags = 0;
895
  req->u.connect.name = NULL;
896

897
  if (flags & ~UV_PIPE_NO_TRUNCATE) {
898
    return UV_EINVAL;
899
  }
900

901
  if (name == NULL) {
902
    return UV_EINVAL;
903
  }
904

905
  if (namelen == 0) {
906
    return UV_EINVAL;
907
  }
908

909
  if (includes_nul(name, namelen)) {
910
    return UV_EINVAL;
911
  }
912

913
  name_copy = uv__malloc(namelen + 1);
914
  if (name_copy == NULL) {
915
    return UV_ENOMEM;
916
  }
917

918
  memcpy(name_copy, name, namelen);
919
  name_copy[namelen] = '\0';
920

921
  if (handle->flags & UV_HANDLE_PIPESERVER) {
922
    err = ERROR_INVALID_PARAMETER;
923
    goto error;
924
  }
925
  if (handle->flags & UV_HANDLE_CONNECTION) {
926
    err = ERROR_PIPE_BUSY;
927
    goto error;
928
  }
929
  uv__pipe_connection_init(handle);
930

931
  /* TODO(bnoordhuis) Add converters that take a |length| parameter. */
932
  err = uv__convert_utf8_to_utf16(name_copy, &handle->name);
933
  uv__free(name_copy);
934
  name_copy = NULL;
935

936
  if (err) {
937
    err = ERROR_NO_UNICODE_TRANSLATION;
938
    goto error;
939
  }
940

941
  pipeHandle = open_named_pipe(handle->name, &duplex_flags);
942
  if (pipeHandle == INVALID_HANDLE_VALUE) {
943
    if (GetLastError() == ERROR_PIPE_BUSY) {
944
      nameSize = (wcslen(handle->name) + 1) * sizeof(WCHAR);
945
      req->u.connect.name = uv__malloc(nameSize);
946
      if (!req->u.connect.name) {
947
        uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
948
      }
949

950
      memcpy(req->u.connect.name, handle->name, nameSize);
951

952
      /* Wait for the server to make a pipe instance available. */
953
      if (!QueueUserWorkItem(&pipe_connect_thread_proc,
954
                             req,
955
                             WT_EXECUTELONGFUNCTION)) {
956
        uv__free(req->u.connect.name);
957
        req->u.connect.name = NULL;
958
        err = GetLastError();
959
        goto error;
960
      }
961

962
      REGISTER_HANDLE_REQ(loop, handle, req);
963
      handle->reqs_pending++;
964

965
      return 0;
966
    }
967

968
    err = GetLastError();
969
    goto error;
970
  }
971

972
  req->u.connect.pipeHandle = pipeHandle;
973
  req->u.connect.duplex_flags = duplex_flags;
974
  SET_REQ_SUCCESS(req);
975
  uv__insert_pending_req(loop, (uv_req_t*) req);
976
  handle->reqs_pending++;
977
  REGISTER_HANDLE_REQ(loop, handle, req);
978
  return 0;
979

980
error:
981
  uv__free(name_copy);
982

983
  if (handle->name) {
984
    uv__free(handle->name);
985
    handle->name = NULL;
986
  }
987

988
  if (pipeHandle != INVALID_HANDLE_VALUE)
989
    CloseHandle(pipeHandle);
990

991
  /* Make this req pending reporting an error. */
992
  SET_REQ_ERROR(req, err);
993
  uv__insert_pending_req(loop, (uv_req_t*) req);
994
  handle->reqs_pending++;
995
  REGISTER_HANDLE_REQ(loop, handle, req);
996
  return 0;
997
}
998

999

1000
void uv__pipe_interrupt_read(uv_pipe_t* handle) {
1001
  BOOL r;
1002

1003
  if (!(handle->flags & UV_HANDLE_READ_PENDING))
1004
    return; /* No pending reads. */
1005
  if (handle->flags & UV_HANDLE_CANCELLATION_PENDING)
1006
    return; /* Already cancelled. */
1007
  if (handle->handle == INVALID_HANDLE_VALUE)
1008
    return; /* Pipe handle closed. */
1009

1010
  if (!(handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)) {
1011
    /* Cancel asynchronous read. */
1012
    r = CancelIoEx(handle->handle, &handle->read_req.u.io.overlapped);
1013
    assert(r || GetLastError() == ERROR_NOT_FOUND);
1014
    (void) r;
1015
  } else {
1016
    /* Cancel synchronous read (which is happening in the thread pool). */
1017
    HANDLE thread;
1018
    volatile HANDLE* thread_ptr = &handle->pipe.conn.readfile_thread_handle;
1019

1020
    EnterCriticalSection(&handle->pipe.conn.readfile_thread_lock);
1021

1022
    thread = *thread_ptr;
1023
    if (thread == NULL) {
1024
      /* The thread pool thread has not yet reached the point of blocking, we
1025
       * can pre-empt it by setting thread_handle to INVALID_HANDLE_VALUE. */
1026
      *thread_ptr = INVALID_HANDLE_VALUE;
1027

1028
    } else {
1029
      /* Spin until the thread has acknowledged (by setting the thread to
1030
       * INVALID_HANDLE_VALUE) that it is past the point of blocking. */
1031
      while (thread != INVALID_HANDLE_VALUE) {
1032
        r = CancelSynchronousIo(thread);
1033
        assert(r || GetLastError() == ERROR_NOT_FOUND);
1034
        SwitchToThread(); /* Yield thread. */
1035
        thread = *thread_ptr;
1036
      }
1037
    }
1038

1039
    LeaveCriticalSection(&handle->pipe.conn.readfile_thread_lock);
1040
  }
1041

1042
  /* Set flag to indicate that read has been cancelled. */
1043
  handle->flags |= UV_HANDLE_CANCELLATION_PENDING;
1044
}
1045

1046

1047
void uv__pipe_read_stop(uv_pipe_t* handle) {
1048
  handle->flags &= ~UV_HANDLE_READING;
1049
  DECREASE_ACTIVE_COUNT(handle->loop, handle);
1050
  uv__pipe_interrupt_read(handle);
1051
}
1052

1053

1054
/* Cleans up uv_pipe_t (server or connection) and all resources associated with
1055
 * it. */
1056
void uv__pipe_close(uv_loop_t* loop, uv_pipe_t* handle) {
1057
  int i;
1058
  HANDLE pipeHandle;
1059

1060
  if (handle->flags & UV_HANDLE_READING) {
1061
    handle->flags &= ~UV_HANDLE_READING;
1062
    DECREASE_ACTIVE_COUNT(loop, handle);
1063
  }
1064

1065
  if (handle->flags & UV_HANDLE_LISTENING) {
1066
    handle->flags &= ~UV_HANDLE_LISTENING;
1067
    DECREASE_ACTIVE_COUNT(loop, handle);
1068
  }
1069

1070
  handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
1071

1072
  uv__handle_closing(handle);
1073

1074
  uv__pipe_interrupt_read(handle);
1075

1076
  if (handle->name) {
1077
    uv__free(handle->name);
1078
    handle->name = NULL;
1079
  }
1080

1081
  if (handle->flags & UV_HANDLE_PIPESERVER) {
1082
    for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
1083
      pipeHandle = handle->pipe.serv.accept_reqs[i].pipeHandle;
1084
      if (pipeHandle != INVALID_HANDLE_VALUE) {
1085
        CloseHandle(pipeHandle);
1086
        handle->pipe.serv.accept_reqs[i].pipeHandle = INVALID_HANDLE_VALUE;
1087
      }
1088
    }
1089
    handle->handle = INVALID_HANDLE_VALUE;
1090
  }
1091

1092
  if (handle->flags & UV_HANDLE_CONNECTION) {
1093
    eof_timer_destroy(handle);
1094
  }
1095

1096
  if ((handle->flags & UV_HANDLE_CONNECTION)
1097
      && handle->handle != INVALID_HANDLE_VALUE) {
1098
    /* This will eventually destroy the write queue for us too. */
1099
    close_pipe(handle);
1100
  }
1101

1102
  if (handle->reqs_pending == 0)
1103
    uv__want_endgame(loop, (uv_handle_t*) handle);
1104
}
1105

1106

1107
static void uv__pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle,
1108
    uv_pipe_accept_t* req, BOOL firstInstance) {
1109
  assert(handle->flags & UV_HANDLE_LISTENING);
1110

1111
  if (!firstInstance && !pipe_alloc_accept(loop, handle, req, FALSE)) {
1112
    SET_REQ_ERROR(req, GetLastError());
1113
    uv__insert_pending_req(loop, (uv_req_t*) req);
1114
    handle->reqs_pending++;
1115
    return;
1116
  }
1117

1118
  assert(req->pipeHandle != INVALID_HANDLE_VALUE);
1119

1120
  /* Prepare the overlapped structure. */
1121
  memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped));
1122

1123
  if (!ConnectNamedPipe(req->pipeHandle, &req->u.io.overlapped) &&
1124
      GetLastError() != ERROR_IO_PENDING) {
1125
    if (GetLastError() == ERROR_PIPE_CONNECTED) {
1126
      SET_REQ_SUCCESS(req);
1127
    } else {
1128
      CloseHandle(req->pipeHandle);
1129
      req->pipeHandle = INVALID_HANDLE_VALUE;
1130
      /* Make this req pending reporting an error. */
1131
      SET_REQ_ERROR(req, GetLastError());
1132
    }
1133
    uv__insert_pending_req(loop, (uv_req_t*) req);
1134
    handle->reqs_pending++;
1135
    return;
1136
  }
1137

1138
  /* Wait for completion via IOCP */
1139
  handle->reqs_pending++;
1140
}
1141

1142

1143
int uv__pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
1144
  uv_loop_t* loop = server->loop;
1145
  uv_pipe_t* pipe_client;
1146
  uv_pipe_accept_t* req;
1147
  struct uv__queue* q;
1148
  uv__ipc_xfer_queue_item_t* item;
1149
  int err;
1150

1151
  if (server->ipc) {
1152
    if (uv__queue_empty(&server->pipe.conn.ipc_xfer_queue)) {
1153
      /* No valid pending sockets. */
1154
      return WSAEWOULDBLOCK;
1155
    }
1156

1157
    q = uv__queue_head(&server->pipe.conn.ipc_xfer_queue);
1158
    uv__queue_remove(q);
1159
    server->pipe.conn.ipc_xfer_queue_length--;
1160
    item = uv__queue_data(q, uv__ipc_xfer_queue_item_t, member);
1161

1162
    err = uv__tcp_xfer_import(
1163
        (uv_tcp_t*) client, item->xfer_type, &item->xfer_info);
1164
    
1165
    uv__free(item);
1166
    
1167
    if (err != 0)
1168
      return err;
1169

1170
  } else {
1171
    pipe_client = (uv_pipe_t*) client;
1172
    uv__pipe_connection_init(pipe_client);
1173

1174
    /* Find a connection instance that has been connected, but not yet
1175
     * accepted. */
1176
    req = server->pipe.serv.pending_accepts;
1177

1178
    if (!req) {
1179
      /* No valid connections found, so we error out. */
1180
      return WSAEWOULDBLOCK;
1181
    }
1182

1183
    /* Initialize the client handle and copy the pipeHandle to the client */
1184
    pipe_client->handle = req->pipeHandle;
1185
    pipe_client->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
1186

1187
    /* Prepare the req to pick up a new connection */
1188
    server->pipe.serv.pending_accepts = req->next_pending;
1189
    req->next_pending = NULL;
1190
    req->pipeHandle = INVALID_HANDLE_VALUE;
1191

1192
    server->handle = INVALID_HANDLE_VALUE;
1193
    if (!(server->flags & UV_HANDLE_CLOSING)) {
1194
      uv__pipe_queue_accept(loop, server, req, FALSE);
1195
    }
1196
  }
1197

1198
  return 0;
1199
}
1200

1201

1202
/* Starts listening for connections for the given pipe. */
1203
int uv__pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
1204
  uv_loop_t* loop = handle->loop;
1205
  int i;
1206

1207
  if (handle->flags & UV_HANDLE_LISTENING) {
1208
    handle->stream.serv.connection_cb = cb;
1209
  }
1210

1211
  if (!(handle->flags & UV_HANDLE_BOUND)) {
1212
    return WSAEINVAL;
1213
  }
1214

1215
  if (handle->flags & UV_HANDLE_READING) {
1216
    return WSAEISCONN;
1217
  }
1218

1219
  if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
1220
    return ERROR_NOT_SUPPORTED;
1221
  }
1222

1223
  if (handle->ipc) {
1224
    return WSAEINVAL;
1225
  }
1226

1227
  handle->flags |= UV_HANDLE_LISTENING;
1228
  INCREASE_ACTIVE_COUNT(loop, handle);
1229
  handle->stream.serv.connection_cb = cb;
1230

1231
  /* First pipe handle should have already been created in uv_pipe_bind */
1232
  assert(handle->pipe.serv.accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE);
1233

1234
  for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
1235
    uv__pipe_queue_accept(loop, handle, &handle->pipe.serv.accept_reqs[i], i == 0);
1236
  }
1237

1238
  return 0;
1239
}
1240

1241

1242
static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* arg) {
1243
  uv_read_t* req = (uv_read_t*) arg;
1244
  uv_pipe_t* handle = (uv_pipe_t*) req->data;
1245
  uv_loop_t* loop = handle->loop;
1246
  volatile HANDLE* thread_ptr = &handle->pipe.conn.readfile_thread_handle;
1247
  CRITICAL_SECTION* lock = &handle->pipe.conn.readfile_thread_lock;
1248
  HANDLE thread;
1249
  DWORD bytes;
1250
  DWORD err;
1251

1252
  assert(req->type == UV_READ);
1253
  assert(handle->type == UV_NAMED_PIPE);
1254

1255
  err = 0;
1256

1257
  /* Create a handle to the current thread. */
1258
  if (!DuplicateHandle(GetCurrentProcess(),
1259
                       GetCurrentThread(),
1260
                       GetCurrentProcess(),
1261
                       &thread,
1262
                       0,
1263
                       FALSE,
1264
                       DUPLICATE_SAME_ACCESS)) {
1265
    err = GetLastError();
1266
    goto out1;
1267
  }
1268

1269
  /* The lock needs to be held when thread handle is modified. */
1270
  EnterCriticalSection(lock);
1271
  if (*thread_ptr == INVALID_HANDLE_VALUE) {
1272
    /* uv__pipe_interrupt_read() cancelled reading before we got here. */
1273
    err = ERROR_OPERATION_ABORTED;
1274
  } else {
1275
    /* Let main thread know which worker thread is doing the blocking read. */
1276
    assert(*thread_ptr == NULL);
1277
    *thread_ptr = thread;
1278
  }
1279
  LeaveCriticalSection(lock);
1280

1281
  if (err)
1282
    goto out2;
1283

1284
  /* Block the thread until data is available on the pipe, or the read is
1285
   * cancelled. */
1286
  if (!ReadFile(handle->handle, &uv_zero_, 0, &bytes, NULL))
1287
    err = GetLastError();
1288

1289
  /* Let the main thread know the worker is past the point of blocking. */
1290
  assert(thread == *thread_ptr);
1291
  *thread_ptr = INVALID_HANDLE_VALUE;
1292

1293
  /* Briefly acquire the mutex. Since the main thread holds the lock while it
1294
   * is spinning trying to cancel this thread's I/O, we will block here until
1295
   * it stops doing that. */
1296
  EnterCriticalSection(lock);
1297
  LeaveCriticalSection(lock);
1298

1299
out2:
1300
  /* Close the handle to the current thread. */
1301
  CloseHandle(thread);
1302

1303
out1:
1304
  /* Set request status and post a completion record to the IOCP. */
1305
  if (err)
1306
    SET_REQ_ERROR(req, err);
1307
  else
1308
    SET_REQ_SUCCESS(req);
1309
  POST_COMPLETION_FOR_REQ(loop, req);
1310

1311
  return 0;
1312
}
1313

1314

1315
static DWORD WINAPI uv_pipe_writefile_thread_proc(void* parameter) {
1316
  int result;
1317
  DWORD bytes;
1318
  uv_write_t* req = (uv_write_t*) parameter;
1319
  uv_pipe_t* handle = (uv_pipe_t*) req->handle;
1320
  uv_loop_t* loop = handle->loop;
1321

1322
  assert(req != NULL);
1323
  assert(req->type == UV_WRITE);
1324
  assert(handle->type == UV_NAMED_PIPE);
1325

1326
  result = WriteFile(handle->handle,
1327
                     req->write_buffer.base,
1328
                     req->write_buffer.len,
1329
                     &bytes,
1330
                     NULL);
1331

1332
  if (!result) {
1333
    SET_REQ_ERROR(req, GetLastError());
1334
  }
1335

1336
  POST_COMPLETION_FOR_REQ(loop, req);
1337
  return 0;
1338
}
1339

1340

1341
static void CALLBACK post_completion_read_wait(void* context, BOOLEAN timed_out) {
1342
  uv_read_t* req;
1343
  uv_tcp_t* handle;
1344

1345
  req = (uv_read_t*) context;
1346
  assert(req != NULL);
1347
  handle = (uv_tcp_t*)req->data;
1348
  assert(handle != NULL);
1349
  assert(!timed_out);
1350

1351
  if (!PostQueuedCompletionStatus(handle->loop->iocp,
1352
                                  req->u.io.overlapped.InternalHigh,
1353
                                  0,
1354
                                  &req->u.io.overlapped)) {
1355
    uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
1356
  }
1357
}
1358

1359

1360
static void CALLBACK post_completion_write_wait(void* context, BOOLEAN timed_out) {
1361
  uv_write_t* req;
1362
  uv_tcp_t* handle;
1363

1364
  req = (uv_write_t*) context;
1365
  assert(req != NULL);
1366
  handle = (uv_tcp_t*)req->handle;
1367
  assert(handle != NULL);
1368
  assert(!timed_out);
1369

1370
  if (!PostQueuedCompletionStatus(handle->loop->iocp,
1371
                                  req->u.io.overlapped.InternalHigh,
1372
                                  0,
1373
                                  &req->u.io.overlapped)) {
1374
    uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
1375
  }
1376
}
1377

1378

1379
static void uv__pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) {
1380
  uv_read_t* req;
1381
  int result;
1382

1383
  assert(handle->flags & UV_HANDLE_READING);
1384
  assert(!(handle->flags & UV_HANDLE_READ_PENDING));
1385

1386
  assert(handle->handle != INVALID_HANDLE_VALUE);
1387

1388
  req = &handle->read_req;
1389

1390
  if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
1391
    handle->pipe.conn.readfile_thread_handle = NULL; /* Reset cancellation. */
1392
    if (!QueueUserWorkItem(&uv_pipe_zero_readfile_thread_proc,
1393
                           req,
1394
                           WT_EXECUTELONGFUNCTION)) {
1395
      /* Make this req pending reporting an error. */
1396
      SET_REQ_ERROR(req, GetLastError());
1397
      goto error;
1398
    }
1399
  } else {
1400
    memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
1401
    if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1402
      assert(req->event_handle != NULL);
1403
      req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
1404
    }
1405

1406
    /* Do 0-read */
1407
    result = ReadFile(handle->handle,
1408
                      &uv_zero_,
1409
                      0,
1410
                      NULL,
1411
                      &req->u.io.overlapped);
1412

1413
    if (!result && GetLastError() != ERROR_IO_PENDING) {
1414
      /* Make this req pending reporting an error. */
1415
      SET_REQ_ERROR(req, GetLastError());
1416
      goto error;
1417
    }
1418

1419
    if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1420
      if (req->wait_handle == INVALID_HANDLE_VALUE) {
1421
        if (!RegisterWaitForSingleObject(&req->wait_handle,
1422
            req->event_handle, post_completion_read_wait, (void*) req,
1423
            INFINITE, WT_EXECUTEINWAITTHREAD)) {
1424
          SET_REQ_ERROR(req, GetLastError());
1425
          goto error;
1426
        }
1427
      }
1428
    }
1429
  }
1430

1431
  /* Start the eof timer if there is one */
1432
  eof_timer_start(handle);
1433
  handle->flags |= UV_HANDLE_READ_PENDING;
1434
  handle->reqs_pending++;
1435
  return;
1436

1437
error:
1438
  uv__insert_pending_req(loop, (uv_req_t*)req);
1439
  handle->flags |= UV_HANDLE_READ_PENDING;
1440
  handle->reqs_pending++;
1441
}
1442

1443

1444
int uv__pipe_read_start(uv_pipe_t* handle,
1445
                        uv_alloc_cb alloc_cb,
1446
                        uv_read_cb read_cb) {
1447
  uv_loop_t* loop = handle->loop;
1448

1449
  handle->flags |= UV_HANDLE_READING;
1450
  INCREASE_ACTIVE_COUNT(loop, handle);
1451
  handle->read_cb = read_cb;
1452
  handle->alloc_cb = alloc_cb;
1453

1454
  /* If reading was stopped and then started again, there could still be a read
1455
   * request pending. */
1456
  if (!(handle->flags & UV_HANDLE_READ_PENDING)) {
1457
    if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
1458
        handle->read_req.event_handle == NULL) {
1459
      handle->read_req.event_handle = CreateEvent(NULL, 0, 0, NULL);
1460
      if (handle->read_req.event_handle == NULL) {
1461
        uv_fatal_error(GetLastError(), "CreateEvent");
1462
      }
1463
    }
1464
    uv__pipe_queue_read(loop, handle);
1465
  }
1466

1467
  return 0;
1468
}
1469

1470

1471
static void uv__insert_non_overlapped_write_req(uv_pipe_t* handle,
1472
    uv_write_t* req) {
1473
  req->next_req = NULL;
1474
  if (handle->pipe.conn.non_overlapped_writes_tail) {
1475
    req->next_req =
1476
      handle->pipe.conn.non_overlapped_writes_tail->next_req;
1477
    handle->pipe.conn.non_overlapped_writes_tail->next_req = (uv_req_t*)req;
1478
    handle->pipe.conn.non_overlapped_writes_tail = req;
1479
  } else {
1480
    req->next_req = (uv_req_t*)req;
1481
    handle->pipe.conn.non_overlapped_writes_tail = req;
1482
  }
1483
}
1484

1485

1486
static uv_write_t* uv_remove_non_overlapped_write_req(uv_pipe_t* handle) {
1487
  uv_write_t* req;
1488

1489
  if (handle->pipe.conn.non_overlapped_writes_tail) {
1490
    req = (uv_write_t*)handle->pipe.conn.non_overlapped_writes_tail->next_req;
1491

1492
    if (req == handle->pipe.conn.non_overlapped_writes_tail) {
1493
      handle->pipe.conn.non_overlapped_writes_tail = NULL;
1494
    } else {
1495
      handle->pipe.conn.non_overlapped_writes_tail->next_req =
1496
        req->next_req;
1497
    }
1498

1499
    return req;
1500
  } else {
1501
    /* queue empty */
1502
    return NULL;
1503
  }
1504
}
1505

1506

1507
static void uv__queue_non_overlapped_write(uv_pipe_t* handle) {
1508
  uv_write_t* req = uv_remove_non_overlapped_write_req(handle);
1509
  if (req) {
1510
    if (!QueueUserWorkItem(&uv_pipe_writefile_thread_proc,
1511
                           req,
1512
                           WT_EXECUTELONGFUNCTION)) {
1513
      uv_fatal_error(GetLastError(), "QueueUserWorkItem");
1514
    }
1515
  }
1516
}
1517

1518

1519
static int uv__build_coalesced_write_req(uv_write_t* user_req,
1520
                                         const uv_buf_t bufs[],
1521
                                         size_t nbufs,
1522
                                         uv_write_t** req_out,
1523
                                         uv_buf_t* write_buf_out) {
1524
  /* Pack into a single heap-allocated buffer:
1525
   *   (a) a uv_write_t structure where libuv stores the actual state.
1526
   *   (b) a pointer to the original uv_write_t.
1527
   *   (c) data from all `bufs` entries.
1528
   */
1529
  char* heap_buffer;
1530
  size_t heap_buffer_length, heap_buffer_offset;
1531
  uv__coalesced_write_t* coalesced_write_req; /* (a) + (b) */
1532
  char* data_start;                           /* (c) */
1533
  size_t data_length;
1534
  unsigned int i;
1535

1536
  /* Compute combined size of all combined buffers from `bufs`. */
1537
  data_length = 0;
1538
  for (i = 0; i < nbufs; i++)
1539
    data_length += bufs[i].len;
1540

1541
  /* The total combined size of data buffers should not exceed UINT32_MAX,
1542
   * because WriteFile() won't accept buffers larger than that. */
1543
  if (data_length > UINT32_MAX)
1544
    return WSAENOBUFS; /* Maps to UV_ENOBUFS. */
1545

1546
  /* Compute heap buffer size. */
1547
  heap_buffer_length = sizeof *coalesced_write_req + /* (a) + (b) */
1548
                       data_length;                  /* (c) */
1549

1550
  /* Allocate buffer. */
1551
  heap_buffer = uv__malloc(heap_buffer_length);
1552
  if (heap_buffer == NULL)
1553
    return ERROR_NOT_ENOUGH_MEMORY; /* Maps to UV_ENOMEM. */
1554

1555
  /* Copy uv_write_t information to the buffer. */
1556
  coalesced_write_req = (uv__coalesced_write_t*) heap_buffer;
1557
  coalesced_write_req->req = *user_req; /* copy (a) */
1558
  coalesced_write_req->req.coalesced = 1;
1559
  coalesced_write_req->user_req = user_req;         /* copy (b) */
1560
  heap_buffer_offset = sizeof *coalesced_write_req; /* offset (a) + (b) */
1561

1562
  /* Copy data buffers to the heap buffer. */
1563
  data_start = &heap_buffer[heap_buffer_offset];
1564
  for (i = 0; i < nbufs; i++) {
1565
    memcpy(&heap_buffer[heap_buffer_offset],
1566
           bufs[i].base,
1567
           bufs[i].len);               /* copy (c) */
1568
    heap_buffer_offset += bufs[i].len; /* offset (c) */
1569
  }
1570
  assert(heap_buffer_offset == heap_buffer_length);
1571

1572
  /* Set out arguments and return. */
1573
  *req_out = &coalesced_write_req->req;
1574
  *write_buf_out = uv_buf_init(data_start, (unsigned int) data_length);
1575
  return 0;
1576
}
1577

1578

1579
static int uv__pipe_write_data(uv_loop_t* loop,
1580
                               uv_write_t* req,
1581
                               uv_pipe_t* handle,
1582
                               const uv_buf_t bufs[],
1583
                               size_t nbufs,
1584
                               uv_write_cb cb,
1585
                               int copy_always) {
1586
  int err;
1587
  int result;
1588
  uv_buf_t write_buf;
1589

1590
  assert(handle->handle != INVALID_HANDLE_VALUE);
1591

1592
  UV_REQ_INIT(req, UV_WRITE);
1593
  req->handle = (uv_stream_t*) handle;
1594
  req->send_handle = NULL;
1595
  req->cb = cb;
1596
  /* Private fields. */
1597
  req->coalesced = 0;
1598
  req->event_handle = NULL;
1599
  req->wait_handle = INVALID_HANDLE_VALUE;
1600

1601
  /* Prepare the overlapped structure. */
1602
  memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
1603
  if (handle->flags & (UV_HANDLE_EMULATE_IOCP | UV_HANDLE_BLOCKING_WRITES)) {
1604
    req->event_handle = CreateEvent(NULL, 0, 0, NULL);
1605
    if (req->event_handle == NULL) {
1606
      uv_fatal_error(GetLastError(), "CreateEvent");
1607
    }
1608
    req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
1609
  }
1610
  req->write_buffer = uv_null_buf_;
1611

1612
  if (nbufs == 0) {
1613
    /* Write empty buffer. */
1614
    write_buf = uv_null_buf_;
1615
  } else if (nbufs == 1 && !copy_always) {
1616
    /* Write directly from bufs[0]. */
1617
    write_buf = bufs[0];
1618
  } else {
1619
    /* Coalesce all `bufs` into one big buffer. This also creates a new
1620
     * write-request structure that replaces the old one. */
1621
    err = uv__build_coalesced_write_req(req, bufs, nbufs, &req, &write_buf);
1622
    if (err != 0)
1623
      return err;
1624
  }
1625

1626
  if ((handle->flags &
1627
      (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) ==
1628
      (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) {
1629
    DWORD bytes;
1630
    result =
1631
        WriteFile(handle->handle, write_buf.base, write_buf.len, &bytes, NULL);
1632

1633
    if (!result) {
1634
      err = GetLastError();
1635
      return err;
1636
    } else {
1637
      /* Request completed immediately. */
1638
      req->u.io.queued_bytes = 0;
1639
    }
1640

1641
    REGISTER_HANDLE_REQ(loop, handle, req);
1642
    handle->reqs_pending++;
1643
    handle->stream.conn.write_reqs_pending++;
1644
    POST_COMPLETION_FOR_REQ(loop, req);
1645
    return 0;
1646
  } else if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
1647
    req->write_buffer = write_buf;
1648
    uv__insert_non_overlapped_write_req(handle, req);
1649
    if (handle->stream.conn.write_reqs_pending == 0) {
1650
      uv__queue_non_overlapped_write(handle);
1651
    }
1652

1653
    /* Request queued by the kernel. */
1654
    req->u.io.queued_bytes = write_buf.len;
1655
    handle->write_queue_size += req->u.io.queued_bytes;
1656
  } else if (handle->flags & UV_HANDLE_BLOCKING_WRITES) {
1657
    /* Using overlapped IO, but wait for completion before returning */
1658
    result = WriteFile(handle->handle,
1659
                       write_buf.base,
1660
                       write_buf.len,
1661
                       NULL,
1662
                       &req->u.io.overlapped);
1663

1664
    if (!result && GetLastError() != ERROR_IO_PENDING) {
1665
      err = GetLastError();
1666
      CloseHandle(req->event_handle);
1667
      req->event_handle = NULL;
1668
      return err;
1669
    }
1670

1671
    if (result) {
1672
      /* Request completed immediately. */
1673
      req->u.io.queued_bytes = 0;
1674
    } else {
1675
      /* Request queued by the kernel. */
1676
      req->u.io.queued_bytes = write_buf.len;
1677
      handle->write_queue_size += req->u.io.queued_bytes;
1678
      if (WaitForSingleObject(req->event_handle, INFINITE) !=
1679
          WAIT_OBJECT_0) {
1680
        err = GetLastError();
1681
        CloseHandle(req->event_handle);
1682
        req->event_handle = NULL;
1683
        return err;
1684
      }
1685
    }
1686
    CloseHandle(req->event_handle);
1687
    req->event_handle = NULL;
1688

1689
    REGISTER_HANDLE_REQ(loop, handle, req);
1690
    handle->reqs_pending++;
1691
    handle->stream.conn.write_reqs_pending++;
1692
    return 0;
1693
  } else {
1694
    result = WriteFile(handle->handle,
1695
                       write_buf.base,
1696
                       write_buf.len,
1697
                       NULL,
1698
                       &req->u.io.overlapped);
1699

1700
    if (!result && GetLastError() != ERROR_IO_PENDING) {
1701
      return GetLastError();
1702
    }
1703

1704
    if (result) {
1705
      /* Request completed immediately. */
1706
      req->u.io.queued_bytes = 0;
1707
    } else {
1708
      /* Request queued by the kernel. */
1709
      req->u.io.queued_bytes = write_buf.len;
1710
      handle->write_queue_size += req->u.io.queued_bytes;
1711
    }
1712

1713
    if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
1714
      if (!RegisterWaitForSingleObject(&req->wait_handle,
1715
          req->event_handle, post_completion_write_wait, (void*) req,
1716
          INFINITE, WT_EXECUTEINWAITTHREAD)) {
1717
        return GetLastError();
1718
      }
1719
    }
1720
  }
1721

1722
  REGISTER_HANDLE_REQ(loop, handle, req);
1723
  handle->reqs_pending++;
1724
  handle->stream.conn.write_reqs_pending++;
1725

1726
  return 0;
1727
}
1728

1729

1730
static DWORD uv__pipe_get_ipc_remote_pid(uv_pipe_t* handle) {
1731
  DWORD* pid = &handle->pipe.conn.ipc_remote_pid;
1732

1733
  /* If the both ends of the IPC pipe are owned by the same process,
1734
   * the remote end pid may not yet be set. If so, do it here.
1735
   * TODO: this is weird; it'd probably better to use a handshake. */
1736
  if (*pid == 0) {
1737
    GetNamedPipeClientProcessId(handle->handle, pid);
1738
    if (*pid == GetCurrentProcessId()) {
1739
      GetNamedPipeServerProcessId(handle->handle, pid);
1740
    }
1741
  }
1742
  
1743
  return *pid;
1744
}
1745

1746

1747
int uv__pipe_write_ipc(uv_loop_t* loop,
1748
                       uv_write_t* req,
1749
                       uv_pipe_t* handle,
1750
                       const uv_buf_t data_bufs[],
1751
                       size_t data_buf_count,
1752
                       uv_stream_t* send_handle,
1753
                       uv_write_cb cb) {
1754
  uv_buf_t stack_bufs[6];
1755
  uv_buf_t* bufs;
1756
  size_t buf_count, buf_index;
1757
  uv__ipc_frame_header_t frame_header;
1758
  uv__ipc_socket_xfer_type_t xfer_type = UV__IPC_SOCKET_XFER_NONE;
1759
  uv__ipc_socket_xfer_info_t xfer_info;
1760
  uint64_t data_length;
1761
  size_t i;
1762
  int err;
1763

1764
  /* Compute the combined size of data buffers. */
1765
  data_length = 0;
1766
  for (i = 0; i < data_buf_count; i++)
1767
    data_length += data_bufs[i].len;
1768
  if (data_length > UINT32_MAX)
1769
    return WSAENOBUFS; /* Maps to UV_ENOBUFS. */
1770

1771
  /* Prepare the frame's socket xfer payload. */
1772
  if (send_handle != NULL) {
1773
    uv_tcp_t* send_tcp_handle = (uv_tcp_t*) send_handle;
1774

1775
    /* Verify that `send_handle` it is indeed a tcp handle. */
1776
    if (send_tcp_handle->type != UV_TCP)
1777
      return ERROR_NOT_SUPPORTED;
1778

1779
    /* Export the tcp handle. */
1780
    err = uv__tcp_xfer_export(send_tcp_handle,
1781
                              uv__pipe_get_ipc_remote_pid(handle),
1782
                              &xfer_type,
1783
                              &xfer_info);
1784
    if (err != 0)
1785
      return err;
1786
  }
1787

1788
  /* Compute the number of uv_buf_t's required. */
1789
  buf_count = 1 + data_buf_count; /* Frame header and data buffers. */
1790
  if (send_handle != NULL)
1791
    buf_count += 1; /* One extra for the socket xfer information. */
1792

1793
  /* Use the on-stack buffer array if it is big enough; otherwise allocate
1794
   * space for it on the heap. */
1795
  if (buf_count < ARRAY_SIZE(stack_bufs)) {
1796
    /* Use on-stack buffer array. */
1797
    bufs = stack_bufs;
1798
  } else {
1799
    /* Use heap-allocated buffer array. */
1800
    bufs = uv__calloc(buf_count, sizeof(uv_buf_t));
1801
    if (bufs == NULL)
1802
      return ERROR_NOT_ENOUGH_MEMORY; /* Maps to UV_ENOMEM. */
1803
  }
1804
  buf_index = 0;
1805

1806
  /* Initialize frame header and add it to the buffers list. */
1807
  memset(&frame_header, 0, sizeof frame_header);
1808
  bufs[buf_index++] = uv_buf_init((char*) &frame_header, sizeof frame_header);
1809

1810
  if (send_handle != NULL) {
1811
    /* Add frame header flags. */
1812
    switch (xfer_type) {
1813
      case UV__IPC_SOCKET_XFER_TCP_CONNECTION:
1814
        frame_header.flags |= UV__IPC_FRAME_HAS_SOCKET_XFER |
1815
                              UV__IPC_FRAME_XFER_IS_TCP_CONNECTION;
1816
        break;
1817
      case UV__IPC_SOCKET_XFER_TCP_SERVER:
1818
        frame_header.flags |= UV__IPC_FRAME_HAS_SOCKET_XFER;
1819
        break;
1820
      default:
1821
        assert(0);  /* Unreachable. */
1822
    }
1823
    /* Add xfer info buffer. */
1824
    bufs[buf_index++] = uv_buf_init((char*) &xfer_info, sizeof xfer_info);
1825
  }
1826

1827
  if (data_length > 0) {
1828
    /* Update frame header. */
1829
    frame_header.flags |= UV__IPC_FRAME_HAS_DATA;
1830
    frame_header.data_length = (uint32_t) data_length;
1831
    /* Add data buffers to buffers list. */
1832
    for (i = 0; i < data_buf_count; i++)
1833
      bufs[buf_index++] = data_bufs[i];
1834
  }
1835

1836
  /* Write buffers. We set the `always_copy` flag, so it is not a problem that
1837
   * some of the written data lives on the stack. */
1838
  err = uv__pipe_write_data(loop, req, handle, bufs, buf_count, cb, 1);
1839

1840
  /* If we had to heap-allocate the bufs array, free it now. */
1841
  if (bufs != stack_bufs) {
1842
    uv__free(bufs);
1843
  }
1844

1845
  return err;
1846
}
1847

1848

1849
int uv__pipe_write(uv_loop_t* loop,
1850
                   uv_write_t* req,
1851
                   uv_pipe_t* handle,
1852
                   const uv_buf_t bufs[],
1853
                   size_t nbufs,
1854
                   uv_stream_t* send_handle,
1855
                   uv_write_cb cb) {
1856
  if (handle->ipc) {
1857
    /* IPC pipe write: use framing protocol. */
1858
    return uv__pipe_write_ipc(loop, req, handle, bufs, nbufs, send_handle, cb);
1859
  } else {
1860
    /* Non-IPC pipe write: put data on the wire directly. */
1861
    assert(send_handle == NULL);
1862
    return uv__pipe_write_data(loop, req, handle, bufs, nbufs, cb, 0);
1863
  }
1864
}
1865

1866

1867
static void uv__pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle,
1868
    uv_buf_t buf) {
1869
  /* If there is an eof timer running, we don't need it any more, so discard
1870
   * it. */
1871
  eof_timer_destroy(handle);
1872

1873
  uv_read_stop((uv_stream_t*) handle);
1874

1875
  handle->read_cb((uv_stream_t*) handle, UV_EOF, &buf);
1876
}
1877

1878

1879
static void uv__pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error,
1880
    uv_buf_t buf) {
1881
  /* If there is an eof timer running, we don't need it any more, so discard
1882
   * it. */
1883
  eof_timer_destroy(handle);
1884

1885
  uv_read_stop((uv_stream_t*) handle);
1886

1887
  handle->read_cb((uv_stream_t*)handle, uv_translate_sys_error(error), &buf);
1888
}
1889

1890

1891
static void uv__pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle,
1892
    int error, uv_buf_t buf) {
1893
  if (error == ERROR_BROKEN_PIPE) {
1894
    uv__pipe_read_eof(loop, handle, buf);
1895
  } else {
1896
    uv__pipe_read_error(loop, handle, error, buf);
1897
  }
1898
}
1899

1900

1901
static void uv__pipe_queue_ipc_xfer_info(
1902
    uv_pipe_t* handle,
1903
    uv__ipc_socket_xfer_type_t xfer_type,
1904
    uv__ipc_socket_xfer_info_t* xfer_info) {
1905
  uv__ipc_xfer_queue_item_t* item;
1906

1907
  item = (uv__ipc_xfer_queue_item_t*) uv__malloc(sizeof(*item));
1908
  if (item == NULL)
1909
    uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
1910

1911
  item->xfer_type = xfer_type;
1912
  item->xfer_info = *xfer_info;
1913

1914
  uv__queue_insert_tail(&handle->pipe.conn.ipc_xfer_queue, &item->member);
1915
  handle->pipe.conn.ipc_xfer_queue_length++;
1916
}
1917

1918

1919
/* Read an exact number of bytes from a pipe. If an error or end-of-file is
1920
 * encountered before the requested number of bytes are read, an error is
1921
 * returned. */
1922
static int uv__pipe_read_exactly(HANDLE h, void* buffer, DWORD count) {
1923
  DWORD bytes_read, bytes_read_now;
1924

1925
  bytes_read = 0;
1926
  while (bytes_read < count) {
1927
    if (!ReadFile(h,
1928
                  (char*) buffer + bytes_read,
1929
                  count - bytes_read,
1930
                  &bytes_read_now,
1931
                  NULL)) {
1932
      return GetLastError();
1933
    }
1934

1935
    bytes_read += bytes_read_now;
1936
  }
1937

1938
  assert(bytes_read == count);
1939
  return 0;
1940
}
1941

1942

1943
static DWORD uv__pipe_read_data(uv_loop_t* loop,
1944
                                uv_pipe_t* handle,
1945
                                DWORD suggested_bytes,
1946
                                DWORD max_bytes) {
1947
  DWORD bytes_read;
1948
  uv_buf_t buf;
1949

1950
  /* Ask the user for a buffer to read data into. */
1951
  buf = uv_buf_init(NULL, 0);
1952
  handle->alloc_cb((uv_handle_t*) handle, suggested_bytes, &buf);
1953
  if (buf.base == NULL || buf.len == 0) {
1954
    handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf);
1955
    return 0; /* Break out of read loop. */
1956
  }
1957

1958
  /* Ensure we read at most the smaller of:
1959
   *   (a) the length of the user-allocated buffer.
1960
   *   (b) the maximum data length as specified by the `max_bytes` argument.
1961
   */
1962
  if (max_bytes > buf.len)
1963
    max_bytes = buf.len;
1964

1965
  /* Read into the user buffer. */
1966
  if (!ReadFile(handle->handle, buf.base, max_bytes, &bytes_read, NULL)) {
1967
    uv__pipe_read_error_or_eof(loop, handle, GetLastError(), buf);
1968
    return 0; /* Break out of read loop. */
1969
  }
1970

1971
  /* Call the read callback. */
1972
  handle->read_cb((uv_stream_t*) handle, bytes_read, &buf);
1973

1974
  return bytes_read;
1975
}
1976

1977

1978
static DWORD uv__pipe_read_ipc(uv_loop_t* loop, uv_pipe_t* handle) {
1979
  uint32_t* data_remaining = &handle->pipe.conn.ipc_data_frame.payload_remaining;
1980
  int err;
1981

1982
  if (*data_remaining > 0) {
1983
    /* Read frame data payload. */
1984
    DWORD bytes_read =
1985
        uv__pipe_read_data(loop, handle, *data_remaining, *data_remaining);
1986
    *data_remaining -= bytes_read;
1987
    return bytes_read;
1988

1989
  } else {
1990
    /* Start of a new IPC frame. */
1991
    uv__ipc_frame_header_t frame_header;
1992
    uint32_t xfer_flags;
1993
    uv__ipc_socket_xfer_type_t xfer_type;
1994
    uv__ipc_socket_xfer_info_t xfer_info;
1995

1996
    /* Read the IPC frame header. */
1997
    err = uv__pipe_read_exactly(
1998
        handle->handle, &frame_header, sizeof frame_header);
1999
    if (err)
2000
      goto error;
2001

2002
    /* Validate that flags are valid. */
2003
    if ((frame_header.flags & ~UV__IPC_FRAME_VALID_FLAGS) != 0)
2004
      goto invalid;
2005
    /* Validate that reserved2 is zero. */
2006
    if (frame_header.reserved2 != 0)
2007
      goto invalid;
2008

2009
    /* Parse xfer flags. */
2010
    xfer_flags = frame_header.flags & UV__IPC_FRAME_XFER_FLAGS;
2011
    if (xfer_flags & UV__IPC_FRAME_HAS_SOCKET_XFER) {
2012
      /* Socket coming -- determine the type. */
2013
      xfer_type = xfer_flags & UV__IPC_FRAME_XFER_IS_TCP_CONNECTION
2014
                      ? UV__IPC_SOCKET_XFER_TCP_CONNECTION
2015
                      : UV__IPC_SOCKET_XFER_TCP_SERVER;
2016
    } else if (xfer_flags == 0) {
2017
      /* No socket. */
2018
      xfer_type = UV__IPC_SOCKET_XFER_NONE;
2019
    } else {
2020
      /* Invalid flags. */
2021
      goto invalid;
2022
    }
2023

2024
    /* Parse data frame information. */
2025
    if (frame_header.flags & UV__IPC_FRAME_HAS_DATA) {
2026
      *data_remaining = frame_header.data_length;
2027
    } else if (frame_header.data_length != 0) {
2028
      /* Data length greater than zero but data flag not set -- invalid. */
2029
      goto invalid;
2030
    }
2031

2032
    /* If no socket xfer info follows, return here. Data will be read in a
2033
     * subsequent invocation of uv__pipe_read_ipc(). */
2034
    if (xfer_type == UV__IPC_SOCKET_XFER_NONE)
2035
      return sizeof frame_header; /* Number of bytes read. */
2036

2037
    /* Read transferred socket information. */
2038
    err = uv__pipe_read_exactly(handle->handle, &xfer_info, sizeof xfer_info);
2039
    if (err)
2040
      goto error;
2041

2042
    /* Store the pending socket info. */
2043
    uv__pipe_queue_ipc_xfer_info(handle, xfer_type, &xfer_info);
2044

2045
    /* Return number of bytes read. */
2046
    return sizeof frame_header + sizeof xfer_info;
2047
  }
2048

2049
invalid:
2050
  /* Invalid frame. */
2051
  err = WSAECONNABORTED; /* Maps to UV_ECONNABORTED. */
2052

2053
error:
2054
  uv__pipe_read_error_or_eof(loop, handle, err, uv_null_buf_);
2055
  return 0; /* Break out of read loop. */
2056
}
2057

2058

2059
void uv__process_pipe_read_req(uv_loop_t* loop,
2060
                               uv_pipe_t* handle,
2061
                               uv_req_t* req) {
2062
  assert(handle->type == UV_NAMED_PIPE);
2063

2064
  handle->flags &= ~(UV_HANDLE_READ_PENDING | UV_HANDLE_CANCELLATION_PENDING);
2065
  DECREASE_PENDING_REQ_COUNT(handle);
2066
  eof_timer_stop(handle);
2067

2068
  /* At this point, we're done with bookkeeping. If the user has stopped
2069
   * reading the pipe in the meantime, there is nothing left to do, since there
2070
   * is no callback that we can call. */
2071
  if (!(handle->flags & UV_HANDLE_READING))
2072
    return;
2073

2074
  if (!REQ_SUCCESS(req)) {
2075
    /* An error occurred doing the zero-read. */
2076
    DWORD err = GET_REQ_ERROR(req);
2077

2078
    /* If the read was cancelled by uv__pipe_interrupt_read(), the request may
2079
     * indicate an ERROR_OPERATION_ABORTED error. This error isn't relevant to
2080
     * the user; we'll start a new zero-read at the end of this function. */
2081
    if (err != ERROR_OPERATION_ABORTED)
2082
      uv__pipe_read_error_or_eof(loop, handle, err, uv_null_buf_);
2083

2084
  } else {
2085
    /* The zero-read completed without error, indicating there is data
2086
     * available in the kernel buffer. */
2087
    DWORD avail;
2088

2089
    /* Get the number of bytes available. */
2090
    avail = 0;
2091
    if (!PeekNamedPipe(handle->handle, NULL, 0, NULL, &avail, NULL))
2092
      uv__pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
2093

2094
    /* Read until we've either read all the bytes available, or the 'reading'
2095
     * flag is cleared. */
2096
    while (avail > 0 && handle->flags & UV_HANDLE_READING) {
2097
      /* Depending on the type of pipe, read either IPC frames or raw data. */
2098
      DWORD bytes_read =
2099
          handle->ipc ? uv__pipe_read_ipc(loop, handle)
2100
                      : uv__pipe_read_data(loop, handle, avail, (DWORD) -1);
2101

2102
      /* If no bytes were read, treat this as an indication that an error
2103
       * occurred, and break out of the read loop. */
2104
      if (bytes_read == 0)
2105
        break;
2106

2107
      /* It is possible that more bytes were read than we thought were
2108
       * available. To prevent `avail` from underflowing, break out of the loop
2109
       * if this is the case. */
2110
      if (bytes_read > avail)
2111
        break;
2112

2113
      /* Recompute the number of bytes available. */
2114
      avail -= bytes_read;
2115
    }
2116
  }
2117

2118
  /* Start another zero-read request if necessary. */
2119
  if ((handle->flags & UV_HANDLE_READING) &&
2120
      !(handle->flags & UV_HANDLE_READ_PENDING)) {
2121
    uv__pipe_queue_read(loop, handle);
2122
  }
2123
}
2124

2125

2126
void uv__process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
2127
    uv_write_t* req) {
2128
  int err;
2129

2130
  assert(handle->type == UV_NAMED_PIPE);
2131

2132
  assert(handle->write_queue_size >= req->u.io.queued_bytes);
2133
  handle->write_queue_size -= req->u.io.queued_bytes;
2134

2135
  UNREGISTER_HANDLE_REQ(loop, handle, req);
2136

2137
  if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
2138
    if (req->wait_handle != INVALID_HANDLE_VALUE) {
2139
      UnregisterWait(req->wait_handle);
2140
      req->wait_handle = INVALID_HANDLE_VALUE;
2141
    }
2142
    if (req->event_handle) {
2143
      CloseHandle(req->event_handle);
2144
      req->event_handle = NULL;
2145
    }
2146
  }
2147

2148
  err = GET_REQ_ERROR(req);
2149

2150
  /* If this was a coalesced write, extract pointer to the user_provided
2151
   * uv_write_t structure so we can pass the expected pointer to the callback,
2152
   * then free the heap-allocated write req. */
2153
  if (req->coalesced) {
2154
    uv__coalesced_write_t* coalesced_write =
2155
        container_of(req, uv__coalesced_write_t, req);
2156
    req = coalesced_write->user_req;
2157
    uv__free(coalesced_write);
2158
  }
2159
  if (req->cb) {
2160
    req->cb(req, uv_translate_sys_error(err));
2161
  }
2162

2163
  handle->stream.conn.write_reqs_pending--;
2164

2165
  if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE &&
2166
      handle->pipe.conn.non_overlapped_writes_tail) {
2167
    assert(handle->stream.conn.write_reqs_pending > 0);
2168
    uv__queue_non_overlapped_write(handle);
2169
  }
2170

2171
  if (handle->stream.conn.write_reqs_pending == 0 &&
2172
      uv__is_stream_shutting(handle))
2173
    uv__pipe_shutdown(loop, handle, handle->stream.conn.shutdown_req);
2174

2175
  DECREASE_PENDING_REQ_COUNT(handle);
2176
}
2177

2178

2179
void uv__process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle,
2180
    uv_req_t* raw_req) {
2181
  uv_pipe_accept_t* req = (uv_pipe_accept_t*) raw_req;
2182

2183
  assert(handle->type == UV_NAMED_PIPE);
2184

2185
  if (handle->flags & UV_HANDLE_CLOSING) {
2186
    /* The req->pipeHandle should be freed already in uv__pipe_close(). */
2187
    assert(req->pipeHandle == INVALID_HANDLE_VALUE);
2188
    DECREASE_PENDING_REQ_COUNT(handle);
2189
    return;
2190
  }
2191

2192
  if (REQ_SUCCESS(req)) {
2193
    assert(req->pipeHandle != INVALID_HANDLE_VALUE);
2194
    req->next_pending = handle->pipe.serv.pending_accepts;
2195
    handle->pipe.serv.pending_accepts = req;
2196

2197
    if (handle->stream.serv.connection_cb) {
2198
      handle->stream.serv.connection_cb((uv_stream_t*)handle, 0);
2199
    }
2200
  } else {
2201
    if (req->pipeHandle != INVALID_HANDLE_VALUE) {
2202
      CloseHandle(req->pipeHandle);
2203
      req->pipeHandle = INVALID_HANDLE_VALUE;
2204
    }
2205
    if (!(handle->flags & UV_HANDLE_CLOSING)) {
2206
      uv__pipe_queue_accept(loop, handle, req, FALSE);
2207
    }
2208
  }
2209

2210
  DECREASE_PENDING_REQ_COUNT(handle);
2211
}
2212

2213

2214
void uv__process_pipe_connect_req(uv_loop_t* loop, uv_pipe_t* handle,
2215
    uv_connect_t* req) {
2216
  HANDLE pipeHandle;
2217
  DWORD duplex_flags;
2218
  int err;
2219

2220
  assert(handle->type == UV_NAMED_PIPE);
2221

2222
  UNREGISTER_HANDLE_REQ(loop, handle, req);
2223

2224
  err = 0;
2225
  if (REQ_SUCCESS(req)) {
2226
    pipeHandle = req->u.connect.pipeHandle;
2227
    duplex_flags = req->u.connect.duplex_flags;
2228
    if (handle->flags & UV_HANDLE_CLOSING)
2229
      err = UV_ECANCELED;
2230
    else
2231
      err = uv__set_pipe_handle(loop, handle, pipeHandle, -1, duplex_flags);
2232
    if (err)
2233
      CloseHandle(pipeHandle);
2234
  } else {
2235
    err = uv_translate_sys_error(GET_REQ_ERROR(req));
2236
  }
2237

2238
  if (req->cb)
2239
    req->cb(req, err);
2240

2241
  DECREASE_PENDING_REQ_COUNT(handle);
2242
}
2243

2244

2245

2246
void uv__process_pipe_shutdown_req(uv_loop_t* loop, uv_pipe_t* handle,
2247
    uv_shutdown_t* req) {
2248
  int err;
2249

2250
  assert(handle->type == UV_NAMED_PIPE);
2251

2252
  /* Clear the shutdown_req field so we don't go here again. */
2253
  handle->stream.conn.shutdown_req = NULL;
2254
  UNREGISTER_HANDLE_REQ(loop, handle, req);
2255

2256
  if (handle->flags & UV_HANDLE_CLOSING) {
2257
    /* Already closing. Cancel the shutdown. */
2258
    err = UV_ECANCELED;
2259
  } else if (!REQ_SUCCESS(req)) {
2260
    /* An error occurred in trying to shutdown gracefully. */
2261
    err = uv_translate_sys_error(GET_REQ_ERROR(req));
2262
  } else {
2263
    if (handle->flags & UV_HANDLE_READABLE) {
2264
      /* Initialize and optionally start the eof timer. Only do this if the pipe
2265
       * is readable and we haven't seen EOF come in ourselves. */
2266
      eof_timer_init(handle);
2267

2268
      /* If reading start the timer right now. Otherwise uv__pipe_queue_read will
2269
       * start it. */
2270
      if (handle->flags & UV_HANDLE_READ_PENDING) {
2271
        eof_timer_start(handle);
2272
      }
2273

2274
    } else {
2275
      /* This pipe is not readable. We can just close it to let the other end
2276
       * know that we're done writing. */
2277
      close_pipe(handle);
2278
    }
2279
    err = 0;
2280
  }
2281

2282
  if (req->cb)
2283
    req->cb(req, err);
2284

2285
  DECREASE_PENDING_REQ_COUNT(handle);
2286
}
2287

2288

2289
static void eof_timer_init(uv_pipe_t* pipe) {
2290
  int r;
2291

2292
  assert(pipe->pipe.conn.eof_timer == NULL);
2293
  assert(pipe->flags & UV_HANDLE_CONNECTION);
2294

2295
  pipe->pipe.conn.eof_timer = (uv_timer_t*) uv__malloc(sizeof *pipe->pipe.conn.eof_timer);
2296

2297
  r = uv_timer_init(pipe->loop, pipe->pipe.conn.eof_timer);
2298
  assert(r == 0);  /* timers can't fail */
2299
  (void) r;
2300
  pipe->pipe.conn.eof_timer->data = pipe;
2301
  uv_unref((uv_handle_t*) pipe->pipe.conn.eof_timer);
2302
}
2303

2304

2305
static void eof_timer_start(uv_pipe_t* pipe) {
2306
  assert(pipe->flags & UV_HANDLE_CONNECTION);
2307

2308
  if (pipe->pipe.conn.eof_timer != NULL) {
2309
    uv_timer_start(pipe->pipe.conn.eof_timer, eof_timer_cb, eof_timeout, 0);
2310
  }
2311
}
2312

2313

2314
static void eof_timer_stop(uv_pipe_t* pipe) {
2315
  assert(pipe->flags & UV_HANDLE_CONNECTION);
2316

2317
  if (pipe->pipe.conn.eof_timer != NULL) {
2318
    uv_timer_stop(pipe->pipe.conn.eof_timer);
2319
  }
2320
}
2321

2322

2323
static void eof_timer_cb(uv_timer_t* timer) {
2324
  uv_pipe_t* pipe = (uv_pipe_t*) timer->data;
2325
  uv_loop_t* loop = timer->loop;
2326

2327
  assert(pipe->type == UV_NAMED_PIPE);
2328

2329
  /* This should always be true, since we start the timer only in
2330
   * uv__pipe_queue_read after successfully calling ReadFile, or in
2331
   * uv__process_pipe_shutdown_req if a read is pending, and we always
2332
   * immediately stop the timer in uv__process_pipe_read_req. */
2333
  assert(pipe->flags & UV_HANDLE_READ_PENDING);
2334

2335
  /* If there are many packets coming off the iocp then the timer callback may
2336
   * be called before the read request is coming off the queue. Therefore we
2337
   * check here if the read request has completed but will be processed later.
2338
   */
2339
  if ((pipe->flags & UV_HANDLE_READ_PENDING) &&
2340
      HasOverlappedIoCompleted(&pipe->read_req.u.io.overlapped)) {
2341
    return;
2342
  }
2343

2344
  /* Force both ends off the pipe. */
2345
  close_pipe(pipe);
2346

2347
  /* Stop reading, so the pending read that is going to fail will not be
2348
   * reported to the user. */
2349
  uv_read_stop((uv_stream_t*) pipe);
2350

2351
  /* Report the eof and update flags. This will get reported even if the user
2352
   * stopped reading in the meantime. TODO: is that okay? */
2353
  uv__pipe_read_eof(loop, pipe, uv_null_buf_);
2354
}
2355

2356

2357
static void eof_timer_destroy(uv_pipe_t* pipe) {
2358
  assert(pipe->flags & UV_HANDLE_CONNECTION);
2359

2360
  if (pipe->pipe.conn.eof_timer) {
2361
    uv_close((uv_handle_t*) pipe->pipe.conn.eof_timer, eof_timer_close_cb);
2362
    pipe->pipe.conn.eof_timer = NULL;
2363
  }
2364
}
2365

2366

2367
static void eof_timer_close_cb(uv_handle_t* handle) {
2368
  assert(handle->type == UV_TIMER);
2369
  uv__free(handle);
2370
}
2371

2372

2373
int uv_pipe_open(uv_pipe_t* pipe, uv_file file) {
2374
  HANDLE os_handle = uv__get_osfhandle(file);
2375
  NTSTATUS nt_status;
2376
  IO_STATUS_BLOCK io_status;
2377
  FILE_ACCESS_INFORMATION access;
2378
  DWORD duplex_flags = 0;
2379
  int err;
2380

2381
  if (os_handle == INVALID_HANDLE_VALUE)
2382
    return UV_EBADF;
2383
  if (pipe->flags & UV_HANDLE_PIPESERVER)
2384
    return UV_EINVAL;
2385
  if (pipe->flags & UV_HANDLE_CONNECTION)
2386
    return UV_EBUSY;
2387

2388
  uv__pipe_connection_init(pipe);
2389
  uv__once_init();
2390
  /* In order to avoid closing a stdio file descriptor 0-2, duplicate the
2391
   * underlying OS handle and forget about the original fd.
2392
   * We could also opt to use the original OS handle and just never close it,
2393
   * but then there would be no reliable way to cancel pending read operations
2394
   * upon close.
2395
   */
2396
  if (file <= 2) {
2397
    if (!DuplicateHandle(INVALID_HANDLE_VALUE,
2398
                         os_handle,
2399
                         INVALID_HANDLE_VALUE,
2400
                         &os_handle,
2401
                         0,
2402
                         FALSE,
2403
                         DUPLICATE_SAME_ACCESS))
2404
      return uv_translate_sys_error(GetLastError());
2405
    assert(os_handle != INVALID_HANDLE_VALUE);
2406
    file = -1;
2407
  }
2408

2409
  /* Determine what kind of permissions we have on this handle.
2410
   * Cygwin opens the pipe in message mode, but we can support it,
2411
   * just query the access flags and set the stream flags accordingly.
2412
   */
2413
  nt_status = pNtQueryInformationFile(os_handle,
2414
                                      &io_status,
2415
                                      &access,
2416
                                      sizeof(access),
2417
                                      FileAccessInformation);
2418
  if (nt_status != STATUS_SUCCESS)
2419
    return UV_EINVAL;
2420

2421
  if (pipe->ipc) {
2422
    if (!(access.AccessFlags & FILE_WRITE_DATA) ||
2423
        !(access.AccessFlags & FILE_READ_DATA)) {
2424
      return UV_EINVAL;
2425
    }
2426
  }
2427

2428
  if (access.AccessFlags & FILE_WRITE_DATA)
2429
    duplex_flags |= UV_HANDLE_WRITABLE;
2430
  if (access.AccessFlags & FILE_READ_DATA)
2431
    duplex_flags |= UV_HANDLE_READABLE;
2432

2433
  err = uv__set_pipe_handle(pipe->loop,
2434
                            pipe,
2435
                            os_handle,
2436
                            file,
2437
                            duplex_flags);
2438
  if (err) {
2439
    if (file == -1)
2440
      CloseHandle(os_handle);
2441
    return err;
2442
  }
2443

2444
  if (pipe->ipc) {
2445
    assert(!(pipe->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
2446
    GetNamedPipeClientProcessId(os_handle, &pipe->pipe.conn.ipc_remote_pid);
2447
    if (pipe->pipe.conn.ipc_remote_pid == GetCurrentProcessId()) {
2448
      GetNamedPipeServerProcessId(os_handle, &pipe->pipe.conn.ipc_remote_pid);
2449
    }
2450
    assert(pipe->pipe.conn.ipc_remote_pid != (DWORD)(uv_pid_t) -1);
2451
  }
2452
  return 0;
2453
}
2454

2455

2456
static int uv__pipe_getname(const uv_pipe_t* handle, char* buffer, size_t* size) {
2457
  NTSTATUS nt_status;
2458
  IO_STATUS_BLOCK io_status;
2459
  FILE_NAME_INFORMATION tmp_name_info;
2460
  FILE_NAME_INFORMATION* name_info;
2461
  WCHAR* name_buf;
2462
  unsigned int name_size;
2463
  unsigned int name_len;
2464
  int err;
2465

2466
  uv__once_init();
2467
  name_info = NULL;
2468

2469
  if (handle->name != NULL) {
2470
    /* The user might try to query the name before we are connected,
2471
     * and this is just easier to return the cached value if we have it. */
2472
    return uv__copy_utf16_to_utf8(handle->name, -1, buffer, size);
2473
  }
2474

2475
  if (handle->handle == INVALID_HANDLE_VALUE) {
2476
    *size = 0;
2477
    return UV_EINVAL;
2478
  }
2479

2480
  /* NtQueryInformationFile will block if another thread is performing a
2481
   * blocking operation on the queried handle. If the pipe handle is
2482
   * synchronous, there may be a worker thread currently calling ReadFile() on
2483
   * the pipe handle, which could cause a deadlock. To avoid this, interrupt
2484
   * the read. */
2485
  if (handle->flags & UV_HANDLE_CONNECTION &&
2486
      handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
2487
    uv__pipe_interrupt_read((uv_pipe_t*) handle); /* cast away const warning */
2488
  }
2489

2490
  nt_status = pNtQueryInformationFile(handle->handle,
2491
                                      &io_status,
2492
                                      &tmp_name_info,
2493
                                      sizeof tmp_name_info,
2494
                                      FileNameInformation);
2495
  if (nt_status == STATUS_BUFFER_OVERFLOW) {
2496
    name_size = sizeof(*name_info) + tmp_name_info.FileNameLength;
2497
    name_info = uv__malloc(name_size);
2498
    if (!name_info) {
2499
      *size = 0;
2500
      return UV_ENOMEM;
2501
    }
2502

2503
    nt_status = pNtQueryInformationFile(handle->handle,
2504
                                        &io_status,
2505
                                        name_info,
2506
                                        name_size,
2507
                                        FileNameInformation);
2508
  }
2509

2510
  if (nt_status != STATUS_SUCCESS) {
2511
    *size = 0;
2512
    err = uv_translate_sys_error(pRtlNtStatusToDosError(nt_status));
2513
    goto error;
2514
  }
2515

2516
  if (!name_info) {
2517
    /* the struct on stack was used */
2518
    name_buf = tmp_name_info.FileName;
2519
    name_len = tmp_name_info.FileNameLength;
2520
  } else {
2521
    name_buf = name_info->FileName;
2522
    name_len = name_info->FileNameLength;
2523
  }
2524

2525
  if (name_len == 0) {
2526
    *size = 0;
2527
    err = 0;
2528
    goto error;
2529
  }
2530

2531
  name_len /= sizeof(WCHAR);
2532

2533
  /* "\\\\.\\pipe" + name */
2534
  if (*size < pipe_prefix_len) {
2535
    *size = 0;
2536
  }
2537
  else {
2538
    memcpy(buffer, pipe_prefix, pipe_prefix_len);
2539
    *size -= pipe_prefix_len;
2540
  }
2541
  err = uv__copy_utf16_to_utf8(name_buf, name_len, buffer+pipe_prefix_len, size);
2542
  *size += pipe_prefix_len;
2543

2544
error:
2545
  uv__free(name_info);
2546
  return err;
2547
}
2548

2549

2550
int uv_pipe_pending_count(uv_pipe_t* handle) {
2551
  if (!handle->ipc)
2552
    return 0;
2553
  return handle->pipe.conn.ipc_xfer_queue_length;
2554
}
2555

2556

2557
int uv_pipe_getsockname(const uv_pipe_t* handle, char* buffer, size_t* size) {
2558
  if (handle->flags & UV_HANDLE_BOUND)
2559
    return uv__pipe_getname(handle, buffer, size);
2560

2561
  if (handle->flags & UV_HANDLE_CONNECTION ||
2562
      handle->handle != INVALID_HANDLE_VALUE) {
2563
    *size = 0;
2564
    return 0;
2565
  }
2566

2567
  return UV_EBADF;
2568
}
2569

2570

2571
int uv_pipe_getpeername(const uv_pipe_t* handle, char* buffer, size_t* size) {
2572
  /* emulate unix behaviour */
2573
  if (handle->flags & UV_HANDLE_BOUND)
2574
    return UV_ENOTCONN;
2575

2576
  if (handle->handle != INVALID_HANDLE_VALUE)
2577
    return uv__pipe_getname(handle, buffer, size);
2578

2579
  if (handle->flags & UV_HANDLE_CONNECTION) {
2580
    if (handle->name != NULL)
2581
      return uv__pipe_getname(handle, buffer, size);
2582
  }
2583

2584
  return UV_EBADF;
2585
}
2586

2587

2588
uv_handle_type uv_pipe_pending_type(uv_pipe_t* handle) {
2589
  if (!handle->ipc)
2590
    return UV_UNKNOWN_HANDLE;
2591
  if (handle->pipe.conn.ipc_xfer_queue_length == 0)
2592
    return UV_UNKNOWN_HANDLE;
2593
  else
2594
    return UV_TCP;
2595
}
2596

2597
int uv_pipe_chmod(uv_pipe_t* handle, int mode) {
2598
  SID_IDENTIFIER_AUTHORITY sid_world = { SECURITY_WORLD_SID_AUTHORITY };
2599
  PACL old_dacl, new_dacl;
2600
  PSECURITY_DESCRIPTOR sd;
2601
  EXPLICIT_ACCESS ea;
2602
  PSID everyone;
2603
  int error;
2604

2605
  if (handle == NULL || handle->handle == INVALID_HANDLE_VALUE)
2606
    return UV_EBADF;
2607

2608
  if (mode != UV_READABLE &&
2609
      mode != UV_WRITABLE &&
2610
      mode != (UV_WRITABLE | UV_READABLE))
2611
    return UV_EINVAL;
2612

2613
  if (!AllocateAndInitializeSid(&sid_world,
2614
                                1,
2615
                                SECURITY_WORLD_RID,
2616
                                0, 0, 0, 0, 0, 0, 0,
2617
                                &everyone)) {
2618
    error = GetLastError();
2619
    goto done;
2620
  }
2621

2622
  if (GetSecurityInfo(handle->handle,
2623
                      SE_KERNEL_OBJECT,
2624
                      DACL_SECURITY_INFORMATION,
2625
                      NULL,
2626
                      NULL,
2627
                      &old_dacl,
2628
                      NULL,
2629
                      &sd)) {
2630
    error = GetLastError();
2631
    goto clean_sid;
2632
  }
2633

2634
  memset(&ea, 0, sizeof(EXPLICIT_ACCESS));
2635
  if (mode & UV_READABLE)
2636
    ea.grfAccessPermissions |= GENERIC_READ | FILE_WRITE_ATTRIBUTES;
2637
  if (mode & UV_WRITABLE)
2638
    ea.grfAccessPermissions |= GENERIC_WRITE | FILE_READ_ATTRIBUTES;
2639
  ea.grfAccessPermissions |= SYNCHRONIZE;
2640
  ea.grfAccessMode = SET_ACCESS;
2641
  ea.grfInheritance = NO_INHERITANCE;
2642
  ea.Trustee.TrusteeForm = TRUSTEE_IS_SID;
2643
  ea.Trustee.TrusteeType = TRUSTEE_IS_WELL_KNOWN_GROUP;
2644
  ea.Trustee.ptstrName = (LPTSTR)everyone;
2645

2646
  if (SetEntriesInAcl(1, &ea, old_dacl, &new_dacl)) {
2647
    error = GetLastError();
2648
    goto clean_sd;
2649
  }
2650

2651
  if (SetSecurityInfo(handle->handle,
2652
                      SE_KERNEL_OBJECT,
2653
                      DACL_SECURITY_INFORMATION,
2654
                      NULL,
2655
                      NULL,
2656
                      new_dacl,
2657
                      NULL)) {
2658
    error = GetLastError();
2659
    goto clean_dacl;
2660
  }
2661

2662
  error = 0;
2663

2664
clean_dacl:
2665
  LocalFree((HLOCAL) new_dacl);
2666
clean_sd:
2667
  LocalFree((HLOCAL) sd);
2668
clean_sid:
2669
  FreeSid(everyone);
2670
done:
2671
  return uv_translate_sys_error(error);
2672
}
2673

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.