Categories

Posts in this category

Sun, 01 Jan 2017

Perl 6 By Example: Silent Cron, a Cron Wrapper


Permanent link

This blog post is part of my ongoing project to write a book about Perl 6.

If you're interested, please sign up for the mailing list at the bottom of the article, or here. It will be low volume (less than an email per month, on average).


On Linux and UNIX-Like systems, a program called cron periodically executes user-defined commands in the background. It is used for system maintenance tasks such as refreshing or removing caches, rotating and deleting old log files and so on.

If such a command produces any output, cron typically sends an email containing the output so that an operator can look at it and judge if some action is required.

But not all command line programs are written for usage with cron. For example they might produce output even on successful execution, and indicate failure through a non-zero exit code. Or they might hang, or otherwise misbehave.

To deal with such commands, we'll develop a small program called silent-cron, which wraps such commands and suppresses output when the exit code is zero. It also allows you to specify a timeout that kills the wrapped program if it takes too long:

$ silent-cron -- command-that-might-fail args
$ silent-cron --timeout=5 -- command-that-might-hang

Running Commands Asynchronously

When you want to run external commands, Perl 6 gives you basically two choices: run, a simple, synchronous interface, and Proc::Async, an asynchronous and slightly more complex option. Even though we will omit the timeout in the first iteration, we need to be aware that implementing the timeout is easier in the asynchronous interface, so that's what we'll use:

#!/usr/bin/env perl6

sub MAIN(*@cmd) {
    my $proc = Proc::Async.new(@cmd);
    my $collector = Channel.new;
    for $proc.stdout, $proc.stderr -> $supply {
        $supply.tap: { $collector.send($_) }
    }
    my $result = $proc.start.result;
    $collector.close;
    my $output = $collector.list.join;

    my $exitcode = $result.exitcode;
    if $exitcode != 0 {
        say "Program @cmd[] exited with code $exitcode";
        print "Output:\n", $output if $output;
    }
    exit $exitcode;
}

There's a big chunk of new features and concepts in here, so let's go through the code bit by bit.

