#!/usr/bin/perl -w
#
# Copyright (c) 2013 OBS Team
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program (see the file COPYING); if not, write to the
# Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
#
################################################################
#
# A little binary proxy to reduce reposerver load
#

BEGIN {
  my ($wd) = $0 =~ m-(.*)/- ;
  $wd ||= '.';
  chdir($wd);
  unshift @INC,  "$wd/build";
  unshift @INC,  "$wd";
}

use XML::Structured ':bytes';
use POSIX;
use Digest::MD5 ();
use Digest::SHA ();
use Data::Dumper;
use Storable ();
use Symbol;

use BSConfiguration;
use BSRPC ':https';
use BSServer;
use BSStdServer;
use BSUtil;
use Build;

use strict;

my @binsufs = qw{rpm deb pkg.tar.gz pkg.tar.xz pkg.tar.zst};
my $binsufsre = join('|', map {"\Q$_\E"} @binsufs);

my $gettimeout = 3600;

my $port = 5254;        
my $proto = 'http';        
$port = $1 if $BSConfig::getbinariesproxyserver && $BSConfig::getbinariesproxyserver =~ /:(\d+)$/;
$proto = $1 if $BSConfig::getbinariesproxyserver && $BSConfig::getbinariesproxyserver =~ /^(https):/;


my $cachedir = "$BSConfig::bsdir/getbinariesproxycache";
my $cachesize = 1024 * 1024 * 1024;	# default: 1G
$cachesize = $BSConfig::getbinariesproxyserver_cachesize * 1024 * 1024 if $BSConfig::getbinariesproxyserver_cachesize;

my $cachetmpdir = "$cachedir/tmp";

my $maxopen;	# max number of fds we can open

sub set_maxopen() {
  my $fd = POSIX::open('/dev/null', O_RDONLY);
  die("cannot open /dev/null: $!\n") unless defined $fd;
  my @fd = ($fd);
  while (1) {
    my $fd = POSIX::dup($fd[0]);
    last unless defined $fd;
    push @fd, $fd;
    last if @fd >= 65536;
  }
  POSIX::close($_) for @fd;
  $maxopen = @fd;
  print "file descriptor limit is $maxopen\n";
}

sub get_cachefile {
  my ($prpa, $key) = @_;
  my $cacheid = Digest::SHA::sha256_hex("$prpa/$key");
  my $cachefile = "$cachedir/".substr($cacheid, 0, 2)."/$cacheid";
  return ($cacheid, $cachefile);
}

sub move_entry_into_cache {
  my ($cacheid, $path) = @_;
  my $cachefile = "$cachedir/".substr($cacheid, 0, 2)."/$cacheid";
  mkdir_p("$cachedir/".substr($cacheid, 0, 2));
  unlink("$cachefile.$$");
  return 0 unless link($path, "$cachefile.$$");
  rename("$cachefile.$$", $cachefile) || die("rename $cachefile.$$ $cachefile: $!\n");
  my $mpath = "$path.meta";
  $mpath = "$1.meta" if $path =~ /^(.*)\.(?:$binsufsre)$/;
  if (-s $mpath) {
    unlink("$cachefile.meta.$$");
    if (link($mpath, "$cachefile.meta.$$")) {
      rename("$cachefile.meta.$$", "$cachefile.meta") || die("rename $cachefile.meta.$$ $cachefile.meta: $!\n");
    } else {
      unlink("$cachefile.meta");
    }
  } else {
    unlink("$cachefile.meta");
  }
  return 1;
}

sub remove_entry_from_cache {
  my ($cacheid) = @_;
  my $cachefile = "$cachedir/".substr($cacheid, 0, 2)."/$cacheid";
  unlink($cachefile);
  unlink("$cachefile.meta");
}

