#!/usr/bin/perl
use warnings;
use strict;

use Getopt::Long;
use Time::HiRes qw(time);

my $processes = get_num_cpus();
my @keys;
my $fieldseparator = '\s+';

GetOptions(
    'processes=i'         => \$processes,
    'keys=s'              => \@keys,
    't|field-separator=s' => \$fieldseparator,
);

@keys = split(/,/,join(',',@keys));

my ($pipe_sorter, $pipe_merger) = create_sorters($processes);

my $ch = 0;
while (<>) {
    # print STDERR "$$ sending line to $ch\n";
    print {$pipe_sorter->[$ch]} $_;
    $ch = ($ch + 1) % $processes;
}
for my $ch (0 .. $processes-1) {
    print STDERR "$$: p: " . sprintf("%.3f", time - $^T) . ": closing $ch\n";
    close($pipe_sorter->[$ch]);
}

my @buffers;
for my $ch (0 .. $processes-1) {
    my $fh = $pipe_merger->[$ch];
    my $rec = <$fh>;
    push @buffers, [ $ch, $rec ];
}
@buffers = sort { $a->[1] cmp $b->[1] } @buffers;
while (@buffers) {
    my $buf = shift @buffers;
    #print STDERR "got line from channel $buf->[0]: $buf->[1]\n";
    print substr($buf->[1], index($buf->[1], "\0") + 1);
    my $fh = $pipe_merger->[$buf->[0]];
    my $rec = <$fh>;
    #print STDERR "read line from channel $buf->[0]: $rec\n";
    next unless defined $rec;
    my $i = 0; 
    $i++ while ($i < @buffers && $rec ge $buffers[$i][1]);
    splice(@buffers, $i, 0, [$buf->[0], $rec]);
}
print STDERR "$$: p: " . sprintf("%.3f", time - $^T) . ": done\n";
exit(0);


sub get_num_cpus {
    open my $fh, '<', '/proc/cpuinfo'
        or die "cannot open /proc/cpuinfo: $!";
    my $num_cpus = 0;
    while (<$fh>) {
        $num_cpus++ if (/^processor\s*:/);
    }
    return $num_cpus;
}


sub create_sorters {
    my ($processes) = @_;

    my (@pipe_sorter, @pipe_merger);
    for my $i (0 .. $processes-1) {
        pipe(my $sorter_in, my $sorter_out);
        pipe(my $merger_in, my $merger_out);
        push @pipe_sorter, $sorter_out;
        push @pipe_merger, $merger_in;
        my $pid = fork();
        die "cannot fork: $!" unless defined $pid;
        if ($pid == 0) {
            # close all the other pipe ends.
            for (@pipe_sorter, @pipe_merger) {
                close($_);
            }

            my @recs;
            print STDERR "$$: $i: " . sprintf("%.3f", time - $^T) . ": reading lines\n";
            while (<$sorter_in>) {
                chomp;
                my $v = join("\1", (split($fieldseparator, $_))[@keys]);
                push @recs, "$v\0$_";
            }
            print STDERR "$$: $i: " . sprintf("%.3f", time - $^T) . ": sorting lines\n";
            @recs = sort @recs;
            print STDERR "$$: $i: " . sprintf("%.3f", time - $^T) . ": printing lines\n";
            for (@recs) {
                print $merger_out $_, $/;
            }
            print STDERR "$$: $i: " . sprintf("%.3f", time - $^T) . ": done\n";
            exit(0);
        }
    }
    return \@pipe_sorter, \@pipe_merger;
}
