pytorch

Форк
0
/
atomic_ops_test.py 
87 строк · 4.0 Кб
1

2

3

4

5
from caffe2.python import core, workspace
6
from caffe2.python.test_util import TestCase
7

8
import unittest
9

10

11
class TestAtomicOps(TestCase):
12
    @unittest.skip("Test is flaky: https://github.com/pytorch/pytorch/issues/28179")
13
    def test_atomic_ops(self):
14
        """
15
        Test that both countdown and checksum are update atomically by having
16
        cowntdown count from 20k to 0 from parallel the workers and updating
17
        the checksum to the value fetched. If operations are trully atomic,
18
        each value from 1 to 20k should be fetched exactly once from the
19
        countdown, and fed exactly once to the checksum, such that at the end
20
        checksum must contain the exact value of sum[i=0..20000](i).
21
        """
22
        init_net = core.Net('init')
23
        mutex_countdown = init_net.CreateMutex([])
24
        mutex_checksum = init_net.CreateMutex([])
25
        countdown = init_net.ConstantFill([], shape=[], value=20000,
26
                                          dtype=core.DataType.INT32)
27
        checksum = init_net.ConstantFill(
28
            [], shape=[], value=0, dtype=core.DataType.INT32)
29
        minus_one = init_net.ConstantFill(
30
            [], shape=[], value=-1, dtype=core.DataType.INT32)
31
        steps = []
32
        for i in range(0, 100):
33
            net = core.Net('net:%d' % i)
34
            _, fetched_count = net.AtomicFetchAdd(
35
                [mutex_countdown, countdown, minus_one],
36
                [countdown, 'fetched_count:%d' % i])
37
            net.AtomicFetchAdd(
38
                [mutex_checksum, checksum, fetched_count],
39
                [checksum, 'not_used'])
40
            steps.append(
41
                core.execution_step('worker:%d' % i, net, num_iter=200))
42
        super_step = core.execution_step(
43
            'parent', steps, concurrent_substeps=True)
44
        plan = core.Plan('plan')
45
        plan.AddStep(core.execution_step('init', init_net))
46
        plan.AddStep(super_step)
47
        workspace.RunPlan(plan)
48
        # checksum = sum[i=1..20000](i) = 20000 * 20001 / 2 = 200010000
49
        self.assertEqual(workspace.FetchBlob(checksum), 200010000)
50

51
    @unittest.skip("Test is flaky: https://github.com/pytorch/pytorch/issues/28179")
52
    def test_atomic64_ops(self):
53
        """
54
        Test that both countdown and checksum are update atomically by having
55
        cowntdown count from 20k to 0 from parallel the workers and updating
56
        the checksum to the value fetched. If operations are trully atomic,
57
        each value from 1 to 20k should be fetched exactly once from the
58
        countdown, and fed exactly once to the checksum, such that at the end
59
        checksum must contain the exact value of sum[i=0..20000](i).
60
        """
61
        init_net = core.Net('init')
62
        mutex_countdown = init_net.CreateMutex([])
63
        mutex_checksum = init_net.CreateMutex([])
64
        countdown = init_net.ConstantFill([], shape=[], value=20000,
65
                                          dtype=core.DataType.INT64)
66
        checksum = init_net.ConstantFill(
67
            [], shape=[], value=0, dtype=core.DataType.INT64)
68
        minus_one = init_net.ConstantFill(
69
            [], shape=[], value=-1, dtype=core.DataType.INT64)
70
        steps = []
71
        for i in range(0, 100):
72
            net = core.Net('net:%d' % i)
73
            _, fetched_count = net.AtomicFetchAdd64(
74
                [mutex_countdown, countdown, minus_one],
75
                [countdown, 'fetched_count:%d' % i])
76
            net.AtomicFetchAdd64(
77
                [mutex_checksum, checksum, fetched_count],
78
                [checksum, 'not_used'])
79
            steps.append(
80
                core.execution_step('worker:%d' % i, net, num_iter=200))
81
        super_step = core.execution_step(
82
            'parent', steps, concurrent_substeps=True)
83
        plan = core.Plan('plan')
84
        plan.AddStep(core.execution_step('init', init_net))
85
        plan.AddStep(super_step)
86
        workspace.RunPlan(plan)
87
        # checksum = sum[i=1..20000](i) = 20000 * 20001 / 2 = 200010000
88
        self.assertEqual(workspace.FetchBlob(checksum), 200010000)
89

90
if __name__ == "__main__":
91
    unittest.main()
92

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.