#!/usr/bin/perl -T

#------------------------------------------------------------------------------
# This is amavis-services, a set of supervisor processes for amavisd-new.
#
# Author: Mark Martinec <mark.martinec@ijs.si>
# Copyright (C) 2012  Mark Martinec,  All Rights Reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
#   this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
#   this list of conditions and the following disclaimer in the documentation
#   and/or other materials provided with the distribution.
# * Neither the name of the author, nor the name of the "Jozef Stefan"
#   Institute, nor the names of contributors may be used to endorse or
#   promote products derived from this software without specific prior
#   written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
# OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
# OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
# OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
#(the license above is the new BSD license, and pertains to this program only)
#
# Patches and problem reports are welcome.
# The latest version of this program is available at:
#   http://www.ijs.si/software/amavisd/
#------------------------------------------------------------------------------

use strict;
use re 'taint';
use warnings;
use warnings FATAL => qw(utf8 void);
no warnings 'uninitialized';

use vars qw($VERSION);  $VERSION = 2.002;

use Errno qw(ESRCH ENOENT);
use POSIX qw(strftime);
use Time::HiRes ();
use Unix::Syslog qw(:macros :subs);

use vars qw($myproduct_name $myversion_id $myversion_date $myversion);
BEGIN {
  $myproduct_name = 'amavis-services';
  $myversion_id = '2.8.0'; $myversion_date = '20120417';
  $myversion = "$myproduct_name-$myversion_id ($myversion_date)";
}

use vars qw($zmq_mod_name $zmq_mod_version $zmq_lib_version);
BEGIN {
  my($zmq_major, $zmq_minor, $zmq_patch);
  if (eval { require ZMQ::LibZMQ3 && require ZMQ::Constants }) {
    $zmq_mod_name = 'ZMQ::LibZMQ3';  # new interface module to zmq v3 or cx
    import ZMQ::LibZMQ3;  import ZMQ::Constants qw(:all);
    ($zmq_major, $zmq_minor, $zmq_patch) = ZMQ::LibZMQ3::zmq_version();
  } elsif (eval { require ZMQ::LibZMQ2 && require ZMQ::Constants }) {
    $zmq_mod_name = 'ZMQ::LibZMQ2';  # new interface module to zmq v2
    import ZMQ::LibZMQ2;  import ZMQ::Constants qw(:all);
    ($zmq_major, $zmq_minor, $zmq_patch) = ZMQ::LibZMQ2::zmq_version();
    # zmq v2/v3 incompatibile renaming
    *zmq_recvmsg = \&ZMQ::LibZMQ2::zmq_recv;
    *zmq_sendmsg = \&ZMQ::LibZMQ2::zmq_send;
  } elsif (eval { require ZeroMQ::Constants && require ZeroMQ::Raw }) {
    $zmq_mod_name = 'ZeroMQ';  # old interface module to zmq v2
    import ZeroMQ::Raw;  import ZeroMQ::Constants qw(:all);
    ($zmq_major, $zmq_minor, $zmq_patch) = ZeroMQ::version();
    # zmq v2/v3 incompatibile renaming
    *zmq_recvmsg = \&ZeroMQ::Raw::zmq_recv;
    *zmq_sendmsg = \&ZeroMQ::Raw::zmq_send;
  } else {
    die "Perl modules ZMQ::LibZMQ3 or ZMQ::LibZMQ2 or ZeroMQ not available\n";
  }
  $zmq_mod_version = $zmq_mod_name->VERSION;
  $zmq_lib_version = join('.', $zmq_major, $zmq_minor, $zmq_patch);
}

sub zmq_version {
  sprintf("%s %s, lib %s",
          $zmq_mod_name, $zmq_mod_version, $zmq_lib_version);
};

my $idle_ttl = 4*60*60; # idle children are sent a SIGTERM
                        #   after this many seconds
my $active_ttl = 15*60; # stuck active children are sent a SIGTERM
                        #   after this many seconds

my $MYHOME = '/var/amavis';

my $log_level = 0;
my $syslog_ident = $myproduct_name;
my $syslog_facility = LOG_MAIL;

my $inner_sock_specs = "ipc://$MYHOME/amavisd-zmq.sock";
my $outer_sock_specs = "tcp://127.0.0.1:23232";
my $snmp_sock_specs  = "tcp://127.0.0.1:23233";  # tcp://*:23233

my($interrupted, $syslog_open);
my($zmq_ctx, $inner_sock, $outer_sock, $snmp_sock);

