#!/usr/bin/perl -w
#
# Copyright (c) 2019 SUSE Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program (see the file COPYING); if not, write to the
# Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
#
################################################################
#
# Forward notifications to the source server
#

BEGIN {
  my ($wd) = $0 =~ m-(.*)/- ;
  $wd ||= '.';
  unshift @INC,  "$wd";
}

use POSIX;
use Data::Dumper;
use Fcntl qw(:DEFAULT :flock);
use XML::Structured ':bytes';
use Getopt::Long ();
use Time::HiRes;

use BSConfiguration;
use BSRPC ':https';
use BSUtil;

use strict;

my $bsdir = $BSConfig::bsdir || "/srv/obs";
my $rundir = "$bsdir/run";
my $eventdir = "$bsdir/events";


my $myeventdir = "$eventdir/notifyforward";

sub parse_options {
  my %opts;
  if (!Getopt::Long::GetOptionsFromArray(\@_, \%opts,
    'stop|exit',
    'restart',
    'logfile=s',
  )) {
    print_usage();
    die("Invalid option(s)\n");
  }
  return (\%opts, @_);
}

sub print_usage {
  $0 =~ /([^\/]+$)/;
  print "Usage: $1 [options]

Options:
  --stop|--exit          - graceful shutdown daemon
  --restart              - restart daemon
  --logfile file         - redirect output to logfile

";
}

sub check_exitrestart {
  if (-e "$rundir/bs_notifyforward.exit") {
    close(RUNLOCK);
    unlink("$rundir/bs_notifyforward.exit");
    BSUtil::printlog("exiting...");
    exit(0);
  }
  if (-e "$rundir/bs_notifyforward.restart") {
    close(RUNLOCK);
    unlink("$rundir/bs_notifyforward.restart");
    BSUtil::printlog("restarting...");
    exec($0, @ARGV);
    die("$0: $!\n");
  }
}

sub markdone {
  my ($markfd, $markoff) = @_;
  defined(sysseek($markfd, $markoff, Fcntl::SEEK_SET)) || die("sysseek $markoff: $!\n");
  syswrite($markfd, "|", 1) == 1 || die("syswrite: $!\n");
}

my $multirequestsock;
my $multirequestcnt = 0;
my $multirequeststart = 0;

sub ansreqreceiver {
  my ($ansreq, $param) = @_;
  return unless $ansreq->{'headers'}->{'x-multirequest'};
  BSHTTP::read_data($ansreq, undef, 1);
  die("ansreq contains data: $ansreq->{'__data'}\n") if length($ansreq->{'__data'});
  open($multirequestsock, '+<&', $ansreq->{'__socket'});
  die("socket dup failed\n") unless $multirequestsock;
  print "entering multirequest mode\n";
  $multirequestcnt = 0;
  $multirequeststart = time();
}

sub multirequestrpc {
  my ($sock, $redisdata) = @_;

  my $pkg = BSUtil::tostorable($redisdata);
  substr($pkg, 0, 0, sprintf("CL%030d", length($pkg)));
  BSHTTP::swrite($sock, $pkg);
  my $ans = '';
  while (1) {
    my $r = sysread($sock, $ans, 1);
    last if $r;
    die("received truncated answer: $!\n") if !defined($r) && $! != POSIX::EINTR && $! != POSIX::EWOULDBLOCK;
    die("received truncated answer\n") if defined $r;
  }
  if ($ans ne '0') {
    my ($status) = BSRPC::readanswerheaderblock($sock, $ans);
    die("remote error: $status\n");
  }
}

sub forwardredis {
  my ($redisdata, $markfd, $markoffs) = @_;

  if ($multirequestsock && ($multirequestcnt++ >= 64 || $multirequeststart + 300 < time())) {
    close($multirequestsock);
    $multirequestsock = undef;
  }
  if ($multirequestsock) {
    local $SIG{'ALRM'} = sub { alarm(0); die("multirequest rpc timeout\n") };
    eval {
      alarm(300);
      multirequestrpc($multirequestsock, $redisdata);
    };
    alarm(0);
    if (!$@) {
      markdone($markfd, $_) for @$markoffs;
      print "forwarded ".@$markoffs." redis notifications (multirequest mode)\n";
      return;
    }
    warn($@);
    close($multirequestsock);
    $multirequestsock = undef;
  }

  my $param = {
    'uri' => "$BSConfig::srcserver/notify/redis",
    'request' => 'POST',
    'timeout' => 300,
    'headers' => [ 'Content-Type: application/octet-stream' ],
    'data' => BSUtil::tostorable($redisdata),
    'receiver' => \&ansreqreceiver,
  };
  my @args;
  push @args, 'multirequest=1' if $BSConfig::srcserver =~ /^http:/;	# sock dup only works for http
  BSRPC::rpc($param, undef, @args);
  markdone($markfd, $_) for @$markoffs;
  print "forwarded ".@$markoffs." redis notifications\n";
}

