dream

Форк
0
181 строка · 7.1 Кб
1
#!/usr/bin/env python
2
import logging
3
import re
4
from os import getenv
5

6
import common.dff.integration.context as int_ctx
7
from common.robot import check_if_valid_robot_command
8
from common.utils import get_entities
9
from df_engine.core import Actor, Context
10

11

12
LANGUAGE = getenv("LANGUAGE", "EN")
13
ROS_FLASK_SERVER = getenv("ROS_FLASK_SERVER")
14

15
logging.basicConfig(format="%(asctime)s - %(pathname)s - %(lineno)d - %(levelname)s - %(message)s", level=logging.DEBUG)
16
logger = logging.getLogger(__name__)
17

18

19
def get_respond_funcs():
20
    return {
21
        "test_command": test_command_respond,
22
        "track_object": track_object_respond,
23
        "turn_around": turn_around_respond,
24
        "move_forward": move_forward_respond,
25
        "move_backward": move_backward_respond,
26
        "open_door": open_door_respond,
27
        "move_to_point": move_to_point_respond,
28
    }
29

30

31
def get_human_utterances(ctx: Context, actor: Actor) -> list:
32
    return {} if ctx.validation else ctx.misc["agent"]["dialog"]["human_utterances"]
33

34

35
def test_command_respond(ctx: Context, actor: Actor):
36
    command = "test_command"
37
    response = "Success"
38

39
    if check_if_valid_robot_command(command, ROS_FLASK_SERVER, dialog_id=int_ctx.get_dialog_id(ctx, actor)):
40
        return response, 1.0, {}, {}, {"command_to_perform": command}
41
    else:
42
        return ""
43

44

45
def track_object_respond(ctx: Context, actor: Actor):
46
    utt = int_ctx.get_last_human_utterance(ctx, actor)
47
    entities = get_entities(utt, only_named=False, with_labels=False, return_lemmas=True)
48
    if len(entities) == 1:
49
        command = f"track_object_{entities[0]}"
50
        response = f"Следую за объектом: {entities[0]}." if LANGUAGE == "RU" else f"Tracking object: {entities[0]}."
51
    else:
52
        command = "track_object_unknown"
53
        if LANGUAGE == "RU":
54
            response = "Не могу извлечь объект для отслеживания. Повторите команду."
55
        else:
56
            response = "I did not get tracked object. Please repeat the command."
57

58
    if check_if_valid_robot_command(command, ROS_FLASK_SERVER, dialog_id=int_ctx.get_dialog_id(ctx, actor)):
59
        return response, 1.0, {}, {}, {"command_to_perform": command}
60
    else:
61
        return ""
62

63

64
def turn_around_respond(ctx: Context, actor: Actor):
65
    utt = int_ctx.get_last_human_utterance(ctx, actor)
66
    degree = re.findall(r"[0-9]+", utt["text"])
67
    if "против" in utt["text"] or re.search(r"counter[- ]?clock-?wise", utt["text"]):
68
        if len(degree) == 1:
69
            command = f"turn_counterclockwise_{degree[0]}"
70
            if LANGUAGE == "RU":
71
                response = f"Поворачиваюсь против часовой стрелки на {degree[0]} градусов."
72
            else:
73
                response = f"Turning around counterclockwise by {degree[0]} degrees."
74
        else:
75
            command = "turn_counterclockwise"
76
            if LANGUAGE == "RU":
77
                response = "Поворачиваюсь против часовой стрелки."
78
            else:
79
                response = "Turning around counterclockwise."
80
    else:
81
        if len(degree) == 1:
82
            command = f"turn_clockwise_{degree[0]}"
83
            if LANGUAGE == "RU":
84
                response = f"Поворачиваюсь по часовой стрелке на {degree[0]} градусов."
85
            else:
86
                response = f"Turning around clockwise by {degree[0]} degrees."
87
        else:
88
            command = "turn_clockwise"
89
            if LANGUAGE == "RU":
90
                response = "Поворачиваюсь по часовой стрелке."
91
            else:
92
                response = "Turning around clockwise."
93

