dream

Форк
0
/
utils.py 
1286 строк · 49.4 Кб
1
from __future__ import annotations
2

3
import re
4
import logging
5
from os import getenv
6
from copy import deepcopy
7
from random import choice
8

9
from common.custom_requests import request_triples_wikidata
10
from common.factoid import FACTOID_THRESHOLD
11
from common.combined_classes import combined_classes, TOPIC_GROUPS
12
from common.join_pattern import *
13

14
from common import food, books, music, news, travel
15
from common import art, science, movies, animals, gaming, sport, gossip
16

17
import sentry_sdk
18

19
logger = logging.getLogger(__name__)
20

21
sentry_sdk.init(getenv("SENTRY_DSN"))
22

23
other_skills = {
24
    "dff_intent_responder_skill",
25
    "dff_program_y_dangerous_skill",
26
    "misheard_asr",
27
    "christmas_new_year_skill",
28
    "superbowl_skill",
29
    "oscar_skill",
30
    "valentines_day_skill",
31
}
32
scenario_skills = {
33
    "dff_movie_skill",
34
    "personal_info_skill",  # 'short_story_skill',
35
    "dff_book_skill",
36
    "dff_weather_skill",
37
    "emotion_skill",
38
    "dummy_skill_dialog",
39
    "meta_script_skill",
40
    "dff_coronavirus_skill",
41
    "small_talk_skill",
42
    "news_api_skill",
43
    "game_cooperative_skill",
44
}
45
retrieve_skills = {
46
    "dff_program_y_skill",
47
    "alice",
48
    "eliza",
49
    "book_tfidf_retrieval",
50
    "entertainment_tfidf_retrieval",
51
    "fashion_tfidf_retrieval",
52
    "movie_tfidf_retrieval",
53
    "music_tfidf_retrieval",
54
    "politics_tfidf_retrieval",
55
    "science_technology_tfidf_retrieval",
56
    "sport_tfidf_retrieval",
57
    "animals_tfidf_retrieval",
58
    "convert_reddit",
59
    "topicalchat_convert_retrieval",
60
    "dff_program_y_wide_skill",
61
    "knowledge_grounding_skill",
62
}
63

64
okay_statements = {
65
    "Okay.",
66
    "That's cool!",
67
    "Interesting.",
68
    "Sounds interesting.",
69
    "Sounds interesting!",
70
    "OK.",
71
    "Cool!",
72
    "Thanks!",
73
    "Okay, thanks.",
74
    "I'm glad you think so!",
75
    "Sorry, I don't have an answer for that!",
76
    "Let's talk about something else.",
77
    "As you wish.",
78
    "All right.",
79
    "Right.",
80
    "Anyway.",
81
    "Oh, okay.",
82
    "Oh, come on.",
83
    "Really?",
84
    "Okay. I got it.",
85
    "Well, okay.",
86
    "Well, as you wish.",
87
}
88

89
service_intents = {
90
    "lets_chat_about",
91
    "tell_me_more",
92
    "topic_switching",
93
    "yes",
94
    "opinion_request",
95
    "dont_understand",
96
    "no",
97
    "stupid",
98
    "weather_forecast_intent",
99
    "doing_well",
100
    "tell_me_a_story",
101
    "choose_topic",
102
}
103

104
high_priority_intents = {
105
    "dff_intent_responder_skill": {
106
        "cant_do",
107
        "exit",
108
        "repeat",
109
        "what_can_you_do",
110
        "what_is_your_job",
111
        "what_is_your_name",
112
        "where_are_you_from",
113
        "who_made_you",
114
    },
115
    "dff_grounding_skill": {"what_are_you_talking_about"},
116
}
117

118
low_priority_intents = {"dont_understand", "what_time", "choose_topic"}
119

120
MULTILABEL_TASKS = [
121
    "emotion_classification",
122
    "toxic_classification",
123
]
124

125
DP_THRESHOLDS = {
126
    "Food": 0,
127
    "Movies_TV": 0,
128
    "Leisure": 0,
129
    "Beauty": 0,
130
    "Clothes": 0,
131
    "Depression": 0,
132
    "Celebrities&Events": 0,
133
    "Family&Relationships": 0,
134
    "Health&Medicine": 0,
135
    "Education": 0,
136
    "Sports": 0,
137
    "Books&Literature": 0.3,
138
    "Videogames": 0.3,
139
    "Politics": 0.3,
140
    "ArtificialIntelligence": 0.3,
141
    "MassTransit": 0.3,
142
}
143

144
THRESHOLDS = {
145
    "deeppavlov_topics": {class_: DP_THRESHOLDS.get(class_, 0.9) for class_ in combined_classes["deeppavlov_topics"]},
146
    "toxic_classification": {
147
        "identity_hate": 0.5,
148
        "insult": 0.5,
149
        "not_toxic": 0.5,
150
        "obscene": 0.5,
151
        "severe_toxic": 0.5,
152
        "sexual_explicit": 0.6,
153
        "threat": 0.5,
154
        "toxic": 0.5,
155
    },
156
}
157

158
midas_classes = {
159
    "semantic_request": {
160
        "question": [
161
            "open_question_factual",
162
            "open_question_opinion",
163
            "open_question_personal",
164
            "yes_no_question",
165
            "clarifying_question",
166
        ],
167
        "command": ["command", "dev_command"],
168
        "opinion": ["appreciation", "opinion", "complaint", "comment"],
169
        "statement": ["statement"],
170
        "answer": ["other_answers", "pos_answer", "neg_answer"],
171
    },
172
    "functional_request": {
173
        "incomplete": ["abandon", "nonsense"],
174
        "social_convention": ["opening", "closing", "hold", "back-channeling"],
175
        "apology": [],
176
        "other": ["uncertain", "non_compliant", "correction"],
177
    },
178
}
179
MIDAS_SEMANTIC_LABELS = sum([intent_list for intent_list in midas_classes["semantic_request"].values()], [])
180
MIDAS_FUNCTIONAL_LABELS = sum([intent_list for intent_list in midas_classes["functional_request"].values()], [])
181

182

183
def get_skill_outputs_from_dialog(utterances, skill_name, activated=False):
184
    """
185
    Extract list of dictionaries with already formatted outputs of `skill_name` from full dialog.
186
    If `activated=True`, skill also should be chosen as `active_skill`;
187
    otherwise, empty list.
188

189
    Args:
190
        utterances: utterances, the first one is user's reply
191
        skill_name: name of target skill
192
        activated: if target skill should be chosen by response selector on previous step or not
193

194
    Returns:
195
        list of dictionaries with formatted outputs of skill
196
    """
197
    result = []
198

199
    skills_outputs = []
200
    for uttr in utterances:
201
        if "active_skill" in uttr:
202
            final_response = uttr.get("orig_text", None) if uttr.get("orig_text", None) is not None else uttr["text"]
203
            for skop in skills_outputs:
204
                # need to check text-response for skills with several hypotheses
205
                if skop["skill_name"] == skill_name:
