dream

Форк
0
217 строк · 8.6 Кб
1
#!/usr/bin/env python
2

3
import tensorflow as tf
4
import numpy as np
5
import random
6
import pandas as pd
7
from itertools import chain
8
from tqdm import tqdm
9
from xeger import Xeger
10
from sklearn.metrics import precision_recall_curve
11

12

13
def tb_accuracy(y_true, y_pred):
14
    y_true = tf.math.argmax(y_true, dimension=1)
15
    y_pred = tf.math.argmax(y_pred, dimension=1)
16
    return tf.keras.metrics.Accuracy()(y_true, y_pred)
17

18

19
def tb_f1(y_true, y_pred):
20
    precision = tf.keras.metrics.Precision()(y_true, y_pred)
21
    recall = tf.keras.metrics.Recall()(y_true, y_pred)
22
    return 2 * ((precision * recall) / (precision + recall + tf.keras.backend.epsilon()))
23

24

25
def multilabel_precision(y_true, y_pred):
26
    """
27
    Macro-precision, with thresholds defined by by argmax F1
28
    """
29
    values = list()
30
    for i in range(y_true.get_shape()[1]):
31
        pr, rec, thresholds = precision_recall_curve(y_true[:, i], y_pred[:, i])
32
        f1 = 2.0 * pr * rec / (pr + rec)
33
        values.append(pr[np.argmax(f1)])
34
    return np.mean(values)
35

36

37
def multilabel_recall(y_true, y_pred):
38
    """
39
    Macro-recall, with thresholds defined by argmax F1
40
    """
41
    values = list()
42
    for i in range(y_true.get_shape()[1]):
43
        pr, rec, thresholds = precision_recall_curve(y_true[:, i], y_pred[:, i])
44
        f1 = 2.0 * pr * rec / (pr + rec)
45
        values.append(rec[np.argmax(f1)])
46
    return np.mean(values)
47

48

49
def multilabel_f1(y_true, y_pred):
50
    """
51
    Macro-F1, with thresholds defined by argmax F1
52
    """
53
    values = list()
54
    for i in range(y_true.shape[1]):
55
        pr, rec, thresholds = precision_recall_curve(y_true[:, i], y_pred[:, i])
56
        f1 = 2.0 * pr * rec / (pr + rec)
57
        values.append(np.max(f1))
58
    return np.mean(values)
59

60

61
def calculate_metrics(intents_min_pr, y_true, y_pred):
62
    intent_data = dict()
63
    for i, intent in enumerate(intents_min_pr):
64
        pr, rec, thresholds = precision_recall_curve(y_true[:, i], y_pred[:, i])
65
        f1 = 2.0 * pr * rec / (pr + rec)
66
        indx = np.argwhere(pr > intents_min_pr[intent]).reshape(-1)
67
        # Argmax F1(threshold) where precision is greater than smth
68
        indx = indx[np.argmax(f1[indx])]
69
        intent_data[intent] = {
70
            "threshold": thresholds[indx],
71
            "precision": pr[indx],
72
            "recall": rec[indx],
73
            "f1": f1[indx],
74
        }
75
    return intent_data
76

77

78
def generate_phrases(template_re, punctuation, limit=2500):
79
    x = Xeger(limit=limit)
80
    phrases = []
81
    for regex in template_re:
82
        try:
83
            phrases += list({x.xeger(regex) for _ in range(limit)})
84
        except Exception as e:
85
            print(e)
86
            print(regex)
87
            raise e
88
    phrases = [phrases] + [[phrase + punct for phrase in phrases] for punct in punctuation]
89
    return list(chain.from_iterable(phrases))
90

91

92
def get_linear_classifier(intents, input_dim=512, dense_layers=1, use_metrics=True, multilabel=False):
93
    if multilabel:
94
        units = len(intents)
95
        activation = "sigmoid"
96
        metrics = [] if not use_metrics else ["binary_crossentropy"]
97
    else:
98
        units = len(intents) + 1
99
        activation = "softmax"
100
        metrics = (
101
            [] if not use_metrics else [tb_accuracy, tf.keras.metrics.Precision(), tf.keras.metrics.Recall(), tb_f1]
102
        )
103
    model = [
104
        tf.keras.layers.Dense(units=256, activation="relu", input_dim=input_dim if i == 0 else 256)
105
        for i in range(dense_layers)
106
    ]  # Hidden dense layers
107
    model += [
108
        tf.keras.layers.Dense(units=units, activation=activation, input_dim=input_dim if not len(model) else 256)
109
    ]  # Output layer
110
    model = tf.keras.Sequential(model)
111
    model.compile(
112
        optimizer=tf.keras.optimizers.Adam(),
113
        loss="categorical_crossentropy" if not multilabel else "binary_crossentropy",
114
        metrics=metrics,
115
    )
116
    return model
117

118

119
def train_test_split(full_length, punct_num, train_size):
120
    original_length = full_length // (punct_num + 1)
121
    # Number of original phrases
122
    train_length = int(original_length * train_size)
123

124
    # Getting indexies
125
    train_idx = random.sample(list(range(original_length)), train_length)
126
    test_idx = list(set(range(original_length)) - set(train_idx))
127

128
    # Upsampling
