-
Notifications
You must be signed in to change notification settings - Fork 0
/
stockanalyze.py
142 lines (114 loc) · 3.66 KB
/
stockanalyze.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# -*- coding: utf-8 -*-
import json
import random
from collections import defaultdict
import jieba
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
from newsscore import post_score
from sklearn.feature_extraction import DictVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.svm import LinearSVC
sns.set(style='whitegrid')
RANDOM_SEED = 20170609
MAX_FAIL_ATTEMPT = 100
path = '2330.json'
training_ratio = 0.7
random.seed(RANDOM_SEED)
with open(path, encoding='utf-8') as f:
posts = json.load(f)
def isfloat(value):
try:
float(value)
return True
except ValueError:
return False
post_words = []
post_scores = []
for post in posts:
d = defaultdict(int)
content = post['content']
for w in jieba.cut(content):
if len(w) > 1 and not isfloat(w):
d[w] += 1
post_words.append(d)
post_scores.append(post_score(post))
def get_vector(index=[]):
words = []
scores = []
if not len(index):
index = range(len(posts))
for i in index:
words.append(post_words[i])
scores.append(post_scores[i])
return words, scores
dvec = DictVectorizer()
tfidf = TfidfTransformer()
svc = LinearSVC()
training_set = int(len(posts) * training_ratio)
post_index = list(range(len(posts)))
random.shuffle(post_index)
# Training
print("[Training] Start Training...")
prev_error = training_set
fail_attempt = 0
while fail_attempt < MAX_FAIL_ATTEMPT:
sub_training_ratio = 0.5
sub_training_set = int(training_set * sub_training_ratio)
index = post_index[:training_set]
random.shuffle(index)
words, scores = get_vector(index[:sub_training_set])
dvec_cur = DictVectorizer()
tfidf_cur = TfidfTransformer()
X = tfidf_cur.fit_transform(dvec_cur.fit_transform(words))
svc_cur = LinearSVC()
svc_cur.fit(X, scores)
# verify
words, scores = get_vector(index[sub_training_set:])
X2 = tfidf_cur.transform(dvec_cur.transform(words))
preds = svc_cur.predict(X2)
error = 0
for i in range(len(preds)):
error += abs(preds[i] - scores[i])
if error < prev_error:
prev_error = error
svc = svc_cur
dvec = dvec_cur
tfidf = tfidf_cur
fail_attempt = 0
print("[Training] Errors: {}/{}".format(error, len(index)))
else:
fail_attempt += 1
print("[Training] Training Finished")
# verify
words, scores = get_vector(post_index[training_set:])
X2 = tfidf.transform(dvec.transform(words))
preds = svc.predict(X2)
error = 0
for i in range(len(preds)):
error += abs(preds[i] - scores[i])
print("Errors: {}/{}".format(error, len(posts) - training_set))
def display_top_features(weights, names, top_n, select=abs):
top_features = sorted(zip(weights, names),
key=lambda x: select(x[0]), reverse=True)[:top_n]
top_weights = [x[0] for x in top_features]
top_names = [x[1] for x in top_features]
fig, ax = plt.subplots(figsize=(10, 8))
ind = np.arange(top_n)
bars = ax.bar(ind, top_weights, color='blue', edgecolor='black')
for bar, w in zip(bars, top_weights):
if w < 0:
bar.set_facecolor('red')
width = 0.30
ax.set_xticks(ind + width)
ax.set_xticklabels(top_names, rotation=45, fontsize=12,
fontdict={'fontname': 'Microsoft JhengHei',
'fontsize': 12})
plt.show(fig)
# top features for posts
display_top_features(svc.coef_[0],
dvec.get_feature_names(), 30, select=lambda x: -x)
# top positive features for posts
display_top_features(svc.coef_[0],
dvec.get_feature_names(), 30, select=lambda x: x)