2
from core.workflow_manager import WorkflowManager
7
def __init__(self, id):
12
def __init__(self, name):
16
class TestWorkflowManagerDialog(unittest.TestCase):
18
self.workflow = WorkflowManager()
19
self.dialog_id = uuid4().hex
20
self.workflow.add_workflow_record(TestDialog(self.dialog_id))
22
def test_internal_params(self):
23
self.assertTrue(self.dialog_id in self.workflow.workflow_records)
24
self.assertEqual(1, len(self.workflow.workflow_records))
26
def test_get_record(self):
27
self.assertEqual(self.workflow.get_dialog_by_id(self.dialog_id).id, self.dialog_id, "get_dialog works wrong")
29
def test_add_another_dialog(self):
30
another_dialog_id = uuid4().hex
31
self.workflow.add_workflow_record(TestDialog(another_dialog_id))
32
self.assertTrue(self.dialog_id in self.workflow.workflow_records)
33
self.assertTrue(another_dialog_id in self.workflow.workflow_records)
34
self.assertEqual(2, len(self.workflow.workflow_records))
36
def test_add_duplicate_dialog(self):
37
with self.assertRaises(ValueError):
38
self.workflow.add_workflow_record(TestDialog(self.dialog_id))
40
def test_flush_record(self):
41
workflow_record = self.workflow.flush_record(self.dialog_id)
42
self.assertTrue(isinstance(workflow_record, dict))
43
self.assertEqual(workflow_record["dialog"].id, self.dialog_id)
45
def test_add_task(self):
47
task_service = TestService("testservice")
48
task_id = self.workflow.add_task(self.dialog_id, task_service, payload, 1)
49
self.assertTrue(task_id is not None)
50
self.assertEqual(1, len(self.workflow.tasks))
51
self.assertTrue(task_id in self.workflow.tasks)
53
def test_complete_task(self):
56
task_service = TestService("testservice")
57
task_id = self.workflow.add_task(self.dialog_id, task_service, payload, 1)
58
workflow_record, task = self.workflow.complete_task(task_id, response)
59
self.assertTrue(isinstance(task, dict))
60
self.assertTrue(isinstance(workflow_record, dict))
61
self.assertEqual(task["service"].name, task_service.name)
62
self.assertEqual(task["dialog"], workflow_record["dialog"].id)
64
def test_double_complete_task(self):
67
task_service = TestService("testservice")
68
task_id = self.workflow.add_task(self.dialog_id, task_service, payload, 1)
69
self.workflow.complete_task(task_id, response)
70
workflow_record, task = self.workflow.complete_task(task_id, response)
71
self.assertTrue(workflow_record is None)
72
self.assertTrue(task is None)
74
def test_next_tasks(self):
77
done_service = TestService(uuid4().hex)
78
waiting_service = TestService(uuid4().hex)
79
skipped_service = TestService(uuid4().hex)
81
self.workflow.skip_service(self.dialog_id, skipped_service)
82
task_id = self.workflow.add_task(self.dialog_id, done_service, payload, 1)
83
self.workflow.complete_task(task_id, response)
84
self.workflow.add_task(self.dialog_id, waiting_service, payload, 1)
86
done, waiting, skipped = self.workflow.get_services_status(self.dialog_id)
87
self.assertTrue(done_service.name in done)
88
self.assertTrue(waiting_service.name in waiting)
89
self.assertTrue(skipped_service.name in skipped)
94
done_service = TestService(uuid4().hex)
95
waiting_service = TestService(uuid4().hex)
96
skipped_service = TestService(uuid4().hex)
98
self.workflow.skip_service(self.dialog_id, skipped_service)
99
done_task_id = self.workflow.add_task(self.dialog_id, done_service, payload, 1)
100
self.workflow.complete_task(done_task_id, response)
101
waiting_task_id = self.workflow.add_task(self.dialog_id, waiting_service, payload, 1)
103
workflow_record = self.workflow.flush_record(self.dialog_id)
104
self.assertEqual(self.dialog_id, workflow_record["dialog"].id)
106
workflow_record, late_task = self.workflow.complete_task(waiting_task_id, response)
107
self.assertEqual(self.dialog_id, workflow_record["dialog"].id)
108
self.assertTrue("dialog" in late_task)
109
self.assertEqual(self.dialog_id, late_task["dialog"])
112
if __name__ == "__main__":