206
                    if activated and skop["text"] in final_response and uttr["active_skill"] == skill_name:
207
                        # removed one condition as if scop contains skill_name and text, its len is > 0
208
                        result.append(skop)
209
                    else:
210
                        if not activated and skop:
211
                            result.append(skop)
212
        elif "hypotheses" in uttr:
213
            skills_outputs = uttr["hypotheses"]
214

215
    return result
216

217

218
def transform_vbg(s):
219
    """
220
    Transform infinitive form of verb to Ving form.
221

222
    Args:
223
        s: verb infinitive
224

225
    Returns:
226
        string with required verb form
227
    """
228
    import re
229

230
    # by Anastasia Kravtsova
231
    s += "+VBG"
232
    # irregular cases
233
    s1 = re.compile(r"(?<![a-z])be\+VBG")
234
    s2 = re.compile(r"(?<![aouiey])([^aouiey][aouiey]([^aouieywr]))\+VBG")
235
    s3 = re.compile(r"ie\+VBG")
236
    s4 = re.compile(r"(ee)\+VBG")
237
    s5 = re.compile(r"e\+VBG")
238
    # regular case
239
    s6 = re.compile(r"\+VBG")
240

241
    # irregular cases
242
    s = re.sub(s1, "being", s)
243
    s = re.sub(s2, r"\1\2ing", s)
244
    s = re.sub(s3, r"ying", s)
245
    s = re.sub(s4, r"\1ing", s)
246
    s = re.sub(s5, r"ing", s)
247
    # regular case
248
    s = re.sub(s6, "ing", s)
249
    return s
250

251

252
def get_list_of_active_skills(utterances):
253
    """
254
    Extract list of active skills names
255

256
    Args:
257
        utterances: utterances, the first one is user's reply
258

259
    Returns:
260
        list of string skill names
261
    """
262
    result = []
263

264
    for uttr in utterances:
265
        if "active_skill" in uttr:
266
            result.append(uttr["active_skill"])
267

268
    return result
269

270

271
def get_user_replies_to_particular_skill(utterances, skill_name):
272
    """
273
    Return user's responses to particular skill if it was active
274
    Args:
275
        utterances:
276
        skill_name:
277

278
    Returns:
279
        list of string response
280
    """
281
    result = []
282
    for i, uttr in enumerate(utterances):
283
        if uttr.get("active_skill", "") == skill_name:
284
            result.append(utterances[i - 1]["text"])
285
    return result
286

287

288
yes_templates = re.compile(
289
    r"(\byes\b|\byup\b|\byep\b|\bsure\b|go ahead|\byeah\b|\bok\b|okay|^(kind of|kinda)\.?$|"
290
    r"^why not\.?$|^tell me\.?$|^i (agree|do|did|like|have|had|think so)\.?$)"
291
)
292

293

294
def is_yes(annotated_phrase):
295
    yes_detected = "yes" in get_intents(annotated_phrase, which="intent_catcher", probs=False)
296
    midas_yes_detected = "pos_answer" in get_intents(annotated_phrase, which="midas", probs=False)
297
    # TODO: intent catcher not catches 'yes thanks!'
298
    if yes_detected or midas_yes_detected or re.search(yes_templates, annotated_phrase.get("text", "").lower()):
299
        return True
300
    return False
301

302

303
no_templates = re.compile(r"(\bno\b|\bnot\b|no way|don't|no please|i disagree|^neither.?$)")
304
DONOTKNOW_LIKE = [r"(i )?(do not|don't) know", "you (choose|decide|pick up)", "no idea"]
305
DONOTKNOW_LIKE_PATTERN = re.compile(join_sentences_in_or_pattern(DONOTKNOW_LIKE), re.IGNORECASE)
306

307

308
def is_donot_know(annotated_phrase):
309
    if DONOTKNOW_LIKE_PATTERN.search(annotated_phrase.get("text", "")):
310
        return True
311
    return False
312

313

314
def is_no_intent(annotated_phrase):
315
    no_detected = "no" in get_intents(annotated_phrase, which="intent_catcher", probs=False)
316
    midas_no_detected = False  # "neg_answer" in get_intents(annotated_phrase, which='midas', probs=False)
317
    is_not_idontknow = not is_donot_know(annotated_phrase)
318
    if (no_detected or midas_no_detected) and is_not_idontknow:
319
        return True
320

321
    return False
322

323

324
def is_no(annotated_phrase):
325
    no_detected = "no" in get_intents(annotated_phrase, which="intent_catcher", probs=False)
326
    midas_no_detected = "neg_answer" in get_intents(annotated_phrase, which="midas", probs=False)
327
    # TODO: intent catcher thinks that horrible is no intent'
328
    user_phrase = annotated_phrase.get("text", "").lower().strip().replace(".", "")
329
    is_not_horrible = "horrible" != user_phrase
330
    no_regexp_detected = re.search(no_templates, annotated_phrase.get("text", "").lower())
331
    is_not_idontknow = not is_donot_know(annotated_phrase)
332
    _yes = is_yes(annotated_phrase)
333
    if is_not_horrible and (no_detected or midas_no_detected or no_regexp_detected) and is_not_idontknow and not _yes:
334
        return True
335

336
    return False
337

338

339
def is_question(text):
340
    return "?" in text
341

342

343
def substitute_nonwords(text):
344
    return re.sub(r"\W+", " ", text).strip()
345

346

347
def get_intent_name(text):
348
    splitter = "#+#"
349
    if splitter not in text:
350
        return None
351
    intent_name = text.split(splitter)[-1]
352
    intent_name = re.sub(r"\W", " ", intent_name.lower()).strip()
353
    return intent_name
354

355

356
OPINION_REQUEST_PATTERN = re.compile(
357
    r"(don't|do not|not|are not|are|do)?\s?you\s"
358
    r"(like|dislike|adore|hate|love|believe|consider|get|know|taste|think|"
359
    r"recognize|sure|understand|feel|fond of|care for|fansy|appeal|suppose|"
360
    r"imagine|guess)",
361
    re.IGNORECASE,
362
)
363
OPINION_EXPRESSION_PATTERN = re.compile(
364
    r"\bi (don't|do not|not|am not|'m not|am|do)?\s?"
365
    r"(like|dislike|adore|hate|love|believe|consider|get|know|taste|think|"
366
    r"recognize|sure|understand|feel|fond of|care for|fansy|appeal|suppose|"
367
    r"imagine|guess)",
368
    re.IGNORECASE,
369
)
370

371

372
def is_opinion_request(annotated_utterance):
373
    intents = get_intents(annotated_utterance, which="all", probs=False)
374
    intent_detected = any([intent in intents for intent in ["Opinion_RequestIntent", "open_question_opinion"]])
375
    uttr_text = annotated_utterance.get("text", "")
376
    if intent_detected or (OPINION_REQUEST_PATTERN.search(uttr_text) and "?" in uttr_text):
377
        return True
378
    else:
379
        return False
380

381

382
def is_opinion_expression(annotated_utterance):
383
    all_intents = get_intents(annotated_utterance, which="all")
