#!/usr/bin/perl -T

#------------------------------------------------------------------------------
# This is amavisd-hub, a set of supervisor processes for amavisd-new.
#
# Author: Mark Martinec <mark.martinec@ijs.si>
# Copyright (C) 2012  Mark Martinec,  All Rights Reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
#   this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
#   this list of conditions and the following disclaimer in the documentation
#   and/or other materials provided with the distribution.
# * Neither the name of the author, nor the name of the "Jozef Stefan"
#   Institute, nor the names of contributors may be used to endorse or
#   promote products derived from this software without specific prior
#   written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
# OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
# OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
# OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
#(the license above is the new BSD license, and pertains to this program only)
#
# Patches and problem reports are welcome.
# The latest version of this program is available at:
#   http://www.ijs.si/software/amavisd/
#------------------------------------------------------------------------------

use strict;
use re 'taint';
use warnings;
use warnings FATAL => qw(utf8 void);
no warnings 'uninitialized';

use Errno qw(ESRCH ENOENT);
use POSIX qw(strftime);
use Time::HiRes ();
use ZMQ qw(:all);
use ZMQ::Raw qw(zmq_poll zmq_device);

use vars qw($VERSION);  $VERSION = 2.000;

my $daemon_user  = 'vscan';
my $daemon_group = 'vscan';

my $idle_ttl = 4*60*60; # idle children are sent a SIGTERM
                        #   after this many seconds
my $active_ttl = 15*60; # stuck active children are sent a SIGTERM
                        #   after this many seconds

my $MYHOME = '/var/amavis';

my $inner_sock_specs = "ipc://$MYHOME/amavisd-zmq.sock";

my $outer_sock_specs = "tcp://127.0.0.1:23232";
                     #  tcp://lo0:23232
                     #  tcp://[2001:db8::1]:23232

my $snmp_sock_specs  = "tcp://127.0.0.1:23233";
                     #  tcp://*:23233

my($zmq_ctx, $inner_sock, $outer_sock, $snmp_sock);
my $debug = 0;


# drop privileges
#
sub drop_priv($$) {
  my($desired_user,$desired_group) = @_;
  local($1);
  my($username,$passwd,$uid,$gid) =
    $desired_user=~/^(\d+)$/ ? (undef,undef,$1,undef) :getpwnam($desired_user);
  defined $uid or die "drop_priv: No such username: $desired_user\n";
  if ($desired_group eq '') { $desired_group = $gid }  # for logging purposes
  else { $gid = $desired_group=~/^(\d+)$/ ? $1 : getgrnam($desired_group) }
  defined $gid or die "drop_priv: No such group: $desired_group\n";
  $( = $gid;  $) = "$gid $gid";   # real and effective GID
  POSIX::setgid($gid) or die "drop_priv: Can't setgid to $gid: $!";
  POSIX::setuid($uid) or die "drop_priv: Can't setuid to $uid: $!";
  $> = $uid; $< = $uid;  # just in case
# print STDERR "desired user=$desired_user ($uid), current: EUID: $> ($<)\n";
# print STDERR "desired group=$desired_group ($gid), current: EGID: $) ($()\n";
  $> != 0 or die "drop_priv: Still running as root, aborting\n";
  $< != 0 or die "Effective UID changed, but Real UID is 0, aborting\n";
}

