pytorch

Форк
0
/
StorageSharing.cpp 
698 строк · 24.4 Кб
1
#include <torch/csrc/python_headers.h>
2
#ifdef _MSC_VER
3
#include <c10/util/win32-headers.h>
4
#endif
5
#include <structmember.h>
6

7
#include <c10/core/CPUAllocator.h>
8
#include <libshm.h>
9
#include <torch/csrc/CudaIPCTypes.h>
10
#include <torch/csrc/Device.h>
11
#include <torch/csrc/DynamicTypes.h>
12
#include <torch/csrc/THP.h>
13
#include <torch/csrc/autograd/utils/wrap_outputs.h>
14
#include <torch/csrc/copy_utils.h>
15

16
#include <c10/util/intrusive_ptr.h>
17
#include <fmt/format.h>
18

19
#include <torch/csrc/Storage.h>
20
#include <torch/csrc/StorageSharing.h>
21

22
#ifdef USE_CUDA
23
#include <c10/cuda/CUDAGuard.h>
24
#include <cuda.h>
25
#include <cuda_runtime.h>
26
#endif
27

28
#include <ATen/MapAllocator.h>
29
#include <ATen/StorageUtils.h>
30
#include <torch/csrc/utils/python_numbers.h>
31
#include <atomic>
32
#include <string>
33

34
static PyObject* THPStorage_sharedDecref(PyObject* self, PyObject* noargs) {
35
  HANDLE_TH_ERRORS
36
  THPStorage_assertNotNull(self);
37
  const auto& storage = THPStorage_Unpack(self);
38
  c10::DeviceType device_type = storage.device_type();
39
  if (device_type == at::kCPU) {
40
    THManagedMapAllocator* ctx =
41
        THManagedMapAllocator::fromDataPtr(storage.data_ptr());
42
    if (ctx) {
43
      ctx->decref();
44
    }
45
  }
46
  Py_INCREF(self);
47
  return self;
48
  END_HANDLE_TH_ERRORS
49
}
50

51
static PyObject* THPStorage_sharedIncref(PyObject* self, PyObject* noargs) {
52
  HANDLE_TH_ERRORS
53
  THPStorage_assertNotNull(self);
54
  const auto& storage = THPStorage_Unpack(self);
55
  c10::DeviceType device_type = storage.device_type();
56
  if (device_type == at::kCPU) {
57
    THManagedMapAllocator* ctx =
58
        THManagedMapAllocator::fromDataPtr(storage.data_ptr());
59
    if (ctx) {
60
      ctx->incref();
61
    }
62
  }
63
  Py_RETURN_NONE;
64
  END_HANDLE_TH_ERRORS
65
}
66

67
static PyObject* THPStorage_pyNewFilenameStorage(
68
    PyObject* _unused,
69
    PyObject* args) {
70
  HANDLE_TH_ERRORS
71
  long long size = 0;
72
  if (!PyArg_ParseTuple(args, "L", &size)) {
73
    return nullptr;
74
  }
75
  if (size < 0) {
76
    return nullptr;
77
  }
78

79
  int flags = at::ALLOCATOR_MAPPED_SHAREDMEM | at::ALLOCATOR_MAPPED_EXCLUSIVE;
80
  std::string handle = at::NewProcessWideShmHandle();
81
  return THPStorage_NewWithStorage(
82
      THPStorageClass,
83
      c10::make_intrusive<at::StorageImpl>(
84
          c10::StorageImpl::use_byte_size_t(),
85
          size,
86
          THManagedMapAllocator::makeDataPtr(
87
              "", handle.c_str(), flags, static_cast<size_t>(size)),
88
          /*allocator=*/nullptr,
89
          /*resizable=*/false),
90
      c10::impl::PyInterpreterStatus::TAGGED_BY_US);
91
  END_HANDLE_TH_ERRORS
92
}
93

