forked from scrapinghub/article-extraction-benchmark
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathevaluate.py
executable file
·196 lines (162 loc) · 6.71 KB
/
evaluate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
#!/usr/bin/env python3
import argparse
from collections import Counter
import json
from pathlib import Path
import random
import re
import statistics
from typing import Any, Dict, Tuple, List
def main():
""" Perform evaluation for all ``output/*.json`` files,
loading ground truth from ``groud-truth.json``.
Python3.6+ is required.
"""
parser = argparse.ArgumentParser()
parser.add_argument('--n-bootstrap', type=int, default=1000)
parser.add_argument('--bootstrap-differences', action='store_true',
help='run bootstrap for differences')
parser.add_argument('--output', type=Path, help='output results as json')
args = parser.parse_args()
ground_truth = load_json(Path('ground-truth.json'))
metrics_by_name = {}
for path in sorted(Path('output').glob('*.json')):
name = path.stem
metrics = evaluate(ground_truth, load_json(path), args.n_bootstrap)
print('{name:<20} '
'precision={precision:.3f} ± {precision_std:.3f} '
'recall={recall:.3f} ± {recall_std:.3f} '
'F1={f1:.3f} ± {f1_std:.3f} '
'accuracy={accuracy:.3f} ± {accuracy_std:.3f} '
.format(name=name, **metrics))
metrics_by_name[name] = metrics
if args.bootstrap_differences:
# check differences with bootstrap
for name, metrics in sorted(metrics_by_name.items()):
tp_fp_fns = metrics['tp_fp_fns']
for other_name, other_metrics in sorted(metrics_by_name.items()):
if name >= other_name:
continue
print(f'Comparison: {name} minus {other_name}')
other_tp_fp_fns = other_metrics['tp_fp_fns']
print_metrics_diff(tp_fp_fns, other_tp_fp_fns, args.n_bootstrap)
if args.output:
args.output.write_text(
json.dumps(metrics_by_name, indent=4, sort_keys=True))
def evaluate(
ground_truth: Dict[str, Dict],
prediction: Dict[str, Dict],
n_bootstrap: int,
) -> Dict[str, Any]:
if ground_truth.keys() != prediction.keys():
raise ValueError('prediction keys do not match ground truth')
tp_fp_fns = []
accuracies = []
for key in ground_truth.keys():
true = ground_truth[key].get('articleBody', '')
pred = prediction[key].get('articleBody', '')
tp_fp_fns.append(string_shingle_matching(true=true, pred=pred))
accuracies.append(get_accuracy(true=true, pred=pred))
metrics: Dict[str, Any] = metrics_from_tp_fp_fns(tp_fp_fns)
metrics['tp_fp_fns'] = tp_fp_fns
metrics['accuracy'] = statistics.mean(accuracies)
# add bootstrap estimates of condifence intervals
b_values: Dict[str, List[float]] = {}
for _ in range(n_bootstrap):
n = len(tp_fp_fns)
indices = [random.randint(0, n - 1) for _ in range(n)]
b_metrics = metrics_from_tp_fp_fns([tp_fp_fns[i] for i in indices])
for key in b_metrics:
b_values.setdefault(key, []).append(b_metrics[key])
b_values.setdefault('accuracy', []).append(
statistics.mean([accuracies[i] for i in indices]))
for key, values in sorted(b_values.items()):
metrics[f'{key}_std'] = statistics.stdev(values)
return metrics
def print_metrics_diff(tp_fp_fns, other_tp_fp_fns, n_bootstrap):
diffs = {}
for _ in range(n_bootstrap):
n = len(tp_fp_fns)
indices = [random.randint(0, n - 1) for _ in range(n)]
metrics = metrics_from_tp_fp_fns([tp_fp_fns[i] for i in indices])
other_metrics = metrics_from_tp_fp_fns(
[other_tp_fp_fns[i] for i in indices])
for key in metrics:
diffs.setdefault(key, []).append(metrics[key] - other_metrics[key])
for key, values in sorted(diffs.items()):
mean = statistics.mean(values)
std = statistics.stdev(values)
print(f'{key:<10} {mean:.3f} ± {std:.3f}')
TP_FP_FN = Tuple[float, float, float]
def metrics_from_tp_fp_fns(tp_fp_fns: List[TP_FP_FN]) -> Dict[str, float]:
precision = statistics.mean([
precision_score(tp, fp, fn) for tp, fp, fn in tp_fp_fns
if tp + fp > 0])
recall = statistics.mean([
recall_score(tp, fp, fn) for tp, fp, fn in tp_fp_fns
if tp + fn > 0])
f1 = 2 * precision * recall / (precision + recall)
return {
'f1': f1,
'precision': precision,
'recall': recall,
}
def precision_score(tp: float, fp: float, fn: float) -> float:
if fp == fn == 0:
return 1.
if tp == fp == 0:
return 0.
return tp / (tp + fp)
def recall_score(tp: float, fp: float, fn: float) -> float:
if fp == fn == 0:
return 1.
if tp == fn == 0:
return 0.
return tp / (tp + fn)
def get_accuracy(true: str, pred: str) -> float:
return float(_tokenize(true) == _tokenize(pred))
def string_shingle_matching(
true: str, pred: str, ngram_n: int = 4,
) -> TP_FP_FN:
""" Compute TP/FP/FN across shingles (joined ngrams).
Intended to be used for articleBody comparison,
similar to the one used here (with shingles instead of tokens):
https://moz.com/devblog/benchmarking-python-content-extraction-algorithms-dragnet-readability-goose-and-eatiht/
"""
true_shingles = _all_shingles(true, ngram_n)
pred_shingles = _all_shingles(pred, ngram_n)
tp = fp = fn = 0.
for key in (set(true_shingles) | set(pred_shingles)):
true_count = true_shingles.get(key, 0)
pred_count = pred_shingles.get(key, 0)
tp += min(true_count, pred_count)
fp += max(0, pred_count - true_count)
fn += max(0, true_count - pred_count)
tp_fp_fn = [tp, fp, fn]
s = sum(tp_fp_fn)
# Normalize metrics so that longer texts do not have more weight.
if s > 0:
tp_fp_fn = [x / s for x in tp_fp_fn]
return tuple(tp_fp_fn) # type: ignore
def _all_shingles(text: str, ngram_n: int) -> Dict[Tuple[str, ...], int]:
return dict(Counter(_ngrams(text, ngram_n)))
_TOKEN_RE = re.compile(
r'\w+', re.UNICODE | re.MULTILINE | re.IGNORECASE | re.DOTALL)
def _tokenize(text: str) -> List[str]:
# Note that such simple tokenization will work ok for any language,
# even if several words will be clumped together, as we expect
# that extra predicted text will still be separated.
return _TOKEN_RE.findall(text or '')
def _ngrams(text: str, n: int) -> List[Tuple[str, ...]]:
tokens = _tokenize(text)
result = []
for i in range(0, max(1, len(tokens) - n + 1)):
shingle = tuple(tokens[i: i + n])
if shingle:
result.append(shingle)
return result
def load_json(path: Path):
with path.open('rt', encoding='utf8') as f:
return json.load(f)
if __name__ == '__main__':
main()