# daemonize by double-forking and disassociating from a controlling terminal
#
sub daemonize() {
  STDOUT->autoflush(1);
  STDERR->autoflush(1);
  my $pid;
  # the first fork allows the shell to return and allows doing a setsid
  eval {
    $pid = fork(); 1;
  } or do {
    my $eval_stat = $@ ne '' ? $@ : "errno=$!";  chomp $eval_stat;
    die "Error forking #1: $eval_stat";
  };
  defined $pid  or die "Can't fork #1: $!";
  if ($pid) {  # parent process terminates here
    exit;
  }

  # disassociate from a controlling terminal
  my $pgid = POSIX::setsid();
  defined $pgid && $pgid >= 0 or die "Can't start a new session: $!";

  # We are now a session leader. As a session leader, opening a file
  # descriptor that is a terminal will make it our controlling terminal.
  # The second fork makes us NOT a session leader. Only session leaders
  # can acquire a controlling terminal, so we may now open up any file
  # we wish without worrying that it will become a controlling terminal.

  # second fork prevents from accidentally reacquiring a controlling terminal
  eval {
    $pid = fork(); 1;
  } or do {
    my $eval_stat = $@ ne '' ? $@ : "errno=$!";  chomp $eval_stat;
    die "Error forking #2: $eval_stat";
  };
  defined $pid  or die "Can't fork #2: $!";
  if ($pid) {  # parent process terminates here
    exit;
  }

  # a daemonized child process, live long and prosper...

  chdir('/')  or die "Can't chdir to '/': $!";

  close(STDIN)                or die "Can't close STDIN: $!";
  close(STDOUT)               or die "Can't close STDOUT: $!";
  open(STDIN,  '</dev/null')  or die "Can't open /dev/null: $!";
  open(STDOUT, '>/dev/null')  or die "Can't open /dev/null: $!";
  close(STDERR)               or die "Can't close STDERR: $!";
  open(STDERR, '>&STDOUT')    or die "Can't dup STDOUT: $!";
}

sub process_message($$) {
  my($process_states_ref,$msg) = @_;
  printf("got: %s\n", !$msg ? "err: $!" : $msg->data) if $debug;
  my $val = $msg->data;
  if ($val !~ /^am\.st \d+\s+/s) {
    print STDERR "Unrecognized message received: $val\n";
  } else {
    my($subscription_chan, $pid, $time, $state, $task_id) = split(' ',$val);
    if ($state eq 'exiting' || $state eq 'purged') {
      delete $process_states_ref->{$pid};  # may or may not exist
    } else {
      $state = ' ' if $state eq '-';
      my $p = $process_states_ref->{$pid};
      if ($p) {
        $p->{state} = $state;
        $p->{task_id} = $task_id;
      } else {  # new process appeared
        $process_states_ref->{$pid} = $p = {
          state     => $state,
          task_id   => $task_id,
          timestamp => undef,
          base_timestamp => undef,
        };
      }
      my $now = Time::HiRes::time;
      if ($time > 1e9) {  # Unix time in seconds with fraction (> Y2000)
        $p->{base_timestamp} = $p->{timestamp} = $time;
      } elsif (!$p->{base_timestamp}) {  # delta time but no base
        $p->{timestamp} = $now;
        $p->{base_timestamp} = $p->{timestamp} - $time/1000;  # estimate
      } else {  # delta time since base_timestamp in ms
        $p->{timestamp} = $p->{base_timestamp} + $time/1000;
      }
      $p->{tick} = $now;
    }
  }
  1;
}

