46.1. 逻辑解码示例

下面的示例演示了使用SQL接口控制逻辑解码。

在你可以使用逻辑解码之前,必须将wal_level设置为 logical,将max_replication_slots 设置为至少为1。然后,以超级用户的身份连接到目标数据库 (在下面的示例中,是postgres)。

postgres=# -- Create a slot named 'regression_slot' using the output plugin 'test_decoding'
postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
    slot_name    | xlog_position
-----------------+---------------
 regression_slot | 0/16B1970
(1 row)

postgres=# SELECT * FROM pg_replication_slots;
    slot_name    |    plugin     | slot_type | datoid | database | active |  xmin  | catalog_xmin | restart_lsn
-----------------+---------------+-----------+--------+----------+--------+--------+--------------+-------------
 regression_slot | test_decoding | logical   |  12052 | postgres | f      |        |          684 | 0/16A4408
(1 row)

postgres=# -- There are no changes to see yet
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
 location | xid | data
----------+-----+------
(0 rows)

postgres=# CREATE TABLE data(id serial primary key, data text);
CREATE TABLE

postgres=# -- DDL isn't replicated, so all you'll see is the transaction
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
 location  | xid |    data
-----------+-----+------------
 0/16D5D48 | 688 | BEGIN 688
 0/16E0380 | 688 | COMMIT 688
(2 rows)

postgres=# -- Once changes are read, they're consumed and not emitted
postgres=# -- in a subsequent call:
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
 location | xid | data
----------+-----+------
(0 rows)

postgres=# BEGIN;
postgres=# INSERT INTO data(data) VALUES('1');
postgres=# INSERT INTO data(data) VALUES('2');
postgres=# COMMIT;

postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
 location  | xid |                     data
-----------+-----+-----------------------------------------------
 0/16E0478 | 689 | BEGIN 689
 0/16E0478 | 689 | table public.data: INSERT: id[integer]:1 data[text]:'1'
 0/16E0580 | 689 | table public.data: INSERT: id[integer]:2 data[text]:'2'
 0/16E0650 | 689 | COMMIT 689
(4 rows)

postgres=# INSERT INTO data(data) VALUES('3');

postgres=# -- You can also peek ahead in the change stream without consuming changes
postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL);
 location  | xid |                     data
-----------+-----+-----------------------------------------------
 0/16E09C0 | 690 | BEGIN 690
 0/16E09C0 | 690 | table public.data: INSERT: id[integer]:3 data[text]:'3'
 0/16E0B90 | 690 | COMMIT 690
(3 rows)

postgres=# -- The next call to pg_logical_slot_peek_changes() returns the same changes again
postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL);
 location  | xid |                     data
-----------+-----+-----------------------------------------------
 0/16E09C0 | 690 | BEGIN 690
 0/16E09C0 | 690 | table public.data: INSERT: id[integer]:3 data[text]:'3'
 0/16E0B90 | 690 | COMMIT 690
(3 rows)

postgres=# -- options can be passed to output plugin, to influence the formatting
postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-timestamp', 'on');
 location  | xid |                     data
-----------+-----+-----------------------------------------------
 0/16E09C0 | 690 | BEGIN 690
 0/16E09C0 | 690 | table public.data: INSERT: id[integer]:3 data[text]:'3'
 0/16E0B90 | 690 | COMMIT 690 (at 2014-02-27 16:41:51.863092+01)
(3 rows)

postgres=# -- Remember to destroy a slot you no longer need to stop it consuming
postgres=# -- server resources:
postgres=# SELECT pg_drop_replication_slot('regression_slot');
 pg_drop_replication_slot
-----------------------

(1 row)

下面的示例显示了流复制协议是如何控制逻辑解码的,使用PostgreSQL发布中包含的程序 pg_recvlogical。这要求客户端认证设置为允许复制连接 (参阅第 25.2.5.1 节), 并且max_wal_senders的设置对于额外连接来说足够大。

$ pg_recvlogical -d postgres --slot test --create-slot
$ pg_recvlogical -d postgres --slot test --start -f -
Control+Z
$ psql -d postgres -c "INSERT INTO data(data) VALUES('4');"
$ fg
BEGIN 693
table public.data: INSERT: id[integer]:4 data[text]:'4'
COMMIT 693
Control+C
$ pg_recvlogical -d postgres --slot test --drop-slot