multi-threading with PERL

Multi-threading, forking… At first, it seems to be complicated… Actually, it can be quite simple ! First, one thing you should now: your PERL installation may not support threads (this option has been set during compilation), so check it out:

perl -V
Summary of my perl5 (revision 5 version 10 subversion 0) configuration:
  Platform:
    osname=linux, osvers=2.6.30.5-dsa-amd64, archname=x86_64-linux-gnu-thread-multi
    uname='linux brahms 2.6.30.5-dsa-amd64 #1 smp mon aug 17 02:18:43 cest 2009 x86_64 gnulinux '
    config_args='-Dusethreads -Duselargefiles -Dccflags=-DDEBIAN -Dcccdlflags=-fPIC -Darchname=x86_64-linux-gnu -Dprefix=/usr -Dprivlib=/usr/share/perl/5.10 -Darchlib=/usr/lib/perl/5.10 -Dvendorprefix=/usr -Dvendorlib=/usr/share/perl5 -Dvendorarch=/usr/lib/perl5 -Dsiteprefix=/usr/local -Dsitelib=/usr/local/share/perl/5.10.0 -Dsitearch=/usr/local/lib/perl/5.10.0 -Dman1dir=/usr/share/man/man1 -Dman3dir=/usr/share/man/man3 -Dsiteman1dir=/usr/local/man/man1 -Dsiteman3dir=/usr/local/man/man3 -Dman1ext=1 -Dman3ext=3perl -Dpager=/usr/bin/sensible-pager -Uafs -Ud_csh -Ud_ualarm -Uusesfio -Uusenm -DDEBUGGING=-g -Doptimize=-O2 -Duseshrplib -Dlibperl=libperl.so.5.10.0 -Dd_dosuid -des'
    hint=recommended, useposix=true, d_sigaction=define
    useithreads=define, usemultiplicity=define
    useperlio=define, d_sfio=undef, uselargefiles=define, usesocks=undef
    use64bitint=define, use64bitall=define, uselongdouble=undef
    usemymalloc=n, bincompat5005=undef

Here, you should look at the following options: useithreads=define. If your PERL install doesn’t have this option, I recommend you to install the forsk.pm module. During installation, the CPAN installer will ask you:

It appears your perl was not built with native ithreads.

Would you like to create references to forks, such that
using 'use threads' and 'use threads::shared' will quietly
load forks and forks::shared? [no]