sub check_proc($) {
  my($process_states_ref) = @_;
  printf STDERR ("CHECK\n") if $debug;
  while (my($pid,$p) = each %$process_states_ref) {
    my $now = Time::HiRes::time;
    my $age = $now - $p->{base_timestamp};
    my $idling = $p->{task_id} eq '' && $p->{state} =~ /^[. ]\z/s;
    my $overdue = $age > ($idling ? $idle_ttl : $active_ttl);
    my $n;
    if (!$overdue && $now - $p->{tick} < 10) {
      $n = 1;  # recently heard from it, assume it is still there
      printf STDERR ("PID %d skipped, recently heared from\n", $pid) if $debug;
    } elsif (!$overdue && $idling &&
             $p->{last_checked_timestamp} &&
             $now - $p->{last_checked_timestamp} < 60) {
      $n = 1;  # recently checked, idle, assume it is still there
      printf STDERR ("PID %d skipped, recently checked\n", $pid) if $debug;
    } elsif (!$overdue && $p->{last_checked_timestamp} &&
                          $now - $p->{last_checked_timestamp} < 10) {
      $n = 1;  # recently checked, busy, assume it is still there
      printf STDERR ("PID %d skipped, recently checked\n", $pid) if $debug;
    } else {
      printf STDERR ("PID %d checking\n", $pid) if $debug;
      $p->{last_checked_timestamp} = $now;
      $n = kill(0,$pid);  # test if the process is still there
      if ($n == 0) {
        # ESRCH means there is no such process
        if ($! != ESRCH) {
          die "Can't check the process $pid: $!";
        } elsif (defined $p->{sig_sent}) {
          printf STDERR ("PID %d sucessfully terminated by SIG%s, %s\n",
                         $pid, $p->{sig_sent}, $p->{task_id} || $p->{state}
                        ) if $debug;
        } else {
          printf STDERR ("PID %d went away, %s\n",
                         $pid, $p->{task_id} || $p->{state} ) if $debug;
        }
        delete $process_states_ref->{$pid};
        $inner_sock->send(sprintf('am.st %s %014.3f purged', $pid,$now));
      }
    }
    if ($n == 0) {
      # already dealt with
    } elsif (!$overdue) {  # life is good
      printf STDERR ("PID %d: %s\n", $pid, $p->{task_id} || $p->{state} )
        if $debug;
    } elsif (!$p->{sig_sent} ||
             $p->{sig_sent_timestamp} + $p->{sig_sent_retry_in} >= $now) {
      # overdue, terminate or kill, or retry the killing
      if (!$p->{sig_sent}) {
        $p->{sig_sent} = 'TERM';
        $p->{sig_sent_retry_in} = 20;
      } else {
        $p->{sig_sent} = 'KILL';
        $p->{sig_sent_retry_in} *= 1.5;  # increase the wait time for a retry
      }
      $p->{sig_sent_timestamp} = $now;
      if (kill($p->{sig_sent},$pid)) {
        printf STDERR ("PID %d SIG%s, %s\n",
                       $pid, $p->{sig_sent}, $p->{task_id} || $p->{state}
                      ) if $debug;
      } elsif ($! == ESRCH) {
        # already gone by now, no fuss
      } else {
        warn "Can't $p->{sig_sent} the [$pid]: $!";
      }
      if ($p->{sig_sent_retry_in} > 600) {
        printf STDERR ("Giving up on PID %d, %s\n",
                       $pid, $p->{task_id} || $p->{state}) if $debug;
        delete $process_states_ref->{$pid};
        $inner_sock->send(sprintf('am.st %s %014.3f purged', $pid,$now));
      }
    }
  }
}

# The supervisor process receives information about amavisd child processes
# from an outer socket, forwarded there by a forwarding process. Based on
# updates received, and based on its own processes sanity checks, it maintains
# in memory a status of each amavisd child process. It must run on the same
# host as amavisd child processes in order to be able to check for lost
# or crashed processes.
#
# It kills amavisd child processes which are active longer than $active_ttl
# seconds, or are idling for more than $idle_ttl seconds. For killed or lost
# processes it sends 'purged' messages to the inner socket, and periodically
# sends a list of amavisd child process PIDs to the inner socket for the
# benefit of more ephemeral clients.
#
sub child_supervisor() {
  $zmq_ctx = ZMQ::Context->new;
  $zmq_ctx or die "Can't create ZMQ context: $!";

  $outer_sock = $zmq_ctx->socket(ZMQ_SUB);
  $outer_sock or die "Can't create outer ZMQ socket: $!";
  $outer_sock->setsockopt(ZMQ_IPV4ONLY, 0) == 0 or die "setsockopt failed";
  $outer_sock->setsockopt(ZMQ_SUBSCRIBE, 'am.st ');
  $outer_sock->connect($outer_sock_specs);

  $inner_sock = $zmq_ctx->socket(ZMQ_PUB);
  $inner_sock or die "Can't create inner ZMQ socket: $!";
  $inner_sock->setsockopt(ZMQ_IPV4ONLY, 0) == 0 or die "setsockopt failed";
  $inner_sock->setsockopt(ZMQ_SNDHWM, 100);
  $inner_sock->connect($inner_sock_specs);

  my $last_checked = 0;
  my $last_proclist_sent = 0;
  my %process_states;  # associative array on pid

  for (;;) {

    zmq_poll([
      { socket => $outer_sock->socket,
        events => ZMQ_POLLIN,
        callback =>
          sub { process_message(\%process_states, $outer_sock->recvmsg) },
      },
    ], 1000000);  # microseconds

    my $now = Time::HiRes::time;
    if ($last_checked + 1 < $now) {
      check_proc(\%process_states);
      $last_checked = $now;
      if ($last_proclist_sent + 5 < $now) {
        my(@proc_idle_list, @proc_busy_list);
        for my $pid (sort { $a <=> $b } keys %process_states) {
          my $p = $process_states{$pid};
          if ($p->{task_id} eq '' && $p->{state} =~ /^[. ]\z/s) {
            push(@proc_idle_list, $pid);
          } else {
            push(@proc_busy_list, $pid);
          }
        }
        $inner_sock->send('am.proc.busy ' . join(' ', @proc_busy_list));
        $inner_sock->send('am.proc.idle ' . join(' ', @proc_idle_list));
        $last_proclist_sent = $now;
      }
    }
  } # forever
  # not reached
}