384
    intent_detected = any([intent in all_intents for intent in ["opinion", "Opinion_ExpressionIntent"]])
385
    uttr_text = annotated_utterance.get("text", "")
386
    if intent_detected or OPINION_EXPRESSION_PATTERN.search(uttr_text):
387
        return True
388
    else:
389
        return False
390

391

392
def get_outputs_with_response_from_dialog(utterances, response, activated=False):
393
    """
394
    Extract list of dictionaries with already formatted outputs of different skills from full dialog
395
    which replies containe `response`.
396
    If `activated=True`, skill also should be chosen as `active_skill`;
397
    otherwise, empty list.
398

399
    Args:
400
        utterances: utterances, the first one is user's reply
401
        response: target text to search among bot utterances
402
        activated: if target skill should be chosen by response selector on previous step or not
403

404
    Returns:
405
        list of dictionaries with formatted outputs of skill
406
    """
407
    result = []
408

409
    skills_outputs = []
410
    for uttr in utterances:
411
        if "active_skill" in uttr:
412
            final_response = uttr["text"]
413
            for skop in skills_outputs:
414
                # need to check text-response for skills with several hypotheses
415
                if response in skop["text"]:
416
                    if activated and skop["text"] in final_response and skop:
417
                        result.append(skop)
418
                    else:
419
                        if not activated and skop:
420
                            result.append(skop)
421
        elif "hypotheses" in uttr:
422
            skills_outputs = uttr["hypotheses"]
423

424
    return result
425

426

427
def get_not_used_template(used_templates, all_templates, any_if_no_available=True):
428
    """
429
    Choose not used template among all templates
430

431
    Args:
432
        used_templates: list of templates already used in the dialog
433
        all_templates: list of all available templates
434

435
    Returns:
436
        string template
437
    """
438
    available = list(set(all_templates).difference(set(used_templates)))
439
    if available:
440
        return choice(available)
441
    elif any_if_no_available:
442
        return choice(all_templates)
443
    else:
444
        return ""
445

446

447
def get_all_not_used_templates(used_templates, all_templates):
448
    """
449
    Return all not used template among all templates
450

451
    Args:
452
        used_templates: list of templates already used in the dialog
453
        all_templates: list of all available templates
454

455
    Returns:
456
        string template
457
    """
458
    available = list(set(all_templates).difference(set(used_templates)))
459
    return available
460

461

462
def _probs_to_labels(answer_probs, max_proba=True, threshold=0.5):
463
    if not answer_probs:
464
        return []
465
    if isinstance(threshold, dict):
466
        answer_labels = [key for key in answer_probs if answer_probs[key] > threshold.get(key, 0)]
467
        if max_proba:
468
            answer_labels = [key for key in answer_labels if answer_probs[key] == max(answer_probs.values())]
469
    else:
470
        answer_labels = [label for label in answer_probs if answer_probs[label] > threshold]
471
        if not answer_labels and max_proba:
472
            answer_labels = [key for key in answer_probs if answer_probs[key] == max(answer_probs.values())]
473
    return answer_labels
474

475

476
def _labels_to_probs(answer_labels, all_labels):
477
    answer_probs = dict()
478
    for label in all_labels:
479
        if label in answer_labels:
480
            answer_probs[label] = 1
481
        else:
482
            answer_probs[label] = 0
483
    return answer_probs
484

485

486
def _get_combined_annotations(annotated_utterance, model_name, threshold=0.5):
487
    answer_probs, answer_labels = {}, []
488
    try:
489
        annotations = annotated_utterance["annotations"]
490
        combined_annotations = annotations.get("combined_classification", {})
491
        if combined_annotations and isinstance(combined_annotations, list):
492
            combined_annotations = combined_annotations[0]
493
        if model_name in combined_annotations:
494
            answer_probs = combined_annotations[model_name]
495
        else:
496
            logger.warning(f"Not found Model name {model_name} in combined annotations {combined_annotations}")
497
        old_style_toxic = all(
498
            [model_name == "toxic_classification", "factoid_classification" not in combined_annotations]
499
        )
500
        if model_name in MULTILABEL_TASKS or old_style_toxic:
501
            answer_labels = _probs_to_labels(answer_probs, max_proba=False, threshold=threshold)
502
        elif model_name == "factoid_classification" and answer_probs.get("is_factoid", 0) < threshold:
503
            answer_labels = ["is_conversational"]
504
        elif model_name == "deeppavlov_topics":
505
            answer_labels = _probs_to_labels(answer_probs, max_proba=True, threshold=THRESHOLDS["deeppavlov_topics"])
506
        elif model_name == "toxic_classification":
507
            answer_labels = _probs_to_labels(answer_probs, max_proba=True, threshold=THRESHOLDS["toxic_classification"])
508
        else:
509
            answer_labels = _probs_to_labels(answer_probs, max_proba=True, threshold=threshold)
510
    except Exception as e:
511
        sentry_sdk.capture_exception(e)
512
        logger.exception(e)
513

514
    return answer_probs, answer_labels
515

516

517
def _process_text(answer):
518
    if isinstance(answer, dict) and "text" in answer:
519
        return answer["text"]
520
    else:
521
        return answer
522

523

524
def _process_old_sentiment(answer):
525
    # Input: all sentiment annotations. Output: probs
526
    if isinstance(answer[0], str) and isinstance(answer[1], float):
527
        # support old sentiment output
528
        curr_answer = {}
529
        for key in combined_classes["sentiment_classification"]:
530
            if key == answer[0]:
531
                curr_answer[key] = answer[1]
532
            else:
533
                curr_answer[key] = 0.5 * (1 - answer[1])
534
        answer_probs = curr_answer
535
        return answer_probs
536
    else:
537
        logger.warning("_process_old_sentiment got file with an output that is not old-style")
538
        return answer
539

540

541
def _get_plain_annotations(annotated_utterance, model_name, threshold=0.5):
542
    answer_probs, answer_labels = {}, []
543
    try:
544
        annotations = annotated_utterance["annotations"]
545
        answer = annotations[model_name]
546

547
        answer = _process_text(answer)
548
        if isinstance(answer, list):
549
            if model_name == "sentiment_classification":
550
                answer_probs = _process_old_sentiment(answer)
551
                answer_labels = _probs_to_labels(answer_probs, max_proba=True, threshold=threshold)
552
            else:
553
                answer_labels = answer
554
                answer_probs = _labels_to_probs(answer_labels, combined_classes[model_name])
555
        else:
556
            answer_probs = answer
557
            if model_name == "toxic_classification":
558
                # this function is only for plain annotations (when toxic_classification is a separate annotator)
559
                answer_labels = _probs_to_labels(answer_probs, max_proba=False, threshold=threshold)
560
            elif model_name == "factoid_classification" and answer_probs.get("is_factoid", 0) < threshold:
561
                answer_labels = ["is_conversational"]
562
            else:
563
                answer_labels = _probs_to_labels(answer_probs, max_proba=True, threshold=threshold)