94
static PyObject* THPStorage_shareFilename(PyObject* self, PyObject* noargs) {
95
  HANDLE_TH_ERRORS
96
  THPStorage_assertNotNull(self);
97
  const auto& storage = THPStorage_Unpack(self);
98
  TORCH_CHECK(
99
      storage.device_type() == at::kCPU,
100
      "_share_filename_: only available on CPU");
101
  THManagedMapAllocator* ctx =
102
      THManagedMapAllocator::fromDataPtr(storage.data_ptr());
103
  // Storage is already in shared memory, just return a handle
104
  if (ctx) {
105
    // done
106
  } else {
107
    // TODO: retry on collision
108
    // TODO: free GIL - but remember to reacquire it when an exception is thrown
109
    int flags = at::ALLOCATOR_MAPPED_SHAREDMEM | at::ALLOCATOR_MAPPED_EXCLUSIVE;
110
    std::string handle = at::NewProcessWideShmHandle();
111
    // Create a new storage in shared memory
112
    at::Storage new_storage(c10::make_intrusive<at::StorageImpl>(
113
        c10::StorageImpl::use_byte_size_t(),
114
        storage.nbytes(),
115
        THManagedMapAllocator::makeDataPtr(
116
            "", handle.c_str(), flags, storage.nbytes()),
117
        /*allocator=*/nullptr,
118
        /*resizable=*/false));
119

120
    {
121
      // Copying into shared memory can be slow, so release the GIL
122
      pybind11::gil_scoped_release no_gil;
123
      // Copy data from old storage into the new one
124
      at::storage_copy(new_storage, storage);
125
    }
126

127
    // Replace the old data_ptr and allocator with the new ones
128
    storage.set_data_ptr(std::move(new_storage.mutable_data_ptr()));
129
    storage.unsafeGetStorageImpl()->set_allocator(new_storage.allocator());
130

131
    ctx = THManagedMapAllocator::fromDataPtr(storage.data_ptr());
132
    AT_ASSERT(ctx);
133
  }
134

135
  THPObjectPtr manager_handle(PyBytes_FromString(ctx->manager_handle()));
136
  if (!manager_handle)
137
    return nullptr;
138
  THPObjectPtr storage_handle(PyBytes_FromString(ctx->filename()));
139
  if (!storage_handle)
140
    return nullptr;
141
  THPObjectPtr size(THPUtils_packUInt64(storage.nbytes()));
142
  if (!size)
143
    return nullptr;
144

145
  THPObjectPtr tuple(PyTuple_New(3));
146
  if (!tuple)
147
    return nullptr;
148
  PyTuple_SET_ITEM(tuple.get(), 0, manager_handle.release());
149
  PyTuple_SET_ITEM(tuple.get(), 1, storage_handle.release());
150
  PyTuple_SET_ITEM(tuple.get(), 2, size.release());
151
  return tuple.release();
152
  END_HANDLE_TH_ERRORS
153
}
154

155
static PyObject* THPStorage_newSharedFilename(
156
    PyObject* _unused,
157
    PyObject* args) {
158
  HANDLE_TH_ERRORS
159
  TORCH_CHECK(PyTuple_GET_SIZE(args) == 3, "tuple of 3 items expected");
160
  PyObject* _manager_handle = PyTuple_GET_ITEM(args, 0);
161
  PyObject* _object_handle = PyTuple_GET_ITEM(args, 1);
162
  PyObject* _size = PyTuple_GET_ITEM(args, 2);
163
  if (!PyBytes_Check(_manager_handle) || !PyBytes_Check(_object_handle) ||
164
      !THPUtils_checkLong(_size)) {
165
    THPUtils_invalidArguments(
166
        args,
167
        nullptr,
168
        "_new_shared in file system mode",
169
        1,
170
        "a handle (string/bytes) and storage size (int)");
171
    return nullptr;
172
  }
173
  const char* manager_handle = PyBytes_AS_STRING(_manager_handle);
174
  const char* object_handle = PyBytes_AS_STRING(_object_handle);
175
  uint64_t size = THPUtils_unpackUInt64(_size);
176
  int flags = at::ALLOCATOR_MAPPED_SHAREDMEM | at::ALLOCATOR_MAPPED_NOCREATE;
177
  return THPStorage_NewWithStorage(
178
      THPStorageClass,
179
      c10::make_intrusive<at::StorageImpl>(
180
          c10::StorageImpl::use_byte_size_t(),
181
          size,
182
          THManagedMapAllocator::makeDataPtr(
183
              manager_handle, object_handle, flags, size),
184
          /*allocator=*/nullptr,
185
          /*resizable=*/false),
186
      c10::impl::PyInterpreterStatus::TAGGED_BY_US);
187
  END_HANDLE_TH_ERRORS
188
}
189

