PeerDB introduces CREATE MIRROR command for blazing fast syncs between Peers. This section captures different types of MIRRORs and how to kick them off.

Command Reference


This type of MIRROR streams change feed in real-time from source peer to target peer. For BigQuery, Snowflake and Postgres destinations specifically, we also support automatic propagation of table schema changes. Any new column detected in the source tables will be created in the corresponding destination table, and rows with the new columns will be transferred as well. Dropped columns in the source tables are currently ignored.

FROM <source_peer> TO <target_peer>
    from: <schema_name>.<source_table_name>,
    to: <schema_name>.<target_table_name>,
    key: <partition_key>
  } -- syntax only for EventHub peers
  do_initial_copy = <include_initial_copy>,
  snapshot_num_rows_per_partition = <500000>,
  snapshot_max_parallel_workers = <number_of_threads_per_table>,
  snapshot_num_tables_in_parallel = <number_of_tables_in_parallel>
  1. do_initial_copy: true if you want to include initial snapshot of tables as a part of the MIRROR, else false.
  2. snapshot_sync_mode: sql uses a multi-insert command and avro converts data to avro format before syncing to target. Both use the COPY command under the covers. avro is the only supported mode for Snowflake destinations and sql is the only supported mode for Postgres destinations.
  3. snapshot_num_tables_in_parallel: In the initial snapshot, number of tables to be snapshotted at parallely.
  4. snapshot_max_parallel_workers: In the initial snapshot, number of threads used per table. Max number of concurrent connections on source will be snapshot_num_tables_in_parallel*snapshot_max_parallel_workers
  5. snapshot_num_rows_per_partition: In the initial snapshot, number of rows per table streamed in a given batch.
  6. If key is specified, the table will be partitioned on the target peer based on the partition_key. If not specified, the table will be partitioned on the target peer based on the ctid column.
  7. if_not_exists: If specified, the MIRROR will be created only if it does not exist.

CDC Modes for Mirror

We also support the sql and avro modes for CDC for some destinations such as BigQuery and Snowflake. This can be configured with:

  cdc_sync_mode = 'avro' // or 'sql'
  cdc_staging_path = '<stage>' // Needed for AVRO mode only

The stage depends on your destination peer:

  • For BigQuery, it would be your GCS bucket.
  • For Snowflake, it would be either your S3 bucket URL or an empty string for PeerDB to stage AVRO files internally.

MIRROR for Streaming Query Results

This type of MIRROR periodically streams query results on the source peer to the target peer.

	<source_peer> TO <target_peer> FOR
  SELECT * FROM <source_table_name> WHERE
  <watermark_column> BETWEEN {{.start}} AND {{.end}}
	destination_table_name = '<schema_qualified_destination_table_name>',
	watermark_column = '<watermark_column>',
	watermark_table_name = '<table_on_source_on_which_watermark_filter_should_be_applied>',
	mode = '<mode>',
	unique_key_columns = '<unique_key_columns>',
	parallelism = <parallelism>,
	refresh_interval = <refresh_interval_in_seconds>,
	sync_data_format = '<sync_data_format>'
  num_rows_per_partition = 100000,
  initial_copy_onle = <true|false>,
  setup_watermark_table_on_destination = <true|false>,

The OPTIONS clause provides granular control when configuring the MIRROR:

  1. watermark_column (Required) represents a sequentially incrementing integer or timestamp column. PeerDB uses this column to keep track of the processed rows and the ones that need to be processed.
  2. watermark_table (Required) is the table that PeerDB iterates sequentially based on the watermark_column. The fact table is a good candidate for the watermark_table.
  3. mode (Required) tells PeerDB whether it should append data to the target or perform an upsert operation (update if the row exists).
    1. If your data is append-only, a sequential ID or a created_at column can be a good watermark, and you can choose the mode append.
    2. If your data is updated, make sure to have an “updated_at” column for PeerDB to identify the most recent version of the row. Choose the mode upsert to update already existing rows on the target.
      1. unique_key: In the upsert mode, you need to specify a set of columns (unique_key) that uniquely identify a row. This helps perform the upsert operation.
  4. parallelism (Optional) represents the number of threads used to read from the source and sync to the target. This helps with workload management on the source and target.
    1. If you have a powerful PostgreSQL server or if it is a read-replica, you can configure a higher parallelism value. In scenarios where you want to put less load on the source, you can choose a lower value.
  5. refresh_interval (Optional) helps configure the frequency (in seconds) at which the sync should run. If not specified, PeerDB attempts to sync new rows every 10 seconds.
  6. initial_copy_only (Optional): If only a one-time copy needs to be done, this can be set to true. The mirror stops after a one-time data load. Apart from specifying append or upsert modes, a new mode overwrite can be specified when this option is set to true. This truncates all destination tables before starting the mirror, and is functionally identical to append otherwise.
  7. num_rows_per_parition (Required): PeerDB internally splits a table into partitions of a certain size which it distributes amongst the worker threads. This option allows you to tune the size of each partition based on memory or network requirements. 8 setup_watermark_table_on_destination (Optional): If set to true, a table with the same schema as the watermark table is created on the destination peer. Its name will be set to destination_table_name.
  8. More details on the OPTIONs can be found here.
  9. if_not_exists: If specified, the MIRROR will be created only if it does not exist.

XMIN Query Replication

PeerDB supports xmin as watermark column. The query needs typecasting like below to work with xmin column. Note that DELETEs are not supported in XMIN Query Replication.

CREATE MIRROR xmin_mirror_1 FROM pg_test_peer TO sf_test_peer FOR
$$ SELECT * FROM users WHERE xmin::text::bigint BETWEEN {{.start}} AND {{.end}} $$
	destination_table_name = 'public.users',
	watermark_column = 'xmin',
	watermark_table_name = 'public.users',
	mode = 'append', -- upsert (only updates) and overwrite are also supported
	parallelism = 10,
	refresh_interval = 30,
	num_rows_per_partition = 100,
	sync_data_format = 'avro'