564
    except Exception as e:
565
        logger.warning(e)
566

567
    return answer_probs, answer_labels
568

569

570
def print_combined(combined_output):
571
    combined_output = deepcopy(combined_output)
572
    for i in range(len(combined_output)):
573
        for key in combined_output[i]:
574
            for class_ in combined_output[i][key]:
575
                combined_output[i][key][class_] = round(combined_output[i][key][class_], 2)
576
    logger.info(f"Combined classifier output is {combined_output}")
577

578

579
def _get_etc_model(annotated_utterance, model_name, probs, default_probs, default_labels, threshold=0.5):
580
    """Function to get emotion classifier annotations from annotated utterance.
581

582
    Args:
583
        annotated_utterance: dictionary with annotated utterance, or annotations
584
        probs: return probabilities or not
585
        default_probs: default probs to return.
586
        default_labels: default labels to return.
587
        Threshold: threshold for classification
588
    Returns:
589
        dictionary with emotion probablilties, if probs == True, or emotion labels if probs != True
590
    """
591

592
    try:
593
        if model_name in annotated_utterance.get("annotations", {}):
594
            answer_probs, answer_labels = _get_plain_annotations(
595
                annotated_utterance, model_name=model_name, threshold=threshold
596
            )
597
        elif "combined_classification" in annotated_utterance.get("annotations", {}):
598
            answer_probs, answer_labels = _get_combined_annotations(
599
                annotated_utterance, model_name=model_name, threshold=threshold
600
            )
601
        else:
602
            answer_probs, answer_labels = default_probs, default_labels
603
    except Exception as e:
604
        logger.exception(e, stack_info=True)
605
        answer_probs, answer_labels = default_probs, default_labels
606
    if probs:  # return probs
607
        return answer_probs
608
    else:
609
        return answer_labels
610

611

612
def get_toxic(annotated_utterance, probs=True, default_probs=None, default_labels=None):
613
    """Function to get toxic classifier annotations from annotated utterance.
614

615
    Args:
616
        annotated_utterance: dictionary with annotated utterance, or annotations
617
        probs: return probabilities or not
618
        default: default value to return. If it is None, returns empty dict/list depending on probs argument
619
    Returns:
620
        dictionary with toxic probablilties, if probs == True, or toxic labels if probs != True
621
    """
622
    default_probs = {} if default_probs is None else default_probs
623
    default_labels = [] if default_labels is None else default_labels
624
    return _get_etc_model(
625
        annotated_utterance,
626
        "toxic_classification",
627
        probs=probs,
628
        default_probs=default_probs,
629
        default_labels=default_labels,
630
    )
631

632

633
def get_factoid(annotated_utterance, probs=True, default_probs=None, default_labels=None):
634
    """Function to get factoid classifier annotations from annotated utterance.
635

636
    Args:
637
        annotated_utterance: dictionary with annotated utterance, or annotations
638
        probs: return probabilities or not
639
        default: default value to return. If it is None, returns empty dict/list depending on probs argument
640
    Returns:
641
        dictionary with factoid probablilties, if probs == True, or factoid labels if probs != True
642
    """
643
    default_probs = {"is_conversational": 1} if default_probs is None else default_probs
644
    default_labels = ["is_conversational"] if default_labels is None else default_labels
645
    return _get_etc_model(
646
        annotated_utterance,
647
        "factoid_classification",
648
        probs=probs,
649
        default_probs=default_probs,
650
        default_labels=default_labels,
651
        threshold=FACTOID_THRESHOLD,
652
    )
653

654

655
def get_sentiment(annotated_utterance, probs=True, default_probs=None, default_labels=None):
656
    """Function to get sentiment classifier annotations from annotated utterance.
657

658
    Args:
659
        annotated_utterance: dictionary with annotated utterance, or annotations
660
        probs: return probabilities or not
661
        default: default value to return. If it is None, returns empty dict/list depending on probs argument
662
    Returns:
663
        dictionary with sentiment probablilties, if probs == True, or sentiment labels if probs != True
664
    """
665
    default_probs = {"positive": 0, "negative": 0, "neutral": 1} if default_probs is None else default_probs
666
    default_labels = ["neutral"] if default_labels is None else default_labels
667

668
    return _get_etc_model(
669
        annotated_utterance,
670
        "sentiment_classification",
671
        probs=probs,
672
        default_probs=default_probs,
673
        default_labels=default_labels,
674
    )
675

676

677
def get_emotions(annotated_utterance, probs=True, default_probs=None, default_labels=None):
678
    """Function to get emotion classifier annotations from annotated utterance.
679

680
    Args:
681
        annotated_utterance: dictionary with annotated utterance, or annotations
682
        probs: return probabilities or not
683
        default: default value to return. If it is None, returns empty dict/list depending on probs argument
684
    Returns:
685
        dictionary with emotion probablilties, if probs == True, or emotion labels if probs != True
686
    """
687
    default_probs = (
688
        {"anger": 0, "fear": 0, "joy": 0, "love": 0, "sadness": 0, "surprise": 0, "neutral": 1}
689
        if default_probs is None
690
        else default_probs
691
    )
692
    default_labels = ["neutral"] if default_labels is None else default_labels
693

694
    return _get_etc_model(
695
        annotated_utterance,
696
        "emotion_classification",
697
        probs=probs,
698
        default_probs=default_probs,
699
        default_labels=default_labels,
700
    )
701

702

703
def get_topics(annotated_utterance, probs=False, default_probs=None, default_labels=None, which="all"):
704
    """Function to get topics from particular annotator or all detected.
705
    Args:
706
        annotated_utterance: dictionary with annotated utterance
707
        probs: if False we return labels, otherwise we return probs
708
        default_probs: default probabilities to return
709
        default_labels: default labels to return
710
        which: which topics to return.
711
            'all' means topics by `cobot_topics` and `cobot_dialogact_topics`,
712
            'cobot_topics' means topics by `cobot_topics`,
713
            'cobot_dialogact_topics' means topics by `cobot_dialogact_topics`.
714
            'deeppavlov_topics' means topics by `deeppavlov_topics`.
715

716
    Returns:
717
        list of topic labels, if probs == False,
718
        dictionary where all keys are topic labels and values are probabilities, if probs == True
719
    """
720
    default_probs = {} if default_probs is None else default_probs
721
    default_labels = [] if default_labels is None else default_labels
722
    annotations = annotated_utterance.get("annotations", {})
723
    cobot_topics_probs, cobot_topics_labels = {}, []
724
    if "cobot_topics" in annotations:
725
        cobot_topics_labels = _process_text(annotations.get("cobot_topics", {}))
726
    if "combined_classification" in annotations and not cobot_topics_labels:
727
        cobot_topics_probs, cobot_topics_labels = _get_combined_annotations(
728
            annotated_utterance, model_name="cobot_topics"
729
        )
730
    cobot_topics_labels = _process_text(cobot_topics_labels)
731
    if not cobot_topics_probs:
