pytorch

Форк
0
/
parallel_workers_test.py 
114 строк · 3.4 Кб
1

2

3

4

5

6
import unittest
7

8
from caffe2.python import workspace, core
9
import caffe2.python.parallel_workers as parallel_workers
10

11

12
def create_queue():
13
    queue = 'queue'
14

15
    workspace.RunOperatorOnce(
16
        core.CreateOperator(
17
            "CreateBlobsQueue", [], [queue], num_blobs=1, capacity=1000
18
        )
19
    )
20
    # Technically, blob creations aren't thread safe. Since the unittest below
21
    # does RunOperatorOnce instead of CreateNet+RunNet, we have to precreate
22
    # all blobs beforehand
23
    for i in range(100):
24
        workspace.C.Workspace.current.create_blob("blob_" + str(i))
25
        workspace.C.Workspace.current.create_blob("status_blob_" + str(i))
26
    workspace.C.Workspace.current.create_blob("dequeue_blob")
27
    workspace.C.Workspace.current.create_blob("status_blob")
28

29
    return queue
30

31

32
def create_worker(queue, get_blob_data):
33
    def dummy_worker(worker_id):
34
        blob = 'blob_' + str(worker_id)
35

36
        workspace.FeedBlob(blob, get_blob_data(worker_id))
37

38
        workspace.RunOperatorOnce(
39
            core.CreateOperator(
40
                'SafeEnqueueBlobs', [queue, blob], [blob, 'status_blob_' + str(worker_id)]
41
            )
42
        )
43

44
    return dummy_worker
45

46

47
def dequeue_value(queue):
48
    dequeue_blob = 'dequeue_blob'
49
    workspace.RunOperatorOnce(
50
        core.CreateOperator(
51
            "SafeDequeueBlobs", [queue], [dequeue_blob, 'status_blob']
52
        )
53
    )
54

55
    return workspace.FetchBlob(dequeue_blob)
56

57

58
class ParallelWorkersTest(unittest.TestCase):
59
    def testParallelWorkers(self):
60
        workspace.ResetWorkspace()
61

62
        queue = create_queue()
63
        dummy_worker = create_worker(queue, str)
64
        worker_coordinator = parallel_workers.init_workers(dummy_worker)
65
        worker_coordinator.start()
66

67
        for _ in range(10):
68
            value = dequeue_value(queue)
69
            self.assertTrue(
70
                value in [b'0', b'1'], 'Got unexpected value ' + str(value)
71
            )
72

73
        self.assertTrue(worker_coordinator.stop())
74

75
    def testParallelWorkersInitFun(self):
76
        workspace.ResetWorkspace()
77

78
        queue = create_queue()
79
        dummy_worker = create_worker(
80
            queue, lambda worker_id: workspace.FetchBlob('data')
81
        )
82
        workspace.FeedBlob('data', 'not initialized')
83

84
        def init_fun(worker_coordinator, global_coordinator):
85
            workspace.FeedBlob('data', 'initialized')
86

87
        worker_coordinator = parallel_workers.init_workers(
88
            dummy_worker, init_fun=init_fun
89
        )
90
        worker_coordinator.start()
91

92
        for _ in range(10):
93
            value = dequeue_value(queue)
94
            self.assertEqual(
95
                value, b'initialized', 'Got unexpected value ' + str(value)
96
            )
97

98
        # A best effort attempt at a clean shutdown
99
        worker_coordinator.stop()
100

101
    def testParallelWorkersShutdownFun(self):
102
        workspace.ResetWorkspace()
103

104
        queue = create_queue()
105
        dummy_worker = create_worker(queue, str)
106
        workspace.FeedBlob('data', 'not shutdown')
107

108
        def shutdown_fun():
109
            workspace.FeedBlob('data', 'shutdown')
110

111
        worker_coordinator = parallel_workers.init_workers(
112
            dummy_worker, shutdown_fun=shutdown_fun
113
        )
114
        worker_coordinator.start()
115

116
        self.assertTrue(worker_coordinator.stop())
117

118
        data = workspace.FetchBlob('data')
119
        self.assertEqual(data, b'shutdown', 'Got unexpected value ' + str(data))
120

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.