1
from __future__ import annotations
6
from copy import deepcopy
7
from random import choice
9
from common.custom_requests import request_triples_wikidata
10
from common.factoid import FACTOID_THRESHOLD
11
from common.combined_classes import combined_classes, TOPIC_GROUPS
12
from common.join_pattern import *
14
from common import food, books, music, news, travel
15
from common import art, science, movies, animals, gaming, sport, gossip
19
logger = logging.getLogger(__name__)
21
sentry_sdk.init(getenv("SENTRY_DSN"))
24
"dff_intent_responder_skill",
25
"dff_program_y_dangerous_skill",
27
"christmas_new_year_skill",
30
"valentines_day_skill",
34
"personal_info_skill", # 'short_story_skill',
40
"dff_coronavirus_skill",
43
"game_cooperative_skill",
46
"dff_program_y_skill",
49
"book_tfidf_retrieval",
50
"entertainment_tfidf_retrieval",
51
"fashion_tfidf_retrieval",
52
"movie_tfidf_retrieval",
53
"music_tfidf_retrieval",
54
"politics_tfidf_retrieval",
55
"science_technology_tfidf_retrieval",
56
"sport_tfidf_retrieval",
57
"animals_tfidf_retrieval",
59
"topicalchat_convert_retrieval",
60
"dff_program_y_wide_skill",
61
"knowledge_grounding_skill",
68
"Sounds interesting.",
69
"Sounds interesting!",
74
"I'm glad you think so!",
75
"Sorry, I don't have an answer for that!",
76
"Let's talk about something else.",
98
"weather_forecast_intent",
104
high_priority_intents = {
105
"dff_intent_responder_skill": {
112
"where_are_you_from",
115
"dff_grounding_skill": {"what_are_you_talking_about"},
118
low_priority_intents = {"dont_understand", "what_time", "choose_topic"}
121
"emotion_classification",
122
"toxic_classification",
132
"Celebrities&Events": 0,
133
"Family&Relationships": 0,
134
"Health&Medicine": 0,
137
"Books&Literature": 0.3,
140
"ArtificialIntelligence": 0.3,
145
"deeppavlov_topics": {class_: DP_THRESHOLDS.get(class_, 0.9) for class_ in combined_classes["deeppavlov_topics"]},
146
"toxic_classification": {
147
"identity_hate": 0.5,
152
"sexual_explicit": 0.6,
159
"semantic_request": {
161
"open_question_factual",
162
"open_question_opinion",
163
"open_question_personal",
165
"clarifying_question",
167
"command": ["command", "dev_command"],
168
"opinion": ["appreciation", "opinion", "complaint", "comment"],
169
"statement": ["statement"],
170
"answer": ["other_answers", "pos_answer", "neg_answer"],
172
"functional_request": {
173
"incomplete": ["abandon", "nonsense"],
174
"social_convention": ["opening", "closing", "hold", "back-channeling"],
176
"other": ["uncertain", "non_compliant", "correction"],
179
MIDAS_SEMANTIC_LABELS = sum([intent_list for intent_list in midas_classes["semantic_request"].values()], [])
180
MIDAS_FUNCTIONAL_LABELS = sum([intent_list for intent_list in midas_classes["functional_request"].values()], [])
183
def get_skill_outputs_from_dialog(utterances, skill_name, activated=False):
185
Extract list of dictionaries with already formatted outputs of `skill_name` from full dialog.
186
If `activated=True`, skill also should be chosen as `active_skill`;
187
otherwise, empty list.
190
utterances: utterances, the first one is user's reply
191
skill_name: name of target skill
192
activated: if target skill should be chosen by response selector on previous step or not
195
list of dictionaries with formatted outputs of skill
200
for uttr in utterances:
201
if "active_skill" in uttr:
202
final_response = uttr.get("orig_text", None) if uttr.get("orig_text", None) is not None else uttr["text"]
203
for skop in skills_outputs:
204
# need to check text-response for skills with several hypotheses
205
if skop["skill_name"] == skill_name:
206
if activated and skop["text"] in final_response and uttr["active_skill"] == skill_name:
207
# removed one condition as if scop contains skill_name and text, its len is > 0
210
if not activated and skop:
212
elif "hypotheses" in uttr:
213
skills_outputs = uttr["hypotheses"]
220
Transform infinitive form of verb to Ving form.
226
string with required verb form
230
# by Anastasia Kravtsova
233
s1 = re.compile(r"(?<![a-z])be\+VBG")
234
s2 = re.compile(r"(?<![aouiey])([^aouiey][aouiey]([^aouieywr]))\+VBG")
235
s3 = re.compile(r"ie\+VBG")
236
s4 = re.compile(r"(ee)\+VBG")
237
s5 = re.compile(r"e\+VBG")
239
s6 = re.compile(r"\+VBG")
242
s = re.sub(s1, "being", s)
243
s = re.sub(s2, r"\1\2ing", s)
244
s = re.sub(s3, r"ying", s)
245
s = re.sub(s4, r"\1ing", s)
246
s = re.sub(s5, r"ing", s)
248
s = re.sub(s6, "ing", s)
252
def get_list_of_active_skills(utterances):
254
Extract list of active skills names
257
utterances: utterances, the first one is user's reply
260
list of string skill names
264
for uttr in utterances:
265
if "active_skill" in uttr:
266
result.append(uttr["active_skill"])
271
def get_user_replies_to_particular_skill(utterances, skill_name):
273
Return user's responses to particular skill if it was active
279
list of string response
282
for i, uttr in enumerate(utterances):
283
if uttr.get("active_skill", "") == skill_name:
284
result.append(utterances[i - 1]["text"])
288
yes_templates = re.compile(
289
r"(\byes\b|\byup\b|\byep\b|\bsure\b|go ahead|\byeah\b|\bok\b|okay|^(kind of|kinda)\.?$|"
290
r"^why not\.?$|^tell me\.?$|^i (agree|do|did|like|have|had|think so)\.?$)"
294
def is_yes(annotated_phrase):
295
yes_detected = "yes" in get_intents(annotated_phrase, which="intent_catcher", probs=False)
296
midas_yes_detected = "pos_answer" in get_intents(annotated_phrase, which="midas", probs=False)
297
# TODO: intent catcher not catches 'yes thanks!'
298
if yes_detected or midas_yes_detected or re.search(yes_templates, annotated_phrase.get("text", "").lower()):
303
no_templates = re.compile(r"(\bno\b|\bnot\b|no way|don't|no please|i disagree|^neither.?$)")
304
DONOTKNOW_LIKE = [r"(i )?(do not|don't) know", "you (choose|decide|pick up)", "no idea"]
305
DONOTKNOW_LIKE_PATTERN = re.compile(join_sentences_in_or_pattern(DONOTKNOW_LIKE), re.IGNORECASE)
308
def is_donot_know(annotated_phrase):
309
if DONOTKNOW_LIKE_PATTERN.search(annotated_phrase.get("text", "")):
314
def is_no_intent(annotated_phrase):
315
no_detected = "no" in get_intents(annotated_phrase, which="intent_catcher", probs=False)
316
midas_no_detected = False # "neg_answer" in get_intents(annotated_phrase, which='midas', probs=False)
317
is_not_idontknow = not is_donot_know(annotated_phrase)
318
if (no_detected or midas_no_detected) and is_not_idontknow:
324
def is_no(annotated_phrase):
325
no_detected = "no" in get_intents(annotated_phrase, which="intent_catcher", probs=False)
326
midas_no_detected = "neg_answer" in get_intents(annotated_phrase, which="midas", probs=False)
327
# TODO: intent catcher thinks that horrible is no intent'
328
user_phrase = annotated_phrase.get("text", "").lower().strip().replace(".", "")
329
is_not_horrible = "horrible" != user_phrase
330
no_regexp_detected = re.search(no_templates, annotated_phrase.get("text", "").lower())
331
is_not_idontknow = not is_donot_know(annotated_phrase)
332
_yes = is_yes(annotated_phrase)
333
if is_not_horrible and (no_detected or midas_no_detected or no_regexp_detected) and is_not_idontknow and not _yes:
339
def is_question(text):
343
def substitute_nonwords(text):
344
return re.sub(r"\W+", " ", text).strip()
347
def get_intent_name(text):
349
if splitter not in text:
351
intent_name = text.split(splitter)[-1]
352
intent_name = re.sub(r"\W", " ", intent_name.lower()).strip()
356
OPINION_REQUEST_PATTERN = re.compile(
357
r"(don't|do not|not|are not|are|do)?\s?you\s"
358
r"(like|dislike|adore|hate|love|believe|consider|get|know|taste|think|"
359
r"recognize|sure|understand|feel|fond of|care for|fansy|appeal|suppose|"
363
OPINION_EXPRESSION_PATTERN = re.compile(
364
r"\bi (don't|do not|not|am not|'m not|am|do)?\s?"
365
r"(like|dislike|adore|hate|love|believe|consider|get|know|taste|think|"
366
r"recognize|sure|understand|feel|fond of|care for|fansy|appeal|suppose|"
372
def is_opinion_request(annotated_utterance):
373
intents = get_intents(annotated_utterance, which="all", probs=False)
374
intent_detected = any([intent in intents for intent in ["Opinion_RequestIntent", "open_question_opinion"]])
375
uttr_text = annotated_utterance.get("text", "")
376
if intent_detected or (OPINION_REQUEST_PATTERN.search(uttr_text) and "?" in uttr_text):
382
def is_opinion_expression(annotated_utterance):
383
all_intents = get_intents(annotated_utterance, which="all")
384
intent_detected = any([intent in all_intents for intent in ["opinion", "Opinion_ExpressionIntent"]])
385
uttr_text = annotated_utterance.get("text", "")
386
if intent_detected or OPINION_EXPRESSION_PATTERN.search(uttr_text):
392
def get_outputs_with_response_from_dialog(utterances, response, activated=False):
394
Extract list of dictionaries with already formatted outputs of different skills from full dialog
395
which replies containe `response`.
396
If `activated=True`, skill also should be chosen as `active_skill`;
397
otherwise, empty list.
400
utterances: utterances, the first one is user's reply
401
response: target text to search among bot utterances
402
activated: if target skill should be chosen by response selector on previous step or not
405
list of dictionaries with formatted outputs of skill
410
for uttr in utterances:
411
if "active_skill" in uttr:
412
final_response = uttr["text"]
413
for skop in skills_outputs:
414
# need to check text-response for skills with several hypotheses
415
if response in skop["text"]:
416
if activated and skop["text"] in final_response and skop:
419
if not activated and skop:
421
elif "hypotheses" in uttr:
422
skills_outputs = uttr["hypotheses"]
427
def get_not_used_template(used_templates, all_templates, any_if_no_available=True):
429
Choose not used template among all templates
432
used_templates: list of templates already used in the dialog
433
all_templates: list of all available templates
438
available = list(set(all_templates).difference(set(used_templates)))
440
return choice(available)
441
elif any_if_no_available:
442
return choice(all_templates)
447
def get_all_not_used_templates(used_templates, all_templates):
449
Return all not used template among all templates
452
used_templates: list of templates already used in the dialog
453
all_templates: list of all available templates
458
available = list(set(all_templates).difference(set(used_templates)))
462
def _probs_to_labels(answer_probs, max_proba=True, threshold=0.5):
465
if isinstance(threshold, dict):
466
answer_labels = [key for key in answer_probs if answer_probs[key] > threshold.get(key, 0)]
468
answer_labels = [key for key in answer_labels if answer_probs[key] == max(answer_probs.values())]
470
answer_labels = [label for label in answer_probs if answer_probs[label] > threshold]
471
if not answer_labels and max_proba:
472
answer_labels = [key for key in answer_probs if answer_probs[key] == max(answer_probs.values())]
476
def _labels_to_probs(answer_labels, all_labels):
477
answer_probs = dict()
478
for label in all_labels:
479
if label in answer_labels:
480
answer_probs[label] = 1
482
answer_probs[label] = 0
486
def _get_combined_annotations(annotated_utterance, model_name, threshold=0.5):
487
answer_probs, answer_labels = {}, []
489
annotations = annotated_utterance["annotations"]
490
combined_annotations = annotations.get("combined_classification", {})
491
if combined_annotations and isinstance(combined_annotations, list):
492
combined_annotations = combined_annotations[0]
493
if model_name in combined_annotations:
494
answer_probs = combined_annotations[model_name]
496
logger.warning(f"Not found Model name {model_name} in combined annotations {combined_annotations}")
497
old_style_toxic = all(
498
[model_name == "toxic_classification", "factoid_classification" not in combined_annotations]
500
if model_name in MULTILABEL_TASKS or old_style_toxic:
501
answer_labels = _probs_to_labels(answer_probs, max_proba=False, threshold=threshold)
502
elif model_name == "factoid_classification" and answer_probs.get("is_factoid", 0) < threshold:
503
answer_labels = ["is_conversational"]
504
elif model_name == "deeppavlov_topics":
505
answer_labels = _probs_to_labels(answer_probs, max_proba=True, threshold=THRESHOLDS["deeppavlov_topics"])
506
elif model_name == "toxic_classification":
507
answer_labels = _probs_to_labels(answer_probs, max_proba=True, threshold=THRESHOLDS["toxic_classification"])
509
answer_labels = _probs_to_labels(answer_probs, max_proba=True, threshold=threshold)
510
except Exception as e:
511
sentry_sdk.capture_exception(e)
514
return answer_probs, answer_labels
517
def _process_text(answer):
518
if isinstance(answer, dict) and "text" in answer:
519
return answer["text"]
524
def _process_old_sentiment(answer):
525
# Input: all sentiment annotations. Output: probs
526
if isinstance(answer[0], str) and isinstance(answer[1], float):
527
# support old sentiment output
529
for key in combined_classes["sentiment_classification"]:
531
curr_answer[key] = answer[1]
533
curr_answer[key] = 0.5 * (1 - answer[1])
534
answer_probs = curr_answer
537
logger.warning("_process_old_sentiment got file with an output that is not old-style")
541
def _get_plain_annotations(annotated_utterance, model_name, threshold=0.5):
542
answer_probs, answer_labels = {}, []
544
annotations = annotated_utterance["annotations"]
545
answer = annotations[model_name]
547
answer = _process_text(answer)
548
if isinstance(answer, list):
549
if model_name == "sentiment_classification":
550
answer_probs = _process_old_sentiment(answer)
551
answer_labels = _probs_to_labels(answer_probs, max_proba=True, threshold=threshold)
553
answer_labels = answer
554
answer_probs = _labels_to_probs(answer_labels, combined_classes[model_name])
556
answer_probs = answer
557
if model_name == "toxic_classification":
558
# this function is only for plain annotations (when toxic_classification is a separate annotator)
559
answer_labels = _probs_to_labels(answer_probs, max_proba=False, threshold=threshold)
560
elif model_name == "factoid_classification" and answer_probs.get("is_factoid", 0) < threshold:
561
answer_labels = ["is_conversational"]
563
answer_labels = _probs_to_labels(answer_probs, max_proba=True, threshold=threshold)
564
except Exception as e:
567
return answer_probs, answer_labels
570
def print_combined(combined_output):
571
combined_output = deepcopy(combined_output)
572
for i in range(len(combined_output)):
573
for key in combined_output[i]:
574
for class_ in combined_output[i][key]:
575
combined_output[i][key][class_] = round(combined_output[i][key][class_], 2)
576
logger.info(f"Combined classifier output is {combined_output}")
579
def _get_etc_model(annotated_utterance, model_name, probs, default_probs, default_labels, threshold=0.5):
580
"""Function to get emotion classifier annotations from annotated utterance.
583
annotated_utterance: dictionary with annotated utterance, or annotations
584
probs: return probabilities or not
585
default_probs: default probs to return.
586
default_labels: default labels to return.
587
Threshold: threshold for classification
589
dictionary with emotion probablilties, if probs == True, or emotion labels if probs != True
593
if model_name in annotated_utterance.get("annotations", {}):
594
answer_probs, answer_labels = _get_plain_annotations(
595
annotated_utterance, model_name=model_name, threshold=threshold
597
elif "combined_classification" in annotated_utterance.get("annotations", {}):
598
answer_probs, answer_labels = _get_combined_annotations(
599
annotated_utterance, model_name=model_name, threshold=threshold
602
answer_probs, answer_labels = default_probs, default_labels
603
except Exception as e:
604
logger.exception(e, stack_info=True)
605
answer_probs, answer_labels = default_probs, default_labels
606
if probs: # return probs
612
def get_toxic(annotated_utterance, probs=True, default_probs=None, default_labels=None):
613
"""Function to get toxic classifier annotations from annotated utterance.
616
annotated_utterance: dictionary with annotated utterance, or annotations
617
probs: return probabilities or not
618
default: default value to return. If it is None, returns empty dict/list depending on probs argument
620
dictionary with toxic probablilties, if probs == True, or toxic labels if probs != True
622
default_probs = {} if default_probs is None else default_probs
623
default_labels = [] if default_labels is None else default_labels
624
return _get_etc_model(
626
"toxic_classification",
628
default_probs=default_probs,
629
default_labels=default_labels,
633
def get_factoid(annotated_utterance, probs=True, default_probs=None, default_labels=None):
634
"""Function to get factoid classifier annotations from annotated utterance.
637
annotated_utterance: dictionary with annotated utterance, or annotations
638
probs: return probabilities or not
639
default: default value to return. If it is None, returns empty dict/list depending on probs argument
641
dictionary with factoid probablilties, if probs == True, or factoid labels if probs != True
643
default_probs = {"is_conversational": 1} if default_probs is None else default_probs
644
default_labels = ["is_conversational"] if default_labels is None else default_labels
645
return _get_etc_model(
647
"factoid_classification",
649
default_probs=default_probs,
650
default_labels=default_labels,
651
threshold=FACTOID_THRESHOLD,
655
def get_sentiment(annotated_utterance, probs=True, default_probs=None, default_labels=None):
656
"""Function to get sentiment classifier annotations from annotated utterance.
659
annotated_utterance: dictionary with annotated utterance, or annotations
660
probs: return probabilities or not
661
default: default value to return. If it is None, returns empty dict/list depending on probs argument
663
dictionary with sentiment probablilties, if probs == True, or sentiment labels if probs != True
665
default_probs = {"positive": 0, "negative": 0, "neutral": 1} if default_probs is None else default_probs
666
default_labels = ["neutral"] if default_labels is None else default_labels
668
return _get_etc_model(
670
"sentiment_classification",
672
default_probs=default_probs,
673
default_labels=default_labels,
677
def get_emotions(annotated_utterance, probs=True, default_probs=None, default_labels=None):
678
"""Function to get emotion classifier annotations from annotated utterance.
681
annotated_utterance: dictionary with annotated utterance, or annotations
682
probs: return probabilities or not
683
default: default value to return. If it is None, returns empty dict/list depending on probs argument
685
dictionary with emotion probablilties, if probs == True, or emotion labels if probs != True
688
{"anger": 0, "fear": 0, "joy": 0, "love": 0, "sadness": 0, "surprise": 0, "neutral": 1}
689
if default_probs is None
692
default_labels = ["neutral"] if default_labels is None else default_labels
694
return _get_etc_model(
696
"emotion_classification",
698
default_probs=default_probs,
699
default_labels=default_labels,
703
def get_topics(annotated_utterance, probs=False, default_probs=None, default_labels=None, which="all"):
704
"""Function to get topics from particular annotator or all detected.
706
annotated_utterance: dictionary with annotated utterance
707
probs: if False we return labels, otherwise we return probs
708
default_probs: default probabilities to return
709
default_labels: default labels to return
710
which: which topics to return.
711
'all' means topics by `cobot_topics` and `cobot_dialogact_topics`,
712
'cobot_topics' means topics by `cobot_topics`,
713
'cobot_dialogact_topics' means topics by `cobot_dialogact_topics`.
714
'deeppavlov_topics' means topics by `deeppavlov_topics`.
717
list of topic labels, if probs == False,
718
dictionary where all keys are topic labels and values are probabilities, if probs == True
720
default_probs = {} if default_probs is None else default_probs
721
default_labels = [] if default_labels is None else default_labels
722
annotations = annotated_utterance.get("annotations", {})
723
cobot_topics_probs, cobot_topics_labels = {}, []
724
if "cobot_topics" in annotations:
725
cobot_topics_labels = _process_text(annotations.get("cobot_topics", {}))
726
if "combined_classification" in annotations and not cobot_topics_labels:
727
cobot_topics_probs, cobot_topics_labels = _get_combined_annotations(
728
annotated_utterance, model_name="cobot_topics"
730
cobot_topics_labels = _process_text(cobot_topics_labels)
731
if not cobot_topics_probs:
732
cobot_topics_probs = _labels_to_probs(cobot_topics_labels, combined_classes.get("cobot_topics", {}))
734
cobot_da_topics_probs, cobot_da_topics_labels = {}, []
735
if "cobot_dialogact" in annotations and "topics" in annotations["cobot_dialogact"]:
736
cobot_da_topics_labels = annotations["cobot_dialogact"]["topics"]
737
elif "cobot_dialogact_topics" in annotations:
738
cobot_da_topics_labels = annotations["cobot_dialogact_topics"]
740
if "combined_classification" in annotations and not cobot_da_topics_labels:
741
cobot_da_topics_probs, cobot_da_topics_labels = _get_combined_annotations(
742
annotated_utterance, model_name="cobot_dialogact_topics"
744
cobot_da_topics_labels = _process_text(cobot_da_topics_labels)
745
if not cobot_da_topics_probs:
746
cobot_da_topics_probs = _labels_to_probs(cobot_da_topics_labels, combined_classes["cobot_dialogact_topics"])
748
dp_topics_probs, dp_topics_labels = {}, []
749
if "combined_classification" in annotations and not dp_topics_labels:
750
dp_topics_probs, dp_topics_labels = _get_combined_annotations(
751
annotated_utterance, model_name="deeppavlov_topics"
753
topics_ru_probs, topics_ru_labels = {}, []
754
if "topics_ru" in annotations:
755
topics_ru_probs, topics_ru_labels = _get_combined_annotations(annotated_utterance, model_name="topics_ru")
757
answer_labels = cobot_topics_labels + cobot_da_topics_labels + dp_topics_labels + topics_ru_labels
758
answer_probs = {**cobot_topics_probs, **cobot_da_topics_probs, **dp_topics_probs, **topics_ru_probs}
759
elif which == "cobot_topics":
760
answer_probs, answer_labels = cobot_topics_probs, cobot_topics_labels
761
elif which == "cobot_dialogact_topics":
762
answer_probs, answer_labels = cobot_da_topics_probs, cobot_da_topics_labels
763
elif which == "deeppavlov_topics":
764
answer_probs, answer_labels = dp_topics_probs, dp_topics_labels
765
elif which == "topics_ru":
766
answer_probs, answer_labels = topics_ru_probs, topics_ru_labels
768
logger.exception(f"Unknown input type in get_topics: {which}")
769
answer_probs, answer_labels = default_probs, default_labels
777
def get_intents(annotated_utterance, probs=False, default_probs=None, default_labels=None, which="all"):
778
"""Function to get intents from particular annotator or all detected.
780
annotated_utterance: dictionary with annotated utterance
781
probs: if False we return labels, otherwise we return probs
782
default_probs: default probabilities to return
783
default_labels: default labels to return
784
which: which intents to return:
785
'all' means intents detected by `intent_catcher`,
786
`cobot_dialogact_intents` and `midas_classification`.
787
'intent_catcher' means intents detected by `intent_catcher`.
788
'cobot_dialogact_intents' means intents detected by `cobot_dialogact_intents`.
789
'midas' means intents detected by `midas_classification`.
791
list of intent labels, if probs == False,
792
dictionary where all keys are intent labels and values are probabilities, if probs == True
794
default_probs = {} if default_probs is None else default_probs
795
default_labels = [] if default_labels is None else default_labels
796
annotations = annotated_utterance.get("annotations", {})
797
intents = annotations.get("intent_catcher", {})
798
detected_intents = [k for k, v in intents.items() if v.get("detected", 0) == 1]
799
detected_intent_probs = {key: 1 for key in detected_intents}
800
midas_intent_probs = annotations.get("midas_classification", {})
801
if "combined_classification" in annotations and not midas_intent_probs:
802
midas_intent_probs, midas_intent_labels = _get_combined_annotations(
803
annotated_utterance, model_name="midas_classification"
805
if isinstance(midas_intent_probs, dict) and midas_intent_probs:
806
semantic_midas_probs = {k: v for k, v in midas_intent_probs.items() if k in MIDAS_SEMANTIC_LABELS}
807
functional_midas_probs = {k: v for k, v in midas_intent_probs.items() if k in MIDAS_FUNCTIONAL_LABELS}
808
if semantic_midas_probs:
809
max_midas_semantic_prob = max(semantic_midas_probs.values())
811
max_midas_semantic_prob = 0.0
812
if functional_midas_probs:
813
max_midas_functional_prob = max(functional_midas_probs.values())
815
max_midas_functional_prob = 0.0
817
midas_semantic_intent_labels = [k for k, v in semantic_midas_probs.items() if v == max_midas_semantic_prob]
818
midas_functional_intent_labels = [
819
k for k, v in functional_midas_probs.items() if v == max_midas_functional_prob
821
midas_intent_labels = midas_semantic_intent_labels + midas_functional_intent_labels
822
elif isinstance(midas_intent_probs, list):
823
if midas_intent_probs:
824
# now it's a list of dictionaries. length of list is n sentences
825
midas_intent_labels = []
826
for midas_sent_probs in midas_intent_probs:
827
max_midas_sent_prob = max(midas_sent_probs.values())
828
midas_intent_labels += [k for k, v in midas_sent_probs.items() if v == max_midas_sent_prob]
829
_midas_intent_probs = deepcopy(midas_intent_probs)
830
midas_intent_probs = {}
831
class_names = list(set(sum([list(resp.keys()) for resp in _midas_intent_probs], [])))
832
for class_name in class_names:
833
max_proba = max([resp.get(class_name, 0.0) for resp in _midas_intent_probs])
834
midas_intent_probs[class_name] = max_proba
836
midas_intent_probs = {}
837
midas_intent_labels = []
839
midas_intent_labels = []
840
cobot_da_intent_probs, cobot_da_intent_labels = {}, []
842
if "cobot_dialogact" in annotations and "intents" in annotations["cobot_dialogact"]:
843
cobot_da_intent_labels = annotated_utterance["annotations"]["cobot_dialogact"]["intents"]
844
elif "cobot_dialogact_intents" in annotations:
845
cobot_da_intent_labels = annotated_utterance["annotations"]["cobot_dialogact_intents"]
847
if "combined_classification" in annotations and not cobot_da_intent_labels:
848
cobot_da_intent_probs, cobot_da_intent_labels = _get_combined_annotations(
849
annotated_utterance, model_name="cobot_dialogact_intents"
852
cobot_da_intent_labels = _process_text(cobot_da_intent_labels)
853
if not cobot_da_intent_probs:
854
cobot_da_intent_probs = _labels_to_probs(cobot_da_intent_labels, combined_classes["cobot_dialogact_intents"])
857
answer_probs = {**detected_intent_probs, **cobot_da_intent_probs, **midas_intent_probs}
858
answer_labels = detected_intents + cobot_da_intent_labels + midas_intent_labels
859
elif which == "intent_catcher":
860
answer_probs, answer_labels = detected_intent_probs, detected_intents
861
elif which == "cobot_dialogact_intents":
862
answer_probs, answer_labels = cobot_da_intent_probs, cobot_da_intent_labels
863
elif which == "midas":
864
answer_probs, answer_labels = midas_intent_probs, midas_intent_labels
866
logger.warning(f"Unknown type in get_intents {which}")
867
answer_probs, answer_labels = default_probs, default_labels
875
COBOT_ENTITIES_SKIP_LABELS = ["anaphor"]
878
def get_entities(annotated_utterance, only_named=False, with_labels=False, return_lemmas=False):
881
if "entity_detection" in annotated_utterance.get("annotations", {}):
882
# for english and russian languages
883
labelled_entities = annotated_utterance["annotations"]["entity_detection"].get("labelled_entities", [])
885
entities = [ent for ent in labelled_entities if ent["label"] not in COBOT_ENTITIES_SKIP_LABELS]
887
entities = [ent["text"] for ent in entities]
888
elif "spacy_nounphrases" in annotated_utterance.get("annotations", {}):
889
# for english language
890
entities = annotated_utterance.get("annotations", {}).get("spacy_nounphrases", [])
892
# actually there are no labels for cobot nounphrases
893
# so, let's make it as for cobot_entities format
894
entities = [{"text": ent, "label": "misc"} for ent in entities]
895
elif "spacy_annotator" in annotated_utterance.get("annotations", {}):
896
# for russian language
897
words = annotated_utterance["annotations"]["spacy_annotator"]
899
if word.get("pos_", "") == "NOUN":
900
entities += [{"text": word["lemma_"] if return_lemmas else word["text"], "label": "misc"}]
902
entities = [ent["text"] for ent in entities]
904
# `ner` contains list of lists of dicts. the length of the list is n-sentences
905
# each entity is {"confidence": 1, "end_pos": 1, "start_pos": 0, "text": "unicorns", "type": "ORG"}
906
entities = annotated_utterance.get("annotations", {}).get("ner", [])
907
entities = sum(entities, []) # flatten list, now it's a list of dicts-entities
909
entities = [ent["text"] for ent in entities]
910
return entities if entities is not None else []
913
def get_named_persons(annotated_utterance):
914
named_entities = get_entities(annotated_utterance, only_named=True, with_labels=True)
915
all_entities = get_entities(annotated_utterance, only_named=False, with_labels=True)
918
if "cobot_entities" in annotated_utterance["annotations"]:
919
for ent in all_entities:
920
if ent["label"] == "person":
921
named_persons.append(ent["text"])
922
if "ner" in annotated_utterance["annotations"]:
923
for ent in named_entities:
924
if ent["type"] == "PER":
925
named_persons.append(ent["text"])
927
named_persons = list(set(named_persons))
932
def get_named_locations(annotated_utterance):
933
named_entities = get_entities(annotated_utterance, only_named=True, with_labels=True)
934
all_entities = get_entities(annotated_utterance, only_named=False, with_labels=True)
937
if "cobot_entities" in annotated_utterance["annotations"]:
938
for ent in all_entities:
939
if ent["label"] == "location":
940
named_locations.append(ent["text"])
941
if len(named_locations) == 0 and "ner" in annotated_utterance["annotations"]:
942
for ent in named_entities:
943
if ent["type"] == "LOC" and ent["text"] != "alexa":
944
_is_part_of_other_entity = False
945
for cobot_ent in all_entities:
946
if ent["text"] in cobot_ent["text"] and cobot_ent["label"] != "location":
947
_is_part_of_other_entity = True
948
if not _is_part_of_other_entity:
949
named_locations.append(ent["text"])
951
named_locations = list(set(named_locations))
952
if re.search(r"\bjapan\b", annotated_utterance["text"], re.IGNORECASE) and "japan" not in named_locations:
953
# NER does not catch this country at all!
954
named_locations.append("japan")
956
return named_locations
959
def get_raw_entity_names_from_annotations(annotations):
963
annotated_utterance: annotated utterance
966
Wikidata entities we received from annotations
968
raw_el_output = annotations.get("entity_linking", [{}])
972
if isinstance(raw_el_output[0], dict):
973
entities = raw_el_output[0].get("entity_ids", [])
974
if isinstance(raw_el_output[0], list):
975
entities = raw_el_output[0][0]
976
except Exception as e:
977
error_message = f"Wrong entity linking output format {raw_el_output} : {e}"
978
sentry_sdk.capture_exception(e)
979
logger.exception(error_message)
983
def get_entity_names_from_annotations(annotated_utterance, stopwords=None, default_entities=None):
987
annotated_utterance: annotated utterance
988
stopwords_file: name of file with stopwords
991
Names of named entities we received from annotations
993
default_entities = [] if default_entities is None else default_entities
994
stopwords = stopwords if stopwords else []
995
full_text = annotated_utterance.get("text", "").lower()
996
named_entities = [full_text] if full_text in default_entities else []
997
annotations = annotated_utterance.get("annotations", {})
998
for tmp in annotations.get("ner", []):
999
if tmp and "text" in tmp[0]:
1000
named_entities.append(tmp[0]["text"])
1001
for nounphrase in annotations.get("spacy_nounphrases", []):
1002
named_entities.append(nounphrase)
1003
for wikiparser_dict in annotations.get("wiki_parser", [{}]):
1004
for wiki_entity_name in wikiparser_dict:
1005
named_entities.append(wiki_entity_name)
1008
for entity in named_entities
1009
if any([len(ent_word) >= 5 or ent_word not in stopwords for ent_word in entity.split(" ")])
1011
named_entities = list(set(named_entities))
1012
# remove entities which are is either too short or stopword
1013
return named_entities
1016
def entity_to_label(entity):
1020
entity: Wikidata entity for which we need to receive the label
1021
If should be string, with first letter Q and other from 0 to 9, like Q5321
1025
label: label from this entity.
1026
If entity is in wrong format we assume that it is already label but give exception
1029
logger.debug(f"Calling entity_to_label for {entity}")
1030
no_entity = not entity
1031
wrong_entity_type = not isinstance(entity, str)
1032
wrong_entity_format = entity and (entity[0] != "Q" or any([j not in "0123456789" for j in entity[1:]]))
1033
if no_entity or wrong_entity_type or wrong_entity_format:
1034
warning_text = f"Wrong entity format. We assume {entity} to be label but check the code"
1035
sentry_sdk.capture_exception(Exception(warning_text))
1036
logger.exception(warning_text)
1039
labels = request_triples_wikidata("find_label", [(entity, "")])
1042
if sep in labels[0]:
1043
label = labels[0].split('"')[1]
1046
logger.debug(f"Answer {label}")
1047
except Exception as e:
1048
sentry_sdk.capture_exception(e)
1049
logger.exception(Exception(e, "Exception in conversion of labels {labels}"))
1053
def get_types_from_annotations(annotations, types, tocheck_relation="occupation"):
1057
annotations: annotations of utterance
1058
types: types from which we need to find one
1059
or ( if exclude_types is True) to find type not included in the list, if it is the entity of given type
1060
tocheck_relation: relation we want to check
1061
exclude_types: if False we look for matching types, otherwise we look for excluding types
1064
name of entity, name of type found, raw name of type found
1066
wp_annotations = annotations.get("wiki_parser", {})
1067
if isinstance(wp_annotations, list) and wp_annotations: # support 2 different formats
1068
wp_annotations = wp_annotations[0]
1070
topic_entities = wp_annotations.get("topic_skill_entities_info", {})
1071
for entity in topic_entities:
1072
for relation in topic_entities[entity]:
1073
if relation == tocheck_relation:
1074
type_to_typename = {j[0]: j[1] for j in topic_entities[entity][relation]}
1075
found_types = type_to_typename.keys()
1076
matching_types = [type_to_typename[k] for k in set(found_types) & set(types)]
1077
mismatching_types = [type_to_typename[k] for k in found_types if k not in types]
1079
return entity, matching_types, mismatching_types
1080
logger.warning("Relation to check not found")
1081
except Exception as e:
1082
sentry_sdk.capture_exception(e)
1083
logger.exception(Exception(e, f"Exception in processing wp annotations {wp_annotations}"))
1084
return None, None, None
1087
ANYTHING_EXCEPT_OF_LETTERS_AND_SPACE_COMPILED = re.compile(r"[^a-zA-Z ]")
1088
ANYTHING_EXCEPT_OF_LETTERS_SPACE_AND_PUNCT_COMPILED = re.compile(r"[^a-zA-Z\,\.\?\!\- ]")
1089
MULTI_SPACE_COMPILED = re.compile(r"\s+")
1092
def clean_entities(entities):
1093
entities = [entity.lower() for entity in entities]
1094
entities = [re.sub(ANYTHING_EXCEPT_OF_LETTERS_AND_SPACE_COMPILED, " ", entity) for entity in entities]
1095
entities = [re.sub(MULTI_SPACE_COMPILED, " ", entity).strip() for entity in entities]
1096
entities = [entity.split() for entity in entities] # now it's a list of lists of strings
1097
entities = sum(entities, []) # flatten list
1101
def get_common_tokens_in_lists_of_strings(list_of_strings_0, list_of_strings_1):
1103
Clean strings removing anything except of letters and spaces, split every string to tokens by spaces,
1104
find common tokens for two lists of strings.
1106
list_of_strings_0 = deepcopy(list_of_strings_0)
1107
list_of_strings_1 = deepcopy(list_of_strings_1)
1109
list_of_strings_0 = clean_entities(list_of_strings_0)
1110
list_of_strings_1 = clean_entities(list_of_strings_1)
1112
common_substrings = list(set(list_of_strings_0).intersection(set(list_of_strings_1)))
1114
return common_substrings
1117
SYMBOLS_EXCEPT_LETTERS_AND_DIGITS = re.compile(r"[^a-zA-Zа-яА-ЯёЁ0-9\-_ ]")
1118
DOUBLE_SPACES = re.compile(r"\s+")
1121
def replace_symbols_except_letters_and_digits(s):
1122
s = SYMBOLS_EXCEPT_LETTERS_AND_DIGITS.sub(" ", s)
1123
s = DOUBLE_SPACES.sub(" ", s).strip()
1127
def remove_punctuation_from_dict_keys(element):
1128
if isinstance(element, dict):
1130
for dict_key, value in element.items():
1131
if isinstance(value, dict) or isinstance(value, list):
1132
new_value = remove_punctuation_from_dict_keys(value)
1133
new_element[replace_symbols_except_letters_and_digits(dict_key)] = deepcopy(new_value)
1135
new_element[replace_symbols_except_letters_and_digits(dict_key)] = deepcopy(value)
1137
elif isinstance(element, list):
1139
for sub_element in element:
1140
if isinstance(sub_element, dict) or isinstance(sub_element, list):
1141
new_sub_element = remove_punctuation_from_dict_keys(sub_element)
1142
new_element += [new_sub_element]
1144
new_element += [sub_element]
1150
PERSONAL_PRONOUNS = re.compile(
1151
r"\b(i|you|he|she|it|we|they|me|my|him|her|us|them|its|mine|your|yours|his|hers|ours|theirs|myself|yourself|himself"
1152
r"|herself|itself|ourselves|themselves|their)\b",
1157
def find_first_complete_sentence(sentences):
1158
"""Find first sentence without any personal pronouns."""
1159
for sent in sentences:
1160
if PERSONAL_PRONOUNS.search(sent):
1167
def is_toxic_utterance(annotated_utterance):
1168
toxic_result = get_toxic(annotated_utterance, probs=False)
1169
toxic_result = [] if "not_toxic" in toxic_result else toxic_result
1170
# now toxic_result is empty if not toxic utterance
1171
return True if len(toxic_result) > 0 else False
1174
def is_badlisted_utterance(annotated_utterance):
1175
default_badlist = {"bad_words": False}
1176
badlist_result = annotated_utterance.get("annotations", {}).get("badlisted_words", default_badlist)
1178
return any([badlist_result.get(bad, False) for bad in ["bad_words", "inappropriate", "profanity"]])
1181
def is_toxic_or_badlisted_utterance(annotated_utterance):
1182
return is_toxic_utterance(annotated_utterance) or is_badlisted_utterance(annotated_utterance)
1185
FACTOID_PATTERNS = re.compile(
1186
r"^(do you know |((can |could )you )tell me )?(please )?"
1187
r"((what|who|which|where) (is|are|was|were)\b|how to\b|when)",
1190
COUNTER_FACTOID_PATTERNS = re.compile(r"^(what|who|which|where) (is|are|was|were)( that|[\.\?]$)\b", re.IGNORECASE)
1193
def is_special_factoid_question(annotated_utterance):
1194
uttr_text = annotated_utterance.get("text", "")
1195
found = FACTOID_PATTERNS.search(uttr_text)
1196
if found and not COUNTER_FACTOID_PATTERNS.search(uttr_text):
1197
# remove first question like part
1198
rest_string = uttr_text[uttr_text.find(found[0]) + len(found[0]) :].strip()
1199
if PERSONAL_PRONOUNS.search(rest_string):
1200
# if any personal pronouns - not our case
1206
FACTS_EXTRA_WORDS = re.compile(
1207
r"(this might answer your question[:\,]? "
1208
r"|(according to|from) (wikipedia|wikihow)[:\,]? "
1209
r"|here's (something|what) I found (from|on) [a-zA-Z0-9\-\.]+:"
1210
r"|here's a fact about [a-zA-Z0-9\- \,]+\.)",
1215
def get_dialog_breakdown_annotations(annotated_utterance):
1216
breakdown = annotated_utterance.get("annotations", {}).get("dialog_breakdown", {}).get("breakdown", 0.0) > 0.5
1220
def get_comet_conceptnet_annotations(annotated_utterance):
1222
COMeT ConceptNet annotations look like (entity names are keys):
1225
'SymbolOf': ['space', 'time', 'justice'],
1226
'HasProperty': ['number', 'one of six number', 'long'],
1227
'Causes': ['death', 'death and destruction', 'war'],
1228
'CausesDesire': ['go to work', 'go to play', 'sleep']
1232
if "comet_conceptnet" in annotated_utterance.get("annotations", {}):
1233
return annotated_utterance.get("annotations", {}).get("comet_conceptnet", {})
1234
elif "conceptnet" in annotated_utterance.get("annotations", {}):
1235
return annotated_utterance.get("annotations", {}).get("conceptnet", {})
1241
def __init__(self, topic_group=None, detecting_regexp=None, detecting_function=None):
1242
self.topic_group = topic_group
1243
self.detecting_regexp = detecting_regexp
1244
self.detecting_function = detecting_function
1246
def detect(self, annotated_utterance, only_one_topic=True, threshold=0.1, which="all"):
1248
found_topics = get_topics(annotated_utterance, probs=False, which=which)
1250
found_probs = get_topics(annotated_utterance, probs=True, which=which)
1251
found_topics = [key for key in found_probs if found_probs[key] > threshold]
1252
if any([target_topic in found_topics for target_topic in self.topic_group]):
1254
elif self.detecting_regexp is not None:
1255
if re.findall(self.detecting_regexp, annotated_utterance["text"]):
1257
elif self.detecting_function is not None: # Support for non-regexp methods
1258
if self.detecting_function(annotated_utterance):
1264
"food": Topic(TOPIC_GROUPS["food"], food.FOOD_COMPILED_PATTERN),
1265
"books": Topic(TOPIC_GROUPS["books"], books.BOOK_PATTERN),
1266
"music": Topic(TOPIC_GROUPS["music"], music.MUSIC_COMPILED_PATTERN),
1267
"news": Topic(TOPIC_GROUPS["news"], news.NEWS_COMPILED_PATTERN),
1268
"politics": Topic(TOPIC_GROUPS["politics"]),
1269
"sports": Topic(TOPIC_GROUPS["sports"], detecting_function=sport.about_sport),
1270
"religion": Topic(TOPIC_GROUPS["religion"]),
1271
"movies": Topic(TOPIC_GROUPS["movies"], movies.MOVIE_COMPILED_PATTERN),
1272
"fashion": Topic(TOPIC_GROUPS["fashion"]),
1273
"travel": Topic(TOPIC_GROUPS["travel"], travel.TRAVELLING_TEMPLATE),
1274
"celebrities": Topic(TOPIC_GROUPS["celebrities"], gossip.GOSSIP_COMPILED_PATTERN),
1275
"art": Topic(TOPIC_GROUPS["art"], art.ART_PATTERN),
1276
"science": Topic(TOPIC_GROUPS["science"], science.SCIENCE_COMPILED_PATTERN),
1277
"entertainment": Topic(TOPIC_GROUPS["entertainment"]),
1278
"games": Topic(TOPIC_GROUPS["games"], gaming.VIDEO_GAME_WORDS_COMPILED_PATTERN),
1279
"animals": Topic(TOPIC_GROUPS["animals"], animals.ANIMALS_FIND_TEMPLATE),
1280
"sex": Topic(TOPIC_GROUPS["sex"]),
1281
"weather": Topic(TOPIC_GROUPS["weather"]),
1282
} # The list can be expanded according to the topic list supported
1285
def is_about(topic_name, annotated_utterance, **kwargs):
1286
return TOPICS[topic_name].detect(annotated_utterance, **kwargs)