Using PeerDB

Peer Creation

The building block of a PeerDB deployment is a peer, which could represent a database from a wide variety of providers. We can query these peers using the same Postgres dialect of SQL and also run mirrors to migrate data between two peers. To begin with, let us create two Postgres peers on the same cluster that PeerDB uses for internal state management. If you have another Postgres cluster, you can use that as well! Just be sure to substitute the connection details wherever needed.

Let’s begin by connecting to the PeerDB internal cluster [which we will call catalog from now on]:

psql "host=localhost user=postgres password=postgres database=postgres port=5432"

Then execute the following commands:

postgres=# CREATE DATABASE peerone;
CREATE DATABASE
postgres=# CREATE DATABASE peertwo;
CREATE DATABASE

Now switch over to peerone and create some sample tables.

postgres=# \c peerone
You are now connected to database "peerone" as user "postgres".
peerone=# CREATE TABLE tableone(id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, c1 INT, c2 INT, t TEXT);
CREATE TABLE
peerone=# CREATE TABLE tabletwo(id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, c3 INT, c4 INT, u TEXT);
CREATE TABLE

All done! Keep this terminal open though, we’ll come to it later.

Now coming back to the terminal where we are connected to PeerDB itself, we run the following commands to create two Postgres peers.

kevin=> CREATE PEER peerone FROM POSTGRES WITH
(
    host = 'catalog',
    port = '5432',
    user = 'postgres',
    password = 'postgres',
    database = 'peerone'
);
OK
kevin=> CREATE PEER peertwo FROM POSTGRES WITH
(
    host = 'catalog',
    port = '5432',
    user = 'postgres',
    password = 'postgres',
    database = 'peertwo'
);
OK

Now check that we are able to query the two tables we created on peerone.

kevin=> SELECT * FROM peerone.public.tableone;
id | c1 | c2 | t
----+----+----+---
(0 rows)

kevin=> SELECT * FROM peerone.public.tabletwo;
id | c3 | c4 | u
----+----+----+---
(0 rows)

CDC Mirror

Now imagine the two tables we created in peerone have a lot of records being inserted, updated and deleted concurrently. We want a live copy of these tables in peertwo. We don’t want to change the data in any way, just Extract it from peerone and Load it in peertwo.

PeerDB supports this via CDC mirrors, which builds on Postgres’ logical replication capablities and allows efficient synchronization between the state of two tables.

kevin=> CREATE MIRROR cdc_mirror FROM peerone TO peertwo WITH TABLE MAPPING (public.tableone:public.tableone_dst, public.tabletwo:public.tabletwo_dst);
CREATE MIRROR cdc_mirror

After a few seconds, you should be able to see the destination tables created automatically on peertwo.

kevin=> SELECT * FROM peertwo.public.tableone_dst;
c1 | c2 | id | t
----+----+----+---
(0 rows)

kevin=> SELECT * FROM peertwo.public.tabletwo_dst;
c3 | c4 | id | u
----+----+----+---
(0 rows)

To simulate the desired usage patterns on the two tables, we’ll use pgbench which is a standard tool for Postgres testing. It usually comes bundled with psql, so it should already be present on your system.

\set naccounts 1000000 * :scale
\set aid random(1, :naccounts)
\set bid random(1, :naccounts)
INSERT INTO public.tableone(c1,c2,t) VALUES (:aid,:bid,'pgbenchinsert'||substr(md5(random()::text), 0, 25));
INSERT INTO public.tabletwo(c3,c4,u) VALUES (:bid,:aid,'pgbenchinsert'||substr(md5(random()::text), 0, 25));
\set naccounts 1000000 * :scale
\set aid random(1, :naccounts)
\set bid random(1, :naccounts)
UPDATE public.tableone SET t='pgbenchupdate'||substr(md5(random()::text), 0, 25) where id = :aid;
UPDATE public.tabletwo SET u='pgbenchupdate'||substr(md5(random()::text), 0, 25) where id = :bid;
\set naccounts 1000000 * :scale
\set aid random(1, :naccounts)
\set bid random(1, :naccounts)
DELETE FROM public.tableone WHERE id = :aid;
DELETE FROM public.tabletwo WHERE id = :bid;

These 3 pgbench scripts insert records in the two tables with random data, while the other two scripts attempt to update or delete a record if it updates. By running them concurrently, we can roughly simulate the sort of load we want.

After saving these scripts to files, we run them in parallel like so:

pgbench "host=localhost port=5432 user=postgres password=postgres dbname=peerone" -f pgbench_insert.sql -j 2 -c 8 -T 120 -P 60 \
& pgbench "host=localhost port=5432 user=postgres password=postgres dbname=peerone" -f pgbench_update.sql -j 2 -c 8 -T 120 -P 60 \
& pgbench "host=localhost port=5432 user=postgres password=postgres dbname=peerone" -f pgbench_delete.sql -j 2 -c 8 -T 120 -P 60

[For those who are curious, the flags are asking pgbench to run each script on 8 parallel connections on 2 threads, for a time period of 120 seconds and a progress report every 60 seconds.]

[NOTE: Windows Powershell or Command Prompt users cannot run commands in parallel like this, there are other ways but it would probably be easier to run them in 3 seperate terminals.]

As soon as the rows are inserted into the source tables, PeerDB will start receiving logical replication messages and perform the same operations on the destination tables. We can observe the process from psql in the other terminal.

