|
|
|
… dann lebt sie noch heute.
Ja, sie lebt noch.
|
Nicht elegant, aber praktisch
select facttablename, columnname, term from concept c1, relation r, concept c2, term t where c1.canonicalname='Berichtsregion' and c1.id=r.parent and c2.id=r.child and c2.canonicalname='Österreich' and t.concept_id=c2.id
select * from facttable_gen_preise f where f.berichtsregion='295';
select * from facttable_gen_finanz f where f.berichtsregion='295';
select * from facttable_kon_eh f where f.berichtsregion='10';
select * from facttable_eurostat_comext f where f.rep='38';
Offensichtlich über Facttables parallelisierbar!
Das war ein bisschen vereinfacht.
In Wirklichkeit sieht das eher so aus:
select facttablename, columnname, term, concept_id, t.hidden, language, register, id, canonicalname, description, parent, type, r.sortorder from term t, concept c, relation r where facttablename='facttable_gen_tourismus' and columnname='erhebungszeitpunkt' and exists ( select 1 from facttable_gen_tourismus f where f.erhebungszeitpunkt=t.term and thema in (E'10') and saison in (E'3') and berichtsregion in (E'295.06') ) and c.id=concept_id and r.child=concept_id
Aber die Parallelisierbarkeit bleibt.
create or replace function mb_nextlevel_zmq(variadic integer[]) returns setof mb_treenode as $$ use strict; use warnings; use 5.10.0; use utf8; use ZMQ::LibZMQ3; use ZMQ::Constants qw(ZMQ_REQ); use WDS::Macrobond::Utils; my ($ids) = @_; $ids =~ s/{(.*)}/$1/; my @ids = split(/,/, $ids); # some constants my $GS = "\x{1D}"; # ASCII group separator my $RS = "\x{1E}"; # ASCII record separator my $US = "\x{1F}"; # ASCII unit separator my $sth = spi_query("select * from userconfig where logname=user"); my $row = spi_fetchrow($sth); my $url = $row->{mb_dal_url} // "tcp://127.0.0.1:21887"; spi_cursor_close($sth); my $context = zmq_init(); my $req_sck = zmq_socket($context, ZMQ_REQ); zmq_connect($req_sck, $url); my $qry_msg = join($US, "mb_nextlevel", @ids); zmq_send($req_sck, $qry_msg); my $res_msg = zmq_msg_init(); zmq_msg_recv($res_msg, $req_sck); my $result = WDS::Macrobond::Utils::decode_result(zmq_msg_data($res_msg)); my $aoh = WDS::Macrobond::Utils::aoh($result); return $aoh; $$ language plperlu;
my $context = zmq_init(); # This service is strictly synchronous. We answer every request before # considering the next one. So we use a REP socket. my $qry_socket = zmq_socket($context, ZMQ_REP); zmq_bind($qry_socket, "tcp://*:$qry_port"); ... for (;;) { zmq_msg_recv($msg, $qry_socket); my $t0 = time; my $req = zmq_msg_data($msg); my @param = split($US, $req); $_ = decode_utf8($_) for @param; my $cmd = shift(@param); ++$req_no; print STDERR "$0: $t0: received request $req_no: $cmd @param\n"; if ($cmd{$cmd}) { my $result = $cmd{$cmd}->(@param); my $msg = WDS::Macrobond::Utils::encode_result($result); zmq_msg_send($msg, $qry_socket); } else { say STDERR "unrecognized command $cmd"; my $result = { success => 'ERROR', error => "unrecognized command $cmd" }; my $msg = WDS::Macrobond::Utils::encode_result($result); zmq_msg_send($msg, $qry_socket); } my $t1 = time; print STDERR "$0: $t1: finished request $req_no: $cmd @param (" . ($t1 - $t0) . " s)\n"; exit 0 if WDS::Macrobond::Utils::has_changed($treestate0); }
sub BUILD { my ($self, $args) = @_; my $context = zmq_init(); my $socket = zmq_socket($context, ZMQ_ROUTER); my $url = $self->url; unless ($url) { my $port = int(rand(0x4000)) + 0xC000; $url = "tcp://127.0.0.1:$port"; $self->_set_url($url); } zmq_bind($socket, $self->url); $self->_set_utils(WDS::Macrobond::Utils->new); $self->_set_socket($socket); my $worker_pids = {}; for (1 .. $self->workers) { my $rc = fork(); if (!defined $rc) { die "cannot fork: $!"; } elsif ($rc == 0) { exec($self->worker, $self->url, "wds", $self->schema); die "cannot exec " . $self->worker . ": $!"; } else { $worker_pids->{$rc} = 1; } } $self->_worker_pids($worker_pids); }
sub sync_query { my ($self, $queries) = @_; $self->query_time(0); my $t0 = time; my $socket = $self->socket; # list of ready workers my $ready = $self->_ready; # outstanding requests by worker id. # we store an index into @$queries plus some stats my $outstanding = {}; # results. Same order as @$queries # Each element is a hashref with the elements # success (OK, ERROR) # header (arrayref of column names) # error (error message) # resultset (AofA) my $finished = []; while (my ($i, $q) = each $queries) { if (@$ready == 0) { $self->process_worker_request($finished, $outstanding); } # send request my $worker_id = shift($ready); zmq_msg_send($worker_id, $socket, ZMQ_SNDMORE); zmq_msg_send("", $socket, ZMQ_SNDMORE); my $msg = join($US, "anonymous", "secret", $q); zmq_msg_send($msg, $socket); $outstanding->{$worker_id} = { qidx => $i, start => time }; } while (keys %$outstanding) { $self->process_worker_request($finished, $outstanding); } my $t1 = time; $self->log('INFO', "elapsed = " . ($t1 - $t0) . "s, total = " . $self->query_time . "s"); return $finished; }
sub process_worker_request { my ($self, $finished, $outstanding) = @_; my $ready = $self->_ready; my $socket = $self->socket; my $msg = zmq_msg_init(); # from REQ to ROUTER: (at least) 3 parts: zmq_msg_recv($msg, $socket); my $worker_id = zmq_msg_data($msg); zmq_msg_recv($msg, $socket); my $empty = zmq_msg_data($msg); die unless $empty eq ""; zmq_msg_recv($msg, $socket); my $payload = zmq_msg_data($msg); my ($success, $header, $resultset) = split($GS, $payload); if (exists $outstanding->{$worker_id}) { my $i = $outstanding->{$worker_id}{qidx}; my $start = $outstanding->{$worker_id}{start}; $finished->[$i] = { success => $success, }; if ($success eq 'OK') { $finished->[$i]{header} = [ split($US, $header) ]; $finished->[$i]{resultset} = []; for my $g (split($RS, $resultset)) { my $row = []; for my $u (split($US, $g, -1)) { push $row, ($u eq "\0" ? undef : $u); } push $finished->[$i]{resultset}, $row; } } elsif ($success eq 'ERROR') { $finished->[$i]{error} = $header; } else { die "cannot happen"; } delete $outstanding->{$worker_id}; my $now = time; my $dt = $now - $start; $self->log('INFO', "finished request $i: $start + $dt = $now"); $self->query_time($self->query_time + $dt); } push $ready, $worker_id; }
my $req_sck = zmq_socket($context, ZMQ_REQ); zmq_connect($req_sck, $boss); my $dbh = DBIx::SimpleConnect->connect($db, { PrintError => 0, RaiseError => 1}); ... # Send an unsolicited "result" zmq_send($req_sck, "NOP"); for (;;) { my $msg = zmq_msg_init(); zmq_msg_recv($msg, $req_sck); my $message = zmq_msg_data($msg); my ($user, $auth, $query) = split($US, $message); my $error = 0; # XXX - check auth token and switch to user here (NYI). my $result; eval { my $sth = $dbh->prepare($query); $sth->execute; my $header = join($US, @{$sth->{NAME}}); my $resultset = $dbh->selectall_arrayref($sth); $resultset = join($RS, map { join($US, map $_ // "\0", @$_) } @$resultset); $result = join($GS, "OK", $header, $resultset); 1; } or do { $result = join($GS, "ERROR", $@); $error = 1; }; zmq_send($req_sck, $result); if ($error) { ... } }
$result = $workers->sync_query( [ "select distinct concept_id, canonicalname from term join concept on concept_id=concept.id where facttablename is not null and columnname is not null and term is null" ] );
for my $di (@$dim_instances) { my $facttablename = $di->{facttablename}; my @meta_column_ids; my @real_column_names; for my $columnname (@{ $di->{columnname} }) { my $cmd; if (looks_like_number($columnname)) { push @meta_column_ids, $columnname; } else { push @real_column_names, $columnname; } } ... if (@real_column_names) { # The complicated case: We have to extract matching members from the fact table. # we have different column names, so we need a different subquery for each. # the path condition is the same for each # not sure if rolling them all into one query is ideal my $path_cond = ""; for my $node (@{ $self->{facttables}{$facttablename}{path} }) { my $columnname = $node->{columnname}; unless (looks_like_number($columnname)) { $columnname = encode('UTF-8', $columnname); $path_cond .= " and $columnname in (" . join(", ", map quote($_->{term}), @{ $node->{members} }) . ")"; } } for my $columnname (@real_column_names) { my $cmd = "select facttablename, columnname, term, concept_id, t.hidden, language, register, id, canonicalname, description, parent, type, r.sortorder"; $cmd .= " from term t, concept c, relation r"; $cmd .= " where facttablename='$facttablename' and"; $cmd .= " columnname='$columnname' and exists (select 1 from $facttablename f where f.$columnname=t.term $path_cond)"; $cmd .= " and c.id=concept_id and r.child=concept_id"; push @cmd, $cmd; } } } my $result = $self->{workers}->sync_query(\@cmd);
"Echte" Query (Alle Daten von Statistik Austria für Berichtsregion Salzburg) inkl. nicht-parallelisiertem Anteil
Frischer DAL-Prozess, aber "vorgewärmte" Datenbank.
Erheblicher Anteil an nicht-parallelisierten Querys.