sub forwarddata {
  my ($fd) = @_;

  my $markfd;
  open($markfd, '+<', "$myeventdir/queue.send") || die("$myeventdir/queue.send: $!\n");
  my $markoff = 0;

  my @redisdata;
  my @redismarkoffs;

  while (<$fd>) {
    my $len = length($_);
    die("bad line\n") unless chop($_) eq "\n";
    my @line = split('\|', $_);
    if (!@line || !$line[0]) {
	$markoff += $len;	# empty or marked as done
	next;
    }
    s/%([a-fA-F0-9]{2})/chr(hex($1))/ge for @line;
    my $type = shift @line;

    # batch redis notifications into chunks of 32 updates
    if (@redisdata && ($type ne 'redis' || @redisdata >= 32)) {
      forwardredis(\@redisdata, $markfd, \@redismarkoffs);
      @redisdata = ();
      @redismarkoffs = ();
    }
    if ($type eq 'redis') {
      push @redisdata, \@line;
      push @redismarkoffs, $markoff;
      $markoff += $len;
      next;
    }

    my $param = {
      'uri' => "$BSConfig::srcserver/notify/$type",
      'request' => 'POST',
      'formurlencode' => 1,
      'timeout' => 300,
    };
    BSRPC::rpc($param, undef, @line);
    markdone($markfd, $markoff);
    print "forwarded a $type notification\n";
    $markoff += $len;
  }

  forwardredis(\@redisdata, $markfd, \@redismarkoffs) if @redisdata;
  close $markfd;
}

my $noprogress;

sub doforward {
  my ($fd) = @_;
  eval {
    forwarddata($fd);
  };
  if ($@) {
    warn($@);
    close($fd);
    print "retrying in 60 seconds\n";
    my $now = time();
    $noprogress ||= $now;
    if ($now - $noprogress > 10 * 60) {
      BSUtil::logcritical("no progress forwarding events since 10 minutes");
      $noprogress = $now;
    }
    return $now + 60;
  }
  $noprogress = undef;
  unlink("$myeventdir/queue.send");
  close($fd);
  return undef;
}

# copy @ARGV to keep it untouched in case of restart
my ($options, @args) = parse_options(@ARGV);

BSUtil::mkdir_p_chown($bsdir, $BSConfig::bsuser, $BSConfig::bsgroup) || die("unable to create $bsdir\n");
# Open logfile if requested
BSUtil::openlog($options->{'logfile'}, $BSConfig::logdir, $BSConfig::bsuser, $BSConfig::bsgroup);
BSUtil::drop_privs_to($BSConfig::bsuser, $BSConfig::bsgroup);

$| = 1;
$SIG{'PIPE'} = 'IGNORE';
BSUtil::restartexit($options, 'notifyforward', "$rundir/bs_notifyforward", "$myeventdir/.ping");
BSUtil::printlog("starting build service notifyforward");

mkdir_p($rundir);
open(RUNLOCK, '>>', "$rundir/bs_notifyforward.lock") || die("$rundir/bs_notifyforward.lock: $!\n");
flock(RUNLOCK, LOCK_EX | LOCK_NB) || die("notifyforward is already running!\n");
utime undef, undef, "$rundir/bs_notifyforward.lock";

mkdir_p($myeventdir);
if (! -p "$myeventdir/.ping") {
  POSIX::mkfifo("$myeventdir/.ping", 0666) || die("$myeventdir/.ping: $!");
  chmod(0666, "$myeventdir/.ping");
}

sysopen(PING, "$myeventdir/.ping", POSIX::O_RDWR) || die("$myeventdir/.ping: $!");

my $retry;

if (-e "$myeventdir/queue.send") {
  print "resuming transmission of old data\n";
  my $file;
  BSUtil::lockopen($file, '<', "$myeventdir/queue.send");
  $retry = doforward($file);
}

while (1) {
  check_exitrestart();
  if ($retry) {
    my $now = time();
    if ($now < $retry) {
      sleep(1);
      next;
    }
    undef $retry;
  }
  BSUtil::drainping(\*PING);
  if (-e "$myeventdir/queue.send") {
    my $file;
    BSUtil::lockopen($file, '<', "$myeventdir/queue.send");
    $retry = doforward($file);
    next if $retry;
  }
  if (-e "$myeventdir/queue") {
    my $file;
    BSUtil::lockopen($file, '<', "$myeventdir/queue");
    die if -e "$myeventdir/queue.send";
    rename("$myeventdir/queue", "$myeventdir/queue.send") || die("rename $myeventdir/queue $myeventdir/queue.send: $!\n");
    $retry = doforward($file);
    Time::HiRes::sleep(.5);
  } else {
    print "waiting for an event...\n";
    BSUtil::waitping(\*PING);
  }
}

