|
|
|
|
… dann lebt sie noch heute.
Ja, sie lebt noch.
|
|
Nicht elegant, aber praktisch
select facttablename, columnname, term from concept c1, relation r, concept c2, term t where c1.canonicalname='Berichtsregion' and c1.id=r.parent and c2.id=r.child and c2.canonicalname='Österreich' and t.concept_id=c2.id
select * from facttable_gen_preise f where f.berichtsregion='295';
select * from facttable_gen_finanz f where f.berichtsregion='295';
select * from facttable_kon_eh f where f.berichtsregion='10';
select * from facttable_eurostat_comext f where f.rep='38';
Offensichtlich über Facttables parallelisierbar!
Das war ein bisschen vereinfacht.
In Wirklichkeit sieht das eher so aus:
select facttablename, columnname, term, concept_id, t.hidden, language,
register, id, canonicalname, description, parent, type, r.sortorder
from term t, concept c, relation r
where facttablename='facttable_gen_tourismus' and columnname='erhebungszeitpunkt'
and exists (
select 1 from facttable_gen_tourismus f
where f.erhebungszeitpunkt=t.term
and thema in (E'10')
and saison in (E'3')
and berichtsregion in (E'295.06')
)
and c.id=concept_id and r.child=concept_id
Aber die Parallelisierbarkeit bleibt.
create or replace function mb_nextlevel_zmq(variadic integer[]) returns setof mb_treenode as $$
use strict;
use warnings;
use 5.10.0;
use utf8;
use ZMQ::LibZMQ3;
use ZMQ::Constants qw(ZMQ_REQ);
use WDS::Macrobond::Utils;
my ($ids) = @_;
$ids =~ s/{(.*)}/$1/;
my @ids = split(/,/, $ids);
# some constants
my $GS = "\x{1D}"; # ASCII group separator
my $RS = "\x{1E}"; # ASCII record separator
my $US = "\x{1F}"; # ASCII unit separator
my $sth = spi_query("select * from userconfig where logname=user");
my $row = spi_fetchrow($sth);
my $url = $row->{mb_dal_url} // "tcp://127.0.0.1:21887";
spi_cursor_close($sth);
my $context = zmq_init();
my $req_sck = zmq_socket($context, ZMQ_REQ);
zmq_connect($req_sck, $url);
my $qry_msg = join($US, "mb_nextlevel", @ids);
zmq_send($req_sck, $qry_msg);
my $res_msg = zmq_msg_init();
zmq_msg_recv($res_msg, $req_sck);
my $result = WDS::Macrobond::Utils::decode_result(zmq_msg_data($res_msg));
my $aoh = WDS::Macrobond::Utils::aoh($result);
return $aoh;
$$
language plperlu;
my $context = zmq_init();
# This service is strictly synchronous. We answer every request before
# considering the next one. So we use a REP socket.
my $qry_socket = zmq_socket($context, ZMQ_REP);
zmq_bind($qry_socket, "tcp://*:$qry_port");
...
for (;;) {
zmq_msg_recv($msg, $qry_socket);
my $t0 = time;
my $req = zmq_msg_data($msg);
my @param = split($US, $req);
$_ = decode_utf8($_) for @param;
my $cmd = shift(@param);
++$req_no;
print STDERR "$0: $t0: received request $req_no: $cmd @param\n";
if ($cmd{$cmd}) {
my $result = $cmd{$cmd}->(@param);
my $msg = WDS::Macrobond::Utils::encode_result($result);
zmq_msg_send($msg, $qry_socket);
} else {
say STDERR "unrecognized command $cmd";
my $result = { success => 'ERROR', error => "unrecognized command $cmd" };
my $msg = WDS::Macrobond::Utils::encode_result($result);
zmq_msg_send($msg, $qry_socket);
}
my $t1 = time;
print STDERR "$0: $t1: finished request $req_no: $cmd @param (" . ($t1 - $t0) . " s)\n";
exit 0 if WDS::Macrobond::Utils::has_changed($treestate0);
}
sub BUILD {
my ($self, $args) = @_;
my $context = zmq_init();
my $socket = zmq_socket($context, ZMQ_ROUTER);
my $url = $self->url;
unless ($url) {
my $port = int(rand(0x4000)) + 0xC000;
$url = "tcp://127.0.0.1:$port";
$self->_set_url($url);
}
zmq_bind($socket, $self->url);
$self->_set_utils(WDS::Macrobond::Utils->new);
$self->_set_socket($socket);
my $worker_pids = {};
for (1 .. $self->workers) {
my $rc = fork();
if (!defined $rc) {
die "cannot fork: $!";
} elsif ($rc == 0) {
exec($self->worker, $self->url, "wds", $self->schema);
die "cannot exec " . $self->worker . ": $!";
} else {
$worker_pids->{$rc} = 1;
}
}
$self->_worker_pids($worker_pids);
}
sub sync_query {
my ($self, $queries) = @_;
$self->query_time(0);
my $t0 = time;
my $socket = $self->socket;
# list of ready workers
my $ready = $self->_ready;
# outstanding requests by worker id.
# we store an index into @$queries plus some stats
my $outstanding = {};
# results. Same order as @$queries
# Each element is a hashref with the elements
# success (OK, ERROR)
# header (arrayref of column names)
# error (error message)
# resultset (AofA)
my $finished = [];
while (my ($i, $q) = each $queries) {
if (@$ready == 0) {
$self->process_worker_request($finished, $outstanding);
}
# send request
my $worker_id = shift($ready);
zmq_msg_send($worker_id, $socket, ZMQ_SNDMORE);
zmq_msg_send("", $socket, ZMQ_SNDMORE);
my $msg = join($US, "anonymous", "secret", $q);
zmq_msg_send($msg, $socket);
$outstanding->{$worker_id} = { qidx => $i, start => time };
}
while (keys %$outstanding) {
$self->process_worker_request($finished, $outstanding);
}
my $t1 = time;
$self->log('INFO', "elapsed = " . ($t1 - $t0) . "s, total = " . $self->query_time . "s");
return $finished;
}
sub process_worker_request {
my ($self, $finished, $outstanding) = @_;
my $ready = $self->_ready;
my $socket = $self->socket;
my $msg = zmq_msg_init();
# from REQ to ROUTER: (at least) 3 parts:
zmq_msg_recv($msg, $socket);
my $worker_id = zmq_msg_data($msg);
zmq_msg_recv($msg, $socket);
my $empty = zmq_msg_data($msg);
die unless $empty eq "";
zmq_msg_recv($msg, $socket);
my $payload = zmq_msg_data($msg);
my ($success, $header, $resultset) = split($GS, $payload);
if (exists $outstanding->{$worker_id}) {
my $i = $outstanding->{$worker_id}{qidx};
my $start = $outstanding->{$worker_id}{start};
$finished->[$i] = {
success => $success,
};
if ($success eq 'OK') {
$finished->[$i]{header} = [ split($US, $header) ];
$finished->[$i]{resultset} = [];
for my $g (split($RS, $resultset)) {
my $row = [];
for my $u (split($US, $g, -1)) {
push $row, ($u eq "\0" ? undef : $u);
}
push $finished->[$i]{resultset}, $row;
}
} elsif ($success eq 'ERROR') {
$finished->[$i]{error} = $header;
} else {
die "cannot happen";
}
delete $outstanding->{$worker_id};
my $now = time;
my $dt = $now - $start;
$self->log('INFO', "finished request $i: $start + $dt = $now");
$self->query_time($self->query_time + $dt);
}
push $ready, $worker_id;
}
my $req_sck = zmq_socket($context, ZMQ_REQ);
zmq_connect($req_sck, $boss);
my $dbh = DBIx::SimpleConnect->connect($db, { PrintError => 0, RaiseError => 1});
...
# Send an unsolicited "result"
zmq_send($req_sck, "NOP");
for (;;) {
my $msg = zmq_msg_init();
zmq_msg_recv($msg, $req_sck);
my $message = zmq_msg_data($msg);
my ($user, $auth, $query) = split($US, $message);
my $error = 0;
# XXX - check auth token and switch to user here (NYI).
my $result;
eval {
my $sth = $dbh->prepare($query);
$sth->execute;
my $header = join($US, @{$sth->{NAME}});
my $resultset = $dbh->selectall_arrayref($sth);
$resultset = join($RS,
map { join($US, map $_ // "\0", @$_) }
@$resultset);
$result = join($GS, "OK", $header, $resultset);
1;
} or do {
$result = join($GS, "ERROR", $@);
$error = 1;
};
zmq_send($req_sck, $result);
if ($error) {
...
}
}
$result
= $workers->sync_query(
[
"select distinct concept_id, canonicalname
from term join concept on concept_id=concept.id
where facttablename is not null and columnname is not null and term is null"
]
);
for my $di (@$dim_instances) {
my $facttablename = $di->{facttablename};
my @meta_column_ids;
my @real_column_names;
for my $columnname (@{ $di->{columnname} }) {
my $cmd;
if (looks_like_number($columnname)) {
push @meta_column_ids, $columnname;
} else {
push @real_column_names, $columnname;
}
}
...
if (@real_column_names) {
# The complicated case: We have to extract matching members from the fact table.
# we have different column names, so we need a different subquery for each.
# the path condition is the same for each
# not sure if rolling them all into one query is ideal
my $path_cond = "";
for my $node (@{ $self->{facttables}{$facttablename}{path} }) {
my $columnname = $node->{columnname};
unless (looks_like_number($columnname)) {
$columnname = encode('UTF-8', $columnname);
$path_cond .= " and $columnname in ("
. join(", ", map quote($_->{term}), @{ $node->{members} })
. ")";
}
}
for my $columnname (@real_column_names) {
my $cmd = "select facttablename, columnname, term, concept_id, t.hidden, language, register, id, canonicalname,
description, parent, type, r.sortorder";
$cmd .= " from term t, concept c, relation r";
$cmd .= " where facttablename='$facttablename' and";
$cmd .= " columnname='$columnname' and exists (select 1 from $facttablename f where f.$columnname=t.term $path_cond)";
$cmd .= " and c.id=concept_id and r.child=concept_id";
push @cmd, $cmd;
}
}
}
my $result = $self->{workers}->sync_query(\@cmd);
"Echte" Query (Alle Daten von Statistik Austria für Berichtsregion Salzburg) inkl. nicht-parallelisiertem Anteil
Frischer DAL-Prozess, aber "vorgewärmte" Datenbank.
Erheblicher Anteil an nicht-parallelisierten Querys.