kevin=> SELECT (SELECT COUNT(*) FROM peertwo.public.tableone_dst) AS one, (SELECT COUNT(*) FROM peertwo.public.tabletwo_dst) AS two;
one | two
-----+-----
0 |   0
(1 row)
kevin=> \watch 5
... (every 5s)

one  | two
------+------
7549 | 7543
(1 row)
... lines omitted ...
... (every 5s)

one   |  two
--------+--------
216463 | 216251
(1 row)

We can confirm that the records in both tables are exactly as the records in the source tables. Do note that rows present in the source tables before the mirror was created are not replicated currently.

kevin=> SELECT (SELECT COUNT(*) FROM peertwo.public.tableone_dst WHERE t LIKE '%update%') AS one, (SELECT COUNT(*) FROM peertwo.public.tabletwo_dst WHERE u LIKE '%update%') AS two;
one  |  two
-------+-------
35061 | 34768
(1 row)

kevin=> SELECT (SELECT COUNT(*) FROM peerone.public.tableone) AS one, (SELECT COUNT(*) FROM peerone.public.tabletwo) AS two;
one   |  two
--------+--------
216463 | 216251
(1 row)

kevin=> SELECT (SELECT COUNT(*) FROM peerone.public.tableone WHERE t LIKE '%update%') AS one, (SELECT COUNT(*) FROM peerone.public.tabletwo WHERE u LIKE '%update%') AS two;
one  |  two
-------+-------
35061 | 34768
(1 row)

QRep Mirror

After some brainstorming, we realize that having a copy of both tables is unnecessary when all we do is run this one query against them.

peertwo=# SELECT one.id, one.c1+two.c3 AS c13, one.c2+two.c4 AS c24 FROM public.tableone_dst AS one JOIN public.tabletwo_dst AS two ON one.id=two.id;

It would therefore be ideal if we could just get the results of this query in one table. Well, PeerDB can do this too! PeerDB supports something called query replication [QRep] mirrors. They allow you to Extract the data from the source tables, Transform it in some way and then Load it into the destination tables.

We need to first create the table that the results are replicated to, keeping in mind that the results of the query we run must map to the columns of the destination table.

peertwo=# CREATE TABLE qrep_dst(id INT PRIMARY KEY, c13 INT, c24 INT);
CREATE TABLE

Now in the PeerDB terminal, we kickoff the QRep mirror.

kevin=> CREATE MIRROR qrep_mirror FROM peerone TO peertwo FOR $$SELECT one.id, one.c1+two.c3 AS c13, one.c2+two.c4 AS c24 FROM public.tableone AS one JOIN public.tabletwo AS two ON one.id=two.id WHERE one.id BETWEEN {{.start}} AND {{.end}}$$ WITH (watermark_column='id', watermark_table_name='public.tableone', mode='append', parallelism=2, refresh_interval=30, num_rows_per_partition=100000, destination_table_name='public.qrep_dst');
CREATE MIRROR qrep_mirror

Note that this QRep mirror is operating in ‘append’ mode, where rows are inserted once and changes to them are not tracked. This is fine for our purposes because we are only tracking rows that don’t change.

Within a few seconds, you should see the existing data in peerone and peertwo in the table we created.

kevin=> SELECT SUM(one.c1+two.c3), SUM(one.c2+two.c4) FROM peerone.public.tableone AS one JOIN peerone.public.tabletwo AS two ON one.id=two.id;
    sum      |     sum
--------------+--------------
178035703078 | 177984263847
(1 row)

kevin=> SELECT SUM(c13), SUM(c24) FROM peertwo.public.qrep_dst;
    sum      |     sum
--------------+--------------
178035703078 | 177984263847
(1 row)

We can even run our pgbench scripts from earlier and the changes should reflect in qrep_dst. Also since our CDC mirror from before is still running, the new rows will reflect there as well.

pgbench "host=localhost port=5432 user=postgres password=postgres dbname=peerone" -f pgbench_insert.sql -j 2 -c 8 -T 120 -P 60 \
& pgbench "host=localhost port=5432 user=postgres password=postgres dbname=peerone" -f pgbench_update.sql -j 2 -c 8 -T 120 -P 60
kevin=> SELECT SUM(one.c1+two.c3), SUM(one.c2+two.c4) FROM peerone.public.tableone AS one JOIN peerone.public.tabletwo AS two ON one.id=two.id;
    sum      |     sum
--------------+--------------
502025807680 | 501974368449
(1 row)

kevin=> SELECT SUM(c13), SUM(c24) FROM peertwo.public.qrep_dst;
    sum      |     sum
--------------+--------------
502025807680 | 501974368449
(1 row)

kevin=> SELECT (SELECT COUNT(*) FROM peerone.public.tableone WHERE t LIKE '%update%') AS one, (SELECT COUNT(*) FROM peerone.public.tabletwo WHERE u LIKE '%update%') AS two;
one   |  two
--------+--------
154848 | 154972
(1 row)

kevin=> SELECT (SELECT COUNT(*) FROM peertwo.public.tableone_dst WHERE t LIKE '%update%') AS one, (SELECT COUNT(*) FROM peertwo.public.tabletwo_dst WHERE u LIKE '%update%') AS two;
one   |  two
--------+--------
154848 | 154972
(1 row)