190
static PyObject* THPStorage_pyNewFdStorage(PyObject* _unused, PyObject* args) {
191
  HANDLE_TH_ERRORS
192
  long long size = 0;
193
  if (!PyArg_ParseTuple(args, "L", &size)) {
194
    return nullptr;
195
  }
196
  if (size < 0) {
197
    return nullptr;
198
  }
199
  return THPStorage_NewWithStorage(
200
      THPStorageClass,
201
      at::new_shm_fd_storage(size),
202
      c10::impl::PyInterpreterStatus::TAGGED_BY_US);
203
  END_HANDLE_TH_ERRORS
204
}
205

206
static PyObject* THPStorage_shareFd(PyObject* self, PyObject* noargs) {
207
  HANDLE_TH_ERRORS
208
  THPStorage_assertNotNull(self);
209
  const auto& storage = THPStorage_Unpack(self);
210
  TORCH_CHECK(
211
      storage.device_type() == at::kCPU, "_share_fd_: only available on CPU");
212
  at::MapAllocator* ctx = at::MapAllocator::fromDataPtr(storage.data_ptr());
213
  // Storage is already in shared memory, just return a handle
214
  if (ctx) {
215
    // done
216
  } else {
217
    at::Storage new_storage(at::new_shm_fd_storage(storage.nbytes()));
218
    {
219
      // Copying into shared memory can be slow, so release the GIL
220
      pybind11::gil_scoped_release no_gil;
221
      // Copy data from old storage into the new one
222
      at::storage_copy(new_storage, storage);
223
    }
224

225
    // Replace the old data_ptr and allocator with the new ones
226
    storage.set_data_ptr(std::move(new_storage.mutable_data_ptr()));
227
    storage.unsafeGetStorageImpl()->set_allocator(new_storage.allocator());
228

229
    ctx = at::MapAllocator::fromDataPtr(storage.data_ptr());
230
    AT_ASSERT(ctx);
231
  }
232

233
  THPObjectPtr storage_handle(THPUtils_packInt32(ctx->fd()));
234
  if (!storage_handle)
235
    return nullptr;
236
  THPObjectPtr size(THPUtils_packUInt64(storage.nbytes()));
237
  if (!size)
238
    return nullptr;
239

240
  THPObjectPtr tuple(PyTuple_New(2));
241
  if (!tuple)
242
    return nullptr;
243
  PyTuple_SET_ITEM(tuple.get(), 0, storage_handle.release());
244
  PyTuple_SET_ITEM(tuple.get(), 1, size.release());
245
  return tuple.release();
246
  END_HANDLE_TH_ERRORS
247
}
248

249
static PyObject* THPStorage_newSharedFd(PyObject* _unused, PyObject* args) {
250
  HANDLE_TH_ERRORS
251
  TORCH_CHECK(PyTuple_GET_SIZE(args) == 2, "tuple of 2 items expected");
252
  PyObject* _tmp_fd = PyTuple_GET_ITEM(args, 0);
253
  PyObject* _size = PyTuple_GET_ITEM(args, 1);
254
  if (!THPUtils_checkLong(_tmp_fd) || !THPUtils_checkLong(_size)) {
255
    THPUtils_invalidArguments(
256
        args,
257
        nullptr,
258
        "_new_shared in file descriptor mode",
259
        1,
260
        "a file descriptor (int) and storage size (int)");
261
    return nullptr;
262
  }
263
  int tmp_fd = (int)THPUtils_unpackLong(_tmp_fd);
264
  int64_t size = THPUtils_unpackLong(_size);
265
  int fd = dup(tmp_fd);
266
  if (fd == -1) {
267
    THPUtils_setError("could not duplicate a shared memory file descriptor");
268
    return nullptr;
269
  }
270

271
  int flags = at::ALLOCATOR_MAPPED_SHAREDMEM | at::ALLOCATOR_MAPPED_NOCREATE |
272
      at::ALLOCATOR_MAPPED_KEEPFD | at::ALLOCATOR_MAPPED_FROMFD;
273
  return THPStorage_NewWithStorage(
274
      THPStorageClass,
275
      c10::make_intrusive<at::StorageImpl>(
276
          c10::StorageImpl::use_byte_size_t(),
277
          size,
278
          at::MapAllocator::makeDataPtr(
279
              at::WITH_FD, "", fd, flags, size, nullptr),
280
          /*allocator=*/nullptr,
281
          /*resizable=*/false),
282
      c10::impl::PyInterpreterStatus::TAGGED_BY_US);
283
  END_HANDLE_TH_ERRORS
284
}
285

