dream
181 строка · 7.1 Кб
1#!/usr/bin/env python
2import logging
3import re
4from os import getenv
5
6import common.dff.integration.context as int_ctx
7from common.robot import check_if_valid_robot_command
8from common.utils import get_entities
9from df_engine.core import Actor, Context
10
11
12LANGUAGE = getenv("LANGUAGE", "EN")
13ROS_FLASK_SERVER = getenv("ROS_FLASK_SERVER")
14
15logging.basicConfig(format="%(asctime)s - %(pathname)s - %(lineno)d - %(levelname)s - %(message)s", level=logging.DEBUG)
16logger = logging.getLogger(__name__)
17
18
19def get_respond_funcs():
20return {
21"test_command": test_command_respond,
22"track_object": track_object_respond,
23"turn_around": turn_around_respond,
24"move_forward": move_forward_respond,
25"move_backward": move_backward_respond,
26"open_door": open_door_respond,
27"move_to_point": move_to_point_respond,
28}
29
30
31def get_human_utterances(ctx: Context, actor: Actor) -> list:
32return {} if ctx.validation else ctx.misc["agent"]["dialog"]["human_utterances"]
33
34
35def test_command_respond(ctx: Context, actor: Actor):
36command = "test_command"
37response = "Success"
38
39if check_if_valid_robot_command(command, ROS_FLASK_SERVER, dialog_id=int_ctx.get_dialog_id(ctx, actor)):
40return response, 1.0, {}, {}, {"command_to_perform": command}
41else:
42return ""
43
44
45def track_object_respond(ctx: Context, actor: Actor):
46utt = int_ctx.get_last_human_utterance(ctx, actor)
47entities = get_entities(utt, only_named=False, with_labels=False, return_lemmas=True)
48if len(entities) == 1:
49command = f"track_object_{entities[0]}"
50response = f"Следую за объектом: {entities[0]}." if LANGUAGE == "RU" else f"Tracking object: {entities[0]}."
51else:
52command = "track_object_unknown"
53if LANGUAGE == "RU":
54response = "Не могу извлечь объект для отслеживания. Повторите команду."
55else:
56response = "I did not get tracked object. Please repeat the command."
57
58if check_if_valid_robot_command(command, ROS_FLASK_SERVER, dialog_id=int_ctx.get_dialog_id(ctx, actor)):
59return response, 1.0, {}, {}, {"command_to_perform": command}
60else:
61return ""
62
63
64def turn_around_respond(ctx: Context, actor: Actor):
65utt = int_ctx.get_last_human_utterance(ctx, actor)
66degree = re.findall(r"[0-9]+", utt["text"])
67if "против" in utt["text"] or re.search(r"counter[- ]?clock-?wise", utt["text"]):
68if len(degree) == 1:
69command = f"turn_counterclockwise_{degree[0]}"
70if LANGUAGE == "RU":
71response = f"Поворачиваюсь против часовой стрелки на {degree[0]} градусов."
72else:
73response = f"Turning around counterclockwise by {degree[0]} degrees."
74else:
75command = "turn_counterclockwise"
76if LANGUAGE == "RU":
77response = "Поворачиваюсь против часовой стрелки."
78else:
79response = "Turning around counterclockwise."
80else:
81if len(degree) == 1:
82command = f"turn_clockwise_{degree[0]}"
83if LANGUAGE == "RU":
84response = f"Поворачиваюсь по часовой стрелке на {degree[0]} градусов."
85else:
86response = f"Turning around clockwise by {degree[0]} degrees."
87else:
88command = "turn_clockwise"
89if LANGUAGE == "RU":
90response = "Поворачиваюсь по часовой стрелке."
91else:
92response = "Turning around clockwise."
93
94if check_if_valid_robot_command(command, ROS_FLASK_SERVER, dialog_id=int_ctx.get_dialog_id(ctx, actor)):
95return response, 1.0, {}, {}, {"command_to_perform": command}
96else:
97return ""
98
99
100def move_forward_respond(ctx: Context, actor: Actor):
101utt = int_ctx.get_last_human_utterance(ctx, actor)
102dist = re.findall(r"[0-9]+", utt["text"])
103if len(dist) == 1:
104command = f"move_forward_{dist[0]}"
105if LANGUAGE == "RU":
106response = f"Двигаюсь вперед на {dist[0]} метров."
107else:
108response = f"Moving forward by {dist[0]} meters."
109else:
110command = "move_forward"
111if LANGUAGE == "RU":
112response = "Двигаюсь вперед."
113else:
114response = "Moving forward."
115
116if check_if_valid_robot_command(command, ROS_FLASK_SERVER, dialog_id=int_ctx.get_dialog_id(ctx, actor)):
117return response, 1.0, {}, {}, {"command_to_perform": command}
118else:
119return ""
120
121
122def move_backward_respond(ctx: Context, actor: Actor):
123utt = int_ctx.get_last_human_utterance(ctx, actor)
124dist = re.findall(r"[0-9]+", utt["text"])
125if len(dist) == 1:
126command = f"move_backward_{dist[0]}"
127if LANGUAGE == "RU":
128response = f"Двигаюсь назад на {dist[0]} метров."
129else:
130response = f"Moving backward by {dist[0]} meters."
131else:
132command = "move_backward"
133if LANGUAGE == "RU":
134response = "Двигаюсь назад."
135else:
136response = "Moving backward."
137
138if check_if_valid_robot_command(command, ROS_FLASK_SERVER, dialog_id=int_ctx.get_dialog_id(ctx, actor)):
139return response, 1.0, {}, {}, {"command_to_perform": command}
140else:
141return ""
142
143
144def open_door_respond(ctx: Context, actor: Actor):
145command = "open_door"
146if LANGUAGE == "RU":
147response = "Открываю дверь"
148else:
149response = "Opening the door."
150
151if check_if_valid_robot_command(command, ROS_FLASK_SERVER, dialog_id=int_ctx.get_dialog_id(ctx, actor)):
152return response, 1.0, {}, {}, {"command_to_perform": command}
153else:
154return ""
155
156
157# covers coords like "5,35", "5, 35", "5 35"
158COMPILED_COORDS_PATTERN = re.compile(r"[-][0-9]+[ ,]+[-][0-9]+", re.IGNORECASE)
159
160
161def move_to_point_respond(ctx: Context, actor: Actor):
162utt = int_ctx.get_last_human_utterance(ctx, actor)
163entities = get_entities(utt, only_named=False, with_labels=False, return_lemmas=True)
164coords = COMPILED_COORDS_PATTERN.search(utt["text"])
165if len(entities) == 1:
166command = f"move_to_point_{entities[0]}"
167response = f"Двигаюсь к объекту: {entities[0]}." if LANGUAGE == "RU" else f"Moving to object: {entities[0]}."
168elif coords:
169command = f"move_to_point_{coords[0]}"
170response = f"Двигаюсь в точку: {coords[0]}." if LANGUAGE == "RU" else f"Moving to point: {coords[0]}."
171else:
172command = "move_to_point_unknown"
173if LANGUAGE == "RU":
174response = "Не могу извлечь объект для цели. Повторите команду."
175else:
176response = "I did not get a target point. Please repeat the command."
177
178if check_if_valid_robot_command(command, ROS_FLASK_SERVER, dialog_id=int_ctx.get_dialog_id(ctx, actor)):
179return response, 1.0, {}, {}, {"command_to_perform": command}
180else:
181return ""
182