-
Notifications
You must be signed in to change notification settings - Fork 0
/
DetectorLoader.py
185 lines (141 loc) · 6.07 KB
/
DetectorLoader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
import time
import torch
import numpy as np
import torchvision.transforms as transforms
from queue import Queue
from threading import Thread
from Detection.Models import Darknet
from Detection.Utils import non_max_suppression, ResizePadding
import cv2
class TinyYOLOv3_onecls(object):
"""Load trained Tiny-YOLOv3 one class (person) detection model.
Args:
input_size: (int) Size of input image must be divisible by 32. Default: 416,
config_file: (str) Path to Yolo model structure config file.,
weight_file: (str) Path to trained weights file.,
nms: (float) Non-Maximum Suppression overlap threshold.,
conf_thres: (float) Minimum Confidence threshold of predicted bboxs to cut off.,
device: (str) Device to load the model on 'cpu' or 'cuda'.
"""
def __init__(self,
input_size=416,
config_file='Weights/yolo-tiny-onecls/yolov3-tiny-onecls.cfg',
weight_file='Weights/yolo-tiny-onecls/best-model.pth',
nms=0.5,
conf_thres=0.5,
device='cuda'):
self.input_size = input_size
self.model = Darknet(config_file).to(device)
self.model.load_state_dict(torch.load(weight_file))
self.model.eval()
self.device = device
self.nms = nms
self.conf_thres = conf_thres
self.resize_fn = ResizePadding(input_size, input_size)
self.transf_fn = transforms.ToTensor()
def detect(self, image, need_resize=True, expand_bb=5):
"""Feed forward to the model.
Args:
image: (numpy array) Single RGB image to detect.,
need_resize: (bool) Resize to input_size before feed and will return bboxs
with scale to image original size.,
expand_bb: (int) Expand boundary of the boxs.
Returns:
(torch.float32) Of each detected object contain a
[top, left, bottom, right, bbox_score, class_score, class]
return `None` if no detected.
"""
image_size = (self.input_size, self.input_size)
if need_resize:
image_size = image.shape[:2]
image = self.resize_fn(image)
image = self.transf_fn(image)[None, ...]
scf = torch.min(self.input_size / torch.FloatTensor([image_size]), 1)[0]
detected = self.model(image.to(self.device))
detected = non_max_suppression(detected, self.conf_thres, self.nms)[0]
if detected is not None:
detected[:, [0, 2]] -= (self.input_size - scf * image_size[1]) / 2
detected[:, [1, 3]] -= (self.input_size - scf * image_size[0]) / 2
detected[:, 0:4] /= scf
detected[:, 0:2] = np.maximum(0, detected[:, 0:2] - expand_bb)
detected[:, 2:4] = np.minimum(image_size[::-1], detected[:, 2:4] + expand_bb)
return detected
class YOLOv8_human(object):
"""Load trained Tiny-YOLOv3 one class (person) detection model.
Args:
input_size: (int) Size of input image must be divisible by 32. Default: 416,
config_file: (str) Path to Yolo model structure config file.,
weight_file: (str) Path to trained weights file.,
nms: (float) Non-Maximum Suppression overlap threshold.,
conf_thres: (float) Minimum Confidence threshold of predicted bboxs to cut off.,
device: (str) Device to load the model on 'cpu' or 'cuda'.
"""
def __init__(self,
input_size=416,
weight_file='Weights/YOLOv8_Human/v8_n.pt',
nms=0.5,
conf_thres=0.5,
device='cuda'):
# from Detection import nn e
self.device = device
self.input_size = input_size
self.model = torch.load(weight_file, map_location=self.device)['model'].float()
self.model.half()
self.model.eval()
self.nms = nms
self.conf_thres = conf_thres
self.resize_fn = ResizePadding(input_size, input_size)
self.transf_fn = transforms.ToTensor()
def detect(self, image, need_resize=True, expand_bb=5):
image_size = (self.input_size, self.input_size)
if need_resize:
image_size = image.shape[:2]
image = self.resize_fn(image)
x = image.transpose((2, 0, 1))[::-1]
x = np.ascontiguousarray(x)
x = torch.from_numpy(x)
x = x.unsqueeze(dim=0)
x = x.half()
x = x / 255
scf = torch.min(self.input_size / torch.FloatTensor([image_size]), 1)[0]
detected = self.model(x.to(self.device))
detected = non_max_suppression(detected, self.conf_thres, self.nms)[0]
if detected is not None:
# move all to cuda
if (self.device == 'cuda'):
detected = detected.cpu().detach().numpy()
scf = scf.cpu().detach().numpy()
detected[:, [0, 2]] -= (self.input_size - scf * image_size[1]) / 2
detected[:, [1, 3]] -= (self.input_size - scf * image_size[0]) / 2
detected[:, 0:4] /= scf
detected[:, 0:2] = np.maximum(0, detected[:, 0:2] - expand_bb)
detected[:, 2:4] = np.minimum(image_size[::-1], detected[:, 2:4] + expand_bb)
print(detected.shape)
return torch.from_numpy(detected)
class ThreadDetection(object):
def __init__(self,
dataloader,
model,
queue_size=256):
self.model = model
self.dataloader = dataloader
self.stopped = False
self.Q = Queue(maxsize=queue_size)
def start(self):
t = Thread(target=self.update, args=(), daemon=True).start()
return self
def update(self):
while True:
if self.stopped:
return
images = self.dataloader.getitem()
outputs = self.model.detect(images)
if self.Q.full():
time.sleep(2)
self.Q.put((images, outputs))
def getitem(self):
return self.Q.get()
def stop(self):
self.stopped = True
def __len__(self):
return self.Q.qsize()