sub MAIN(*@cmd) {
    my $proc = Proc::Async.new(@cmd);

This collects all the command line arguments in the array variable @cmd, where the first element is the command to be executed, and any further elements are arguments passed to this command. The second line creates a new Proc::Async instance, but doesn't yet run the command.

We need to capture all output from the command; thus we capture the output of the STDOUT and STDERR streams (file handles 1 and 2 on Linux), and combine it into a single string. In the asynchronous API, STDOUT and STDERR are modeled as objects of type Supply, and hence are streams of events. Since supplies can emit events in parallel, we need a thread-safe data structure for collecting the result, and Perl 6 conveniently provides a Channel for that:

my $collector = Channel.new;

To actually get the output from the program, we need to tap into the STDOUT and STDERR streams:

for $proc.stdout, $proc.stderr -> $supply {
    $supply.tap: { $collector.send($_) }
}

Each supply executes the block { $collector.send($_) } for each string it receives. The string can be a character, a line or something larger if the stream is buffered. All we do with it is put the string into the channel $collector via the send method.

Now that the streams are tapped, we can start the program and wait for it to finish:

my $result = $proc.start.result;

Proc::Async.start executes the external process and returns a Promise. A promise wraps a piece of code that potentially runs on another thread, has a status (Planned, Kept or Broken), and once it's finished, a result. Accessing the result automatically waits for the wrapped code to finish. Here the code is the one that runs the external program and the result is an object of type Proc (which happens to be the same as the run() function from the synchronous interface).

After this line, we can be sure that the external command has terminated, and thus no more output will come from $proc.stdout and $proc.stderr. Hence we can safely close the channel and access all its elements through Channel.list:

$collector.close;
my $output = $collector.list.join;

Finally it's time to check if the external command was successful -- by checking its exit code -- and to exit the wrapper with the command's exit code:

my $exitcode = $result.exitcode;
if $exitcode != 0 {
    say "Program @cmd[] exited with code $exitcode";
    print "Output:\n", $output if $output;
}
exit $exitcode;

Implementing Timeouts

The idiomatic way to implement timeouts in Perl 6 is to use the Promise.anyof combinator together with a timer:

sub MAIN(*@cmd, :$timeout) {
    my $proc = Proc::Async.new(|@cmd);
    my $collector = Channel.new;
    for $proc.stdout, $proc.stderr -> $supply {
        $supply.tap: { $collector.send($_) }
    }
    my $promise = $proc.start;
    my $waitfor = $promise;
    $waitfor = Promise.anyof(Promise.in($timeout), $promise)
        if $timeout;
    await $waitfor;

The initialization of $proc hasn't changed. But instead of accessing $proc.start.result, we store the promise returned from $proc.start. If the user specified a timeout, we run this piece of code:

$waitfor = Promise.anyof(Promise.in($timeout), $promise)

Promise.in($seconds) returns a promise that will be fulfilled in $seconds seconds. It's basically the same as start { sleep $seconds }, but the scheduler can be a bit smarter about not allocating a whole thread just for sleeping.

Promise.anyof($p1, $p2) returns a promise that is fulfilled as soon as one of the arguments (which should also be promises) is fulfilled. So we wait either until the external program finished, or until the sleep promise is fulfilled.

With await $waitfor; the program waits for the promise to be fulfilled (or broken). When that is the case, we can't simply access $promise.result as before, because $promise (which is the promise for the external program) might not be fulfilled in the case of a timeout. So we have to check the status of the promise first and only then can we safely access $promise.result:

if !$timeout || $promise.status ~~ Kept {
    my $exitcode = $promise.result.exitcode;
    if $exitcode != 0 {
        say "Program @cmd[] exited with code $exitcode";
        print "Output:\n", $output if $output;
    }
    exit $exitcode;
}
else {
    ...
}

In the else { ... } branch, we need to handle the timeout case. This might be as simple as printing a statement that a timeout has occurred, and when silent-cron exits immediately afterwards, that might be acceptable. But we might want to do more in the future, so we should kill the external program. And if the program doesn't terminate after the friendly kill signal, it should receive a kill(9), which on UNIX systems forcefully terminates the program:

else {
    $proc.kill;
    say "Program @cmd[] did not finish after $timeout seconds";
    sleep 1 if $promise.status ~~ Planned;
    $proc.kill(9);
    await $promise;
    exit 2;
}

await $promise returns the result of the promise, so here a Proc object. Proc has a safety feature built in that if the command returned with a non-zero exit code, evaluating the object in void context throws an exception.

Since we explicitly handle the non-zero exit code in the code, we can suppress the generation of this exception by assigning the return value from await to a dummy variable:

my $dummy = await $promise

Since we don't need the value, we can also assign it to an anonymous variable instead:

$ = await $promise

More on Promises

If you have worked with concurrent or parallel programs in other languages, you might have come across threads, locks, mutexes, and other low-level constructs. These exist in Perl 6 too, but their direct usage is discouraged.

The problem with such low-level primitives is that they don't compose well. You can have two libraries that use threads and work fine on their own, but lead to deadlocks when combined within the same program. Or different components might launch threads on their own, which can lead to too many threads and high memory consumption when several such components come together in the same process.

Perl 6 provides higher-level primitives. Instead of spawning a thread, you use start to run code asynchronously and the scheduler decides which thread to run this on. If more start calls happen that ask for threads to schedule things on, some will run serially.

Here is a very simple example of running a computation in the background:

sub count-primes(Int $upto) {
    (1..$upto).grep(&is-prime).elems;
}

my $p = start count-primes 10_000;
say $p.status;
await $p;
say $p.result;

It gives this output:

Planned
1229

You can see that the main line of execution continued after the start call, and $p immediately had a value -- the promise, with status Planned.

As we've seen before, there are combinators for promises, anyof and allof. You can also chain actions to a promise using the then method:

sub count-primes(Int $upto) {
    (1..$upto).grep(&is-prime).elems;
}

my $p1 = start count-primes 10_000;
my $p2 = $p1.then({ say .result });
await $p2;

If an exception is thrown inside asynchronously executing code, the status of the promise becomes Broken, and calling its .result method re-throws the exception.

As a demonstration of the scheduler distributing tasks, let's consider a small Monte Carlo simulation to calculate an approximation for π. We generate a pair of random numbers between zero and one, and interpret them as dots in a square. A quarter circle with radius one covers the area of π/4, so the ratio of randomly placed dots within the quarter circle to the total number of dots approaches π/4, if we use enough dots.

sub pi-approx($iterations) {
    my $inside = 0;
    for 1..$iterations {
        my $x = 1.rand;
        my $y = 1.rand;
        $inside++ if $x * $x + $y * $y <= 1;
    }
    return ($inside / $iterations) * 4;
}
my @approximations = (1..1000).map({ start pi-approx(80) });
await @approximations;

say @approximations.map({.result}).sum / @approximations;

The program starts one thousand computations asynchronously, but if you look at a system monitoring tool while it runs, you'll observe only 16 threads running. This magic number comes from the default thread scheduler, and we can override it by providing our own instance of a scheduler above the previous code:

my $*SCHEDULER = ThreadPoolScheduler.new(:max_threads(3));

For CPU bound tasks like this Monte Carlo Simulation, it is a good idea to limit the number of threads roughly to the number of (possibly virtual) CPU cores; if many threads are stuck waiting for IO, a higher number of threads can yield better performance.

Possible Extensions

If you want to play with silent-cron, you could add a retry mechanism. If a command fails because of an external dependency (like an API or an NFS share), it might take time for that external dependency to recover. Hence you should add a quadratic or exponential backoff, that is, the wait time between retries should increase quadratically (1, 2, 4, 9, 16, ...) or exponentially (1, 2, 4, 8, 16, 32, ...).

Summary

We've seen an asynchronous API for running external programs and how to use Promises to implement timeouts. We've also discussed how promises are distributed to threads by a scheduler, allowing you to start an arbitrary number of promises without overloading your computer.

Subscribe to the Perl 6 book mailing list

* indicates required

[/perl-6] Permanent link