286
static PyObject* THPStorage_shareCuda(PyObject* self, PyObject* noargs) {
287
  HANDLE_TH_ERRORS
288
  THPStorage_assertNotNull(self);
289
#ifdef USE_CUDA
290
  const auto& storage = THPStorage_Unpack(self);
291
  TORCH_CHECK(
292
      storage.device_type() == at::kCUDA,
293
      "_share_cuda_: only available on CUDA");
294
  c10::StorageImpl* storage_impl = storage.unsafeGetStorageImpl();
295

296
  if (storage_impl->received_cuda()) {
297
    AT_ERROR(
298
        "Attempted to send CUDA tensor received from another process; this is not currently supported. Consider cloning before sending.");
299
  }
300

301
  at::DeviceGuard device_guard(storage.device());
302
  THPObjectPtr tuple(PyTuple_New(8));
303
  THPObjectPtr device(THPUtils_packInt32(storage.device().index()));
304
  THPObjectPtr _handle(Py_None);
305
  Py_INCREF(Py_None);
306
  THPObjectPtr size_bytes(THPUtils_packUInt64(storage.nbytes()));
307
  THPObjectPtr _offset_bytes(THPUtils_packInt32(0));
308
  THPObjectPtr _ref_counter(Py_None);
309
  Py_INCREF(Py_None);
310
  THPObjectPtr _ref_counter_offset(THPUtils_packInt32(0));
311
  THPObjectPtr _event_handle(Py_None);
312
  Py_INCREF(Py_None);
313
  THPObjectPtr _event_sync_required(Py_None);
314
  Py_INCREF(Py_None);
315
  if (storage.data()) {
316
    // NOLINTNEXTLINE(cppcoreguidelines-init-variables)
317
    size_t base_size;
318
    void* base_ptr = c10::cuda::CUDACachingAllocator::getBaseAllocation(
319
        storage.mutable_data(), &base_size);
320
    ptrdiff_t offset_bytes = (char*)storage.data() - (char*)base_ptr;
321

322
    // NOLINTNEXTLINE(cppcoreguidelines-init-variables)
323
    cudaIpcMemHandle_t handle;
324
    C10_CUDA_CHECK(cudaIpcGetMemHandle(&handle, base_ptr));
325

326
    _handle = PyBytes_FromStringAndSize((char*)&handle, CUDA_IPC_HANDLE_SIZE);
327
    _offset_bytes = PyLong_FromSsize_t((Py_ssize_t)offset_bytes);
328

329
    // Put Storage Data behind new ref counting context
330
    // See Note [CUDA IPC Refcounting implementation explained]
331
    at::DataPtr sent_data_ptr = torch::GetNewRefCountedSentData(
332
        storage.mutable_data(), storage.device());
333
    auto old_data_ptr = storage.set_data_ptr(std::move(sent_data_ptr));
334
    auto sent_data =
335
        static_cast<torch::CudaIPCSentData*>(storage.data_ptr().get_context());
336
    sent_data->set_original_ptr(std::move(old_data_ptr));
337
    _ref_counter = PyBytes_FromString((sent_data->handle()).c_str());
338
    _ref_counter_offset = THPUtils_packUInt64(sent_data->offset());
339

340
    // NOLINTNEXTLINE(cppcoreguidelines-init-variables)
341
    cudaIpcEventHandle_t ipc_event_handle;
342

343
    if (sent_data->event_sync_required_) {
344
      C10_CUDA_CHECK(
345
          cudaIpcGetEventHandle(&ipc_event_handle, sent_data->event_));
346
    }
347

348
    _event_handle = PyBytes_FromStringAndSize(
349
        (char*)&ipc_event_handle, CUDA_IPC_HANDLE_SIZE);
350
    _event_sync_required = PyBool_FromLong(sent_data->event_sync_required_);
351
  }
352

353
  if (!tuple || !device || !_handle || !size_bytes || !_offset_bytes ||
354
      !_event_handle) {
355
    return nullptr;
356
  }
357
  PyTuple_SET_ITEM(tuple.get(), 0, device.release());
358
  // cudaIpcMemHandle_t(of basePtr)
359
  PyTuple_SET_ITEM(tuple.get(), 1, _handle.release());
360
  // Size(in bytes) of the real storage, note this is not the size of basePtr
361
  // memory block.
362
  PyTuple_SET_ITEM(tuple.get(), 2, size_bytes.release());
363
  // Offset(in bytes) of the real storage in the basePtr memory block.
364
  // NB: this offset MUST be in bytes instead of numel, since we use
365
  // (storage_handle, offset)
366
  //     as key in shared_cache(multiprocessing/reduction.py).
367
  //     Offset in numel cannot uniquely represent a storage.
368
  PyTuple_SET_ITEM(tuple.get(), 3, _offset_bytes.release());
369
  PyTuple_SET_ITEM(tuple.get(), 4, _ref_counter.release());
370
  PyTuple_SET_ITEM(tuple.get(), 5, _ref_counter_offset.release());
371
  PyTuple_SET_ITEM(tuple.get(), 6, _event_handle.release());
372
  PyTuple_SET_ITEM(tuple.get(), 7, _event_sync_required.release());
373
  return tuple.release();
374
#else
375
  TORCH_CHECK(false, "CUDA is not available");
376
#endif
377
  END_HANDLE_TH_ERRORS
378
}
379