sub manage_cache {
  my ($prunesize, $cacheold, $cachenew) = @_;
  # get the lock
  local *F;
  BSUtil::lockopen(\*F, '+>>', "$cachedir/content", 1) || return;
  my $content;
  if (-s F) {
    seek(F, 0, 0);
    $content = Storable::fd_retrieve(\*F);
  }
  $content ||= [];
  my %content = map {$_->[0] => $_->[1]} @$content;
  # put cacheold, cachenew at the top
  splice(@$content, 0, 0, @$cacheold) if $cacheold && @$cacheold;
  if ($cachenew) {
    for my $c (reverse @$cachenew) {
      next unless move_entry_into_cache($c->[0], pop(@$c));
      unshift @$content, $c;
      $content{$c->[0]} = $c->[1];
    }
  }
  # prune cache
  for my $c (@$content) {
    if (!defined delete $content{$c->[0]}) {
      $c = undef;
    } elsif (($prunesize -= $c->[1]) < 0) {
      remove_entry_from_cache($c->[0]);
      $c = undef;
    }
  }
  # store content
  @$content = grep {defined $_} @$content;
  eval {
    Storable::nstore($content, "$cachedir/content.new");
    rename("$cachedir/content.new", "$cachedir/content") || die("rename $cachedir/content.new $cachedir/content");
  };
  if ($@) {
    # could not update content, delete all new entries
    remove_entry_from_cache($_->[0]) for @$cachenew;
    die($@);
  }
  close F;
}

sub calcmd5 {
  my ($file) = @_;
  local *F;
  open(F, '<', $file) || return undef;
  my $ctx = Digest::MD5->new;
  $ctx->addfile(*F);
  close F;
  return $ctx->hexdigest();
}

sub checkmd5 {
  my ($file, $md5) = @_;
  local *F;
  open(F, '<', $file) || return undef;
  my $ctx = Digest::MD5->new;
  $ctx->addfile(*F);
  close F;
  return $ctx->hexdigest() eq $md5 ? 1 : undef;
}

sub printstats {
  my ($nhit, $khit, $nmiss, $kmiss, $type, $prpa) = @_;
  print "getbinariesproxy statistics: $nhit $khit $nmiss $kmiss $type $prpa\n";
}