my $zmq_poll_units = 1000;  # milliseconds since zmq v3
$zmq_poll_units *= 1000  if $zmq_lib_version =~ /^[012]\./;  # microseconds

sub ll($) {
  my($level) = @_;
  $level <= $log_level;
}

sub do_log($$;@) {
  my($level,$errmsg,@args) = @_;
  if ($level <= $log_level) {
    # treat $errmsg as sprintf format string if additional arguments provided
    if (@args) { $errmsg = sprintf($errmsg,@args) }
    if (!$syslog_open) {
      print STDERR $errmsg."\n";  # ignoring I/O status, except SIGPIPE
    } else {
      my $prio = $level <= -2 ? LOG_ERR
               : $level <= -1 ? LOG_WARNING
               : $level <=  0 ? LOG_NOTICE
               : $level <=  1 ? LOG_INFO
               :                LOG_DEBUG;
      syslog(LOG_INFO, "%s", $errmsg);
    }
  }
}

sub process_message($$) {
  my($process_states_ref,$msg) = @_;
  my $val;
  if ($msg) {
    $val = zmq_msg_data($msg);
    zmq_msg_close($msg); undef $msg;
  }
  do_log(2, "got: %s\n", !$msg ? "err: $!" : $val);
  if (!defined $val) {
    # should not happen (except on a failure of zmq_recvmsg)
  } elsif ($val =~ /^am\.st \d+\s+/s) {
    my($subscription_chan, $pid, $time, $state, $task_id) = split(' ',$val);
    if ($state eq 'FLUSH') {
      %$process_states_ref = ();  # flush all kept state (e.g. on a restart)
    } elsif ($state eq 'exiting' || $state eq 'purged') {
      delete $process_states_ref->{$pid};  # may or may not exist
    } else {
      $state = ' ' if $state eq '-';
      my $p = $process_states_ref->{$pid};
      if ($p) {
        $p->{state} = $state;
        $p->{task_id} = $task_id;
      } else {  # new process appeared
        $process_states_ref->{$pid} = $p = {
          state     => $state,
          task_id   => $task_id,
          timestamp => undef,
          base_timestamp => undef,
        };
      }
      my $now = Time::HiRes::time;
      if ($time > 1e9) {  # Unix time in seconds with fraction (> Y2000)
        $p->{base_timestamp} = $p->{timestamp} = $time;
      } elsif (!$p->{base_timestamp}) {  # delta time but no base
        $p->{timestamp} = $now;
        $p->{base_timestamp} = $p->{timestamp} - $time/1000;  # estimate
      } else {  # delta time since base_timestamp in ms
        $p->{timestamp} = $p->{base_timestamp} + $time/1000;
      }
      $p->{tick} = $now;
    }
  }
  1;
}