732
        cobot_topics_probs = _labels_to_probs(cobot_topics_labels, combined_classes.get("cobot_topics", {}))
733

734
    cobot_da_topics_probs, cobot_da_topics_labels = {}, []
735
    if "cobot_dialogact" in annotations and "topics" in annotations["cobot_dialogact"]:
736
        cobot_da_topics_labels = annotations["cobot_dialogact"]["topics"]
737
    elif "cobot_dialogact_topics" in annotations:
738
        cobot_da_topics_labels = annotations["cobot_dialogact_topics"]
739

740
    if "combined_classification" in annotations and not cobot_da_topics_labels:
741
        cobot_da_topics_probs, cobot_da_topics_labels = _get_combined_annotations(
742
            annotated_utterance, model_name="cobot_dialogact_topics"
743
        )
744
    cobot_da_topics_labels = _process_text(cobot_da_topics_labels)
745
    if not cobot_da_topics_probs:
746
        cobot_da_topics_probs = _labels_to_probs(cobot_da_topics_labels, combined_classes["cobot_dialogact_topics"])
747

748
    dp_topics_probs, dp_topics_labels = {}, []
749
    if "combined_classification" in annotations and not dp_topics_labels:
750
        dp_topics_probs, dp_topics_labels = _get_combined_annotations(
751
            annotated_utterance, model_name="deeppavlov_topics"
752
        )
753
    topics_ru_probs, topics_ru_labels = {}, []
754
    if "topics_ru" in annotations:
755
        topics_ru_probs, topics_ru_labels = _get_combined_annotations(annotated_utterance, model_name="topics_ru")
756
    if which == "all":
757
        answer_labels = cobot_topics_labels + cobot_da_topics_labels + dp_topics_labels + topics_ru_labels
758
        answer_probs = {**cobot_topics_probs, **cobot_da_topics_probs, **dp_topics_probs, **topics_ru_probs}
759
    elif which == "cobot_topics":
760
        answer_probs, answer_labels = cobot_topics_probs, cobot_topics_labels
761
    elif which == "cobot_dialogact_topics":
762
        answer_probs, answer_labels = cobot_da_topics_probs, cobot_da_topics_labels
763
    elif which == "deeppavlov_topics":
764
        answer_probs, answer_labels = dp_topics_probs, dp_topics_labels
765
    elif which == "topics_ru":
766
        answer_probs, answer_labels = topics_ru_probs, topics_ru_labels
767
    else:
768
        logger.exception(f"Unknown input type in get_topics: {which}")
769
        answer_probs, answer_labels = default_probs, default_labels
770

771
    if probs:
772
        return answer_probs
773
    else:
774
        return answer_labels
775

776

777
def get_intents(annotated_utterance, probs=False, default_probs=None, default_labels=None, which="all"):
778
    """Function to get intents from particular annotator or all detected.
779
    Args:
780
        annotated_utterance: dictionary with annotated utterance
781
        probs: if False we return labels, otherwise we return probs
782
        default_probs: default probabilities to return
783
        default_labels: default labels to return
784
        which: which intents to return:
785
            'all' means intents detected by `intent_catcher`,
786
            `cobot_dialogact_intents` and  `midas_classification`.
787
            'intent_catcher' means intents detected by `intent_catcher`.
788
            'cobot_dialogact_intents' means intents detected by `cobot_dialogact_intents`.
789
            'midas' means intents detected by `midas_classification`.
790
    Returns:
791
        list of intent labels, if probs == False,
792
        dictionary where all keys are intent labels and values are probabilities, if probs == True
793
    """
794
    default_probs = {} if default_probs is None else default_probs
795
    default_labels = [] if default_labels is None else default_labels
796
    annotations = annotated_utterance.get("annotations", {})
797
    intents = annotations.get("intent_catcher", {})
798
    detected_intents = [k for k, v in intents.items() if v.get("detected", 0) == 1]
799
    detected_intent_probs = {key: 1 for key in detected_intents}
800
    midas_intent_probs = annotations.get("midas_classification", {})
801
    if "combined_classification" in annotations and not midas_intent_probs:
802
        midas_intent_probs, midas_intent_labels = _get_combined_annotations(
803
            annotated_utterance, model_name="midas_classification"
804
        )
805
    if isinstance(midas_intent_probs, dict) and midas_intent_probs:
806
        semantic_midas_probs = {k: v for k, v in midas_intent_probs.items() if k in MIDAS_SEMANTIC_LABELS}
807
        functional_midas_probs = {k: v for k, v in midas_intent_probs.items() if k in MIDAS_FUNCTIONAL_LABELS}
808
        if semantic_midas_probs:
809
            max_midas_semantic_prob = max(semantic_midas_probs.values())
810
        else:
811
            max_midas_semantic_prob = 0.0
812
        if functional_midas_probs:
813
            max_midas_functional_prob = max(functional_midas_probs.values())
814
        else:
815
            max_midas_functional_prob = 0.0
816

817
        midas_semantic_intent_labels = [k for k, v in semantic_midas_probs.items() if v == max_midas_semantic_prob]
818
        midas_functional_intent_labels = [
819
            k for k, v in functional_midas_probs.items() if v == max_midas_functional_prob
820
        ]
821
        midas_intent_labels = midas_semantic_intent_labels + midas_functional_intent_labels
822
    elif isinstance(midas_intent_probs, list):
823
        if midas_intent_probs:
824
            # now it's a list of dictionaries. length of list is n sentences
825
            midas_intent_labels = []
826
            for midas_sent_probs in midas_intent_probs:
827
                max_midas_sent_prob = max(midas_sent_probs.values())
828
                midas_intent_labels += [k for k, v in midas_sent_probs.items() if v == max_midas_sent_prob]
829
            _midas_intent_probs = deepcopy(midas_intent_probs)
830
            midas_intent_probs = {}
831
            class_names = list(set(sum([list(resp.keys()) for resp in _midas_intent_probs], [])))
832
            for class_name in class_names:
833
                max_proba = max([resp.get(class_name, 0.0) for resp in _midas_intent_probs])
834
                midas_intent_probs[class_name] = max_proba
835
        else:
836
            midas_intent_probs = {}
837
            midas_intent_labels = []
838
    else:
839
        midas_intent_labels = []
840
    cobot_da_intent_probs, cobot_da_intent_labels = {}, []
841

842
    if "cobot_dialogact" in annotations and "intents" in annotations["cobot_dialogact"]:
843
        cobot_da_intent_labels = annotated_utterance["annotations"]["cobot_dialogact"]["intents"]
844
    elif "cobot_dialogact_intents" in annotations:
845
        cobot_da_intent_labels = annotated_utterance["annotations"]["cobot_dialogact_intents"]
846

847
    if "combined_classification" in annotations and not cobot_da_intent_labels:
848
        cobot_da_intent_probs, cobot_da_intent_labels = _get_combined_annotations(
849
            annotated_utterance, model_name="cobot_dialogact_intents"
850
        )
851

852
    cobot_da_intent_labels = _process_text(cobot_da_intent_labels)
