Pool de thread avec limite de parallélisation

Je travaillais cette semaine sur un script python qui récupérait des centaines de milliers de données depuis un serveur. Pour gagner en temps de calcul, j'ai décidé de faire des threads. Limitant la taille de mes tâches parallèles à un nombre spécifique, le script bloquait... car j'utilisais des fonctions threadées qui lançaient récursivement d'autres threads. Sauf que la Queue que j'utilisais pour faire le pool se bloquait si trop de threads se lançaient en même temps attendant indéfiniment que quelqu'un veuille bien lire la Queue... J'ai alors trouvé ma solution, et je la partage avec vous. Je vous explique le problème, je vous montre un exemple, et ensuite on voit la solution.

J'ai cherché un peu partout sur le net, et je n'ai pas trouvé de solution à mon problème. Ainsi, pour vous montrer ce qui coince, je vous ai préparé un exemple de script très simple. Le script suivant lance 5 Threads qui eux même vont en créer... Pour limiter le nombre de tâche parallèles je passe par une classe assez connue (depuis python recipes) qui utilise la classe Queue avec une limite de taille.

Chaque fois qu'on ajoute une tâche dans la classe, la Queue est remplie... les workers vont alors lire en boucle dans cette dernière pour lancer la tâche et les arguments stoqués. Tant que la Queue est remplie, il faut attendre que quelqu'un la lise... sinon l'appel à "put" reste bloqué.

Voici la classe de base que j'ai utilisée (et modifié):

Edit: J'ai beaucoup joué avec ma classe et j'ai fait quelques corrections, notamment en ce qui concerne l'ajout d'arguments, vous pouvez suivre les fixes sur le gist suivant: https://gist.github.com/metal3d/507...

# -*- encoding: utf-8 -*-
# file threadingpoolbad.py
 
import logging
from threading import Thread
from Queue import Queue
 
class ThreadPool:
 
    class _ThreadQueue(Thread):
 
        def __init__(self, pool, *args, **kwargs):
            """ Get tasks queue then launch thread """
            super(ThreadPool._ThreadQueue, self).__init__(*args, **kwargs)
            self.tasks = pool.tasks
            self.daemon = True
            self.start()
 
        def run(self):
            """ Read tasks from limited size queue, then launch task """
            while True:
                # read bloking Queue ..
                task,args = self.tasks.get(True)
                try:
                    task(*args)
                except Exception, e:
                    logging.exception(e)
                    print "Error"
                finally:
                    self.tasks.task_done()
 
    def __init__(self, num=10):
        """ Create a limited pool with "num" threads """
        self.tasks = Queue(num)
 
        for _ in range(num):
            self._ThreadQueue(self)
 
    def add_task(self, target, args):
        """ Write in unlimited size queue which will be
        read in "run" method of a thread
        Block if tasks Queue is full !
        """
        self.tasks.put((target, args))
 
    def wait_completion(self):
        """ Wait for tasks to be completed """
        self.tasks.join()

Voilà comment utiliser cette classe:

import time
from threadingpoolbad import ThreadPool
 
def test(pool, num=0):
    num += 1
    print "num is %d" % num
    if num < 5:
        pool.add_task(target=test, args=(q, num))
        time.sleep(0.5)
 
#create a pool of 2 threads, launch test function
pool = ThreadPool(2)
pool.add_task(target=test, args=(pool,))

Jusqu'ici tout va bien... mais admettons que j'ajoute 5 ou 6 appels à add_task:

# ...
#create a pool of 2 threads, launch test function
pool = ThreadPool(2)
pool.add_task(target=test, args=(pool,))
pool.add_task(target=test, args=(pool,))
pool.add_task(target=test, args=(pool,))
pool.add_task(target=test, args=(pool,))
pool.add_task(target=test, args=(pool,))

Si vous lancez le test... ça coince. Pour arrêter le script faites CTRL+Z puis "kill %%".

Tout le souci se trouve entre deux méthodes:

  • add_tasks qui tente d'écrire dans une queue, l'appel à "put" bloque si la Queue "tasks" est pleine
  • dans "run" on libère un espace au moment où la fonction "task" est terminée

