forked from Niketkumardheeryan/ML-CaPsule
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrandom_forest.py
155 lines (114 loc) · 5.03 KB
/
random_forest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import numpy as np
from collections import Counter
def entropy(a):
hist = np.bincount(a)
ps = hist / len(a)
return -np.sum([p * np.log2(p) for p in ps if p > 0])
class Node:
def __init__(self, feature=None, threshold=None, left=None, right=None, *, value=None):
self.feature = feature
self.threshold = threshold
self.left = left
self.right = right
self.value = value
def is_leaf_node(self):
return self.value is not None
class decisionTree:
def __init__(self, min_samples_split=2, max_depth=100, n_feats=None):
self.min_samples_split = min_samples_split
self.max_depth = max_depth
self.n_feats = n_feats
self.root = None
def fit(self, X, a):
self.n_feats = X.shape[1] if not self.n_feats else min(self.n_feats, X.shape[1])
self.root = self._grow_tree(X, a)
def predict(self, X):
return np.array([self._traverse_tree(x, self.root) for x in X])
def _grow_tree(self, X, a, depth=0):
n_samples, n_features = X.shape
n_labels = len(np.unique(a))
# stopping criteria
if (depth >= self.max_depth
or n_labels == 1
or n_samples < self.min_samples_split):
leaf_value = self._most_common_label(a)
return Node(value=leaf_value)
feat_idxs = np.random.choice(n_features, self.n_feats, replace=False)
# greedily select the best split according to information gain
best_feat, best_thresh = self._best_criteria(X, a, feat_idxs)
# grow the children that result from the split
left_idxs, right_idxs = self._split(X[:, best_feat], best_thresh)
left = self._grow_tree(X[left_idxs, :], a[left_idxs], depth+1)
right = self._grow_tree(X[right_idxs, :], a[right_idxs], depth+1)
return Node(best_feat, best_thresh, left, right)
def _best_criteria(self, X, a, feat_idxs):
best_gain = -1
split_idx, split_thresh = None, None
for feat_idx in feat_idxs:
X_column = X[:, feat_idx]
thresholds = np.unique(X_column)
for threshold in thresholds:
gain = self._information_gain(a, X_column, threshold)
if gain > best_gain:
best_gain = gain
split_idx = feat_idx
split_thresh = threshold
return split_idx, split_thresh
def _information_gain(self, a, X_column, split_thresh):
# parent loss
parent_entropy = entropy(a)
# generate split
left_idxs, right_idxs = self._split(X_column, split_thresh)
if len(left_idxs) == 0 or len(right_idxs) == 0:
return 0
# compute the weighted avg. of the loss for the children
n = len(a)
n_l, n_r = len(left_idxs), len(right_idxs)
e_l, e_r = entropy(y[left_idxs]), entropy(a[right_idxs])
child_entropy = (n_l / n) * e_l + (n_r / n) * e_r
# information gain is difference in loss before vs. after split
ig = parent_entropy - child_entropy
return ig
def _split(self, X_column, split_thresh):
left_idxs = np.argwhere(X_column <= split_thresh).flatten()
right_idxs = np.argwhere(X_column > split_thresh).flatten()
return left_idxs, right_idxs
def traverse(self, x, node):
if node.is_leaf_node():
return node.value
if x[node.feature] <= node.threshold:
return self.traverse(x, node.left)
return self.traverse(x, node.right)
def _most_common_label(self, a):
counter = Counter(a)
most_common = counter.most_common(1)[0][0]
return most_common
def bootstrap_sample(X, a):
n_samples = X.shape[0]
idxs = np.random.choice(n_samples, n_samples, replace=True)
return X[idxs], a[idxs]
def most_common_label(a):
counter = Counter(a)
most_common = counter.most_common(1)[0][0]
return most_common
class forest:
def __init__(self, n_trees=10, min_samples_split=2,
max_depth=100, n_feats=None):
self.n_trees = n_trees
self.min_samples_split = min_samples_split
self.max_depth = max_depth
self.n_feats = n_feats
self.trees = []
def fit(self, X, a):
self.trees = []
for _ in range(self.n_trees):
tree = decisionTree(min_samples_split=self.min_samples_split,
max_depth=self.max_depth, n_feats=self.n_feats)
X_samp, a_samp = bootstrap_sample(X, a)
tree.fit(X_samp, a_samp)
self.trees.append(tree)
def predict(self, X):
tree_preds = np.array([tree.predict(X) for tree in self.trees])
tree_preds = np.swapaxes(tree_preds, 0, 1)
y_pred = [most_common_label(tree_pred) for tree_pred in tree_preds]
return np.array(a_pred)