8
from hypothesis import given, settings
9
import hypothesis.strategies as st
11
from caffe2.python import brew, core, model_helper, rnn_cell
12
import caffe2.python.workspace as ws
15
class TestObservers(unittest.TestCase):
17
core.GlobalInit(["python", "caffe2"])
19
self.model = model_helper.ModelHelper()
20
brew.fc(self.model, "data", "y",
22
weight_init=('ConstantFill', dict(value=1.0)),
23
bias_init=('ConstantFill', dict(value=0.0)),
25
ws.FeedBlob("data", np.zeros([4], dtype='float32'))
27
ws.RunNetOnce(self.model.param_init_net)
28
ws.CreateNet(self.model.net)
30
def testObserver(self):
31
ob = self.model.net.AddObserver("TimeObserver")
32
ws.RunNet(self.model.net)
33
print(ob.average_time())
34
num = self.model.net.NumObservers()
35
self.model.net.RemoveObserver(ob)
36
assert(self.model.net.NumObservers() + 1 == num)
39
num_layers=st.integers(1, 4),
40
forward_only=st.booleans()
42
@settings(deadline=1000)
43
def test_observer_rnn_executor(self, num_layers, forward_only):
45
Test that the RNN executor produces same results as
46
the non-executor (i.e running step nets as sequence of simple nets).
54
run_cnt = [0] * len(Tseq)
55
avg_time = [0] * len(Tseq)
56
for j in range(len(Tseq)):
62
np.array([T] * batch_size, dtype=np.int32)
64
ws.FeedBlob("target", np.random.rand(
65
T, batch_size, hidden_dim).astype(np.float32))
66
ws.FeedBlob("hidden_init", np.zeros(
67
[1, batch_size, hidden_dim], dtype=np.float32
69
ws.FeedBlob("cell_init", np.zeros(
70
[1, batch_size, hidden_dim], dtype=np.float32
73
model = model_helper.ModelHelper(name="lstm")
74
model.net.AddExternalInputs(["input"])
77
for i in range(num_layers):
78
hidden_init, cell_init = model.net.AddExternalInputs(
79
"hidden_init_{}".format(i),
80
"cell_init_{}".format(i)
82
init_blobs.extend([hidden_init, cell_init])
84
output, last_hidden, _, last_state = rnn_cell.LSTM(
87
seq_lengths="seq_lengths",
88
initial_states=init_blobs,
90
dim_out=[hidden_dim] * num_layers,
92
forward_only=forward_only,
93
return_last_layer_only=True,
96
loss = model.AveragedLoss(
97
model.SquaredL2Distance([output, "target"], "dist"),
102
model.AddGradientOperators([loss])
105
for init_blob in init_blobs:
106
ws.FeedBlob(init_blob, np.zeros(
107
[1, batch_size, hidden_dim], dtype=np.float32
109
ws.RunNetOnce(model.param_init_net)
112
self.enable_rnn_executor(model.net, 1, forward_only)
114
np.random.seed(10022015)
115
input_shape = [T, batch_size, input_dim]
118
np.random.rand(*input_shape).astype(np.float32)
128
ws.CreateNet(model.net, overwrite=True)
130
time_ob = model.net.AddObserver("TimeObserver")
131
run_cnt_ob = model.net.AddObserver("RunCountObserver")
133
avg_time[j] = time_ob.average_time()
134
run_cnt[j] = int(''.join(x for x in run_cnt_ob.debug_info() if x.isdigit()))
135
model.net.RemoveObserver(time_ob)
136
model.net.RemoveObserver(run_cnt_ob)
140
self.assertTrue(run_cnt[1] > run_cnt[0] and run_cnt[2] > run_cnt[1])
141
self.assertEqual(run_cnt[1] - run_cnt[0], run_cnt[2] - run_cnt[1])
143
def enable_rnn_executor(self, net, value, forward_only):
145
for op in net.Proto().op:
146
if op.type.startswith("RecurrentNetwork"):
148
if arg.name == 'enable_rnn_executor':
151
# This sanity check is so that if someone changes the
152
# enable_rnn_executor parameter name, the test will
153
# start failing as this function will become defective.
154
self.assertEqual(1 if forward_only else 2, num_found)