Or, dans notre exemple, la fonction "test" ne libère pas la queue de suite, car elle écrit dans la Queue via add_task... mais comme d'autres threads sont aussi en cours, la Queue est pleine et aucun espace n'est libre. On se retrouve en attente indéfinie...

Pour la plupart des scripts, vous n'allez pas vous retrouver dans cette situation, mais voilà... pour moi c'est arrivé.

Comment corriger le souci ? revoir l'algo ? c'est dommage... tout ce qui nous manque c'est de pouvoir écire dans la Queue sans limite, mais limiter quand même le nombre de threads.

La classe Queue peut tout à fait ne pas bloquer, et avoir une taille indéfinie. Si on lui passe un entier en argument de constructeur elle se limite à bloquer sa taille max à ce nombre. Mais si on ne lui donne pas de paramètres, ou "0" ou un nombre négatif, alors sa taille est illimitée.

Or, je veux tout de même limiter le nombre de process simultanés. Tout ce dont j'ai besoin c'est de ne pas bloquer l'insert de tâches... vous me suivez ?

C'est alors très simple ! il suffit de gérer tout ça avec 2 Queue:

  • une qui n'est pas limitée, elle gardera toutes les taches à lancer, quelque soit le nombre
  • une qui va limiter le nombre de thread, celle-ci sera limité à nombre qui correspond au nombre de threads. C'est quasiement le même fonctionnement que la classe de threadingpoolbad.py

On y va:

# -*- encoding: utf-8 -*-
#file threadingpool.py
import logging
from threading import Thread
from Queue import Queue
 
 
class ThreadPool:
 
    class _ThreadQueue(Thread):
 
        def __init__(self, pool, *args, **kwargs):
            """ Get task and pool Queue. Then launch thread.
            """
            super(ThreadPool._ThreadQueue, self).__init__(*args, **kwargs)
            self.pool = pool.pool
            self.tasks = pool.tasks
            self.daemon = True
            self.start()
 
        def run(self):
            """ Run unlimited while Queues are not joined """
            while True:
                # reinsert the nonblocking queue 
                # in blocking queue, that should block
                # if "tasks" queue is full
                self.tasks.put(self.pool.get(True))
 
                #and read this queue...
                task,args = self.tasks.get(True)
                try:
                    task(*args)
                except Exception, e:
                    logging.exception(e)
                finally:
                    self.tasks.task_done()
                    self.pool.task_done()
 
 
    def __init__(self, num=10):
        """ Create the thread queue with "num" thread in parallel"""
        self.tasks = Queue(num)
        self.pool = Queue()
 
        for _ in range(num):
            self._ThreadQueue(self)
 
    def add_task(self, target, args):
        """ Write in unlimited size queue which will be
        read in "run" method of a thread
        That should not block !
        """
        self.pool.put((target, args))
 
 
    def wait_completion(self):
        """ Wait for the all threads to be completed """
        self.pool.join()
        self.tasks.join()

Vous voyez ici les deux queues, "tasks" et "pool". La première est limitée, l'autre non.

Et cette fois ci, ça marche: j'ai bien deux threads qui tournent mais je ne bloque plus lors de l'appel à "add_task":

import time
from threadingpool import ThreadPool
 
def test(pool, num=0):
    num += 1
    print "num is %d" % num
    if num < 5:
        pool.add_task(target=test, args=(q, num))
        time.sleep(0.5)
 
#create a pool of 2 threads, launch test function
pool = ThreadPool(2)
pool.add_task(target=test, args=(pool,))
pool.add_task(target=test, args=(pool,))
pool.add_task(target=test, args=(pool,))
pool.add_task(target=test, args=(pool,))
pool.add_task(target=test, args=(pool,))
pool.add_task(target=test, args=(pool,))
#...

Et voilà ! Désormais, mes 120 000 tâches que j'ai à lancer ne se bloquent plus, elles se placent en file l'attente et j'ai bien un nombre limité de tâches en parallèle.

Je ne vous montre pas mon script d'import, mais croyez moi, la récursion (bien contrôlée) est énorme, et avoir résolut mon problème a été un soulagement énorme.