sub check_proc($) {
  my($process_states_ref) = @_;
  do_log(2,"CHECK");
  while (my($pid,$p) = each %$process_states_ref) {
    my $now = Time::HiRes::time;
    my $age = $now - $p->{base_timestamp};
    my $idling = $p->{task_id} eq '' && $p->{state} =~ /^[. ]\z/s;
    my $overdue = $age > ($idling ? $idle_ttl : $active_ttl);
    my $n;
    if (!$overdue && $now - $p->{tick} < 10) {
      $n = 1;  # recently heard from it, assume it is still there
      do_log(2, "PID %d skipped, recently heared from\n", $pid);
    } elsif (!$overdue && $idling &&
             $p->{last_checked_timestamp} &&
             $now - $p->{last_checked_timestamp} < 60) {
      $n = 1;  # recently checked, idle, assume it is still there
      do_log(2, "PID %d skipped, recently checked\n", $pid);
    } elsif (!$overdue && $p->{last_checked_timestamp} &&
                          $now - $p->{last_checked_timestamp} < 10) {
      $n = 1;  # recently checked, busy, assume it is still there
      do_log(2, "PID %d skipped, recently checked\n", $pid);
    } else {
      do_log(2, "PID %d checking\n", $pid);
      $p->{last_checked_timestamp} = $now;
      $n = kill(0,$pid);  # test if the process is still there
      if ($n == 0) {
        # ESRCH means there is no such process
        if ($! != ESRCH) {
          die "Can't check the process $pid: $!";
        } elsif (defined $p->{sig_sent}) {
          do_log(2, "PID %d sucessfully terminated by SIG%s, %s\n",
                    $pid, $p->{sig_sent}, $p->{task_id} || $p->{state});
        } else {
          do_log(2, "PID %d went away, %s\n",
                    $pid, $p->{task_id} || $p->{state} );
        }
        delete $process_states_ref->{$pid};
        zmq_sendmsg($inner_sock,
                    sprintf('am.st %s %014.3f purged', $pid,$now), 0) != -1
          or die "zmq_sendmsg failed: $!"
      }
    }
    if ($n == 0) {
      # already dealt with
    } elsif (!$overdue) {  # life is good
      do_log(2, "PID %d: %s\n", $pid, $p->{task_id} || $p->{state} );
    } elsif (!$p->{sig_sent} ||
             $p->{sig_sent_timestamp} + $p->{sig_sent_retry_in} >= $now) {
      # overdue, terminate or kill, or retry the killing
      if (!$p->{sig_sent}) {
        $p->{sig_sent} = 'TERM';
        $p->{sig_sent_retry_in} = 20;
      } else {
        $p->{sig_sent} = 'KILL';
        $p->{sig_sent_retry_in} *= 1.5;  # increase the wait time for a retry
      }
      $p->{sig_sent_timestamp} = $now;
      if (kill($p->{sig_sent},$pid)) {
        do_log(2, "PID %d SIG%s, %s\n",
                  $pid, $p->{sig_sent}, $p->{task_id} || $p->{state});
      } elsif ($! == ESRCH) {
        # already gone by now, no fuss
      } else {
        warn "Can't $p->{sig_sent} the [$pid]: $!";
      }
      if ($p->{sig_sent_retry_in} > 600) {
        do_log(2, "Giving up on PID %d, %s\n",
                  $pid, $p->{task_id} || $p->{state});
        delete $process_states_ref->{$pid};
        zmq_sendmsg($inner_sock,
                    sprintf('am.st %s %014.3f purged', $pid,$now), 0) != -1
          or die "zmq_sendmsg failed: $!"
      }
    }
  }
}

# The childproc_minder process receives information about amavisd child
# processes from an outer socket, forwarded there by a forwarding process.
# Based on updates received, and based on its own processes sanity checks,
# it maintains a state in memory of each amavisd child process.
#
# It must run on the same host as amavisd child processes in order to be
# able to check for lost or crashed processes.
#
# It kills amavisd child processes which are active longer than $active_ttl
# seconds, or are idling for more than $idle_ttl seconds. For killed or lost
# processes it sends 'purged' messages to the inner socket, and periodically
# sends a list of amavisd child process PIDs to the inner socket for the
# benefit of more ephemeral clients.
#
sub childproc_minder() {
  $zmq_ctx = zmq_init(1);
  $zmq_ctx or die "Can't create ZMQ context: $!";

  $outer_sock = zmq_socket($zmq_ctx, ZMQ_SUB);
  $outer_sock or die "Can't create outer ZMQ socket: $!";
  zmq_setsockopt($outer_sock, ZMQ_LINGER, 2000) != -1  # milliseconds
    or die "zmq_setsockopt LINGER failed: $!";
# zmq_setsockopt($outer_sock, ZMQ_IPV4ONLY, 0) != -1
#   or die "zmq_setsockopt failed: $!";
  zmq_setsockopt($outer_sock, ZMQ_SUBSCRIBE, 'am.st ') != -1
    or die "zmq_setsockopt SUBSCRIBE failed: $!";

  zmq_connect($outer_sock, $outer_sock_specs) != -1
    or die "zmq_connect to $outer_sock_specs failed: $!";

  $inner_sock = zmq_socket($zmq_ctx, ZMQ_PUB);
  $inner_sock or die "Can't create inner ZMQ socket: $!";
  zmq_setsockopt($inner_sock, ZMQ_LINGER, 2000) != -1  # milliseconds
    or die "zmq_setsockopt LINGER failed: $!";
# zmq_setsockopt($inner_sock, ZMQ_IPV4ONLY, 0) != -1
#   or die "zmq_setsockopt IPV4ONLY failed: $!";

# my $hwm = $zmq_lib_version =~ /^[012]\./ && defined &ZMQ_HWM ? &ZMQ_HWM
#                                     : defined &ZMQ_SNDHWM ? &ZMQ_SNDHWM
#                                     : undef;
# if (defined $hwm) {
#   zmq_setsockopt($inner_sock, $hwm, 100) != -1
#     or die "zmq_setsockopt HWM failed: $!";
# }

  zmq_connect($inner_sock, $inner_sock_specs) != -1
    or die "zmq_connect to $inner_sock_specs failed: $!";

  my $last_checked = 0;
  my $last_proclist_sent = 0;
  my %process_states;  # associative array on pid

  my $now = Time::HiRes::time;
  zmq_sendmsg($inner_sock,
              sprintf('am.st %s %014.3f FLUSH', $$, $now), 0) != -1
    or die "zmq_sendmsg failed: $!";

  for (;;) {
    zmq_poll(
      [
        { socket => $outer_sock,
          events => ZMQ_POLLIN,
           callback => sub { process_message(\%process_states,
                                             zmq_recvmsg($outer_sock)) },
        },
      ],
      1 * $zmq_poll_units
    ) != -1  or die "zmq_poll failed: $!";

    $now = Time::HiRes::time;
    if ($last_checked + 1 < $now) {
      check_proc(\%process_states);
      $last_checked = $now;
      if ($last_proclist_sent + 5 < $now) {
        my(@proc_idle_list, @proc_busy_list);
        for my $pid (sort { $a <=> $b } keys %process_states) {
          my $p = $process_states{$pid};
          if ($p->{task_id} eq '' && $p->{state} =~ /^[. ]\z/s) {
            push(@proc_idle_list, $pid);
          } else {
            push(@proc_busy_list, $pid);
          }
        }
        zmq_sendmsg($inner_sock,
                    'am.proc.busy ' . join(' ', @proc_busy_list), 0) != -1
          or die "zmq_sendmsg failed: $!";
        zmq_sendmsg($inner_sock,
                    'am.proc.idle ' . join(' ', @proc_idle_list), 0) != -1
          or die "zmq_sendmsg failed: $!";
        $last_proclist_sent = $now;
      }
    }
  } # forever
  # not reached
}

