pytorch
87 строк · 4.0 Кб
1
2
3
4
5from caffe2.python import core, workspace
6from caffe2.python.test_util import TestCase
7
8import unittest
9
10
11class TestAtomicOps(TestCase):
12@unittest.skip("Test is flaky: https://github.com/pytorch/pytorch/issues/28179")
13def test_atomic_ops(self):
14"""
15Test that both countdown and checksum are update atomically by having
16cowntdown count from 20k to 0 from parallel the workers and updating
17the checksum to the value fetched. If operations are trully atomic,
18each value from 1 to 20k should be fetched exactly once from the
19countdown, and fed exactly once to the checksum, such that at the end
20checksum must contain the exact value of sum[i=0..20000](i).
21"""
22init_net = core.Net('init')
23mutex_countdown = init_net.CreateMutex([])
24mutex_checksum = init_net.CreateMutex([])
25countdown = init_net.ConstantFill([], shape=[], value=20000,
26dtype=core.DataType.INT32)
27checksum = init_net.ConstantFill(
28[], shape=[], value=0, dtype=core.DataType.INT32)
29minus_one = init_net.ConstantFill(
30[], shape=[], value=-1, dtype=core.DataType.INT32)
31steps = []
32for i in range(0, 100):
33net = core.Net('net:%d' % i)
34_, fetched_count = net.AtomicFetchAdd(
35[mutex_countdown, countdown, minus_one],
36[countdown, 'fetched_count:%d' % i])
37net.AtomicFetchAdd(
38[mutex_checksum, checksum, fetched_count],
39[checksum, 'not_used'])
40steps.append(
41core.execution_step('worker:%d' % i, net, num_iter=200))
42super_step = core.execution_step(
43'parent', steps, concurrent_substeps=True)
44plan = core.Plan('plan')
45plan.AddStep(core.execution_step('init', init_net))
46plan.AddStep(super_step)
47workspace.RunPlan(plan)
48# checksum = sum[i=1..20000](i) = 20000 * 20001 / 2 = 200010000
49self.assertEqual(workspace.FetchBlob(checksum), 200010000)
50
51@unittest.skip("Test is flaky: https://github.com/pytorch/pytorch/issues/28179")
52def test_atomic64_ops(self):
53"""
54Test that both countdown and checksum are update atomically by having
55cowntdown count from 20k to 0 from parallel the workers and updating
56the checksum to the value fetched. If operations are trully atomic,
57each value from 1 to 20k should be fetched exactly once from the
58countdown, and fed exactly once to the checksum, such that at the end
59checksum must contain the exact value of sum[i=0..20000](i).
60"""
61init_net = core.Net('init')
62mutex_countdown = init_net.CreateMutex([])
63mutex_checksum = init_net.CreateMutex([])
64countdown = init_net.ConstantFill([], shape=[], value=20000,
65dtype=core.DataType.INT64)
66checksum = init_net.ConstantFill(
67[], shape=[], value=0, dtype=core.DataType.INT64)
68minus_one = init_net.ConstantFill(
69[], shape=[], value=-1, dtype=core.DataType.INT64)
70steps = []
71for i in range(0, 100):
72net = core.Net('net:%d' % i)
73_, fetched_count = net.AtomicFetchAdd64(
74[mutex_countdown, countdown, minus_one],
75[countdown, 'fetched_count:%d' % i])
76net.AtomicFetchAdd64(
77[mutex_checksum, checksum, fetched_count],
78[checksum, 'not_used'])
79steps.append(
80core.execution_step('worker:%d' % i, net, num_iter=200))
81super_step = core.execution_step(
82'parent', steps, concurrent_substeps=True)
83plan = core.Plan('plan')
84plan.AddStep(core.execution_step('init', init_net))
85plan.AddStep(super_step)
86workspace.RunPlan(plan)
87# checksum = sum[i=1..20000](i) = 20000 * 20001 / 2 = 200010000
88self.assertEqual(workspace.FetchBlob(checksum), 200010000)
89
90if __name__ == "__main__":
91unittest.main()
92