#!/usr/bin/perl -w
#
# Copyright (c) 2019 SUSE Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program (see the file COPYING); if not, write to the
# Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
#
################################################################
#
# Forward notifications to the source server
#

BEGIN {
  my ($wd) = $0 =~ m-(.*)/- ;
  $wd ||= '.';
  unshift @INC,  "$wd";
}

use Data::Dumper;
use XML::Structured ':bytes';
use Getopt::Long ();
use Time::HiRes;

use BSConfiguration;
use BSUtil;
use BSRedis ':tls';

use strict;

my $bsdir = $BSConfig::bsdir || "/srv/obs";
my $rundir = "$bsdir/run";
my $eventdir = "$bsdir/events";


my $myeventdir = "$eventdir/redis";
my $runname = 'bs_redis';

sub parse_options {
  my %opts;
  if (!Getopt::Long::GetOptionsFromArray(\@_, \%opts,
    'stop|exit',
    'restart',
    'logfile=s',
  )) {
    print_usage();
    die("Invalid option(s)\n");
  }
  return (\%opts, @_);
}

sub print_usage {
  $0 =~ /([^\/]+$)/;
  print "Usage: $1 [options]

Options:
  --stop|--exit          - graceful shutdown daemon
  --restart              - restart daemon
  --logfile file         - redirect output to logfile

";
}

sub check_exitrestart {
  if (-e "$rundir/$runname.exit") {
    close(RUNLOCK);
    unlink("$rundir/$runname.exit");
    BSUtil::printlog("exiting...");
    exit(0);
  }
  if (-e "$rundir/$runname.restart") {
    close(RUNLOCK);
    unlink("$rundir/$runname.restart");
    BSUtil::printlog("restarting...");
    exec($0, @ARGV);
    die("$0: $!\n");
  }
}

my $lua_deleteresult = q{
  redis.call('DEL', KEYS[1], KEYS[2])
  return 'OK'
};

my $lua_updateresult_empty = q{
  redis.call('DEL', KEYS[1], KEYS[2])
  return 'OK'
};

my $lua_updateresult = q{
  local pkgs = {}
  local details = {}
  local i = 1 ; while 1 do
    local pkg = ARGV[i]
    if not pkg then break end
    local detail = ARGV[i + 1]
    redis.call('HSET', KEYS[1], pkg, detail)
    pkgs[pkg] = true
    details[detail] = true
    i = i + 2
  end
  for _,k in ipairs(redis.call('HKEYS', KEYS[1])) do
    if not pkgs[k] then redis.call('HDEL', KEYS[1], k) end
  end
  for _,k in ipairs(redis.call('HKEYS', KEYS[2])) do
    if not details[k] then redis.call('HDEL', KEYS[2], k) end
  end
  return 'OK'
};

my $lua_updateoneresult = q{
  redis.call('HSET', KEYS[1], ARGV[1], ARGV[2])
  if ARGV[3] then
    redis.call('HDEL', KEYS[2], ARGV[3])
  end
  return 'OK'
};

my $lua_updatejobstatus = q{
  redis.call('HSET', KEYS[1], ARGV[1], ARGV[2])
  return 'OK'
};

my $lua_updatejobstatus_del = q{
  redis.call('HDEL', KEYS[1], ARGV[1])
  return 'OK'
};

my $red;	# the redis handle

sub markdone {
  my ($markfd, $markoff) = @_;
  defined(sysseek($markfd, $markoff, Fcntl::SEEK_SET)) || die("sysseek $markoff: $!\n");
  syswrite($markfd, "|", 1) == 1 || die("syswrite: $!\n");
}

sub forwarddata {
  my ($fd) = @_;
  my $markfd;
  open($markfd, '+<', "$myeventdir/queue.send") || die("$myeventdir/queue.send: $!\n");
  my $markoff = 0;
  my $cnt = 0;
  while (<$fd>) {
    my $len = length($_);
    die("bad line\n") unless chop($_) eq "\n";
    my @line = split('\|', $_);
    if (!@line || !$line[0]) {
      $markoff += $len;		# empty or marked as done
      next;
    }
    s/%([a-fA-F0-9]{2})/chr(hex($1))/ge for @line;
    my $cmd = shift @line;
    my $prpa = shift @line;
    die unless $prpa;
    if ($cmd eq 'deleteresult') {
      unshift @line, 'EVAL', $lua_deleteresult, 2, "result.$prpa", "jobs.$prpa";
    } elsif ($cmd eq 'updateresult') {
      die("odd number of arguments in $cmd\n") if scalar(@line) % 2;
      if (@line) {
        unshift @line, 'EVAL', $lua_updateresult, 2, "result.$prpa", "jobs.$prpa";
      } else {
        unshift @line, 'EVAL', $lua_updateresult_empty, 2, "result.$prpa", "jobs.$prpa";
      }
    } elsif ($cmd eq 'updateoneresult') {
      unshift @line, 'EVAL', $lua_updateoneresult, 2, "result.$prpa", "jobs.$prpa";
    } elsif ($cmd eq 'updatejobstatus') {
      if (@line == 1) {
        unshift @line, 'EVAL', $lua_updatejobstatus_del, 1, "jobs.$prpa";
      } else {
        unshift @line, 'EVAL', $lua_updatejobstatus, 1, "jobs.$prpa";
      }
    } else {
      die("unknown redis command '$cmd'\n");
    }
    #print "RUN @line\n";
    $red->run(@line);
    markdone($markfd, $markoff);
    $cnt++;
    $markoff += $len;
  }
  close $markfd;
  print "sent $cnt redis notifications\n";
}