sub getbinaries {
  my ($cgi, $projid, $repoid, $arch) = @_;

  my $server = $cgi->{'server'};
  my $nometa = $cgi->{'nometa'};
  my $metaonly = $cgi->{'metaonly'};
  my $prpa = "$projid/$repoid/$arch";
  die("nometa and metaonly?\n") if $nometa && $metaonly;
  if ($cgi->{'now'}) {
    my $waited = time() - $cgi->{'now'};
    print "waited $waited seconds to accept call\n" if $waited > 60;
  }
  mkdir_p($cachedir);
  mkdir_p($cachetmpdir);
  set_maxopen() unless defined $maxopen;

  my ($nhit, $khit, $nmiss, $kmiss) = (0, 0, 0, 0);
  my @binaries = split(',', $cgi->{'binaries'});
  my %bv;
  my @missingbvs;
  for my $bin (@binaries) {
    die("bad binary specification\n") unless $bin =~ /^(?:([0-9a-f]{32})([0-9a-f]{32})?:(\d*):(\S+))?\@(.+)/s;
    $bin = $5;
    if (!$1) {
      push @missingbvs, $bin;
      next;
    }
    $bv{$bin}->{'hdrmd5'} = $1;
    $bv{$bin}->{'metamd5'} = $2 if $2;
    $bv{$bin}->{'sizek'} = $3 || 0;
    $bv{$bin}->{'name'} = "$bin.$4";
  }

  # get missing bvs from server
  if (@missingbvs) {
    print "missingbvs: @missingbvs\n";
    my $callers_time = time();
    my @args;
    push @args, "project=$projid";
    push @args, "repository=$repoid";
    push @args, "arch=$arch";
    push @args, "nometa" if $nometa;
    push @args, map {"module=$_"} @{$cgi->{'module'} || []};
    push @args, "binaries=".join(',', @missingbvs);
    push @args, "now=$callers_time";
    my $bvl;
    eval {
      $bvl = BSRPC::rpc({
	'uri' => "$server/getbinaryversions",
	'timeout' => $gettimeout,
	}, $BSXML::binaryversionlist, @args);
    };
    warn($@) if $@;
    for (@{$bvl->{'binary'} || []}) {
      if ($_->{'error'}) {
	$bv{$_->{'name'}} = $_;
      } else {
	next unless $_->{'name'} =~ /(.*)\.(?:$binsufsre)$/;
	$bv{$1} = $_;
      }
    }
  }

  my @cpio;

  # check the cache
  my $downloadsizek = 0;
  my @cacheold;
  my @cachenew;
  my @downloadbins;
  my $openfds = 0;

  my $tmpprefix = $$.'_';

  for my $bin (@binaries) {
    my $bv = $bv{$bin};
    if (!$bv) {
      push @downloadbins, $bin;
      next;
    }
    if ($bv->{'error'}) {
      push @cpio, {'name' => $bin, 'error' => $bv->{'error'}};
      next;
    }
    my ($cacheid, $cachefile) = get_cachefile($prpa, $bv->{'hdrmd5'});
    my $usecache;
    my @s;
    my $binfd;
    my $metafd;
    if ($metaonly) {
      $usecache = 1;
    } else {
      my $fd = gensym;
      my $tmpname = "$cachetmpdir/$tmpprefix$bv->{'name'}";
      if (link($cachefile, $tmpname)) {
	# check hdrmd5 to be sure we got the right bin
	my $id;
	eval {
	  $id = Build::queryhdrmd5($tmpname);
	};
	if ($id && $id eq $bv->{'hdrmd5'} && open($fd, '<', $tmpname)) {
	  $binfd = $fd;
	  $usecache = 1;
	  unlink($tmpname);
	  @s = stat($fd);
	  die unless @s;
	} else {
	  unlink($tmpname);
	}
      }
    }
    if ($usecache && !$nometa && $bv->{'metamd5'}) {
      my $fd = gensym;
      if (open($fd, '<', "$cachefile.meta")) {
	my $ctx = Digest::MD5->new;
	$ctx->addfile($fd);
	seek($fd, 0, 0) || die;
	if ($ctx->hexdigest() ne $bv->{'metamd5'}) {
	  $usecache = undef;
	  close $fd;
	  close $binfd unless $metaonly;
	} else {
	  $metafd = $fd;
	}
      } else {
	close $binfd unless $metaonly;
	$usecache = undef;
      }
    }
    @s = stat($cachefile) if !@s && $usecache;
    push @cacheold, [$cacheid, $s[7]] if @s && $usecache;
    if (!$usecache) {
      push @downloadbins, $bin;
      $downloadsizek += $bv->{'sizek'};
    } else {
      $nhit++;
      $khit += int($s[7] / 1024);
      push @cpio, {'name' => $bv->{'name'}, 'filename' => $binfd} if $binfd;
      push @cpio, {'name' => "$bin.meta", 'filename' => $metafd} if $metafd;
      $openfds++ if $binfd;
      $openfds++ if $metafd;
    }
    die("too many files to send\n") if $openfds + 16 > $maxopen;	# sorry
  }

  # get files not in cache
  if (@downloadbins) {
    print "downloading: @downloadbins\n";
    my $toomany;
    my %downloadbins = map {$_ => 1} @downloadbins;
    if ($downloadsizek * 1024 * 100 > $cachesize) {
      manage_cache($cachesize - $downloadsizek * 1024);
    }
    my $callers_time = time();
    my @args;
    push @args, "project=$projid";
    push @args, "repository=$repoid";
    push @args, "arch=$arch";
    push @args, map {"module=$_"} @{$cgi->{'module'} || []};
    push @args, "now=$callers_time";
    my $res;
    eval {
      $res = BSRPC::rpc({
	'uri' => "$server/getbinaries",
	'directory' => $cachetmpdir,
	'map' => $tmpprefix,
	'timeout' => $gettimeout,
	'receiver' => \&BSHTTP::cpio_receiver,
      }, undef, @args, 'binaries='.join(',', @downloadbins));
      for my $r (@$res) {
	if ($r->{'name'} =~ /^\Q$tmpprefix\E(.*)\.($binsufsre)$/) {
	  my $n = $1;
	  my $suf = $2;
	  next unless $downloadbins{$n};
	  my @s = stat("$cachetmpdir/$r->{'name'}");
	  die unless @s;
	  my $id = Build::queryhdrmd5("$cachetmpdir/$r->{'name'}");
	  $r->{'hdrmd5'} = $id; 
	  if (!$metaonly && !$toomany) {
	    my $fd = gensym;
	    open($fd, '<', "$cachetmpdir/$r->{'name'}") || die;
	    push @cpio, {'name' => "$n.$suf", 'filename' => $fd};
	    $openfds++;
	  }
	  my ($cacheid) = get_cachefile($prpa, $id);
	  push @cachenew, [$cacheid, $s[7], "$cachetmpdir/$r->{'name'}"];
	  $nmiss++;
	  $kmiss += int($s[7] / 1024);
	} elsif ($r->{'name'} =~ /^\Q$tmpprefix\E(.*)\.meta$/) {
	  my $n = $1;
	  next unless $downloadbins{$n};
	  if (!$nometa && !$toomany) {
	    my $fd = gensym;
	    open($fd, '<', "$cachetmpdir/$r->{'name'}") || die;
	    push @cpio, {'name' => "$n.meta", 'filename' => $fd};
	    $openfds++;
	  }
	}
	$toomany = 1 if $openfds + 16 > $maxopen;		# sorry
      }
      manage_cache($cachesize, \@cacheold, \@cachenew);
    };
    if ($@) {
      # clean up downloaded files
      my $err = $@;
      unlink("$cachetmpdir/$_") for grep {/^\Q$tmpprefix\E/} ls($cachetmpdir);
      die($err);
    }
    # clean up
    for my $r (@$res) {
      unlink("$cachetmpdir/$r->{'name'}");
    }
    die("too many files to send\n") if $toomany;
  }
  printstats($nhit, $khit, $nmiss, $kmiss, 'binary', $prpa);
  BSServer::reply_cpio(\@cpio);
  return undef;
}

