2
\$\begingroup\$

This multi-threaded code takes an array of 3d images and applies the convolution function with padding, stride, pad values .. as parameters and same applies for creating pooling layers. I need suggestions on how to improve performance and maybe get rid of the for loops used(if that is possible).

Use this photo tiger.jpeg for producing the example in the code.

tiger.jpeg

Code:

from concurrent.futures import ThreadPoolExecutor, as_completed from time import perf_counter import numpy as np import cv2 def pad_image(image, pad_width, values, mode='img_arr'): """ Add a pad(border) of a given width and value to the image. Args: image: Image array or a single image. pad_width: Width of the pad layer. values: Value of the pad layer. mode: A string representation of the input 'img_arr': Array of images. 'img': A single image. Return: numpy array of padded images or a padded image. """ if mode == 'img_arr': return np.array(list(map(lambda x: cv2.copyMakeBorder( x, pad_width, pad_width, pad_width, pad_width, cv2.BORDER_CONSTANT, value=values), image))) if mode == 'img': return cv2.copyMakeBorder( image, pad_width, pad_width, pad_width, pad_width, cv2.BORDER_CONSTANT, value=values) def calculate_size(image_shape, kernel_shape, pad_width, stride): """ Calculate size of the output after one pass of the convolution function. Args: image_shape: Input shape. kernel_shape: Convolution filter shape. pad_width: Width of the pad layer. stride: The number of pixels a kernel moves. Return: height, width, channels. """ height, width, channels = image_shape kernel_size = kernel_shape[1] output_height = int((height - kernel_size + 2 * pad_width) / stride) + 1 output_width = int((width - kernel_size + 2 * pad_width) / stride) + 1 output_channels = kernel_shape[-1] return output_height, output_width, output_channels def partition_image(image, stride, pad_width, pad_values, kernel_size): """ Prepare image to apply the convolution function. Args: image: numpy array containing image data. stride: The number of pixels a kernel moves. pad_width: Width of the pad layer. pad_values: Value of the pad layer. kernel_size: Size of the convolution filter. Return: R, G, B partitioned numpy arrays. """ partitions = [] padded = pad_image(image, pad_width, pad_values, 'img') height, width, channels = padded.shape red = padded[..., 0] green = padded[..., 1] blue = padded[..., 2] for item in red, green, blue: item = [item[ h: h + kernel_size, w: w + kernel_size] for h in range(0, height - kernel_size + 1, stride) for w in range(0, width - kernel_size + 1, stride)] partitions.append(np.array(item)) return partitions def convolve_image(image, kernel, bias, pad_width, stride, pad_values): """ Apply convolution function to an image. Args: image: numpy image nd array. kernel: Convolution filter. bias: A scalar. pad_width: Width of the pad layer. stride: The number of pixels a kernel moves. pad_values: Value of the pad layer. Return: Convolution output, cache. """ channel = 0 target_height, target_width, target_channels = calculate_size( image.shape, kernel.shape, pad_width, stride) output = np.zeros((target_height, target_width, target_channels)) red_part, green_part, blue_part = partition_image( image, stride, pad_width, pad_values, kernel.shape[0]) red_kernel, green_kernel, blue_kernel = kernel[..., 0], kernel[..., 1], kernel[..., 2] for img_col, kl_color in zip([red_part, green_part, blue_part], [red_kernel, green_kernel, blue_kernel]): product = img_col * kl_color addition = np.array(list(map(np.sum, product))) addition += bias addition = addition.reshape(target_height, target_width) output[..., channel] = addition channel += 1 cache = image, kernel, bias return output, cache def convolve_image_arr(img_arr, kernel, bias, pad_width, stride, pad_values, threads): """ Convolve an array of images. Args: img_arr: numpy array of images. kernel: Convolution filter. bias: A scalar. pad_width: Width of the pad layer. stride: The number of pixels a kernel moves. pad_values: Value of the pad layer. threads: Number of parallel threads. Return: Convolved image array, caches. """ z, caches, current_img, total_images = [], [], 0, img_arr.shape[0] with ThreadPoolExecutor(max_workers=threads) as executor: future_output = {executor.submit( convolve_image, img, kernel, bias, pad_width, stride, pad_values): img for img in img_arr} for future_item in as_completed(future_output): convolved_img, cache = future_item.result() print(f'Convolved image {current_img} out of {total_images} ... done') z.append(convolved_img) caches.append(cache) current_img += 1 return z, caches def pool_image(image, window_size, stride, mode): """ Create a pooling layer to the image. Args: image: numpy image nd array. window_size: Size of the sliding window/kernel. stride: The number of pixels a kernel moves. mode: Calculation mode 'max': Window will be maximized. 'avg': Window will be averaged. Return: Output layer, image. """ channel, result = 0, 0 target_height, target_width, target_channels = calculate_size( image.shape, (window_size, window_size), 0, stride) output = np.zeros((target_height, target_width, target_channels)) red_part, green_part, blue_part = partition_image(image, stride, 0, 0, window_size) for img in red_part, green_part, blue_part: if mode == 'max': result = np.array(list(map(np.max, img))) if mode == 'avg': result = np.array(list(map(np.mean, img))) result = result.reshape(target_height, target_width) output[..., channel] = result channel += 1 return output, image def pool_image_arr(img_arr, window_size, stride, mode, threads): """ Create a pooling layer to a numpy array of images. Args: img_arr: numpy array of images. window_size: Size of the sliding window/kernel. stride: The number of pixels a kernel moves. mode: Calculation mode 'max': Window will be maximized. 'avg': Window will be averaged. threads: Number of parallel threads. Return: Pooling layer output, images. """ output, current_img, total_images = [], 1, img_arr.shape[0] with ThreadPoolExecutor(max_workers=threads) as executor: future_output = {executor.submit(pool_image, img, window_size, stride, mode): img for img in img_arr} for future_item in as_completed(future_output): layered_img, img = future_item.result() print(f'Layered image {current_img} out of {total_images} ... done') output.append(layered_img) current_img += 1 return np.array(output), img_arr if __name__ == '__main__': t1 = perf_counter() tiger = cv2.imread('tiger.jpeg') tigers = np.array([tiger for _ in range(50)]) kl = np.random.randn(3, 3, 3) pa, st = 1, 1 b = np.random.randn() p_values = 0 z, cc = convolve_image_arr(tigers, kl, b, pa, st, p_values, 10) t2 = perf_counter() print(f'Time: {t2 - t1} seconds.') 
\$\endgroup\$
4
  • \$\begingroup\$Is there any particular reason you're using multithreading over multiprocessing here? Can you share some information on the current performance?\$\endgroup\$
    – AMC
    CommentedFeb 8, 2020 at 22:33
  • \$\begingroup\$@AMC no, there is not, do you suggest multiprocessing is a better choice? why?\$\endgroup\$CommentedFeb 8, 2020 at 22:37
  • \$\begingroup\$@AMC You can check the performance by changing this line tigers = np.array([tiger for _ in range(50)]) increase 50 to higher values and see for yourself\$\endgroup\$CommentedFeb 8, 2020 at 22:44
  • \$\begingroup\$do you suggest multiprocessing is a better choice? why? Yes, in Python threads are almost always used for IO only, for a few different reasons. There are plenty resources on the subject, which can explain things far better than I can.\$\endgroup\$
    – AMC
    CommentedFeb 8, 2020 at 22:57

0

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.