853
    if not cobot_da_intent_probs:
854
        cobot_da_intent_probs = _labels_to_probs(cobot_da_intent_labels, combined_classes["cobot_dialogact_intents"])
855

856
    if which == "all":
857
        answer_probs = {**detected_intent_probs, **cobot_da_intent_probs, **midas_intent_probs}
858
        answer_labels = detected_intents + cobot_da_intent_labels + midas_intent_labels
859
    elif which == "intent_catcher":
860
        answer_probs, answer_labels = detected_intent_probs, detected_intents
861
    elif which == "cobot_dialogact_intents":
862
        answer_probs, answer_labels = cobot_da_intent_probs, cobot_da_intent_labels
863
    elif which == "midas":
864
        answer_probs, answer_labels = midas_intent_probs, midas_intent_labels
865
    else:
866
        logger.warning(f"Unknown type in get_intents {which}")
867
        answer_probs, answer_labels = default_probs, default_labels
868

869
    if probs:
870
        return answer_probs
871
    else:
872
        return answer_labels
873

874

875
COBOT_ENTITIES_SKIP_LABELS = ["anaphor"]
876

877

878
def get_entities(annotated_utterance, only_named=False, with_labels=False, return_lemmas=False):
879
    entities = []
880
    if not only_named:
881
        if "entity_detection" in annotated_utterance.get("annotations", {}):
882
            # for english and russian languages
883
            labelled_entities = annotated_utterance["annotations"]["entity_detection"].get("labelled_entities", [])
884
            # skip some labels
885
            entities = [ent for ent in labelled_entities if ent["label"] not in COBOT_ENTITIES_SKIP_LABELS]
886
            if not with_labels:
887
                entities = [ent["text"] for ent in entities]
888
        elif "spacy_nounphrases" in annotated_utterance.get("annotations", {}):
889
            # for english language
890
            entities = annotated_utterance.get("annotations", {}).get("spacy_nounphrases", [])
891
            if with_labels:
892
                # actually there are no labels for cobot nounphrases
893
                # so, let's make it as for cobot_entities format
894
                entities = [{"text": ent, "label": "misc"} for ent in entities]
895
        elif "spacy_annotator" in annotated_utterance.get("annotations", {}):
896
            # for russian language
897
            words = annotated_utterance["annotations"]["spacy_annotator"]
898
            for word in words:
899
                if word.get("pos_", "") == "NOUN":
900
                    entities += [{"text": word["lemma_"] if return_lemmas else word["text"], "label": "misc"}]
901
            if not with_labels:
902
                entities = [ent["text"] for ent in entities]
903
    else:
904
        # `ner` contains list of lists of dicts. the length of the list is n-sentences
905
        # each entity is {"confidence": 1, "end_pos": 1, "start_pos": 0, "text": "unicorns", "type": "ORG"}
906
        entities = annotated_utterance.get("annotations", {}).get("ner", [])
907
        entities = sum(entities, [])  # flatten list, now it's a list of dicts-entities
908
        if not with_labels:
909
            entities = [ent["text"] for ent in entities]
910
    return entities if entities is not None else []
911

912

913
def get_named_persons(annotated_utterance):
914
    named_entities = get_entities(annotated_utterance, only_named=True, with_labels=True)
915
    all_entities = get_entities(annotated_utterance, only_named=False, with_labels=True)
916

917
    named_persons = []
918
    if "cobot_entities" in annotated_utterance["annotations"]:
919
        for ent in all_entities:
920
            if ent["label"] == "person":
921
                named_persons.append(ent["text"])
922
    if "ner" in annotated_utterance["annotations"]:
923
        for ent in named_entities:
924
            if ent["type"] == "PER":
925
                named_persons.append(ent["text"])
926

927
    named_persons = list(set(named_persons))
928

929
    return named_persons
930

931

932
def get_named_locations(annotated_utterance):
933
    named_entities = get_entities(annotated_utterance, only_named=True, with_labels=True)
934
    all_entities = get_entities(annotated_utterance, only_named=False, with_labels=True)
935

936
    named_locations = []
937
    if "cobot_entities" in annotated_utterance["annotations"]:
938
        for ent in all_entities:
939
            if ent["label"] == "location":
940
                named_locations.append(ent["text"])
941
    if len(named_locations) == 0 and "ner" in annotated_utterance["annotations"]:
942
        for ent in named_entities:
943
            if ent["type"] == "LOC" and ent["text"] != "alexa":
944
                _is_part_of_other_entity = False
945
                for cobot_ent in all_entities:
946
                    if ent["text"] in cobot_ent["text"] and cobot_ent["label"] != "location":
947
                        _is_part_of_other_entity = True
948
                if not _is_part_of_other_entity:
949
                    named_locations.append(ent["text"])
950

951
    named_locations = list(set(named_locations))
952
    if re.search(r"\bjapan\b", annotated_utterance["text"], re.IGNORECASE) and "japan" not in named_locations:
953
        # NER does not catch this country at all!
954
        named_locations.append("japan")
955

956
    return named_locations
957

958

959
def get_raw_entity_names_from_annotations(annotations):
960
    """
961

962
    Args:
963
        annotated_utterance: annotated utterance
964

965
    Returns:
966
        Wikidata entities we received from annotations
967
    """
968
    raw_el_output = annotations.get("entity_linking", [{}])
969
    entities = []
970
    try:
971
        if raw_el_output:
972
            if isinstance(raw_el_output[0], dict):
973
                entities = raw_el_output[0].get("entity_ids", [])
974
            if isinstance(raw_el_output[0], list):
975
                entities = raw_el_output[0][0]
976
    except Exception as e:
977
        error_message = f"Wrong entity linking output format {raw_el_output} : {e}"
978
        sentry_sdk.capture_exception(e)
979
        logger.exception(error_message)
980
    return entities
981

982

983
def get_entity_names_from_annotations(annotated_utterance, stopwords=None, default_entities=None):
984
    """
985

986
    Args:
987
        annotated_utterance: annotated utterance
988
        stopwords_file: name of file with stopwords
989

990
    Returns:
991
        Names of named entities we received from annotations
992
    """
993
    default_entities = [] if default_entities is None else default_entities
994
    stopwords = stopwords if stopwords else []
995
    full_text = annotated_utterance.get("text", "").lower()
996
    named_entities = [full_text] if full_text in default_entities else []
997
    annotations = annotated_utterance.get("annotations", {})
998
    for tmp in annotations.get("ner", []):
999
        if tmp and "text" in tmp[0]:
1000
            named_entities.append(tmp[0]["text"])
1001
    for nounphrase in annotations.get("spacy_nounphrases", []):
1002
        named_entities.append(nounphrase)
1003
    for wikiparser_dict in annotations.get("wiki_parser", [{}]):
1004
        for wiki_entity_name in wikiparser_dict:
1005
            named_entities.append(wiki_entity_name)
1006
    named_entities = [
1007
        entity
1008
        for entity in named_entities
1009
        if any([len(ent_word) >= 5 or ent_word not in stopwords for ent_word in entity.split(" ")])
1010
    ]
