[libdbd-pg-perl] 17/35: crude replication test to prove COPY_BOTH changes

Christoph Berg myon at debian.org
Wed Sep 27 17:41:03 UTC 2017


This is an automated email from the git hooks/post-receive script.

myon pushed a commit to branch master
in repository libdbd-pg-perl.

commit 790efd7fc1882f9e228c617f1d4dbce70e160fe5
Author: William Cox <mydimension at gmail.com>
Date:   Wed Sep 20 16:59:01 2017 -0400

    crude replication test to prove COPY_BOTH changes
---
 t/40replication.t     | 137 ++++++++++++++++++++++++++++++++++++++++++++++++++
 t/dbdpg_test_setup.pl |  15 ++++++
 2 files changed, 152 insertions(+)

diff --git a/t/40replication.t b/t/40replication.t
new file mode 100644
index 0000000..96e8780
--- /dev/null
+++ b/t/40replication.t
@@ -0,0 +1,137 @@
+#!perl
+
+## Test the COPY functionality
+
+use 5.006;
+use strict;
+use warnings;
+use Data::Dumper;
+use Data::HexDump;
+use DBD::Pg ':async';
+use Test::More;
+use lib 't','.';
+require 'dbdpg_test_setup.pl';
+select(($|=1,select(STDERR),$|=1)[1]);
+
+use constant {
+    DUP_OBJ => '42710',
+    USECS   => 1_000_000,
+    PG_TO_UNIX_EPOCH_DELTA => 946_684_800,
+};
+
+my $slot = 'dbd_pg_test';
+my $plugin = 'test_decoding';
+
+my $dbh = connect_database();
+
+my $repl_dbh;
+if ($dbh) {
+    if ($dbh->{pg_server_version} >= 9.4) {
+        $repl_dbh = DBI->connect("$ENV{DBI_DSN};replication=database", $ENV{DBI_USER}, '',
+                                 {RaiseError => 1, PrintError => 0, AutoCommit => 1});
+        $repl_dbh->{pg_enable_utf8} = 0;
+    } else {
+        plan skip_all => 'Cannot test logical replication on Postgres < 9.4';
+    }
+}
+else {
+	plan skip_all => 'Connection to database failed, cannot continue testing';
+}
+
+ok defined $repl_dbh, 'Connect to database for logical replication testing';
+
+my ($systemid, $timeline, $xlogpos, $dbname) = $repl_dbh->selectrow_array('IDENTIFY_SYSTEM');
+
+ok $dbname, "connected to specific dbname=$dbname";
+
+my $rv;
+
+eval {
+    $rv = $repl_dbh->do(sprintf 'CREATE_REPLICATION_SLOT %s LOGICAL %s',
+                        $repl_dbh->quote_identifier($slot), $repl_dbh->quote_identifier($plugin));
+};
+if ($@) {
+    unless ($repl_dbh->state eq DUP_OBJ) {
+        die sprintf 'err: %s; errstr: %s; state: %s', $repl_dbh->err, $repl_dbh->errstr, $repl_dbh->state;
+    } else {
+        $rv = 1;
+    }
+}
+ok $rv, 'replication slot created';
+
+$rv = $repl_dbh->do(sprintf 'START_REPLICATION SLOT %s LOGICAL 0/0', $repl_dbh->quote_identifier($slot));
+ok $rv, 'replication started';
+
+my $lastlsn = 0;
+my $tx_watch;
+while (1) {
+    my @status = ('r', $lastlsn, $lastlsn, 0, ((time - PG_TO_UNIX_EPOCH_DELTA) * USECS), 0);
+    my $status = pack 'Aq>4b', @status;
+    $repl_dbh->pg_putcopydata($status)
+        or die sprintf 'err: %s; errstr: %s; state: %s', $repl_dbh->err, $repl_dbh->errstr, $repl_dbh->state;
+
+    unless ($tx_watch) {
+        $dbh->do('set client_min_messages to ERROR');
+        $dbh->do('drop table if exists dbd_pg_repltest');
+        $dbh->do('create table dbd_pg_repltest (id int)');
+        $dbh->do('insert into dbd_pg_repltest (id) values (1)');
+        $tx_watch = $dbh->selectrow_array('select txid_current()');
+        $dbh->commit;
+    }
+
+    my $n = $repl_dbh->pg_getcopydata_async(my $msg);
+
+    if ($n == 0) {
+        # nothing ready
+        sleep 1;
+        next;
+    }
+
+    if ($n == -1) {
+        # COPY closed
+        last;
+    }
+
+    if ($n == -2) {
+        die 'could not read COPY data: ' . $repl_dbh->errstr;
+    }
+
+    if ('k' eq substr $msg, 0, 1) {
+        my ($type, $lsnpos, $ts, $reply) = unpack 'Aq>2b', $msg;
+
+        $ts = ($ts / USECS) + PG_TO_UNIX_EPOCH_DELTA;
+
+        next;
+    }
+
+    if ('w' ne substr $msg, 0, 1) {
+        die sprintf 'unrecognized streaming header: "%s"', substr($msg, 0, 1);
+    }
+
+    my ($type, $startpos, $lsnpos, $ts, $record) = unpack 'Aq>3a*', $msg;
+
+    $ts = ($ts / USECS) + PG_TO_UNIX_EPOCH_DELTA;
+
+    if ($record eq 'table dbd_pg_testschema.dbd_pg_repltest: INSERT: id[integer]:1') {
+        pass 'saw insert event';
+        last;
+    } elsif ($tx_watch and my ($tx) = $record =~ /^COMMIT (\d+)$/) {
+        if ($tx > $tx_watch) {
+            fail 'saw insert event';
+            last;
+        }
+    }
+
+    $lastlsn = $lsnpos;
+}
+
+$repl_dbh->disconnect;
+
+# cleanup the replication slot and test table
+$dbh->do('select pg_drop_replication_slot(?)', undef, $slot);
+$dbh->do('drop table if exists dbd_pg_repltest');
+$dbh->commit;
+
+$dbh->disconnect;
+
+done_testing;
diff --git a/t/dbdpg_test_setup.pl b/t/dbdpg_test_setup.pl
index a4cecc4..7d4e03b 100644
--- a/t/dbdpg_test_setup.pl
+++ b/t/dbdpg_test_setup.pl
@@ -518,6 +518,21 @@ version: $version
 			print $cfh "log_filename = 'postgres%Y-%m-%d.log'\n";
 			print $cfh "log_rotation_size = 0\n";
 
+			if ($version >= 9.4) {
+				print $cfh "wal_level = logical\n";
+				print $cfh "max_replication_slots = 1\n";
+				print $cfh "max_wal_senders = 1\n";
+
+				open my $hba, '>>', "$testdir/data/pg_hba.conf"
+					or die qq{Could not open "$testdir/data/pg_hba.conf": $!\n};
+
+				print $hba "local\treplication\tall\ttrust\n";
+				print $hba "host\treplication\tall\t127.0.0.1/32\ttrust\n";
+				print $hba "host\treplication\tall\t::1/128\ttrust\n";
+
+				close $hba or die qq{Could not close "$testdir/data/pg_hba.conf": $!\n};
+			}
+
 			print $cfh "listen_addresses='127.0.0.1'\n" if $^O =~ /Win32/;
 			print $cfh "\n";
 			close $cfh or die qq{Could not close "$conf": $!\n};

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-perl/packages/libdbd-pg-perl.git



More information about the Pkg-perl-cvs-commits mailing list