94
    if check_if_valid_robot_command(command, ROS_FLASK_SERVER, dialog_id=int_ctx.get_dialog_id(ctx, actor)):
95
        return response, 1.0, {}, {}, {"command_to_perform": command}
96
    else:
97
        return ""
98

99

100
def move_forward_respond(ctx: Context, actor: Actor):
101
    utt = int_ctx.get_last_human_utterance(ctx, actor)
102
    dist = re.findall(r"[0-9]+", utt["text"])
103
    if len(dist) == 1:
104
        command = f"move_forward_{dist[0]}"
105
        if LANGUAGE == "RU":
106
            response = f"Двигаюсь вперед на {dist[0]} метров."
107
        else:
108
            response = f"Moving forward by {dist[0]} meters."
109
    else:
110
        command = "move_forward"
111
        if LANGUAGE == "RU":
112
            response = "Двигаюсь вперед."
113
        else:
114
            response = "Moving forward."
115

116
    if check_if_valid_robot_command(command, ROS_FLASK_SERVER, dialog_id=int_ctx.get_dialog_id(ctx, actor)):
117
        return response, 1.0, {}, {}, {"command_to_perform": command}
118
    else:
119
        return ""
120

121

122
def move_backward_respond(ctx: Context, actor: Actor):
123
    utt = int_ctx.get_last_human_utterance(ctx, actor)
124
    dist = re.findall(r"[0-9]+", utt["text"])
125
    if len(dist) == 1:
126
        command = f"move_backward_{dist[0]}"
127
        if LANGUAGE == "RU":
128
            response = f"Двигаюсь назад на {dist[0]} метров."
129
        else:
130
            response = f"Moving backward by {dist[0]} meters."
131
    else:
132
        command = "move_backward"
133
        if LANGUAGE == "RU":
134
            response = "Двигаюсь назад."
135
        else:
136
            response = "Moving backward."
137

138
    if check_if_valid_robot_command(command, ROS_FLASK_SERVER, dialog_id=int_ctx.get_dialog_id(ctx, actor)):
139
        return response, 1.0, {}, {}, {"command_to_perform": command}
140
    else:
141
        return ""
142

143

144
def open_door_respond(ctx: Context, actor: Actor):
145
    command = "open_door"
146
    if LANGUAGE == "RU":
147
        response = "Открываю дверь"
148
    else:
149
        response = "Opening the door."
150

151
    if check_if_valid_robot_command(command, ROS_FLASK_SERVER, dialog_id=int_ctx.get_dialog_id(ctx, actor)):
152
        return response, 1.0, {}, {}, {"command_to_perform": command}
153
    else:
154
        return ""
155

156

157
# covers coords like "5,35", "5, 35", "5 35"
158
COMPILED_COORDS_PATTERN = re.compile(r"[-][0-9]+[ ,]+[-][0-9]+", re.IGNORECASE)
159

160

161
def move_to_point_respond(ctx: Context, actor: Actor):
162
    utt = int_ctx.get_last_human_utterance(ctx, actor)
163
    entities = get_entities(utt, only_named=False, with_labels=False, return_lemmas=True)
164
    coords = COMPILED_COORDS_PATTERN.search(utt["text"])
165
    if len(entities) == 1:
166
        command = f"move_to_point_{entities[0]}"
167
        response = f"Двигаюсь к объекту: {entities[0]}." if LANGUAGE == "RU" else f"Moving to object: {entities[0]}."
168
    elif coords:
169
        command = f"move_to_point_{coords[0]}"
170
        response = f"Двигаюсь в точку: {coords[0]}." if LANGUAGE == "RU" else f"Moving to point: {coords[0]}."
171
    else:
172
        command = "move_to_point_unknown"
173
        if LANGUAGE == "RU":
174
            response = "Не могу извлечь объект для цели. Повторите команду."
175
        else:
176
            response = "I did not get a target point. Please repeat the command."
177

178
    if check_if_valid_robot_command(command, ROS_FLASK_SERVER, dialog_id=int_ctx.get_dialog_id(ctx, actor)):
179
        return response, 1.0, {}, {}, {"command_to_perform": command}
180
    else:
181
        return ""
182

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.