This multi-threaded code takes an array of 3d images and applies the convolution function with padding, stride, pad values .. as parameters and same applies for creating pooling layers. I need suggestions on how to improve performance and maybe get rid of the for loops used(if that is possible).
Use this photo tiger.jpeg
for producing the example in the code.
Code:
from concurrent.futures import ThreadPoolExecutor, as_completed from time import perf_counter import numpy as np import cv2 def pad_image(image, pad_width, values, mode='img_arr'): """ Add a pad(border) of a given width and value to the image. Args: image: Image array or a single image. pad_width: Width of the pad layer. values: Value of the pad layer. mode: A string representation of the input 'img_arr': Array of images. 'img': A single image. Return: numpy array of padded images or a padded image. """ if mode == 'img_arr': return np.array(list(map(lambda x: cv2.copyMakeBorder( x, pad_width, pad_width, pad_width, pad_width, cv2.BORDER_CONSTANT, value=values), image))) if mode == 'img': return cv2.copyMakeBorder( image, pad_width, pad_width, pad_width, pad_width, cv2.BORDER_CONSTANT, value=values) def calculate_size(image_shape, kernel_shape, pad_width, stride): """ Calculate size of the output after one pass of the convolution function. Args: image_shape: Input shape. kernel_shape: Convolution filter shape. pad_width: Width of the pad layer. stride: The number of pixels a kernel moves. Return: height, width, channels. """ height, width, channels = image_shape kernel_size = kernel_shape[1] output_height = int((height - kernel_size + 2 * pad_width) / stride) + 1 output_width = int((width - kernel_size + 2 * pad_width) / stride) + 1 output_channels = kernel_shape[-1] return output_height, output_width, output_channels def partition_image(image, stride, pad_width, pad_values, kernel_size): """ Prepare image to apply the convolution function. Args: image: numpy array containing image data. stride: The number of pixels a kernel moves. pad_width: Width of the pad layer. pad_values: Value of the pad layer. kernel_size: Size of the convolution filter. Return: R, G, B partitioned numpy arrays. """ partitions = [] padded = pad_image(image, pad_width, pad_values, 'img') height, width, channels = padded.shape red = padded[..., 0] green = padded[..., 1] blue = padded[..., 2] for item in red, green, blue: item = [item[ h: h + kernel_size, w: w + kernel_size] for h in range(0, height - kernel_size + 1, stride) for w in range(0, width - kernel_size + 1, stride)] partitions.append(np.array(item)) return partitions def convolve_image(image, kernel, bias, pad_width, stride, pad_values): """ Apply convolution function to an image. Args: image: numpy image nd array. kernel: Convolution filter. bias: A scalar. pad_width: Width of the pad layer. stride: The number of pixels a kernel moves. pad_values: Value of the pad layer. Return: Convolution output, cache. """ channel = 0 target_height, target_width, target_channels = calculate_size( image.shape, kernel.shape, pad_width, stride) output = np.zeros((target_height, target_width, target_channels)) red_part, green_part, blue_part = partition_image( image, stride, pad_width, pad_values, kernel.shape[0]) red_kernel, green_kernel, blue_kernel = kernel[..., 0], kernel[..., 1], kernel[..., 2] for img_col, kl_color in zip([red_part, green_part, blue_part], [red_kernel, green_kernel, blue_kernel]): product = img_col * kl_color addition = np.array(list(map(np.sum, product))) addition += bias addition = addition.reshape(target_height, target_width) output[..., channel] = addition channel += 1 cache = image, kernel, bias return output, cache def convolve_image_arr(img_arr, kernel, bias, pad_width, stride, pad_values, threads): """ Convolve an array of images. Args: img_arr: numpy array of images. kernel: Convolution filter. bias: A scalar. pad_width: Width of the pad layer. stride: The number of pixels a kernel moves. pad_values: Value of the pad layer. threads: Number of parallel threads. Return: Convolved image array, caches. """ z, caches, current_img, total_images = [], [], 0, img_arr.shape[0] with ThreadPoolExecutor(max_workers=threads) as executor: future_output = {executor.submit( convolve_image, img, kernel, bias, pad_width, stride, pad_values): img for img in img_arr} for future_item in as_completed(future_output): convolved_img, cache = future_item.result() print(f'Convolved image {current_img} out of {total_images} ... done') z.append(convolved_img) caches.append(cache) current_img += 1 return z, caches def pool_image(image, window_size, stride, mode): """ Create a pooling layer to the image. Args: image: numpy image nd array. window_size: Size of the sliding window/kernel. stride: The number of pixels a kernel moves. mode: Calculation mode 'max': Window will be maximized. 'avg': Window will be averaged. Return: Output layer, image. """ channel, result = 0, 0 target_height, target_width, target_channels = calculate_size( image.shape, (window_size, window_size), 0, stride) output = np.zeros((target_height, target_width, target_channels)) red_part, green_part, blue_part = partition_image(image, stride, 0, 0, window_size) for img in red_part, green_part, blue_part: if mode == 'max': result = np.array(list(map(np.max, img))) if mode == 'avg': result = np.array(list(map(np.mean, img))) result = result.reshape(target_height, target_width) output[..., channel] = result channel += 1 return output, image def pool_image_arr(img_arr, window_size, stride, mode, threads): """ Create a pooling layer to a numpy array of images. Args: img_arr: numpy array of images. window_size: Size of the sliding window/kernel. stride: The number of pixels a kernel moves. mode: Calculation mode 'max': Window will be maximized. 'avg': Window will be averaged. threads: Number of parallel threads. Return: Pooling layer output, images. """ output, current_img, total_images = [], 1, img_arr.shape[0] with ThreadPoolExecutor(max_workers=threads) as executor: future_output = {executor.submit(pool_image, img, window_size, stride, mode): img for img in img_arr} for future_item in as_completed(future_output): layered_img, img = future_item.result() print(f'Layered image {current_img} out of {total_images} ... done') output.append(layered_img) current_img += 1 return np.array(output), img_arr if __name__ == '__main__': t1 = perf_counter() tiger = cv2.imread('tiger.jpeg') tigers = np.array([tiger for _ in range(50)]) kl = np.random.randn(3, 3, 3) pa, st = 1, 1 b = np.random.randn() p_values = 0 z, cc = convolve_image_arr(tigers, kl, b, pa, st, p_values, 10) t2 = perf_counter() print(f'Time: {t2 - t1} seconds.')
tigers = np.array([tiger for _ in range(50)])
increase 50 to higher values and see for yourself\$\endgroup\$