380
static PyObject* THPStorage_releaseIPCCounter(
381
    PyObject* _unused,
382
    PyObject* args) {
383
  HANDLE_TH_ERRORS
384
#ifdef USE_CUDA
385
  TORCH_CHECK(PyTuple_GET_SIZE(args) == 2, "tuple of 2 items expected");
386
  PyObject* _ref_counter = PyTuple_GET_ITEM(args, 0);
387
  PyObject* _ref_counter_offset = PyTuple_GET_ITEM(args, 1);
388
  if (!(PyBytes_Check(_ref_counter) &&
389
        THPUtils_checkLong(_ref_counter_offset))) {
390
    THPUtils_invalidArguments(
391
        args,
392
        nullptr,
393
        "_release_ipc_counter in CUDA mode",
394
        1,
395
        "(bytes _ref_counter, int _ref_counter_offset)");
396
    return nullptr;
397
  }
398
  std::string ref_counter_handle = PyBytes_AS_STRING(_ref_counter);
399
  ptrdiff_t ref_counter_offset =
400
      (ptrdiff_t)THPUtils_unpackLong(_ref_counter_offset);
401
  // We don't want to break existing code, so resource deletion is best
402
  // effort basis. Exception expected if producer process terminated
403
  // before consumer released data.
404
  int flags = at::ALLOCATOR_MAPPED_SHAREDMEM | at::ALLOCATOR_MAPPED_NOCREATE;
405
  try {
406
    auto sptr = at::RefcountedMapAllocator::makeDataPtr(
407
        ref_counter_handle.c_str(),
408
        flags,
409
        sizeof(int64_t) * torch::CUDA_IPC_REF_COUNTER_FILE_SIZE,
410
        nullptr);
411
    *(static_cast<int64_t*>(sptr.get()) + ref_counter_offset) -= 1;
412
  } catch (c10::Error& err) {
413
    // Already warned inside of producer process
414
  }
415
  Py_RETURN_NONE;
416
#else
417
  TORCH_CHECK(false, "CUDA is not available");
418
#endif
419
  END_HANDLE_TH_ERRORS
420
}
421

422
#ifdef USE_CUDA
423
static std::string THPStorage_bytesAsHandleString(PyObject* handle) {
424
  HANDLE_TH_ERRORS
425
  char* buffer = nullptr;
426
  Py_ssize_t handle_size = 0;
427
  if (PyBytes_AsStringAndSize(handle, &buffer, &handle_size) == -1) {
428
    TORCH_CHECK(handle_size == CUDA_IPC_HANDLE_SIZE, "incorrect handle");
429
  }
430
  TORCH_CHECK(handle_size == CUDA_IPC_HANDLE_SIZE, "incorrect handle size");
431
  return std::string(buffer, handle_size);
432
  END_HANDLE_TH_ERRORS_RET("")
433
}
434
#endif
435