sub snmp_aggregator() {
  $zmq_ctx = zmq_init(1);
  $zmq_ctx or die "Can't create ZMQ context: $!";

  $outer_sock = zmq_socket($zmq_ctx, ZMQ_SUB);
  $outer_sock or die "Can't create outer ZMQ socket: $!";
  zmq_setsockopt($outer_sock, ZMQ_LINGER, 2000) != -1  # milliseconds
    or die "zmq_setsockopt LINGER failed: $!";
# zmq_setsockopt($outer_sock, ZMQ_IPV4ONLY, 0) != -1
#   or die "zmq_setsockopt IPV4ONLY failed: $!";
  zmq_setsockopt($outer_sock, ZMQ_SUBSCRIBE, 'am.snmp ') != -1
    or die "zmq_setsockopt SUBSCRIBE failed: $!";

  zmq_connect($outer_sock, $outer_sock_specs) != -1
    or die "zmq_connect to $outer_sock_specs failed: $!";

  $snmp_sock = zmq_socket($zmq_ctx, ZMQ_PUB);
  $snmp_sock or die "Can't create ZMQ socket: $!";
  zmq_setsockopt($snmp_sock, ZMQ_LINGER, 2000) != -1  # milliseconds
    or die "zmq_setsockopt LINGER failed: $!";
# zmq_setsockopt($snmp_sock, ZMQ_IPV4ONLY, 0) != -1
#   or die "zmq_setsockopt IPV4ONLY failed: $!";

# my $hwm = $zmq_lib_version =~ /^[012]\./ && defined &ZMQ_HWM ? &ZMQ_HWM
#                                     : defined &ZMQ_SNDHWM ? &ZMQ_SNDHWM
#                                     : undef;
# if (defined $hwm) {
#    zmq_setsockopt($inner_sock, $hwm, 2000) != -1
#      or die "zmq_setsockopt HWM failed: $!";
# }

  zmq_bind($snmp_sock, $snmp_sock_specs) != -1
    or die "zmq_bind to $snmp_sock_specs failed: $!";

  my $interval = 5;  # seconds

  my(%snmp_var, %snmp_type);
  my $now = Time::HiRes::time;
  my $start_time = $now;
  my $last_send_time = 0;

  for (;;) {
    $now = Time::HiRes::time;
    my $next_scheduled_dt = $last_send_time + $interval - $now;
    $next_scheduled_dt = 0  if $next_scheduled_dt < 0;
    zmq_poll(
      [
        { socket => $outer_sock,
          events => ZMQ_POLLIN,
          callback => sub {
            my($msg, $chan, $key, $type, $val);
            $msg = zmq_recvmsg($outer_sock);
            if ($msg) {
              $val = zmq_msg_data($msg);
              ($chan, $key, $type, $val) = split(' ', $val, 4);
              zmq_msg_close($msg); undef $msg;
            }
            next  if $chan ne 'am.snmp';
            if (!$snmp_var{$key}) {
              $snmp_var{$key} = $val;
              $snmp_type{$key} = $type;
            } elsif ($type =~ /^(C32|C64|TIM)\z/) {  # a counter
              $snmp_var{$key} += $val;
            } else {
              $snmp_var{$key} = $val;
            }
          },
        },
      ],
      int($next_scheduled_dt * $zmq_poll_units)
    ) != -1  or die "zmq_poll failed: $!";

    $now = Time::HiRes::time;
    if ($now - $last_send_time > $interval) {
      $snmp_var{'sysUpTime'} = int($start_time);  # override by our time
      $snmp_type{'sysUpTime'} = 'INT';
      while (my($key,$val) = each %snmp_var) {
        zmq_sendmsg($snmp_sock,
                    join(' ', 'am.snmp', $key,
                              $snmp_type{$key}, $snmp_var{$key}), 0) != -1
          or die "zmq_sendmsg failed: $!"

      }
      $last_send_time = $now;
    }
  }
  # not reached
}

