-
Notifications
You must be signed in to change notification settings - Fork 49
/
run_lcqmc_similarity.py
139 lines (112 loc) · 4.19 KB
/
run_lcqmc_similarity.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
from bert4keras.backend import keras, set_gelu
from bert4keras.tokenizers import Tokenizer
from bert4keras.models import build_transformer_model
from bert4keras.optimizers import Adam
from bert4keras.snippets import sequence_padding, DataGenerator
from keras.layers import Dropout, Dense
import tensorflow as tf
import csv
import numpy as np
set_gelu('tanh') # 切换gelu版本
maxlen = 128
batch_size = 32
config_path = 'bert/chinese_L-12_H-768_A-12/bert_config.json'
checkpoint_path = 'bert/chinese_L-12_H-768_A-12/bert_model.ckpt'
dict_path = 'bert/chinese_L-12_H-768_A-12/vocab.txt'
def read_tsv(input_file, quotechar=None):
"""Reads a tab separated value file."""
with tf.gfile.Open(input_file, "r") as f:
next(f)
reader = csv.reader(f, delimiter="\t", quotechar=quotechar)
lines = []
for line in reader:
text1, text2, label = line[0], line[1], line[2]
lines.append((text1, text2, int(label)))
return lines
# 加载数据集
train_data = read_tsv('data/lcqmc/lcqmc_train.tsv')
valid_data = read_tsv('data/lcqmc/lcqmc_dev.tsv')
test_data = read_tsv('data/lcqmc/lcqmc_test.tsv')
# 建立分词器
tokenizer = Tokenizer(dict_path, do_lower_case=True)
class data_generator(DataGenerator):
"""数据生成器
"""
def __iter__(self, random=False):
batch_token_ids, batch_segment_ids, batch_labels = [], [], []
for is_end, (text1, text2, label) in self.sample(random):
token_ids, segment_ids = tokenizer.encode(
text1, text2, maxlen=maxlen
)
batch_token_ids.append(token_ids)
batch_segment_ids.append(segment_ids)
batch_labels.append([label])
if len(batch_token_ids) == self.batch_size or is_end:
batch_token_ids = sequence_padding(batch_token_ids)
batch_segment_ids = sequence_padding(batch_segment_ids)
batch_labels = sequence_padding(batch_labels)
yield [batch_token_ids, batch_segment_ids], batch_labels
batch_token_ids, batch_segment_ids, batch_labels = [], [], []
# 加载预训练模型
bert = build_transformer_model(
config_path=config_path,
checkpoint_path=checkpoint_path,
with_pool=True,
return_keras_model=False,
)
output = Dropout(rate=0.1)(bert.model.output)
output = Dense(
units=2, activation='softmax', kernel_initializer=bert.initializer
)(output)
model = keras.models.Model(bert.model.input, output)
model.summary()
model.compile(
loss='sparse_categorical_crossentropy',
optimizer=Adam(2e-5),
metrics=['accuracy'],
)
# 转换数据集
train_generator = data_generator(train_data, batch_size)
valid_generator = data_generator(valid_data, batch_size)
test_generator = data_generator(test_data, batch_size)
def evaluate(data):
total, right = 0., 0.
for x_true, y_true in data:
y_pred = model.predict(x_true).argmax(axis=1)
y_true = y_true[:, 0]
total += len(y_true)
right += (y_true == y_pred).sum()
return right / total
class Evaluator(keras.callbacks.Callback):
def __init__(self):
self.best_val_acc = 0.
def on_epoch_end(self, epoch, logs=None):
val_acc = evaluate(valid_generator)
if val_acc > self.best_val_acc:
self.best_val_acc = val_acc
model.save_weights('best_model.weights')
test_acc = evaluate(test_generator)
print(
u'val_acc: %.5f, best_val_acc: %.5f, test_acc: %.5f\n' %
(val_acc, self.best_val_acc, test_acc)
)
evaluator = Evaluator()
model.fit_generator(
train_generator.forfit(),
steps_per_epoch=len(train_generator),
epochs=3,
callbacks=[evaluator]
)
model.load_weights('best_model.weights')
print(u'final test acc: %05f\n' % (evaluate(test_generator)))
# 测试单条数据
student_answer = '需要判断市场震荡幅度'
standard_answer = '市场震荡幅度'
lines = []
lines.append((student_answer, standard_answer, None))
test_D = data_generator(lines, batch_size)
results = model.predict_generator(test_D.__iter__(), steps=len(test_D), verbose=1)
max_score = max(results[0])
print(max_score)
label = np.argwhere(results[0] == max_score)[0][0]
print(label)