1011
    named_entities = list(set(named_entities))
1012
    # remove entities which are is either too short or stopword
1013
    return named_entities
1014

1015

1016
def entity_to_label(entity):
1017
    """
1018

1019
    Args:
1020
        entity: Wikidata entity for which we need to receive the label
1021
        If should be string, with first letter Q and other from 0 to 9, like Q5321
1022

1023
    Returns:
1024

1025
        label: label from this entity.
1026
        If entity is in wrong format we assume that it is already label but give exception
1027

1028
    """
1029
    logger.debug(f"Calling entity_to_label for {entity}")
1030
    no_entity = not entity
1031
    wrong_entity_type = not isinstance(entity, str)
1032
    wrong_entity_format = entity and (entity[0] != "Q" or any([j not in "0123456789" for j in entity[1:]]))
1033
    if no_entity or wrong_entity_type or wrong_entity_format:
1034
        warning_text = f"Wrong entity format. We assume {entity} to be label but check the code"
1035
        sentry_sdk.capture_exception(Exception(warning_text))
1036
        logger.exception(warning_text)
1037
        return entity
1038
    label = ""
1039
    labels = request_triples_wikidata("find_label", [(entity, "")])
1040
    try:
1041
        sep = '"'
1042
        if sep in labels[0]:
1043
            label = labels[0].split('"')[1]
1044
        else:
1045
            label = labels[0]
1046
        logger.debug(f"Answer {label}")
1047
    except Exception as e:
1048
        sentry_sdk.capture_exception(e)
1049
        logger.exception(Exception(e, "Exception in conversion of labels {labels}"))
1050
    return label
1051

1052

1053
def get_types_from_annotations(annotations, types, tocheck_relation="occupation"):
1054
    """
1055

1056
    Args:
1057
        annotations: annotations of utterance
1058
        types: types from which we need to find one
1059
        or ( if exclude_types is True) to find type not included in the list, if it is the entity of given type
1060
        tocheck_relation: relation we want to check
1061
        exclude_types: if False we look for matching types, otherwise we look for excluding types
1062

1063
    Returns:
1064
        name of entity, name of type found, raw name of type found
1065
    """
1066
    wp_annotations = annotations.get("wiki_parser", {})
1067
    if isinstance(wp_annotations, list) and wp_annotations:  # support 2 different formats
1068
        wp_annotations = wp_annotations[0]
1069
    try:
1070
        topic_entities = wp_annotations.get("topic_skill_entities_info", {})
1071
        for entity in topic_entities:
1072
            for relation in topic_entities[entity]:
1073
                if relation == tocheck_relation:
1074
                    type_to_typename = {j[0]: j[1] for j in topic_entities[entity][relation]}
1075
                    found_types = type_to_typename.keys()
1076
                    matching_types = [type_to_typename[k] for k in set(found_types) & set(types)]
1077
                    mismatching_types = [type_to_typename[k] for k in found_types if k not in types]
1078
                    if matching_types:
1079
                        return entity, matching_types, mismatching_types
1080
            logger.warning("Relation to check not found")
1081
    except Exception as e:
1082
        sentry_sdk.capture_exception(e)
1083
        logger.exception(Exception(e, f"Exception in processing wp annotations {wp_annotations}"))
1084
    return None, None, None
1085

1086

1087
ANYTHING_EXCEPT_OF_LETTERS_AND_SPACE_COMPILED = re.compile(r"[^a-zA-Z ]")
1088
ANYTHING_EXCEPT_OF_LETTERS_SPACE_AND_PUNCT_COMPILED = re.compile(r"[^a-zA-Z\,\.\?\!\- ]")
1089
MULTI_SPACE_COMPILED = re.compile(r"\s+")
1090

1091

1092
def clean_entities(entities):
1093
    entities = [entity.lower() for entity in entities]
1094
    entities = [re.sub(ANYTHING_EXCEPT_OF_LETTERS_AND_SPACE_COMPILED, " ", entity) for entity in entities]
1095
    entities = [re.sub(MULTI_SPACE_COMPILED, " ", entity).strip() for entity in entities]
1096
    entities = [entity.split() for entity in entities]  # now it's a list of lists of strings
1097
    entities = sum(entities, [])  # flatten list
1098
    return entities
1099

1100

1101
def get_common_tokens_in_lists_of_strings(list_of_strings_0, list_of_strings_1):
1102
    """
1103
    Clean strings removing anything except of letters and spaces, split every string to tokens by spaces,
1104
    find common tokens for two lists of strings.
1105
    """
1106
    list_of_strings_0 = deepcopy(list_of_strings_0)
1107
    list_of_strings_1 = deepcopy(list_of_strings_1)
1108

1109
    list_of_strings_0 = clean_entities(list_of_strings_0)
1110
    list_of_strings_1 = clean_entities(list_of_strings_1)
1111

1112
    common_substrings = list(set(list_of_strings_0).intersection(set(list_of_strings_1)))
1113

1114
    return common_substrings
1115

1116

1117
SYMBOLS_EXCEPT_LETTERS_AND_DIGITS = re.compile(r"[^a-zA-Zа-яА-ЯёЁ0-9\-_ ]")
1118
DOUBLE_SPACES = re.compile(r"\s+")
1119

1120

1121
def replace_symbols_except_letters_and_digits(s):
1122
    s = SYMBOLS_EXCEPT_LETTERS_AND_DIGITS.sub(" ", s)
1123
    s = DOUBLE_SPACES.sub(" ", s).strip()
1124
    return s
1125

1126

1127
def remove_punctuation_from_dict_keys(element):
1128
    if isinstance(element, dict):
1129
        new_element = {}
1130
        for dict_key, value in element.items():
1131
            if isinstance(value, dict) or isinstance(value, list):
1132
                new_value = remove_punctuation_from_dict_keys(value)
1133
                new_element[replace_symbols_except_letters_and_digits(dict_key)] = deepcopy(new_value)
1134
            else:
1135
                new_element[replace_symbols_except_letters_and_digits(dict_key)] = deepcopy(value)
1136
        return new_element
1137
    elif isinstance(element, list):
1138
        new_element = []
1139
        for sub_element in element:
1140
            if isinstance(sub_element, dict) or isinstance(sub_element, list):
1141
                new_sub_element = remove_punctuation_from_dict_keys(sub_element)
1142
                new_element += [new_sub_element]
1143
            else:
1144
                new_element += [sub_element]
1145
        return new_element
1146
    else:
1147
        return element
1148

1149

1150
PERSONAL_PRONOUNS = re.compile(
1151
    r"\b(i|you|he|she|it|we|they|me|my|him|her|us|them|its|mine|your|yours|his|hers|ours|theirs|myself|yourself|himself"
1152
    r"|herself|itself|ourselves|themselves|their)\b",
1153
    re.IGNORECASE,
1154
)
1155

1156

1157
def find_first_complete_sentence(sentences):
1158
    """Find first sentence without any personal pronouns."""