# Forward messages from inner ZMQ socket to outer ZMQ socket.
# Binding (not connecting) to both sockets provides a single stable point
# in the system.
#
# Amavisd child processes are dynamic and connect to the inner socket,
# supplying information. Similarly the childproc_minder process occasionally
# feeds its supplementaty status updates to this inner socket.
#
# Dynamic clients like amavisd-nanny, amavisd-snmp-subagent, amavisd-agent,
# and a childproc_minder process connect to the outer socket to receive
# information from there.
#
sub msg_forwarder() {
  $zmq_ctx = zmq_init(1);
  $zmq_ctx or die "Can't create a ZMQ context";

  # receive from amavisd child processes
  $inner_sock = zmq_socket($zmq_ctx, ZMQ_SUB);
  $inner_sock or die "Error creating inner ZMQ socket: $!";
  zmq_setsockopt($inner_sock, ZMQ_LINGER, 2000) != -1  # milliseconds
    or die "zmq_setsockopt LINGER failed: $!";
# zmq_setsockopt($inner_sock, ZMQ_IPV4ONLY, 0) != -1
#   or die "zmq_setsockopt IPV4ONLY failed: $!";

# zmq_setsockopt($inner_sock, ZMQ_SUBSCRIBE, '') != -1
#   or die "zmq_setsockopt SUBSCRIBE failed: $!";
  zmq_setsockopt($inner_sock, ZMQ_SUBSCRIBE, 'am.log.') != -1
    or die "zmq_setsockopt SUBSCRIBE failed: $!";
  zmq_setsockopt($inner_sock, ZMQ_SUBSCRIBE, 'am.proc.') != -1
    or die "zmq_setsockopt SUBSCRIBE failed: $!";
  zmq_setsockopt($inner_sock, ZMQ_SUBSCRIBE, 'am.st ') != -1
    or die "zmq_setsockopt SUBSCRIBE failed: $!";
  zmq_setsockopt($inner_sock, ZMQ_SUBSCRIBE, 'am.snmp ') != -1
    or die "zmq_setsockopt SUBSCRIBE failed: $!";

  zmq_bind($inner_sock, $inner_sock_specs) != -1
    or die "zmq_bind to $inner_sock_specs failed: $!";

  # forward to a public outer socket
  # to clients like amavisd-nanny, amavisd-agent, amavisd-snmp-subagent
  $outer_sock = zmq_socket($zmq_ctx, ZMQ_PUB);
  $outer_sock or die "Error creating outer ZMQ socket: $!";
  zmq_setsockopt($outer_sock, ZMQ_LINGER, 2000) != -1  # milliseconds
    or die "zmq_setsockopt LINGER failed: $!";
# zmq_setsockopt($outer_sock, ZMQ_IPV4ONLY, 0) != -1
#   or die "zmq_setsockopt IPV4ONLY failed: $!";
  zmq_bind($outer_sock, $outer_sock_specs) != -1
    or die "zmq_bind to $outer_sock_specs failed: $!";

  # start forwarding
  if ($zmq_mod_name eq 'ZeroMQ') {
    zmq_device(ZMQ_FORWARDER, $inner_sock, $outer_sock);

  } else {  # ZMQ_FORWARDER device is no longer available in 3.1
#   $| = 1  if $debug;
    my $cnt = 0;
    for (;;) {
      $cnt++;
      for (;;) {
        my $message = zmq_recvmsg($inner_sock);
        $message or die "zmq_recvmsg failed: $!";
        my $more = zmq_getsockopt($inner_sock, ZMQ_RCVMORE);
        $more != -1  or die "zmq_getsockopt RCVMORE failed: $!";
        zmq_sendmsg($outer_sock, $message, $more ? ZMQ_SNDMORE : 0) != -1
          or die "zmq_sendmsg failed: $!";
        last if !$more;
      }
#     if ($debug) {
#       print ".";
#       printf(" %d\n", $cnt)  if $cnt % 100 == 0;
#     }
    }
  }
  # not reached
}