my $noprogress;

sub doforward {
  my ($fd) = @_;
  eval {
    forwarddata($fd);
  };
  if ($@) {
    warn($@);
    close($fd);
    print "retrying in 60 seconds\n";
    my $now = time();
    $noprogress ||= $now;
    if ($now - $noprogress > 10 * 60) {
      BSUtil::logcritical("no progress sending redis events since 10 minutes");
      $noprogress = $now;
    }
    return $now + 60;
  }
  $noprogress = undef;
  unlink("$myeventdir/queue.send");
  close($fd);
  return undef;
}

sub critlogger {
  my ($logfile, $msg) = @_;
  return unless $logfile;
  my $logstr = sprintf "%s: %-7s %s\n", BSUtil::isotime(time), "[$$]", $msg;
  BSUtil::appendstr($logfile, $logstr);
}

# copy @ARGV to keep it untouched in case of restart
my ($options, @args) = parse_options(@ARGV);

BSUtil::mkdir_p_chown($bsdir, $BSConfig::bsuser, $BSConfig::bsgroup) || die("unable to create $bsdir\n");
# Open logfile if requested
BSUtil::openlog($options->{'logfile'}, $BSConfig::logdir, $BSConfig::bsuser, $BSConfig::bsgroup);
BSUtil::drop_privs_to($BSConfig::bsuser, $BSConfig::bsgroup);

$| = 1;
$SIG{'PIPE'} = 'IGNORE';
BSUtil::restartexit($options, 'redis', "$rundir/$runname", "$myeventdir/.ping");

my $critlogfile = "$BSConfig::logdir/redis.crit.log";
BSUtil::setcritlogger(sub { critlogger($critlogfile, $_[0]) });

mkdir_p($rundir);
BSUtil::openrunlock(\*RUNLOCK, "$rundir/$runname", 'redis');
BSUtil::printlog("starting build service redis forwarder");

mkdir_p($myeventdir);
BSUtil::openping(\*PING, "$myeventdir/.ping");

my $retry;

die("No redis server configured\n") unless $BSConfig::redisserver;
die("Redis server must be of scheme redis[s]://<server>[:port]\n") unless $BSConfig::redisserver =~ /^(rediss?):\/\/(?:([^\/\@]*)\@)?([^\/:]+)(:\d+)?$/;

$red = BSRedis->new('server' => $3, 'port' => $4, 'password' => $2, 'tls' => ($1 eq 'rediss' ? 1 : 0));

if (-e "$myeventdir/queue.send") {
  print "resuming transmission of old data\n";
  my $file;
  BSUtil::lockopen($file, '<', "$myeventdir/queue.send");
  $retry = doforward($file);
}

while (1) {
  check_exitrestart();
  if ($retry) {
    my $now = time();
    if ($now < $retry) {
      sleep(1);
      next;
    }
    undef $retry;
  }
  BSUtil::drainping(\*PING);
  if (-e "$myeventdir/queue.send") {
    my $file;
    BSUtil::lockopen($file, '<', "$myeventdir/queue.send");
    $retry = doforward($file);
    next if $retry;
  }
  if (-e "$myeventdir/queue") {
    my $file;
    BSUtil::lockopen($file, '<', "$myeventdir/queue");
    die if -e "$myeventdir/queue.send";
    rename("$myeventdir/queue", "$myeventdir/queue.send") || die("rename $myeventdir/queue $myeventdir/queue.send: $!\n");
    $retry = doforward($file);
    Time::HiRes::sleep(.5);
  } else {
    print "waiting for an event...\n";
    BSUtil::waitping(\*PING);
  }
}