sub snmp_summarizer() {
  $zmq_ctx = ZMQ::Context->new;
  $zmq_ctx or die "Can't create ZMQ context: $!";

  $outer_sock = $zmq_ctx->socket(ZMQ_SUB);
  $outer_sock or die "Can't create outer ZMQ socket: $!";
  $outer_sock->setsockopt(ZMQ_IPV4ONLY, 0) == 0 or die "setsockopt failed";
  $outer_sock->setsockopt(ZMQ_SUBSCRIBE, 'am.snmp ');
  $outer_sock->connect($outer_sock_specs);

  $snmp_sock = $zmq_ctx->socket(ZMQ_PUB);
  $snmp_sock or die "Can't create ZMQ socket: $!";
  $snmp_sock->setsockopt(ZMQ_IPV4ONLY, 0) == 0 or die "setsockopt failed";
  $snmp_sock->setsockopt(ZMQ_SNDHWM, 2000);
  $snmp_sock->bind($snmp_sock_specs);

  my $interval = 5;  # seconds

  my(%snmp_var, %snmp_type);
  my $now = Time::HiRes::time;
  my $start_time = $now;
  my $last_send_time = 0;

  for (;;) {
    $now = Time::HiRes::time;
    my $next_scheduled_dt = $last_send_time + $interval - $now;
    $next_scheduled_dt = 0  if $next_scheduled_dt < 0;
    zmq_poll([
      { socket => $outer_sock->socket,
        events => ZMQ_POLLIN,
        callback => sub {
          my $msg = $outer_sock->recvmsg;
          my($chan, $key, $type, $val) = split(' ', $msg->data, 4);
          next  if $chan ne 'am.snmp';
          if (!$snmp_var{$key}) {
            $snmp_var{$key} = $val;
            $snmp_type{$key} = $type;
          } elsif ($type =~ /^(C32|C64|TIM)\z/) {  # a counter
            $snmp_var{$key} += $val;
          } else {
            $snmp_var{$key} = $val;
          }
        },
      },
    ], int($next_scheduled_dt*1000000));  # microseconds

    $now = Time::HiRes::time;
    if ($now - $last_send_time > $interval) {
      $snmp_var{'sysUpTime'} = int($start_time);  # override by our time
      $snmp_type{'sysUpTime'} = 'INT';
      while (my($key,$val) = each %snmp_var) {
        $snmp_sock->send(join(' ', 'am.snmp',
                              $key, $snmp_type{$key}, $snmp_var{$key}));
      }
      $last_send_time = $now;
    }
  }
  # not reached
}

