[libdbd-pg-perl] 17/35: crude replication test to prove COPY_BOTH changes
Christoph Berg
myon at debian.org
Wed Sep 27 17:41:03 UTC 2017
This is an automated email from the git hooks/post-receive script.
myon pushed a commit to branch master
in repository libdbd-pg-perl.
commit 790efd7fc1882f9e228c617f1d4dbce70e160fe5
Author: William Cox <mydimension at gmail.com>
Date: Wed Sep 20 16:59:01 2017 -0400
crude replication test to prove COPY_BOTH changes
---
t/40replication.t | 137 ++++++++++++++++++++++++++++++++++++++++++++++++++
t/dbdpg_test_setup.pl | 15 ++++++
2 files changed, 152 insertions(+)
diff --git a/t/40replication.t b/t/40replication.t
new file mode 100644
index 0000000..96e8780
--- /dev/null
+++ b/t/40replication.t
@@ -0,0 +1,137 @@
+#!perl
+
+## Test the COPY functionality
+
+use 5.006;
+use strict;
+use warnings;
+use Data::Dumper;
+use Data::HexDump;
+use DBD::Pg ':async';
+use Test::More;
+use lib 't','.';
+require 'dbdpg_test_setup.pl';
+select(($|=1,select(STDERR),$|=1)[1]);
+
+use constant {
+ DUP_OBJ => '42710',
+ USECS => 1_000_000,
+ PG_TO_UNIX_EPOCH_DELTA => 946_684_800,
+};
+
+my $slot = 'dbd_pg_test';
+my $plugin = 'test_decoding';
+
+my $dbh = connect_database();
+
+my $repl_dbh;
+if ($dbh) {
+ if ($dbh->{pg_server_version} >= 9.4) {
+ $repl_dbh = DBI->connect("$ENV{DBI_DSN};replication=database", $ENV{DBI_USER}, '',
+ {RaiseError => 1, PrintError => 0, AutoCommit => 1});
+ $repl_dbh->{pg_enable_utf8} = 0;
+ } else {
+ plan skip_all => 'Cannot test logical replication on Postgres < 9.4';
+ }
+}
+else {
+ plan skip_all => 'Connection to database failed, cannot continue testing';
+}
+
+ok defined $repl_dbh, 'Connect to database for logical replication testing';
+
+my ($systemid, $timeline, $xlogpos, $dbname) = $repl_dbh->selectrow_array('IDENTIFY_SYSTEM');
+
+ok $dbname, "connected to specific dbname=$dbname";
+
+my $rv;
+
+eval {
+ $rv = $repl_dbh->do(sprintf 'CREATE_REPLICATION_SLOT %s LOGICAL %s',
+ $repl_dbh->quote_identifier($slot), $repl_dbh->quote_identifier($plugin));
+};
+if ($@) {
+ unless ($repl_dbh->state eq DUP_OBJ) {
+ die sprintf 'err: %s; errstr: %s; state: %s', $repl_dbh->err, $repl_dbh->errstr, $repl_dbh->state;
+ } else {
+ $rv = 1;
+ }
+}
+ok $rv, 'replication slot created';
+
+$rv = $repl_dbh->do(sprintf 'START_REPLICATION SLOT %s LOGICAL 0/0', $repl_dbh->quote_identifier($slot));
+ok $rv, 'replication started';
+
+my $lastlsn = 0;
+my $tx_watch;
+while (1) {
+ my @status = ('r', $lastlsn, $lastlsn, 0, ((time - PG_TO_UNIX_EPOCH_DELTA) * USECS), 0);
+ my $status = pack 'Aq>4b', @status;
+ $repl_dbh->pg_putcopydata($status)
+ or die sprintf 'err: %s; errstr: %s; state: %s', $repl_dbh->err, $repl_dbh->errstr, $repl_dbh->state;
+
+ unless ($tx_watch) {
+ $dbh->do('set client_min_messages to ERROR');
+ $dbh->do('drop table if exists dbd_pg_repltest');
+ $dbh->do('create table dbd_pg_repltest (id int)');
+ $dbh->do('insert into dbd_pg_repltest (id) values (1)');
+ $tx_watch = $dbh->selectrow_array('select txid_current()');
+ $dbh->commit;
+ }
+
+ my $n = $repl_dbh->pg_getcopydata_async(my $msg);
+
+ if ($n == 0) {
+ # nothing ready
+ sleep 1;
+ next;
+ }
+
+ if ($n == -1) {
+ # COPY closed
+ last;
+ }
+
+ if ($n == -2) {
+ die 'could not read COPY data: ' . $repl_dbh->errstr;
+ }
+
+ if ('k' eq substr $msg, 0, 1) {
+ my ($type, $lsnpos, $ts, $reply) = unpack 'Aq>2b', $msg;
+
+ $ts = ($ts / USECS) + PG_TO_UNIX_EPOCH_DELTA;
+
+ next;
+ }
+
+ if ('w' ne substr $msg, 0, 1) {
+ die sprintf 'unrecognized streaming header: "%s"', substr($msg, 0, 1);
+ }
+
+ my ($type, $startpos, $lsnpos, $ts, $record) = unpack 'Aq>3a*', $msg;
+
+ $ts = ($ts / USECS) + PG_TO_UNIX_EPOCH_DELTA;
+
+ if ($record eq 'table dbd_pg_testschema.dbd_pg_repltest: INSERT: id[integer]:1') {
+ pass 'saw insert event';
+ last;
+ } elsif ($tx_watch and my ($tx) = $record =~ /^COMMIT (\d+)$/) {
+ if ($tx > $tx_watch) {
+ fail 'saw insert event';
+ last;
+ }
+ }
+
+ $lastlsn = $lsnpos;
+}
+
+$repl_dbh->disconnect;
+
+# cleanup the replication slot and test table
+$dbh->do('select pg_drop_replication_slot(?)', undef, $slot);
+$dbh->do('drop table if exists dbd_pg_repltest');
+$dbh->commit;
+
+$dbh->disconnect;
+
+done_testing;
diff --git a/t/dbdpg_test_setup.pl b/t/dbdpg_test_setup.pl
index a4cecc4..7d4e03b 100644
--- a/t/dbdpg_test_setup.pl
+++ b/t/dbdpg_test_setup.pl
@@ -518,6 +518,21 @@ version: $version
print $cfh "log_filename = 'postgres%Y-%m-%d.log'\n";
print $cfh "log_rotation_size = 0\n";
+ if ($version >= 9.4) {
+ print $cfh "wal_level = logical\n";
+ print $cfh "max_replication_slots = 1\n";
+ print $cfh "max_wal_senders = 1\n";
+
+ open my $hba, '>>', "$testdir/data/pg_hba.conf"
+ or die qq{Could not open "$testdir/data/pg_hba.conf": $!\n};
+
+ print $hba "local\treplication\tall\ttrust\n";
+ print $hba "host\treplication\tall\t127.0.0.1/32\ttrust\n";
+ print $hba "host\treplication\tall\t::1/128\ttrust\n";
+
+ close $hba or die qq{Could not close "$testdir/data/pg_hba.conf": $!\n};
+ }
+
print $cfh "listen_addresses='127.0.0.1'\n" if $^O =~ /Win32/;
print $cfh "\n";
close $cfh or die qq{Could not close "$conf": $!\n};
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-perl/packages/libdbd-pg-perl.git
More information about the Pkg-perl-cvs-commits
mailing list