sub getpreinstallimage {
  my ($cgi, $prpa, $hdrmd5) = @_;
  if ($cgi->{'now'}) {
    my $waited = time() - $cgi->{'now'};
    print "waited $waited seconds to accept call\n" if $waited > 60;
  }
  mkdir_p($cachedir);
  mkdir_p($cachetmpdir);
  set_maxopen() unless defined $maxopen;

  my ($cacheid, $cachefile) = get_cachefile($prpa, $hdrmd5);
  my $cachefilemeta = readstr("$cachefile.meta", 1) || '';
  if ($cachefilemeta eq "$hdrmd5  :preinstallimage\n") {
    # got it
    my $fd = gensym;
    if (open($fd, '<', $cachefile)) {
      # re-check to make races unlikely
      $cachefilemeta = readstr("$cachefile.meta", 1) || '';
      if ($cachefilemeta eq "$hdrmd5  :preinstallimage\n") {
	# use it!
	my @s = stat($fd);
	# put entry on top
	manage_cache($cachesize, [ [$cacheid, $s[7]] ], undef) if @s;
	printstats(1, int($s[7] / 1024), 0, 0, 'preinstallimage', $prpa);
	BSServer::reply_file($fd, "Content-Type: application/octet-stream");
	return undef;
      }
      close($fd);
    }
  }

  # not in cache, fetch
  print "downloading: $prpa/$cgi->{'path'}\n";
  manage_cache($cachesize - $cgi->{'sizek'} * 1024) if $cgi->{'sizek'};
  my $tmpfilename = "$cachetmpdir/$$".'_preinstallimage';
  unlink($tmpfilename);
  my $res = BSRPC::rpc({
    'uri' => "$cgi->{'server'}/build/$prpa/$cgi->{'path'}",
    'timeout' => $gettimeout,
    'receiver' => \&BSHTTP::file_receiver,
    'filename' => $tmpfilename,
  });  
  my $fd = gensym;
  open($fd, '<', $tmpfilename) || die("$tmpfilename: $!\n");
  my @s = stat($tmpfilename);
  die("stat: $!\n") unless @s;
  writestr("$tmpfilename.meta", undef, "$hdrmd5  :preinstallimage\n");
  manage_cache($cachesize, undef, [ [$cacheid, $s[7], $tmpfilename] ]);
  unlink("$tmpfilename.meta");
  unlink($tmpfilename);
  printstats(0, 0, 1, int($s[7] / 1024), 'preinstallimage', $prpa);
  BSServer::reply_file($fd, "Content-Type: application/octet-stream");
  return undef;
}