# Forward messages from inner ZMQ socket to outer ZMQ socket.
#
# Binding (not connecting) to both sockets provides a single stable point
# in the system.
#
# Amavisd child processes are dynamic and connect to the inner socket,
# supplying information. Similarly the supervisor process occasionally
# feeds its supplementaty status updates to this inner socket.
#
# Dynamic clients like amavisd-nanny, amavisd-snmp-subagent, amavisd-agent,
# and a supervisor/killer process connect to the outer socket to receive
# information from there.
#
sub forwarder() {
  $zmq_ctx = ZMQ::Context->new;
  $zmq_ctx or die "Can't create a ZMQ context";

  # receive from amavisd child processes
  $inner_sock = $zmq_ctx->socket(ZMQ_SUB);
  $inner_sock or die "Error creating inner ZMQ socket: $!";
  $inner_sock->setsockopt(ZMQ_IPV4ONLY, 0) == 0 or die "setsockopt failed";

  # $inner_sock->setsockopt(ZMQ_SUBSCRIBE, '');
  $inner_sock->setsockopt(ZMQ_SUBSCRIBE, 'am.log.');
  $inner_sock->setsockopt(ZMQ_SUBSCRIBE, 'am.proc.');
  $inner_sock->setsockopt(ZMQ_SUBSCRIBE, 'am.st ');
  $inner_sock->setsockopt(ZMQ_SUBSCRIBE, 'am.snmp ');

  $inner_sock->bind($inner_sock_specs);

  # forward to a public outer socket
  # to clients like amavisd-nanny, amavisd-agent, amavisd-snmp-subagent
  $outer_sock = $zmq_ctx->socket(ZMQ_PUB);
  $outer_sock or die "Error creating outer ZMQ socket: $!";
  $outer_sock->setsockopt(ZMQ_IPV4ONLY, 0) == 0 or die "setsockopt failed";
  $outer_sock->bind($outer_sock_specs);

  # start forwarding
  #   (ZMQ_FORWARDER device is no longer available in 3.1)
  # zmq_device(ZMQ_FORWARDER, $inner_sock->socket, $outer_sock->socket);
  # return;  # not reached

  $| = 1;
  my $cnt = 0;
  for (;;) {
    $cnt++;
    for (;;) {
      my $message = $inner_sock->recvmsg;
      my $more = $inner_sock->getsockopt(ZMQ_RCVMORE);
      $outer_sock->sendmsg($message, $more ? ZMQ_SNDMORE : 0);
      last if !$more;
    }
    print "." if $debug;
    if ($debug && $cnt % 100 == 0) { printf(" %d\n",$cnt) }
  }
  # not reached
}

sub usage() {
  print STDERR "Usage: $0 [-f]\n";
}

# main program starts here

$SIG{INT} = sub { die "\n" };  # do the END code block when interrupted

my $supervizor;
my $summarizer;
my $daemonize = 1;

while (@ARGV) {
  my $opt = shift @ARGV;
# my $val = shift @ARGV;
  if    ($opt eq '-d') { $debug++ }
  elsif ($opt eq '-f') { $daemonize = 0 }
  elsif ($opt eq '-c') { $supervizor = 1 }
  elsif ($opt eq '-s') { $summarizer = 1 }
  else { usage(); exit }
}
$daemonize = 0 if $debug;

my($zmq_major, $zmq_minor, $zmq_patch) = ZMQ::version();
my $zmq_version = join('.', $zmq_major, $zmq_minor, $zmq_patch);
printf("PID %s, version %s, %s\n", $$, ZMQ->VERSION, $zmq_version) if $debug;

if ($supervizor) {
  print "Running supervizor\n";
} elsif ($summarizer) {
  print "Running SNMP summarizer\n";
} else {
  print "Running forwarder\n";
}

if (defined $daemon_user) {
  drop_priv($daemon_user,$daemon_group);
}

if ($daemonize) {
  daemonize();
  srand();
}

if ($supervizor) {
  child_supervisor();
} elsif ($summarizer) {
  snmp_summarizer();
} else {
  forwarder();
}

# not reached

END {
  # ignoring errors
  $inner_sock->close if $inner_sock;
  $outer_sock->close if $outer_sock;
  $snmp_sock->close  if $snmp_sock;
  $zmq_ctx->term     if $zmq_ctx;
}