436
static PyObject* THPStorage_newSharedCuda(PyObject* _unused, PyObject* args) {
437
  HANDLE_TH_ERRORS
438
#ifdef USE_CUDA
439
  TORCH_CHECK(PyTuple_GET_SIZE(args) == 8, "tuple of 8 items expected");
440
  PyObject* _device = PyTuple_GET_ITEM(args, 0);
441
  PyObject* _handle = PyTuple_GET_ITEM(args, 1);
442
  PyObject* _size_bytes = PyTuple_GET_ITEM(args, 2);
443
  PyObject* _offset_bytes = PyTuple_GET_ITEM(args, 3);
444
  PyObject* _ref_counter = PyTuple_GET_ITEM(args, 4);
445
  PyObject* _ref_counter_offset = PyTuple_GET_ITEM(args, 5);
446
  PyObject* _event_handle = PyTuple_GET_ITEM(args, 6);
447
  PyObject* _event_sync_required = PyTuple_GET_ITEM(args, 7);
448
  if (!(THPUtils_checkLong(_device) && THPUtils_checkLong(_size_bytes) &&
449
        PyBytes_Check(_handle) && PyBytes_Check(_ref_counter) &&
450
        PyBytes_Check(_event_handle) && THPUtils_checkLong(_offset_bytes) &&
451
        THPUtils_checkLong(_ref_counter_offset) &&
452
        PyBool_Check(_event_sync_required))) {
453
    THPUtils_invalidArguments(
454
        args,
455
        nullptr,
456
        "_new_shared in CUDA mode",
457
        1,
458
        "(int device, bytes handle, int storage_size_bytes, int storage_offset_bytes, bytes _ref_counter, int _ref_counter_offset, bytes event_handle, bool event_sync_required)");
459
    return nullptr;
460
  }
461

462
  size_t storage_size =
463
      (size_t)THPUtils_unpackLong(_size_bytes) / sizeof(uint8_t);
464
  ptrdiff_t storage_offset_bytes =
465
      (ptrdiff_t)THPUtils_unpackLong(_offset_bytes);
466

467
  const auto device = c10::checked_convert<c10::DeviceIndex>(
468
      THPUtils_unpackLong(_device), "c10::DeviceIndex");
469
  at::cuda::CUDAGuard device_guard(device);
470

471
  if (PyObject_IsTrue(_event_sync_required)) {
472
    // Ensure that producer prepared all tensor's data
473
    std::string s_ipc_event_handle =
474
        THPStorage_bytesAsHandleString(_event_handle);
475
    if (s_ipc_event_handle.empty()) {
476
      return nullptr;
477
    }
478
    auto ipc_event_handle = reinterpret_cast<const cudaIpcEventHandle_t*>(
479
        s_ipc_event_handle.c_str());
480
    // NOLINTNEXTLINE(cppcoreguidelines-init-variables)
481
    cudaEvent_t event;
482
    cudaIpcOpenEventHandle(&event, *ipc_event_handle);
483
    C10_CUDA_CHECK(
484
        cudaStreamWaitEvent(c10::cuda::getCurrentCUDAStream(device), event, 0));
485
  }
486

487
  std::string s_handle = THPStorage_bytesAsHandleString(_handle);
488
  if (s_handle.empty()) {
489
    return nullptr;
490
  }
491
  std::shared_ptr<void> basePtr =
492
      c10::cuda::CUDACachingAllocator::getIpcDevPtr(s_handle);
493

494
  // Offset the basePtr to reconstruct the real storage
495
  // devPtr = basePtr + storage_offset
496
  void* devPtr = basePtr.get();
497
  devPtr = (char*)devPtr + storage_offset_bytes;
498

499
  std::string ref_counter_handle = PyBytes_AS_STRING(_ref_counter);
500
  ptrdiff_t ref_counter_offset =
501
      (ptrdiff_t)THPUtils_unpackLong(_ref_counter_offset);
502

503
  struct IpcDeleterContext {
504
    std::string ref_counter_handle;
505
    ptrdiff_t ref_counter_offset{};
506
    c10::DeviceIndex device{-1};
507
    torch::CudaIPCReceivedData received_data;
508
  };
509

510
  auto ctx = std::make_unique<IpcDeleterContext>();
511
  ctx->ref_counter_handle = std::move(ref_counter_handle);
512
  ctx->ref_counter_offset = ref_counter_offset;
513
  ctx->device = device;
514
  ctx->received_data.shared_ptr_ = std::move(basePtr);
515

516
  auto cur_device = at::cuda::current_device();
517
  c10::DataPtr data_ptr(
518
      devPtr,
519
      ctx.release(),
520
      +[](void* ctx_) {
521
        std::unique_ptr<IpcDeleterContext> ctx(
522
            static_cast<IpcDeleterContext*>(ctx_));
523
        ctx->received_data.shared_ptr_.reset();
524

525
        // Sync default stream to make sure all operations related to the
526
        // storage is finished (otherwise another process may reuse memory and
527
        // corrupt data)
528

529
        // Ideally all shared memory reference counting could be replaced by
530
        // sending untriggered CUDA event from the producer to consumer and
531
        // using this event as the criteria of memory release. However, CUDA
532
        // (atm 10.1) does not support the creation of untriggered events and
533
        // performance impact of having thousands of shared events is unknown.
534

535
        // TODO: Instead of cudaStreamSynchronize it is possible to add Stream
536
        // Callback and release counter inside of it (need to check performance
537
        // impact)
538
        at::cuda::stream_synchronize(
539
            c10::cuda::getCurrentCUDAStream(ctx->device));
540

541
        // We don't want to break existing code, so resource deletion is best
542
        // effort basis. Exception expected if producer process terminated
543
        // before consumer released data.
544
        int flags =
545
            at::ALLOCATOR_MAPPED_SHAREDMEM | at::ALLOCATOR_MAPPED_NOCREATE;
546
        try {
547
          auto sptr = at::RefcountedMapAllocator::makeDataPtr(
548
              ctx->ref_counter_handle.c_str(),
549
              flags,
550
              sizeof(int64_t) * torch::CUDA_IPC_REF_COUNTER_FILE_SIZE,
551
              nullptr);
552
          *(static_cast<int64_t*>(sptr.get()) + ctx->ref_counter_offset) -= 1;
553
        } catch (c10::Error& err) {
554
          // Already warned inside of producer process
555
        }
556
      },
557
      at::Device(at::DeviceType::CUDA, cur_device));
558

559
  auto base = c10::make_intrusive<at::StorageImpl>(
560
      c10::StorageImpl::use_byte_size_t(),
561
      storage_size,
562
      std::move(data_ptr),
563
      /*allocator=*/nullptr,
564
      /*resizable=*/false);
565

566
  base->set_resizable(false);
567
  base->set_received_cuda(true);
568

569
  return THPStorage_NewWithStorage(
570
      THPStorageClass,
571
      std::move(base),
572
      c10::impl::PyInterpreterStatus::TAGGED_BY_US);
573
#else
574
  TORCH_CHECK(false, "CUDA is not available");
575
#endif
576
  END_HANDLE_TH_ERRORS
577
}
578