I recommend you to choose yes at this point: this will permit you to develop programs with the use threads directive, whatever this options has been set during PERL compilation (note: be aware that the forks module is not as fast as the native threads options.

Then, how do we develop this threads? You can find a lot of tutorials with google, like this one, for instance. But, there’s some issues with these examples: there are all based on loops with the fixed number of iterations (do something 10 times and thread it). Okay, that’s nice for beginners. But, again, in the real world, it’s not in this way that things happen! Most of the time, you have a lot of basic operations to perform (let’s say about 500) and you don’t want to perform all the operations at the same time (as your system may go down). How to develop a system-friendly program that will only launch a limited number of threads and will perform all the tasks? That’s the point! And, again, trust me, I didn’t find any tutorials for this « real-world » case. Here I propose a program that may help you to achieve this. The algorithm is based on a while loop that compares the number of task to perform and the number of running and done threads. It can be easily adapt to any cases. Let’s say that you have 100 nucleotide sequences to analyze. These sequences are stored in a Hash table. Then, you can get the number of entry in your hash (your $nb_compute) and enter in the loop. Now, let’s stop talking and have a look at the code:

#!/opt/local/bin/perl -w
use threads;
use strict;
use warnings;

my @a = ();
my @b = ();

sub sleeping_sub ( $ $ $ );

print "Starting main program\n";

my $nb_process = 10;
my $nb_compute = 20;
my $i=0;
my @running = ();
my @Threads;
while (scalar @Threads < $nb_compute) {
 	@running = threads->list(threads::running);
	print "LOOP $i\n";
	print "  - BEGIN LOOP >> NB running threads = ".(scalar @running)."\n";

	if (scalar @running < $nb_process) {
 		my $thread = threads->new( sub { sleeping_sub($i, \@a, \@b) });
		push (@Threads, $thread);
		my $tid = $thread->tid;
		print "  - starting thread $tid\n";
	}
	@running = threads->list(threads::running);
	print "  - AFTER STARTING >> NB running Threads = ".(scalar @running)."\n";
	foreach my $thr (@Threads) {
		if ($thr->is_running()) {
			my $tid = $thr->tid;
			print "  - Thread $tid running\n";
		}
		elsif ($thr->is_joinable()) {
			my $tid = $thr->tid;
			$thr->join;
			print "  - Results for thread $tid:\n";
			print "  - Thread $tid has been joined\n";
		}
	}

	@running = threads->list(threads::running);
	print "  - END LOOP >> NB Threads = ".(scalar @running)."\n";
	$i++;
}
print "\nJOINING pending threads\n";
while (scalar @running != 0) {
	foreach my $thr (@Threads) {
		$thr->join if ($thr->is_joinable());
	}
	@running = threads->list(threads::running);
}

print "NB started threads = ".(scalar @Threads)."\n";
print "End of main program\n";

sub sleeping_sub ( $ $ $ ) {
	sleep(4);
}

During the main loop, the program will start new threads if the number of running threads is lower than the number of max threads. Still during this loop, it will join pending threads. Then, at the end of the loop, you must be aware that some threads may still running, so, another loop will join the last running threads. You should note that the parameters of the sub are not used (it’s just for the example), but you can send parameters to your favorite sub and get the results, too. To get more details about the shared data, I recommend you to read the threads perldoc. I hope it will help.

zv7qrnb

Comments

  1. Tanuj dit :

    Thanks for the explanation.

  2. Tomki dit :

    Thanks!
    this example helped me understand the problem I was trying to solve in ways that truly the perl thread tutorial did not.

    I adapted it a bit:

    my $nb_process = 20; #this is how many threads to use at a time
    my $nb_compute = scalar(@domains); #this is how many threads I’ll eventually make use of
    my $i = 0;
    my @running = ();
    my @Threads;
    while (scalar @Threads list(threads::running);
    print STDERR  » – BEGIN LOOP $i >> NB running threads = « .(scalar @running). »\n » if $optctl{‘debugthreads’};

    if (scalar @running new(\&dnschecks, $domain);
    push (@Threads, $thread);
    my $tid = $thread->tid;
    print STDERR  » – thread $tid\n » if $optctl{‘debugthreads’};
    }
    @running = threads->list(threads::running);
    print STDERR  » – AFTER STARTING >> NB running Threads = « .(scalar @running). »\n » if $optctl{‘debugthreads’};
    foreach my $thr (@Threads) { #check every thread to see if we can rejoin it
    if ($thr->is_running()) { # no
    my $tid = $thr->tid;
    # if $optctl{‘debugthreads’} print STDERR  » – Thread $tid running\n »;
    }
    elsif ($thr->is_joinable()) { # yes
    my $tid = $thr->tid;
    my ($result_href) = $thr->join;
    $results{$$result_href{‘Domain’}} = $result_href;
    print STDERR  » – Results for thread $tid: $$result_href{‘Domain’} – Thread has been joined\n » if $optctl{‘debugthreads’};
    }
    }

    @running = threads->list(threads::running);
    print STDERR  » – END LOOP >> NB Threads = « .(scalar @running). »\n » if $optctl{‘debugthreads’};
    $i++;
    }
    print STDERR « \nJOINING pending threads\n » if $optctl{‘debugthreads’};
    while (scalar @running != 0) {
    foreach my $thr (@Threads) {
    if ($thr->is_joinable()) {
    my $tid = $thr->tid;
    my ($result_href) = $thr->join;
    $results{$$result_href{‘Domain’}} = $result_href;
    if ($optctl{‘debugthreads’}) {
    print STDERR  » – Results for thread $tid: $$result_href{‘Domain’} – Thread has been joined\n »;
    print STDERR  » – Threads left = « .(scalar @running). »\n »;
    }
    }
    }
    @running = threads->list(threads::running);
    }
    if ($optctl{‘debugthreads’}) {
    print STDERR « NB started threads = « .(scalar @Threads). »\n »;
    print STDERR « End of threads\n »;
    }

  3. Vel dit :

    Thanks very much! I’ve applied this to my bioinformatics pipeline and it works well.

  4. Gawcio dit :

    There’s a typo in your article (k and s characters are exchanged):
    « I recommend you to install the forsk.pm module »

  5. Daniel dit :

    There is some misbehaviour happening, when in the while loop from line 49 the last thread finishes after the check of « is_joinable ». Then it happens that the @running array does not contain any elements and the while loop exits without joining the last thread.

    Bugfix is to do « @running = threads->list(threads::running); » first of all:

    while (scalar @running != 0) {
    @running = threads->list(threads::running);
    foreach my $thr (@Threads) {
    $thr->join if ($thr->is_joinable());
    }
    }

  6. Gadgets dit :

    I’ve been reading various perl threading articles and posts for months, and yours is the first I’ve seen that actually offers anything like a real-world usage example. In other words, it is EXACTLY what I’ve been looking for, and now I’ll be spending time this weekend playing around with threads :) Thanks bud!

    • Fred dit :

      Thank you very much. I’d experienced the same issue when this post was written : most of the tutorials about multi-threading in PERL were not simple to use in real world (as most of the example, for instance). So I’m glad that this post helps you : it was the objective ;-)

      regards.

Submit a Comment

Spam Protection by WP-SpamFree