sub getbinaries_kiwiproduct {
  my ($cgi, $projid, $repoid, $arch, $packid) = @_;
  my $server = $cgi->{'server'};

  my $prpa = "$projid/$repoid/$arch";
  if ($cgi->{'now'}) {
    my $waited = time() - $cgi->{'now'};
    print "waited $waited seconds to accept call\n" if $waited > 60;
  }
  mkdir_p($cachedir);
  mkdir_p($cachetmpdir);
  my $tmpprefix = $$.'_';
  set_maxopen() unless defined $maxopen;

  my @binaries;
  for my $bin (@{$cgi->{'binary'}}) {
    die("bad binary specification\n") unless $bin =~ /^([0-9a-f]{32}):(\d*):(\S+)\@(.+)$/s;
    if ($3 eq 'extra') {
      push @binaries, {'name' => $4, 'md5sum' => $1, 'sizek' => $2, 'cachekey' => "$1.extra"};
    } elsif ($3 eq 'rpm') {
      push @binaries, {'name' => $4, 'hdrmd5' => $1, 'sizek' => $2, 'cachekey' => $1};
    } else {
      die("bad binary specification\n");
    }
  }

  my @cpio;

  # check the cache
  my $downloadsizek = 0;
  my @cacheold;
  my @cachenew;
  my @downloadbins;
  my $openfds = 0;
  my ($nhit, $khit, $nmiss, $kmiss) = (0, 0, 0, 0);

  for my $bv (@binaries) {
    my ($cacheid, $cachefile) = get_cachefile($prpa, $bv->{'cachekey'});
    my $fd = gensym;
    my $binfd;
    my $tmpname = "$cachetmpdir/$tmpprefix$bv->{'name'}";
    my @s;
    if (link($cachefile, $tmpname)) {
      @s = stat($tmpname);
      # verify
      if ($bv->{'md5sum'}) {
	$binfd = $fd if @s && checkmd5($tmpname, $bv->{'md5sum'}) && open($fd, '<', $tmpname);
      } else {
	my $id = eval { Build::queryhdrmd5($tmpname) };
	$binfd = $fd if @s && $id && $id eq $bv->{'hdrmd5'} && open($fd, '<', $tmpname);
      }
      unlink($tmpname);
    }
    if (!$binfd) {
      push @downloadbins, $bv;
      $downloadsizek += $bv->{'sizek'};
      next;
    }
    push @cacheold, [$cacheid, $s[7]];
    $nhit++;
    $khit += int($s[7] / 1024);
    push @cpio, {'name' => $bv->{'name'}, 'filename' => $binfd};
    $openfds++;
    die("too many files to send\n") if $openfds + 16 > $maxopen;	# sorry
  }

  # get files not in cache
  if (@downloadbins) {
    print "downloading: ".join(' ', map {$_->{'name'}} @downloadbins)."\n";
    my $toomany;
    if ($downloadsizek * 1024 * 100 > $cachesize) {
      manage_cache($cachesize - $downloadsizek * 1024);
    }
    my %downloadbins = map {$_->{'name'} => $_} @downloadbins;
    my @args;
    push @args, 'view=cpio';
    push @args, "noajax=1" if $cgi->{'noajax'};
    push @args, "binary=$_->{'name'}" for @downloadbins;
    my $res;
    eval {
      $res = BSRPC::rpc({
	'uri' => "$server/build/$projid/$repoid/$arch/$packid",
	'directory' => $cachetmpdir,
	'map' => $tmpprefix,
	'timeout' => $gettimeout,
	'receiver' => \&BSHTTP::cpio_receiver,
      }, undef, @args);
      for my $r (@$res) {
	die unless $r->{'name'} =~ /^\Q$tmpprefix\E(.*)$/;
	my $bv = $downloadbins{$1};
        next unless $bv;
	my $tmpname = "$cachetmpdir/$r->{'name'}";
	my @s = stat($tmpname);
	die unless @s;

	if (!$toomany) {
	  my $fd = gensym;
	  open($fd, '<', $tmpname) || die;
	  push @cpio, {'name' => $bv->{'name'}, 'filename' => $fd};
	  $openfds++;
	  $toomany = 1 if $openfds + 16 > $maxopen;		# sorry
	}

	my $cachekey;
	if ($bv->{'md5sum'}) {
	  $cachekey = calcmd5($tmpname).'.extra';
	} else {
	  $cachekey = Build::queryhdrmd5($tmpname);
	}
	if ($cachekey) {
	  my ($cacheid) = get_cachefile($prpa, $cachekey);
	  push @cachenew, [$cacheid, $s[7], $tmpname];
        }
	$nmiss++;
	$kmiss += int($s[7] / 1024);
      }
      manage_cache($cachesize, \@cacheold, \@cachenew);
    };
    if ($@) {
      # clean up downloaded files
      my $err = $@;
      unlink("$cachetmpdir/$_") for grep {/^\Q$tmpprefix\E/} ls($cachetmpdir);
      die($err);
    }
    # clean up
    for my $r (@$res) {
      unlink("$cachetmpdir/$r->{'name'}");
    }
    die("too many files to send\n") if $toomany;
  }
  printstats($nhit, $khit, $nmiss, $kmiss, 'kiwiproduct', $prpa);
  BSServer::reply_cpio(\@cpio);
  return undef;
}