579
// Returns an object that holds a "weak" pointer to the c10::StorageImpl.  This
580
// pointer keeps the c10::StorageImpl struct live, but does not retain the data
581
// pointer.
582
//
583
// NB: This does NOT preserve object identity when you call it multiple times
584
static PyObject* THPStorage_weakRef(PyObject* self, PyObject* args) {
585
  HANDLE_TH_ERRORS
586
  c10::StorageImpl* storage = THPStorage_Unpack(self).unsafeGetStorageImpl();
587
  return PyLong_FromVoidPtr(c10::raw::intrusive_ptr::make_weak(storage));
588
  END_HANDLE_TH_ERRORS
589
}
590

591
PyObject* THPStorage_newWithWeakPtr(PyObject* _unused, PyObject* arg) {
592
  HANDLE_TH_ERRORS
593
  TORCH_CHECK(
594
      THPUtils_checkLong(arg), "_new_with_weak_ptr(): arg must be an 'int'");
595
  c10::StorageImpl* weak_storage = (c10::StorageImpl*)PyLong_AsVoidPtr(arg);
596
  if (auto* storage = c10::raw::weak_intrusive_ptr::lock(weak_storage)) {
597
    return THPStorage_Wrap(
598
        c10::intrusive_ptr<c10::StorageImpl>::reclaim(storage));
599
  }
600
  Py_RETURN_NONE;
601
  END_HANDLE_TH_ERRORS
602
}
603

604
PyObject* THPStorage_freeWeakRef(PyObject* _unused, PyObject* arg) {
605
  HANDLE_TH_ERRORS
606
  if (arg == Py_None) {
607
    Py_RETURN_NONE;
608
  }
609
  TORCH_CHECK(
610
      THPUtils_checkLong(arg), "_free_weak_ref(): arg must be an 'int'");
611
  c10::StorageImpl* weak_storage = (c10::StorageImpl*)PyLong_AsVoidPtr(arg);
612
  c10::raw::weak_intrusive_ptr::decref(weak_storage);
613

614
  Py_RETURN_NONE;
615
  END_HANDLE_TH_ERRORS
616
}
617

