-
Notifications
You must be signed in to change notification settings - Fork 0
/
img_processing.py
78 lines (62 loc) · 2.32 KB
/
img_processing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import numpy as np
import tensorflow as tf
from skimage.morphology import binary_opening, label, disk
from skimage.io import imread
from constants import ROOT_PATH
def rle_encode(img, min_max_threshold=1e-3, max_mean_threshold=None):
'''
img: numpy array, 1 - mask, 0 - background
Returns run length as string formated
'''
if np.max(img) < min_max_threshold:
return '' ## no need to encode if it's all zeros
if max_mean_threshold and np.mean(img) > max_mean_threshold:
return '' ## ignore overfilled mask
pixels = img.T.flatten()
pixels = np.concatenate([[0], pixels, [0]])
runs = np.where(pixels[1:] != pixels[:-1])[0] + 1
runs[1::2] -= runs[::2]
return ' '.join(str(x) for x in runs)
def multi_rle_encode(img, **kwargs):
'''
Encode connected regions as separated masks
'''
labels = label(img)
if img.ndim > 2:
return [rle_encode(np.sum(labels==k, axis=2)) for k in np.unique(labels[labels>0])]
else:
return [rle_encode(labels==k) for k in np.unique(labels[labels>0])]
def rle_decode(mask_rle, shape=(768, 768)):
'''
mask_rle: run-length as string formated (start length)
shape: (height,width) of array to return
Returns numpy array, 1 - mask, 0 - background
'''
s = mask_rle.split()
starts, lengths = [np.asarray(x, dtype=int) for x in (s[0:][::2], s[1:][::2])]
starts -= 1
ends = starts + lengths
img = np.zeros(shape[0]*shape[1], dtype=np.uint8)
for lo, hi in zip(starts, ends):
img[lo:hi] = 1
return img.reshape(shape).T
def masks_as_image(in_mask_list):
all_masks = np.zeros((768, 768), dtype = np.uint8)
for mask in in_mask_list:
if isinstance(mask, str):
all_masks |= rle_decode(mask)
return all_masks
def smooth(cur_seg):
return binary_opening(cur_seg>0.99, np.expand_dims(disk(2), -1))
def image_predict(img, unet):
img = np.expand_dims(img, 0) / 255.0
with tf.device("cpu:0"):
cur_seg = unet.predict(img)[0]
return cur_seg, img[0]
def encode_predict(img_id, unet):
img_path = ROOT_PATH + '/data/test_v2/{}'.format(img_id)
img = imread(img_path)
cur_seg, img = image_predict(img, unet)
cur_seg = smooth(cur_seg)
cur_rles = multi_rle_encode(cur_seg)
return [[img, rle] for rle in cur_rles if rle is not None]