1159
    for sent in sentences:
1160
        if PERSONAL_PRONOUNS.search(sent):
1161
            continue
1162
        else:
1163
            return sent
1164
    return None
1165

1166

1167
def is_toxic_utterance(annotated_utterance):
1168
    toxic_result = get_toxic(annotated_utterance, probs=False)
1169
    toxic_result = [] if "not_toxic" in toxic_result else toxic_result
1170
    # now toxic_result is empty if not toxic utterance
1171
    return True if len(toxic_result) > 0 else False
1172

1173

1174
def is_badlisted_utterance(annotated_utterance):
1175
    default_badlist = {"bad_words": False}
1176
    badlist_result = annotated_utterance.get("annotations", {}).get("badlisted_words", default_badlist)
1177

1178
    return any([badlist_result.get(bad, False) for bad in ["bad_words", "inappropriate", "profanity"]])
1179

1180

1181
def is_toxic_or_badlisted_utterance(annotated_utterance):
1182
    return is_toxic_utterance(annotated_utterance) or is_badlisted_utterance(annotated_utterance)
1183

1184

1185
FACTOID_PATTERNS = re.compile(
1186
    r"^(do you know |((can |could )you )tell me )?(please )?"
1187
    r"((what|who|which|where) (is|are|was|were)\b|how to\b|when)",
1188
    re.IGNORECASE,
1189
)
1190
COUNTER_FACTOID_PATTERNS = re.compile(r"^(what|who|which|where) (is|are|was|were)( that|[\.\?]$)\b", re.IGNORECASE)
1191

1192

1193
def is_special_factoid_question(annotated_utterance):
1194
    uttr_text = annotated_utterance.get("text", "")
1195
    found = FACTOID_PATTERNS.search(uttr_text)
1196
    if found and not COUNTER_FACTOID_PATTERNS.search(uttr_text):
1197
        # remove first question like part
1198
        rest_string = uttr_text[uttr_text.find(found[0]) + len(found[0]) :].strip()
1199
        if PERSONAL_PRONOUNS.search(rest_string):
1200
            # if any personal pronouns - not our case
1201
            return False
1202
        return True
1203
    return False
1204

1205

1206
FACTS_EXTRA_WORDS = re.compile(
1207
    r"(this might answer your question[:\,]? "
1208
    r"|(according to|from) (wikipedia|wikihow)[:\,]? "
1209
    r"|here's (something|what) I found (from|on) [a-zA-Z0-9\-\.]+:"
1210
    r"|here's a fact about [a-zA-Z0-9\- \,]+\.)",
1211
    re.IGNORECASE,
1212
)
1213

1214

1215
def get_dialog_breakdown_annotations(annotated_utterance):
1216
    breakdown = annotated_utterance.get("annotations", {}).get("dialog_breakdown", {}).get("breakdown", 0.0) > 0.5
1217
    return breakdown
1218

1219

1220
def get_comet_conceptnet_annotations(annotated_utterance):
1221
    """
1222
    COMeT ConceptNet annotations look like (entity names are keys):
1223
    {
1224
        '123ss': {
1225
            'SymbolOf': ['space', 'time', 'justice'],
1226
            'HasProperty': ['number', 'one of six number', 'long'],
1227
            'Causes': ['death', 'death and destruction', 'war'],
1228
            'CausesDesire': ['go to work', 'go to play', 'sleep']
1229
        }
1230
    }
1231
    """
1232
    if "comet_conceptnet" in annotated_utterance.get("annotations", {}):
1233
        return annotated_utterance.get("annotations", {}).get("comet_conceptnet", {})
1234
    elif "conceptnet" in annotated_utterance.get("annotations", {}):
1235
        return annotated_utterance.get("annotations", {}).get("conceptnet", {})
1236
    else:
1237
        return {}
1238

1239

1240
class Topic:
1241
    def __init__(self, topic_group=None, detecting_regexp=None, detecting_function=None):
1242
        self.topic_group = topic_group
1243
        self.detecting_regexp = detecting_regexp
1244
        self.detecting_function = detecting_function
1245

1246
    def detect(self, annotated_utterance, only_one_topic=True, threshold=0.1, which="all"):
1247
        if only_one_topic:
1248
            found_topics = get_topics(annotated_utterance, probs=False, which=which)
1249
        else:
1250
            found_probs = get_topics(annotated_utterance, probs=True, which=which)
1251
            found_topics = [key for key in found_probs if found_probs[key] > threshold]
1252
        if any([target_topic in found_topics for target_topic in self.topic_group]):
1253
            return True
1254
        elif self.detecting_regexp is not None:
1255
            if re.findall(self.detecting_regexp, annotated_utterance["text"]):
1256
                return True
1257
        elif self.detecting_function is not None:  # Support for non-regexp methods
1258
            if self.detecting_function(annotated_utterance):
1259
                return True
1260
        return False
1261

1262

1263
TOPICS = {
1264
    "food": Topic(TOPIC_GROUPS["food"], food.FOOD_COMPILED_PATTERN),
1265
    "books": Topic(TOPIC_GROUPS["books"], books.BOOK_PATTERN),
1266
    "music": Topic(TOPIC_GROUPS["music"], music.MUSIC_COMPILED_PATTERN),
1267
    "news": Topic(TOPIC_GROUPS["news"], news.NEWS_COMPILED_PATTERN),
1268
    "politics": Topic(TOPIC_GROUPS["politics"]),
1269
    "sports": Topic(TOPIC_GROUPS["sports"], detecting_function=sport.about_sport),
1270
    "religion": Topic(TOPIC_GROUPS["religion"]),
1271
    "movies": Topic(TOPIC_GROUPS["movies"], movies.MOVIE_COMPILED_PATTERN),
1272
    "fashion": Topic(TOPIC_GROUPS["fashion"]),
1273
    "travel": Topic(TOPIC_GROUPS["travel"], travel.TRAVELLING_TEMPLATE),
1274
    "celebrities": Topic(TOPIC_GROUPS["celebrities"], gossip.GOSSIP_COMPILED_PATTERN),
1275
    "art": Topic(TOPIC_GROUPS["art"], art.ART_PATTERN),
1276
    "science": Topic(TOPIC_GROUPS["science"], science.SCIENCE_COMPILED_PATTERN),
1277
    "entertainment": Topic(TOPIC_GROUPS["entertainment"]),
1278
    "games": Topic(TOPIC_GROUPS["games"], gaming.VIDEO_GAME_WORDS_COMPILED_PATTERN),
1279
    "animals": Topic(TOPIC_GROUPS["animals"], animals.ANIMALS_FIND_TEMPLATE),
1280
    "sex": Topic(TOPIC_GROUPS["sex"]),
1281
    "weather": Topic(TOPIC_GROUPS["weather"]),
1282
}  # The list can be expanded according to the topic list supported
1283

1284

1285
def is_about(topic_name, annotated_utterance, **kwargs):
1286
    return TOPICS[topic_name].detect(annotated_utterance, **kwargs)
1287

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.