#!/usr/bin/perl
use warnings;
use strict;

use Getopt::Long;
use Time::HiRes qw(time);

my $processes = get_num_cpus();
my @keys;
my $fieldseparator = '\s+';

GetOptions(
    'processes=i'         => \$processes,
    'keys=s'              => \@keys,
    't|field-separator=s' => \$fieldseparator,
);
$processes = 3 if $processes < 3;

@keys = split(/,/,join(',',@keys));

split_sort_merge($processes, 1, 0);
exit(0);

sub get_num_cpus {
    open my $fh, '<', '/proc/cpuinfo'
        or die "cannot open /proc/cpuinfo: $!";
    my $num_cpus = 0;
    while (<$fh>) {
        $num_cpus++ if (/^processor\s*:/);
    }
    return $num_cpus;
}


sub split_sort_merge {
    my ($processes, $final, $process_number) = @_;
    #$| = 1;
    $processes -= 1;
    if ($processes >= 2) {

	# create child processes

        print STDERR "$$: $process_number: " . sprintf("%.3f", time - $^T) . ": creating pipes for subprocess 1\n";
	my $subprocesses1 = int($processes/2);
        pipe(my $split_in1, my $split_out1);
        pipe(my $merge_in1, my $merge_out1);
        my $pid = fork();
        die "cannot fork: $!" unless defined $pid;
        if ($pid == 0) {
            close($split_out1);
	    close($merge_in1);
	    open STDIN, "<&", $split_in1;
	    open STDOUT, ">&", $merge_out1;
            close($split_in1);
            close($merge_out1);

	    split_sort_merge($subprocesses1, 0, $process_number + 1);

            exit(0);
        }
        print STDERR "$$: $process_number: " . sprintf("%.3f", time - $^T) . ": closing split_in1\n";
	close($split_in1);
        print STDERR "$$: $process_number: " . sprintf("%.3f", time - $^T) . ": closing merge_out1\n";
	close($merge_out1);
        print STDERR "$$: $process_number: " . sprintf("%.3f", time - $^T) . ": still open: split_out1, merge_in1\n";

        print STDERR "$$: $process_number: " . sprintf("%.3f", time - $^T) . ": creating pipes for subprocess 2\n";
	my $subprocesses2 = $processes - $subprocesses1;
        pipe(my $split_in2, my $split_out2);
        pipe(my $merge_in2, my $merge_out2);
        $pid = fork();
        die "cannot fork: $!" unless defined $pid;
        if ($pid == 0) {
            # these are connected to the other child
            close($split_out1);
	    close($merge_in1);
            # and these are the parent ends
            close($split_out2);
	    close($merge_in2);

            # so at this point, only our ends should be open - dup to stdin/out and close them
	    open STDIN, "<&", $split_in2;
	    open STDOUT, ">&", $merge_out2;
            close($split_in2);
            close($merge_out2);

	    split_sort_merge($subprocesses2, 0, $process_number + 1 + $subprocesses1);

            exit(0);
        }
        print STDERR "$$: $process_number: " . sprintf("%.3f", time - $^T) . ": closing split_in2 and merge_out2\n";
	close($split_in2);
	close($merge_out2);

        print STDERR "$$: $process_number: " . sprintf("%.3f", time - $^T) . ": splitting\n";
	# distribute input to child processes
	while (<STDIN>) {
	    my $pipe = $. % 2 ? $split_out1 : $split_out2;
	    print $pipe $_;
	}
	close($split_out1);
	close($split_out2);

        print STDERR "$$: $process_number: " . sprintf("%.3f", time - $^T) . ": merging\n";
	# merge output from child processes
	my $buf1 = <$merge_in1>;
	my $buf2 = <$merge_in2>;
	while(defined $buf1 && defined $buf2) {
	    if ($buf1 le $buf2) {
		if ($final) {
		    $buf1 = substr($buf1, index($buf1, "\0") + 1);
		}
                #print STDERR "$$: $process_number: " . sprintf("%.3f", time - $^T) . ": print record $buf1\n";
		print $buf1;
		$buf1 = <$merge_in1>;
	    } else {
		if ($final) {
		    $buf2 = substr($buf2, index($buf2, "\0") + 1);
		}
                #print STDERR "$$: $process_number: " . sprintf("%.3f", time - $^T) . ": print record $buf2\n";
		print $buf2;
		$buf2 = <$merge_in2>;
	    }
            #sleep(1);
	}
	while(defined $buf1) {
	    if ($final) {
		$buf1 = substr($buf1, index($buf1, "\0") + 1);
	    }
	    print $buf1;
	    $buf1 = <$merge_in1>;
	}
	while(defined $buf2) {
	    if ($final) {
		$buf2 = substr($buf2, index($buf2, "\0") + 1);
	    }
	    print $buf2;
	    $buf2 = <$merge_in2>;
	}

        print STDERR "$$: $process_number: " . sprintf("%.3f", time - $^T) . ": done\n";
    } else {
	sorter($process_number);

    }
}

# reads records in raw format from stdin, cooks them, sorts them and
# writes the sorted, cooked records to stdout.

sub sorter {
    my ($process_number) = @_;
    my @recs;
    print STDERR "$$: $process_number: " . sprintf("%.3f", time - $^T) . ": reading lines\n";
    while (<STDIN>) {
	chomp;
	my $v = join("\1", (split($fieldseparator, $_))[@keys]);
	push @recs, "$v\0$_";
    }
    print STDERR "$$: $process_number: " . sprintf("%.3f", time - $^T) . ": sorting lines\n";
    @recs = sort @recs;
    print STDERR "$$: $process_number: " . sprintf("%.3f", time - $^T) . ": printing lines\n";
    for (@recs) {
	print STDOUT $_, $/;
    }
    print STDERR "$$: $process_number: " . sprintf("%.3f", time - $^T) . ": done\n";
}

# vim: ts=8 sw=4 expandtab tw=0