sub usage() {
  my $me = $0; local $1; $me =~ s{([^/]*)\z}{$1}s;
  "Usage: $me [-d log_level] (msg-forwarder|childproc-minder|snmp-aggregator)";
}

# main program starts here

my $normal_termination = 0;

openlog($syslog_ident, LOG_PID | LOG_NDELAY, $syslog_facility);
$syslog_open = 1;

$SIG{'__DIE__' } =
  sub { if (!$^S) { my($m) = @_; chomp($m); do_log(-1,"_DIE: %s", $m) } };
$SIG{'__WARN__'} =
  sub { my($m) = @_; chomp($m); do_log(0,"_WARN: %s",$m) };

my $task_name;

my(@argv) = @ARGV;  # preserve @ARGV, may modify @argv
while (@argv >= 2 && $argv[0] =~ /^-[d]\z/ ||
       @argv >= 1 && $argv[0] =~ /^-/) {
  my($opt,$val);
  $opt = shift @argv;
  $val = shift @argv  if $opt !~ /^-[hVf-]\z/;  # these take no arguments
  if ($opt eq '--') {
    last;
  } elsif ($opt eq '-h') {  # -h  (help)
    printf STDERR ("%s\n\n%s\n", $myversion, usage());
  } elsif ($opt eq '-V') {  # -V  (version)
    printf STDERR ("%s\n", $myversion);
  } elsif ($opt eq '-d') {  # -d log_level or -d SAdbg1,SAdbg2,..,SAdbg3
    $val =~ /^\d+\z/  or die "Bad value for option -d: $val\n";
    $log_level = untaint($val)  if $val =~ /^\d+\z/;
  } else {
    printf STDERR ("Error in command line options: %s\n\n%s\n", $opt, usage());
    exit 1;
  }
}
if (@argv == 1 &&
    $argv[0] =~ /^(?:msg-forwarder|childproc-minder|snmp-aggregator)\z/) {
  $task_name = $argv[0];
} else {
  printf STDERR ("Error parsing a command line %s\n\n%s\n",
                 join(' ',@ARGV), usage());
  exit 1;
}

do_log(0, "%s task '%s' [%d] started. %s\n",
          $myversion, $task_name, $$, zmq_version());

eval {  # catch TERM and INT signals for a controlled shutdown
  my $h = sub { $interrupted = $_[0]; die "\n" };
  local $SIG{INT}  = $h;
  local $SIG{TERM} = $h;
  if ($task_name eq 'msg-forwarder') {
    msg_forwarder();
  } elsif ($task_name eq 'childproc-minder') {
    childproc_minder();
  } elsif ($task_name eq 'snmp-aggregator') {
    snmp_aggregator();
  } else {
    die "Unrecognized task name: $task_name";
  }
};  # until interrupted
do_log(0, "Task '%s' [%d] shutting down", $task_name, $$);

if ($inner_sock) {
  do_log(0, "%s closing inner socket", $task_name);
  zmq_close($inner_sock);  # ignoring status
}
if ($outer_sock) {
  do_log(0, "%s closing outer socket", $task_name);
  zmq_close($outer_sock);  # ignoring status
}
if ($snmp_sock) {
  do_log(0, "%s closing SNMP socket", $task_name);
  zmq_close($snmp_sock);  # ignoring status
}
if ($zmq_ctx) {
  do_log(0, "%s closing context", $task_name);
  zmq_term($zmq_ctx);  # ignoring status
}

END {
  do_log(0, "Task '%s' [%d] exiting: %s",
            $task_name, $$, $interrupted) if !$normal_termination;
}
