Let’s look at how we can seamlessly perform Change-Data Capturing (CDC) from PostgreSQL to topics in Azure EventHubs.

Scenario

Suppose you have a banking application running on PostgreSQL. There are two tables: “users” and “transactions.” You want to sync these tables in real-time to Event Hubs topics. Let’s see how we can make this happen within a few minutes and a few SQL commands using PeerDB.

Prerequisites

  1. Enable logical decoding in PostgreSQL. Ensure that the following settings/GUCs are properly configured:
    1. wal_level: logical
    2. max_wal_senders: >1
    3. max_replication_slots: 4
  2. Enable replication access for a PostgreSQL user - ALTER USER pg_user REPLICATION;
  3. Ensure that both tables have primary keys. Composite primaries are also fine. If not, make sure your tables have REPLICA IDENTITY FULL.
  4. If you are using PostgreSQL on the cloud, below links capture how to enable logical replication for each cloud:
    1. AWS RDS and Aurora PostgreSQL
    2. Azure Database for PostgreSQL - Flexible Server
    3. GCP Cloud SQL PostgreSQL

Step 1: Add PostgreSQL and Event Hubs Peers

Run the following commands to let PeerDB know about the existing PostgreSQL and Event Hubs Peers.

-- Connect to PeerDB
psql "port=9900 host=localhost password=peerdb"

-- Add PostgreSQL and Event Hubs peers
CREATE PEER postgres_peer FROM POSTGRES (...);
CREATE PEER eventhubs_peer FROM EVENTHUBS (...);

Make sure to replace (…) with the appropriate connection details for both the PostgreSQL and Event Hubs instances. More details on adding Peers are available here.

Step 2: Real-Time CDC from PostgreSQL to Event Hubs

The mirror from PostgreSQL to Eventhubs is unique, in the sense that you can sync source tables across namespaces, and you can specify a column whose values will be used to route data into the partitions of the event hub.

To facilitate real-time Change Data Capture (CDC) from PostgreSQL to Event Hubs, set up your peers and then create a mirror using the following SQL syntax:

CREATE MIRROR IF NOT EXISTS <mirror-name>
FROM <postgres-peer-name> TO <eventhubs-peer-name>
WITH TABLE MAPPING(
    <source-schema>.<table>:<namespace_name>.<eventhub_name>.<partition_key_column>,
    ... -- Repeat as required for multiple tables
)
WITH(
    max_batch_size = <number>,
    publication_name = '<publication-name>'
);

Example:

CREATE MIRROR IF NOT EXISTS test_eh_mirror
FROM test_pg_peer TO test_eh_peer
WITH TABLE MAPPING(
    schema1.table1:mynamespace1.hub1.id,
    schema1.table2:mynamespace2.hub2.id
    -- Add more tables as required
)
WITH(
    do_initial_copy = false,
    max_batch_size = 300000,
    publication_name = 'test_publication'
);

Parameters:

  • mirror-name: Desired name for the mirror.
  • postgres-peer-name: Name of the PostgreSQL peer.
  • eventhubs-peer-name: Name of the Event Hubs group peer.
  • namespace-name: Name of the namespace in which you wish to sync to an eventhub.
  • eventhub-name: Name of the eventhub in which you wish to sync the data. PeerDB creates the eventhub for you if it doesn’t exist already.
  • partition_key_column: Column in the source table whose values will be used to route data into the partitions of the event hub.
  • max_batch_size: Maximum number of records in a batch.
  • publication_name: Name of the publication.

Remember to adjust placeholder values (<…>) with your specific details and preferences. The example above has been abbreviated for clarity; ensure you provide all the necessary mappings and configurations in practice.

Step 3: Validate the Mirror

Validate the mirror by checking if the number of messages in the topics matches the row count on the source table.

SELECT COUNT(*) FROM postgres_peer.public.transactions;

Step 4: Monitor the MIRROR

You can connect to localhost:8085 to get full visibility into the different jobs and steps that PeerDB is taking under the covers to manage the MIRROR.

Managing Mirror

Step 5: DROP MIRROR

To make it easy in your development and test environments, PeerDB also introduces the DROP MIRROR command. DROP MIRROR drops all the underlying objects that CREATE MIRROR generates. More details are available in this PR.

-- drop the mirror
DROP MIRROR real_time_cdc;