Skip to content

Commit

Permalink
Fix MaxPool2D & partially fix CNN
Browse files Browse the repository at this point in the history
  • Loading branch information
Orif Milod committed Mar 11, 2024
1 parent 4f765d3 commit e1fd3bb
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 84 deletions.
118 changes: 54 additions & 64 deletions gigatorch/cnn.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,47 +10,56 @@
from abc import ABC, abstractmethod
from os import listdir
from os.path import join
import numpy as np


class Compute(ABC):
@abstractmethod
def compute(self, data) -> List[List[Tensor]]:
def compute(self, input: Tensor) -> Tensor:
pass


"""
The MaxPool2D layer extracts the maximum value over the window defined by pool_size
for each dimension along the features axis. The window is shifted by strides in each dimension.
MaxPool2D accepts a 4-dimensional tensor as input. The dimensions represent:
Batch size: The number of samples in a batch. We can do parallel processing if it's more than 1 batch.
Channels: The number of input channels. For example, an RGB image would have 3 channels.
Height: The height of the input.
Width: The width of the input.
"""
class MaxPool2D(Compute):
def __init__(self, kernel_size, stride=None):
self.kernel_size = kernel_size
self.stride = stride if stride is not None else kernel_size

def compute(self, data_list) -> List[List[Tensor]]:

def compute(self, input: Tensor) -> Tensor:
assert len(input.shape) == 4, f"can't 2d pool {input.shape}"
(batch_size, channels, height, width) = input.shape
assert (height - self.kernel_size) % self.stride == 0, f"Height does not fit the kernel size {self.kernel_size} and stride {self.stride}"
assert (width - self.kernel_size) % self.stride == 0, f"Width does not fit the kernel size {self.kernel_size} and stride {self.stride}"

print("Computing maxpool")
print("Size of data", data_list.shape[0])
print("Number of input", data_list.shape[0])
output = []
for data in data_list:
if len(data) < self.kernel_size or len(data[0]) < self.kernel_size:
raise Exception("Received data is smaller than the kernel_size")

new_data = []
for row_index in range(0, len(data) - self.kernel_size + 1, self.stride):
row = []
for column_index in range(
0, len(data[row_index]) - self.kernel_size + 1, self.stride
):
current_max = 0
for i in range(self.kernel_size):
for j in range(self.kernel_size):
current_max = max(
current_max, data[row_index + i][column_index + j]
)
row.append(current_max)
new_data.append(row)
output.append(new_data)
print("Size of data", output.shape[0])
print("Number of output", output.shape[0])
print("Input shape: ", input.shape)

pooled_height = (height - self.kernel_size) // self.stride + 1
pooled_width = (width - self.kernel_size) // self.stride + 1
output = np.zeros((batch_size, channels, pooled_height, pooled_width))

for b in range(batch_size):
for c in range(channels):
for i in range(pooled_height):
for j in range(pooled_width):
h_start = i * self.stride
h_end = h_start + self.kernel_size
w_start = j * self.stride
w_end = w_start + self.kernel_size
output[b, c, i, j] = np.max(input.data[b, c, h_start:h_end, w_start:w_end])

print("\n")
return output
return Tensor(output)


class Conv2D(Compute):
Expand Down Expand Up @@ -88,44 +97,25 @@ def __init__(self, in_channels, out_channels, kernel_size, activation_fn, stride
self.activation_fn = activation_fn
self.stride = stride

def compute(self, data_list):
output = Tensor([])
# Iterate for out_channels number of times
for i in range(self.kernels.shape[0]):
new_layer = []
for layer_index in range(data_list.shape[0]):
data = data_list[layer_index]
kernel = self.kernels[layer_index]
print("data", data.shape)
print("kernel", kernel.shape)

if data.shape[0] < self.kernel_size or data.shape[1] < self.kernel_size:
raise Exception("Received data is smaller than the kernel_size")

new_data = []
for row_index in range(
0, len(data) - self.kernel_size + 1, self.stride
):
row = []
for column_index in range(data.shape[0] - self.kernel_size + 1):
sum = 0
for i in range(self.kernel_size):
for j in range(self.kernel_size):
sum += (
data[row_index + i][column_index + j] * kernel[i][j]
)
a = self.activation_fn(sum.item())
print("item", a, type(a))
row.append(a)
print("Adding new row", row)
new_data.append(row)
new_layer.append(new_data)
print("Finishing the layer", new_layer)
output.append(new_layer)

print("Outputshape", output.shape)
return output
def compute(self, input):
(batch_size, _, height, width) = input.shape
output_height = (height - self.kernel_size) // self.stride + 1
output_width = (width - self.kernel_size) // self.stride + 1
output = Tensor(np.zeros((batch_size, self.out_channels, output_height, output_width)))

for b in range(batch_size):
for k in range(self.out_channels):
for i in range(output_height):
for j in range(output_width):
h_start = i * self.stride
h_end = h_start + self.kernel_size
w_start = j * self.stride
w_end = w_start + self.kernel_size
output[b, k, i, j] = self.activation_fn(
np.sum(input[b, :, h_start:h_end, w_start:w_end] * self.kernels[k])
)

return output

class CNN:
def __init__(self, train_data_dir, test_data_dir, categories):
Expand Down
41 changes: 21 additions & 20 deletions tests/cnn_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ def test_conv2d_success():
]
)
sample_data = Tensor(
[
[ # input_channel 1
[ # Batch 1
[ # Channel 1
[1, 1, 1],
[1, 1, 1],
[1, 1, 1],
Expand All @@ -25,8 +25,7 @@ def test_conv2d_success():
)

output = conv2d.compute(sample_data)
print("FINAL OUTPUT", output.item())
expected = [[[10, 10], [10, 10]]] # for layer 1
expected = Tensor([[[10, 10], [10, 10]]]) # for layer 1

assert all(output.item() == expected)

Expand Down Expand Up @@ -54,42 +53,44 @@ def test_conv2d_kernel_size_larger_than_input():


def test_maxpool2d_success():
maxpool2d = MaxPool2D(2, 1)
sample_data = Tensor(
[
maxpool2d = MaxPool2D(kernel_size=2, stride=1)
sample_data = Tensor([
[ # Batch 1
[ # channel 1
[1, 2, 3, 4],
[5, 6, 7, -1],
[0, 2, 100, 9],
[0, 0, 0, 0],
]
]
]]
)

expected = Tensor(
[
[
expected = Tensor([
[ # Batch 1
[ # Channel 1
[6, 7, 7],
[6, 100, 100],
[2, 100, 100],
]
]
)
])

output = maxpool2d.compute(sample_data)
print(output)
print(expected)
assert (expected == output).all()

maxpool2d_with_default_stride = MaxPool2D(2)
maxpool2d_with_default_stride = MaxPool2D(kernel_size=2)

expected = [
[6, 7],
[2, 100],
]
expected = Tensor([
[ # Batch 1
[ # Channel 1
[6, 7],
[2, 100]
]
]
])

output = maxpool2d_with_default_stride.compute(sample_data)
assert expected == output
assert (expected == output).all()


def test_maxpool2d_kernel_size_larger_than_input():
Expand Down

0 comments on commit e1fd3bb

Please sign in to comment.