#!/usr/bin/perl -I. use warnings; use strict; $|=1; my $opt_v = 0; # verbose my $opt_p = 0; # paranoid // do readback my $opt_s = 0; # sender mode my $opt_S = 0; # parallel sender mode my $opt_g = 0; # receiver gearman mode my $opt_P = 0; # gearman, offer processing; no gearman, dont process; my $opt_r = 3; # retries for mogfs ops my $opt_d = "."; # datadir my $opt_l = 0; # limti number of files processed use Getopt::Long qw(:config no_ignore_case bundling); GetOptions( 'r=i' => \$opt_r, 'l=i' => \$opt_l, 'd=s' => \$opt_d, 'p' => \$opt_p, 'g' => \$opt_g, 'P' => \$opt_P, 's' => \$opt_s, 'S=i' => \$opt_S, 'v' => \$opt_v, ) or die "funnyargs"; use Sys::Hostname; my $hn = hostname; $hn =~ s/\..*$//; die "no hostname" unless $hn; my $cnt = 0; if ($opt_g) { $opt_d =~ s,/+,/,g; $opt_d =~ s,/$,,g; die "datadir missing: '$opt_d'" unless -d $opt_d; use TMA::Config; use Gearman::Worker; my $worker = Gearman::Worker->new(job_servers => $TMA::Config::gearman_servers_spool); $worker->register_function("loot_spool" => \&loot_gearspool); if ($opt_P) { $worker->register_function("loot_process" => \&loot_gearprocess); } my $tc = 0; while (!$opt_l || $opt_l > $tc) { $worker->work( stop_if => sub { $_[1]; }, ); $tc++; } die "never here"; } die "internal error" if $opt_g; die "not gearmode and no argv" unless @ARGV; my @stack = (); for (@ARGV) { s,/*$,,; push @stack, $_; } my ($loop,$tb,$tf,$td,) = (0,0,0,); my $CLIENT; while ($opt_s) { $loop++; printf "DEB: starting loop %i ...\n", $loop; use Time::HiRes qw/ time /; my $st = time; my ($lb, $lf, $ld,) = (0,0,0,); for my $d (@stack) { die "not a dir: $d" unless -d $d; $ld++;$td++; opendir DIR, $d or die "cant opendir($d): $!"; my @f = readdir DIR; closedir DIR; for my $f (@f) { next if $f =~ /^\./; my $fn = $d."/".$f; next unless -f $fn; $lf++;$tf++; printf "DEB: loading '%s' ...", $fn; use File::stat; my $s = stat($fn) or die "cant stat($fn): $!"; die "its too big! $fn" if $s->size > 100*1000*1000; my $fs = $s->size; printf " reading %i bytes ...", $fs; open DF, "<", $fn or die "cant open($fn): $!"; my $D; { local $/; $D = ; } close DF; $lb+=$fs; $tb+=$fs; die "size mismatch" unless length $D == $s->size; use TMA::Util; my ($h, $d,) = TMA::Util::msg_split($D); printf " sending"; use TMA::Config; use Gearman::Client; my $client = $CLIENT ||= Gearman::Client->new(job_servers => $TMA::Config::gearman_servers_spool); my $res = $client->do_task("loot_spool", $D, { timeout => 300 }); if ($res && ref $res eq 'SCALAR' && $$res eq 'ok') { printf " ok ..."; unlink $fn or die "cant unlink($fn): $!"; } else { printf " res: '%s' ...", $res; use Data::Dumper; print Dumper($res); warn "bad res"; } print "\n"; #exit 23; last if $opt_l && $tf >= $opt_l; } last if $opt_l && $tf >= $opt_l; } my $dt = time - $st; printf "DEB: finished loop %s in %.2f sec, LOOP %s bytes in %s files in %s dirs, TOTAL %s bytes in %s files in %s dirs...\n", $loop, $dt, $lb, $lf, $ld, $tb, $tf, $td; exit 23 if $opt_l && $tf >= $opt_l; sleep 1; } die "internal error" if $opt_s; ########## ########## my %TASKS = (); my @STACK = (); my @FILES = (); my $LASTSCAN = 0; sub _get_file() { unless (@STACK || @FILES) { if (($LASTSCAN+1) > time) { printf "DEB: skipping rescan...\n"; return undef; } $LASTSCAN = time; if ($loop) { printf "DEB: finished loop %s, TOTAL %s bytes in %s files in %s dirs...\n", $loop, $tb, $tf, $td; } $loop++; printf "DEB: starting loop %i ...\n", $loop; @STACK = @stack; } while (@STACK && !@FILES) { my $d = shift @STACK; die "not a dir: $d" unless -d $d; opendir DIR, $d or die "cant opendir($d): $!"; my @f = readdir DIR; closedir DIR; $td++; for my $f (@f) { next if $f =~ /^\./; my $fn = $d."/".$f; unshift @STACK, $fn if -d $fn; next unless -f $fn; $tf++; push @FILES, $fn; } for (keys %TASKS) { next unless /^done:/; delete $TASKS{$_}; } } return undef unless @FILES; return shift @FILES; } my $TIMER2; { package SpoolTask; use base 'Gearman::Task'; use fields qw(filename launcher); sub new { my ($class,$data,$filename,$launcher,) = @_; # printf "DEB: new(%s)\n", $filename; my $self = fields::new($class); $self->{filename} = $filename; $self->{launcher} = $launcher; $TASKS{"file:$filename"} = [$self, time,]; $self->SUPER::new( 'loot_spool', $data, { timeout => 300, on_complete => sub {$self->on_complete(@_);}, on_fail => sub {$self->on_fail(@_);}, }); return $self; } sub on_fail ($$){ my ($self, $reason,) = @_; my $filename = $self->{filename}; die "no filename" unless $filename; my ($task, $ts,) = @{delete $TASKS{"file:$filename"}||[$self,time,]}; die "internal error: $task vs $self" unless $task eq $self; $TASKS{"fail:$filename"} = 1; printf "DEB: %s->on_fail(%s): %s\n", $self, $filename, (defined $reason ? "'$reason'" : 'NONE'); } sub on_complete ($$) { my ($self, $res,) = @_; my $filename = $self->{filename}; die "no filename" unless $filename; if ($TASKS{"file:$filename"}) { my ($task, $ts,) = @{delete $TASKS{"file:$filename"}||[$self,time,]}; die "internal error: $task vs $self" unless $task eq $self; } else { printf "LATE "; } $TASKS{"done:$filename"} = 1; printf "DEB: %s->on_complete(%s) ...", $self, $filename; if ($res && ref $res eq 'SCALAR' && $$res eq 'ok') { printf " ok ..."; unlink $filename or die "cant unlink($filename): $!"; } else { printf " res: '%s' ...", $res; use Data::Dumper; print Dumper($res); warn "bad res"; } print "\n"; # $self->{launcher}(); $TIMER2 ||= Danga::Socket->AddTimer(0, $self->{launcher}); } } my $TIMER; sub _launch_tasks() { die "internal error" unless $opt_S; # printf "DEB: launcher \n"; require Danga::Socket; if ($TIMER2) { $TIMER2->cancel(); undef $TIMER2; } if ($TIMER) { $TIMER->cancel(); undef $TIMER; } $TIMER = Danga::Socket->AddTimer(0.05, \&_launch_tasks); my $now = time; for (keys(%TASKS)) { next unless /^file:/; my ($task, $ts,) = @{$TASKS{$_}}; if ($ts < $now-120) { printf "TIMEOUT: %s\n", $task; delete $TASKS{$_}; } } my $c = 0; while (scalar(grep(/^file:/,keys(%TASKS))) < $opt_S && $c < 5) { my $fn = &_get_file(); return unless defined $fn; next if $TASKS{"file:$fn"}; next if $TASKS{"done:$fn"}; next if $TASKS{"fail:$fn"}; $c++; my %st = (); for (keys %TASKS) { die "whatis $_" unless /^(\w+):/; $st{$1}++; } my $st = "$opt_S max"; for (keys %st) { $st .= sprintf ", %s %s", $st{$_}, $_; } printf "DEB: tasks %s ... loading '%s' ...", $st, $fn; use File::stat; my $s = stat($fn) or die "cant stat($fn): $!"; die "its too big! $fn" if $s->size > 100*1000*1000; my $fs = $s->size; printf " reading %i bytes ...", $fs; open DF, "<", $fn or die "cant open($fn): $!"; my $D; { local $/; $D = ; } close DF; $tb+=$fs; die "size mismatch" unless length $D == $s->size; use TMA::Util; my ($h, $d,) = TMA::Util::msg_split($D); printf " sending"; use TMA::Config; require Gearman::Client::Async; my $client = $CLIENT ||= Gearman::Client::Async->new(job_servers => $TMA::Config::gearman_servers_spool); require Gearman::Task; my $task = SpoolTask->new(\$D, $fn, \&_launch_tasks); $client->add_task($task); printf " ... %s\n", $task; } } if ($opt_S) { require Danga::Socket; &_launch_tasks(); Danga::Socket->EventLoop(); die "never here"; } die "internal error" if $opt_S; ########## ########## #@stack = splice @stack, 0, 10; while (@stack) { my $n = shift @stack; die "does not exist '$n'" unless -e $n; use File::stat; my $s = stat($n) or die "cant stat($n): $!"; die "its too big! $n" if $s->size > 100*1000*1000; if (-f $n) { open LF, "<", $n or die "cant open($n): $!"; my $d; { local $/; $d = ; } close LF; die "size mismatch" unless length($d) == $s->size; printf "LOADED %s ... %i byte ...", $n, length($d); my $res = &spool_it(\$d); if ($res eq 'ok') { print " ok ..."; unlink $n or die "cant unlink($n): $!"; } else { die "res: $res"; } print "\n"; # @stack = (); } else { die "FUNNY: $n"; } } die "internal error" if @stack; exit 0; ### sub spool_it ($$) { my ($dd,) = @_; die "data not ref" unless ref $dd; die "bad dataref: $dd" unless ref $dd eq 'SCALAR'; #printf "BEGIN: ".`top -b -n1 | grep "^ *$$ "`; use TMA::Util; my ($h, $d,) = TMA::Util::msg_split($dd); use Storable; my $r = Storable::thaw($d); undef($d); if ($opt_v) { print "\n"; for my $k (keys %$r) { my $v = $r->{$k}; my $l = defined $v ? length($v) : 0; $v = '' unless defined $v; $v = '' unless $v =~ /^[-<>\/,.\w :]+$/; printf "DEB: '%s' -> %i bytes -> '%s'\n", $k, $l, $v; if ($k eq 'data') { my $fn = "data.$$"; open OF, ">", $fn or die "cant open $fn: $!"; print OF $r->{$k}; close OF or die "close: $!"; } } } # printf " got %s ...", $r; die "no inner data" unless exists $r->{data}; die "empty inner data" unless length $r->{data}; my $D = \$r->{data}; #printf " D => %s ...", $D; my $size = length($$D); die "no inner size" unless $r->{size}; die "inner size mismatch" unless $r->{size} == $size; #printf "\nPRESHA1: ".`top -b -n1 | grep "^ *$$ "`; require Digest::SHA; my $s1 = Digest::SHA->new(); $s1->add($$D); my $sha1 = $s1->digest(); undef $s1; die "no inner sha1" unless $r->{sha1}; die "inner sha1 mismatch" unless $r->{sha1} eq $sha1; use TMA::Loot; my $loot = TMA::Loot->new( data => $$D, sha1 => $sha1, ); die "no loot" unless $loot; $loot->lock(); my $n2n = $loot->n2n(); my $lid = $n2n->{lid}; die "no lid" unless $lid; die "bad lid" unless $lid =~ /^\d+$/; $loot->store(); if (defined $r->{nourl} && length $r->{nourl}) { die "nourl and url" if exists $r->{url}; # do nothing } elsif (defined $r->{url} && length $r->{url}) { die "url and nourl" if exists $r->{nourl}; my $url = $r->{url}; die "no url" unless length $url; die "bad url '$url'" unless $url =~ /^(http(?:s)?):\/\//; use TMA::URL; my $uid = TMA::URL->store_url($url); my $mtime = $r->{mtime} || $r->{when}; die "no mtime" unless $mtime; die "bad mtime $mtime" unless $mtime =~ /^\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d$/; my $atime = $r->{atime} || $r->{when}; unless ($atime) { printf " no atime ..."; $atime = $r->{mtime}; } die "no atime" unless $atime; die "bad atime $atime" unless $atime =~ /^\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d$/; my $code = $r->{code}; die "no code" unless $code; die "bad code $code" unless $code =~ /^\d\d\d$/; my $dbh = &get_dbh(); my $sth = $dbh->prepare_cached(qq{ insert into loot.l2n set uid = ?, lid = ?, code = ?, minatime = ?, maxatime = minatime, minmtime = ?, maxmtime = minmtime on duplicate key update minatime = least(minatime, ?), maxatime = greatest(maxatime, ?), minmtime = least(minmtime, ?), maxmtime = greatest(maxmtime, ?), seen = seen + 1 }) or die $dbh->errstr; $sth->execute($uid, $lid, $code, $atime, $mtime, $atime, $atime, $mtime, $mtime) or die $sth->errstr; } else { die "neither url no nourl"; } if (!$opt_P || $opt_g) { $loot->process(); } $loot->unlock(); # if ($n2n->{mime} =~ /^image/) { $opt_l = 1; } #printf "\nEND: ".`top -b -n1 | grep "^ *$$ "`; return "ok"; } my $db_h; my $db_pid; sub get_dbh { if ($db_pid && $db_pid != $$) { warn "pid change, discarding dbh $db_h"; $db_h = undef; } if ($db_h && !$db_h->ping()) { warn "ping failed, discarding dbh $db_h"; $db_h = undef; } return $db_h if $db_h; use TMA::Config; my @par = ( $TMA::Config::loot_dsn, $TMA::Config::loot_user, $TMA::Config::loot_pass, ); require DBI; $db_h = DBI->connect(@par) or die "cant connect: ".DBI->errstr; $db_h->{mysql_auto_reconnect} = 1; $db_pid = $$; return $db_h; } sub loot_gearprocess ($) { # printf "START: got a job\n"; my $job = shift; my $dd = $job->argref; die "data not ref" unless ref $dd; die "bad dataref: $dd" unless ref $dd eq 'SCALAR'; #printf "BEGIN: ".`top -b -n1 | grep "^ *$$ "`; use TMA::Util; my ($h, $d,) = TMA::Util::msg_split($dd); use Storable; my $r = Storable::thaw($d); undef($d); use Data::Dumper; printf "PROCESS: %s", Dumper($r); use TMA::Loot; my $loot = TMA::Loot->new( %$r ); die "no loot" unless $loot; $loot->process(); print "\n"; return "ok"; } sub loot_gearspool ($) { # printf "START: got a job\n"; my $job = shift; my $D = $job->argref; my $fn = &file_store($D); printf "SPOOLED: %s ... %i byte ...", $fn, length $$D; my $res = eval { &spool_it($D); }; if ($res && $res eq 'ok') { print " done!"; unlink $fn or die "cant unlink($fn): $!"; } else { if ($res) { chomp $res; print " res:'$res' ..." if $res; } if ($@) { chomp $@; print " exc:'$@' ..." if $@; } } print "\n"; return "ok"; } sub file_store ($) { my ($D,) = @_; die "not ref" unless ref $D; my $id = sprintf "%i.%s.%i.%i",time,$hn,$$,$cnt++; my $fn = sprintf "%s/%s", $opt_d, $id; my $tn = sprintf "%s/.%s.tmp", $opt_d, $id; die "file exists: $fn" if -e $fn; die "file exists: $tn" if -e $tn; open TF, ">", $tn or die "cant open $tn: $!"; print TF $$D; close TF or die "cant close $tn: $!"; rename $tn, $fn or die "cant rename($tn,$fn): $!"; die "still present $tn" if -e $tn; die "not present $fn" unless -e $fn; use File::stat; my $s = stat($fn); die "stat($fn) failed: $!" unless $s; die "size mismatch" unless $s->size == length($$D); if ($opt_p) { open RF, "<", $fn or die "cant readopen($fn): $!"; my $d; { local $/; $d = ; } close RF; die "data mismatch" unless $d eq $$D; } return $fn; }