julio 28, 2021

~ 4 MIN

Python Mutliprocessing

< Blog RSS

Open In Colab

Python Multprocessing

En este post vamos a aprender a ejectuar nuestro código en Python uando todos los cores de nuestra CPU. De esta manera, tareas que normalmente ejecutaríamos de manera secuencial y podrían llevar mucho tiempo las podremos paralelizar consiguiendo una gran mejora.

Empezaremos ilustrando la gran potencia de este método en un ejemplo sencillo que consiste en leer un conjunto de imágenes de satélite de una carpeta, convertirlas a PNG usando las bandas RGB y guardar el resultado en otra carpeta. Para ello usaremos las imágenes del dataset EuroSAT, usado en los posts anteriores.

import glob 

path = './data'
images = glob.glob(f'{path}/*/*.tif')
len(images)
27000
import skimage.io as io
import numpy as np

def read_ms(img):
    ms = io.imread(img)
    # las imágenes originales tienen 13 bandas
    assert ms.shape[2] == 13
    return ms

def get_rgb(ms):
    # nos quedamos con 3 bandas (RGB) y normalizamos
    return (255 * (ms[...,(3,2,1)] / 4000).clip(0,1)).astype(np.uint8)

dest_folder = f'{path}/results'
def save_png(name, img, sep="\\"):
    img_name = name.split(sep)[-1][:-4]
    file_path = f'{dest_folder}/{img_name}.png'
    io.imsave(file_path, img)
    return file_path
from tqdm import tqdm
import warnings 

warnings.simplefilter("ignore") # se queja que hay imágenes con bajo contraste

De la siguiente manera llevamos a cabo nuestro procesado simple de manera secuenciual.

for img in tqdm(images):
    ms = read_ms(img)
    rgb = get_rgb(ms)
    save_png(img, rgb)
100%|██████████| 27000/27000 [01:12<00:00, 370.25it/s]

Como puedes ver llevar a cabo esta tarea tarda un poco más de un minuto, y eso que las imágenes son relativamente pequeñas y pocas. Vamos a acelerar el procesado !

El módulo concurrent.futures

En el módulo concurrent futures encontramos la funcionalidad que ofrece Python para el procesado en paralelo. Básicamente tenemos dos alternativas: usar el objeto ThreadPoolExecutor o ProcessPoolExecutor. En el primer caso, Python intentará ejectuar nuestro código en diferentes threads mientras que en el segundo usará los cores físicos de nuestra CPU. En función de la aplicación, una alternativa puede ser más ventajosa sobre la otra dependiendo de si el cuello de botella se encuentra en el procesado en sí o en el I/O.

def generate_rgb(img):    
    ms = read_ms(img)
    rgb = get_rgb(ms)
    save_png(img, rgb)

El siguiente código es capaz de llevar a cabo el mismo procesado pero de forma paralela. Pudes cambiar el Executor para comparar las diferentes alternativas.

from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor() as executor:
    executor.map(generate_rgb, images)

El mismo procesado ahora ha terminado en 30 segundos, más de la mitad ! En algunos casos puede ser interesante añadir también una barra de progreso.

import multiprocessing
from concurrent.futures import ProcessPoolExecutor


num_cores = multiprocessing.cpu_count()
with ThreadPoolExecutor(max_workers=num_cores) as pool:
    with tqdm(total=len(images)) as progress:
        futures = []
        for img in images:
            future = pool.submit(generate_rgb, img)
            future.add_done_callback(lambda p: progress.update())
            futures.append(future)

        for future in futures:
            future.result()
100%|██████████| 27000/27000 [00:31<00:00, 861.86it/s]

Es posbile recuperar los resultados devueltos por nuestra función de la siguiente manera.

def generate_rgb2(img):    
    ms = read_ms(img)
    rgb = get_rgb(ms)
    # ahora la función devuelve el path de la nueva imágen creada
    return save_png(img, rgb)
with ThreadPoolExecutor(max_workers=num_cores) as pool:
    with tqdm(total=len(images)) as progress:
        futures = []
        for img in images:
            future = pool.submit(generate_rgb2, img)
            future.add_done_callback(lambda p: progress.update())
            futures.append(future)

        # guardamos los resultados
        results = []
        for future in futures:
            result = future.result()
            results.append(result)
100%|██████████| 27000/27000 [00:31<00:00, 867.57it/s]
len(results)
27000
results[:3]
[('./data/results/AnnualCrop_1.png', 'hola'),
 ('./data/results/AnnualCrop_10.png', 'hola'),
 ('./data/results/AnnualCrop_100.png', 'hola')]

Y por último vamos a ver un ejemplo de como podemos enviar varios argumentos a nuestra función. Para ello crearemos una lista en la que cada elemento será una tupla con todos los argumentos necesarios para llevar a cabo la función. En el cuerpo de la función, recuperamos los argumentos individuales de la tupla como puedes ver a continuación.

def generate_rgb3(args):    
    img, a, b, c = args # sacamos los argumentos de la tupla
    ms = read_ms(img)
    rgb = get_rgb(ms)
    return save_png(img, rgb)
args = [(img, 1, 2, 3) for img in images] # lista de tuplas con argumentos

with ThreadPoolExecutor(max_workers=num_cores) as pool:
    with tqdm(total=len(images)) as progress:
        futures = []

        for arg in args:
            future = pool.submit(generate_rgb3, arg) # enviamos la tupla de argumentos
            future.add_done_callback(lambda p: progress.update())
            futures.append(future)

        results = []
        for future in futures:
            result = future.result()
            results.append(result)
100%|██████████| 27000/27000 [00:31<00:00, 867.57it/s]

Resumen

En este post hemos aprendido a usar las herramientas disponibles en Python para acelerar nuestro código ejecutando tareas en paralelo. La próxima vez que te encuentres llevando a cabo operaciones de manera secuencial y ésto te lleve mucho tiempo, si es posible considera paralelizar las tareas para sacar el máximo partido a tu CPU. Casos de uso ideales son el procesado imágenes, comprimir o descomprimir archivos, etc.

< Blog RSS