sub hello {
  my ($cgi) = @_;
  return "<hello name=\"Getbinaries Proxy Server\" />\n";
}

sub run {
  my ($conf) = @_;
  set_maxopen();
  BSServer::server($conf);
}

my $dispatches = [
  '/' => \&hello,
  '/getbinaries $project $repository $arch binaries: nometa:bool? metaonly:bool? module* workerid? now:num? server:' => \&getbinaries,
  '/getbinaries_kiwiproduct $project $repository $arch $package binary:filename+ noajax:bool? workerid? now:num? server:' => \&getbinaries_kiwiproduct,
  '/getpreinstallimage $prpa $hdrmd5:md5 path: sizek:num? workerid? now:num? server:' => \&getpreinstallimage,
];

my $conf = {
  'runname' => 'bs_getbinariesproxy',
  'port' => $port,
  'proto' => $proto,
  'dispatches' => $dispatches,
  'setkeepalive' => 1,
  'maxchild' => 40,
  'run' => \&run,
};

$conf->{'maxchild'} = $BSConfig::getbinariesproxyserver_maxchild if $BSConfig::getbinariesproxyserver_maxchild;
%$conf = (%$conf, %{$BSConfig::getbinariesproxyserver_extraconf}) if $BSConfig::getbinariesproxyserver_extraconf;

BSStdServer::server('getbinariesproxy', \@ARGV, $conf);