129
    # train_length = max(train_length, 800)
130
    # test_length = max(test_length, 800)
131
    # train_idx = np.random.choice(train_idx, train_length)
132
    # test_idx = np.random.choice(test_idx, test_length)
133

134
    # With punctuation variants
135
    train_idx = list(chain.from_iterable([[i + original_length * p for i in train_idx] for p in range(punct_num + 1)]))
136
    test_idx = list(chain.from_iterable([[i + original_length * p for i in test_idx] for p in range(punct_num + 1)]))
137
    return train_idx, test_idx
138

139

140
def get_train_test_data(data, intents, random_phrases_embeddings, multilabel=False, train_size=0.8):
141
    train_data = {"X": [], "y": []}
142
    test_data = {"X": [], "y": []}
143
    num_classes = len(intents) + 1 if not multilabel else len(intents)
144
    for i, intent in enumerate(intents):
145
        train_idx, test_idx = train_test_split(
146
            len(data[intent]["embeddings"]), data[intent]["num_punctuation"], train_size=train_size
147
        )
148
        train = np.array(data[intent]["embeddings"])[train_idx]
149
        test = np.array(data[intent]["embeddings"])[test_idx]
150
        train_data["X"].append(train)
151
        train_data["y"].append([[1.0 if j == i else 0.0 for j in range(num_classes)] for _ in range(len(train))])
152
        test_data["X"].append(test)
153
        test_data["y"].append([[1.0 if j == i else 0.0 for j in range(num_classes)] for _ in range(len(test))])
154

155
    train_data["X"].append(random_phrases_embeddings)
156
    train_data["y"].append(
157
        [[1.0 if j == len(intents) else 0.0 for j in range(num_classes)] for _ in range(len(random_phrases_embeddings))]
158
    )
159

160
    train_data["X"] = np.concatenate(train_data["X"])
161
    test_data["X"] = np.concatenate(test_data["X"])
162
    train_data["y"] = np.concatenate(train_data["y"])
163
    test_data["y"] = np.concatenate(test_data["y"])
164
    return train_data, test_data
165

166

167
def get_train_data(data, intents, random_phrases_embeddings, multilabel=False):
168
    train_data = {"X": [], "y": []}
169
    num_classes = len(intents) + 1 if not multilabel else len(intents)
170
    for i, intent in enumerate(intents):
171
        train = np.array(data[intent]["embeddings"])
172
        train_data["X"].append(train)
173
        train_data["y"].append([[1.0 if j == i else 0.0 for j in range(num_classes)] for _ in range(len(train))])
174

175
    train_data["X"].append(random_phrases_embeddings)
176
    train_data["y"].append(
177
        [[1.0 if j == len(intents) else 0.0 for j in range(num_classes)] for _ in range(len(random_phrases_embeddings))]
178
    )
179

180
    train_data["X"] = np.concatenate(train_data["X"])
181
    train_data["y"] = np.concatenate(train_data["y"])
182
    return train_data
183

184

185
def score_model(
186
    data, intents, random_phrases_embeddings, samples=20, dense_layers=1, train_size=0.5, epochs=80, multilabel=False
187
):
188
    metrics = {intent: {"precision": [], "recall": [], "f1": [], "threshold": []} for intent in intents}
189
    intents_min_pr = {intent: v["min_precision"] for intent, v in data.items()}
190
    for _ in tqdm(range(samples)):
191
        model = get_linear_classifier(intents=intents, dense_layers=dense_layers, multilabel=multilabel)
192
        train_data, test_data = get_train_test_data(
193
            data, intents, random_phrases_embeddings, multilabel=multilabel, train_size=train_size
194
        )
195
        model.fit(x=train_data["X"], y=train_data["y"], epochs=epochs, verbose=0)
196

197
        current_metrics = calculate_metrics(intents_min_pr, test_data["y"], model.predict(test_data["X"]))
198
        for intent in current_metrics:
199
            for metric_name in current_metrics[intent]:
200
                metrics[intent][metric_name].append(current_metrics[intent][metric_name])
201
    for intent in intents:
202
        precision = (np.mean(metrics[intent]["precision"]), np.std(metrics[intent]["precision"]))
203
        recall = (np.mean(metrics[intent]["recall"]), np.std(metrics[intent]["recall"]))
204
        f1 = (np.mean(metrics[intent]["f1"]), np.std(metrics[intent]["f1"]))
205
        threshold = (np.mean(metrics[intent]["threshold"]), np.std(metrics[intent]["threshold"]))
206
        message = (
207
            f"\nIntent: {intent}\n"
208
            + f"PRECISION: {precision[0]}±{precision[1]}\n"
209
            + f"RECALL: {recall[0]}±{recall[1]}\n"
210
            + f"F1: {f1[0]}±{f1[1]}\n"
211
            + f"Threshold: {threshold[0]}±{threshold[1]}\n\n"
212
        )
213
        print(message)
214
    metrics = {intent: {metric: np.mean(metrics[intent][metric]) for metric in metrics[intent]} for intent in metrics}
215
    thresholds = {intent: float(np.mean(metrics[intent]["threshold"])) for intent in metrics}
216
    metrics = pd.DataFrame.from_dict(metrics)
217
    return metrics, thresholds
218

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.