618
PyObject* THPStorage_expired(PyObject* _unused, PyObject* arg) {
619
  HANDLE_TH_ERRORS
620
  TORCH_CHECK(THPUtils_checkLong(arg), "_expired(): arg must be an 'int'");
621
  c10::StorageImpl* weak_storage = (c10::StorageImpl*)PyLong_AsVoidPtr(arg);
622
  return PyBool_FromLong(
623
      c10::raw::weak_intrusive_ptr::use_count(weak_storage) == 0);
624
  END_HANDLE_TH_ERRORS
625
}
626

627
PyObject* THPStorage_sharedFd(PyObject* self, PyObject* noargs) {
628
  HANDLE_TH_ERRORS
629
  THPStorage_assertNotNull(self);
630
  at::MapAllocator* ctx = nullptr;
631
  const auto& storage = THPStorage_Unpack(self);
632
  if (storage.device_type() == at::kCPU) {
633
    ctx = at::MapAllocator::fromDataPtr(storage.data_ptr());
634
  }
635

636
  TORCH_CHECK(ctx, "couldn't retrieve a shared file descriptor");
637
  return THPUtils_packInt32(ctx->fd());
638
  END_HANDLE_TH_ERRORS
639
}
640

641
PyObject* THPStorage_isShared(PyObject* self, PyObject* noargs) {
642
  const auto& storage = THPStorage_Unpack(self);
643
  if (storage.device_type() == at::kCUDA) {
644
    Py_RETURN_TRUE;
645
  }
646
  if (at::MapAllocator::fromDataPtr(storage.data_ptr()) ||
647
      THManagedMapAllocator::fromDataPtr(storage.data_ptr())) {
648
    Py_RETURN_TRUE;
649
  } else {
650
    Py_RETURN_FALSE;
651
  }
652
}
653

654
// NOLINTNEXTLINE(cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays,cppcoreguidelines-avoid-non-const-global-variables)
655
static PyMethodDef THPStorage_sharingMethods[] = {
656
    {"_new_with_weak_ptr",
657
     THPStorage_newWithWeakPtr,
658
     METH_O | METH_CLASS,
659
     nullptr},
660
    {"_share_cuda_", THPStorage_shareCuda, METH_NOARGS, nullptr},
661
    {"_new_shared_cuda",
662
     THPStorage_newSharedCuda,
663
     METH_VARARGS | METH_STATIC,
664
     nullptr},
665
    {"_release_ipc_counter_cuda",
666
     THPStorage_releaseIPCCounter,
667
     METH_VARARGS | METH_STATIC,
668
     nullptr},
669
    {"_share_fd_cpu_", THPStorage_shareFd, METH_NOARGS, nullptr},
670
    {"_new_shared_fd_cpu",
671
     THPStorage_newSharedFd,
672
     METH_VARARGS | METH_STATIC,
673
     nullptr},
674
    {"_new_using_fd_cpu",
675
     THPStorage_pyNewFdStorage,
676
     METH_VARARGS | METH_STATIC,
677
     nullptr},
678
    {"_share_filename_cpu_", THPStorage_shareFilename, METH_NOARGS, nullptr},
679
    {"_new_shared_filename_cpu",
680
     THPStorage_newSharedFilename,
681
     METH_VARARGS | METH_STATIC,
682
     nullptr},
683
    {"_new_using_filename_cpu",
684
     THPStorage_pyNewFilenameStorage,
685
     METH_VARARGS | METH_STATIC,
686
     nullptr},
687
    {"_weak_ref", THPStorage_weakRef, METH_NOARGS, nullptr},
688
    {"_free_weak_ref", THPStorage_freeWeakRef, METH_O | METH_STATIC, nullptr},
689
    {"_expired", THPStorage_expired, METH_O | METH_STATIC, nullptr},
690
    {"_shared_decref", THPStorage_sharedDecref, METH_NOARGS, nullptr},
691
    {"_shared_incref", THPStorage_sharedIncref, METH_NOARGS, nullptr},
692
    {"_get_shared_fd", THPStorage_sharedFd, METH_NOARGS, nullptr},
693
    {"is_shared", THPStorage_isShared, METH_NOARGS, nullptr},
694
    {nullptr}};
695

696
PyMethodDef* THPStorage_getSharingMethods() {
697
  return THPStorage_sharingMethods;
698
}
699

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.