This topic describes how to capture DDL events by using triggers when DDL changes occur in the source database of data migration. This way, you can ensure that DML operations are synchronized properly.
Background information
When you use the data migration service to perform incremental synchronization from a source PostgreSQL database, by default only DML statements (INSERT, DELETE, and UPDATE) are supported. However, if there are DDL changes in the source database (such as adding new table fields), and the data migration service cannot obtain the new schema information, subsequent DML synchronizations related to this object will fail, causing the task to be interrupted. Therefore, you need to create a DDL record table and triggers in the source PostgreSQL database to capture DDL information to ensure normal DML synchronization.
Limitations
The source PostgreSQL database must be of version V10.x or later.
Procedure
Log in to the source database and switch to the database to be migrated.
Take psql as an example. Run the
\c <database name>command to switch to the database to be migrated.Create the following two tables to record DDL event changes.
Table for non-DROP DDL events
----postgreSQL DDL EVENT TABLE---- CREATE TABLE public.oms_non_dropped_ddl_command ( id bigserial primary key, ddl_text text COLLATE pg_catalog."default", tag text COLLATE pg_catalog."default", database character varying COLLATE pg_catalog."default", schema character varying COLLATE pg_catalog."default", object_type character varying COLLATE pg_catalog."default", objid integer, top_objid integer, object_identity text COLLATE pg_catalog."default", top_object_identity text COLLATE pg_catalog."default", top_schema text COLLATE pg_catalog."default", all_children_table text COLLATE pg_catalog."default" ); ALTER TABLE public.oms_non_dropped_ddl_command REPLICA IDENTITY FULL;Table for DROP DDL events
----postgreSQL DROP DDL EVENT TABLE---- CREATE TABLE public.oms_dropped_ddl_command ( id bigserial primary key, ddl_text text COLLATE pg_catalog."default", tag text COLLATE pg_catalog."default", database character varying COLLATE pg_catalog."default", schema character varying COLLATE pg_catalog."default", objid integer, object_type text COLLATE pg_catalog."default", object_name text COLLATE pg_catalog."default" ); ALTER TABLE public.oms_dropped_ddl_command REPLICA IDENTITY FULL;
Create the following two event trigger functions.
- Event trigger function for non-DROP DDL events
CREATE OR REPLACE FUNCTION public.oms_capture_ddl_for_non_dropped() RETURNS event_trigger LANGUAGE 'plpgsql' COST 100 VOLATILE NOT LEAKPROOF SECURITY DEFINER AS $BODY$ DECLARE ddl_text text; DECLARE record_object record; DECLARE obj record; DECLARE top_objid integer; DECLARE parent_oid integer; DECLARE top_object_identity text; DECLARE top_schema text; DECLARE all_children_table text; BEGIN SELECT current_query() INTO ddl_text; FOR obj IN (SELECT * FROM pg_event_trigger_ddl_commands() WHERE TG_TAG IN ('CREATE TABLE', 'ALTER TABLE','CREATE INDEX','CREATE SCHEMA', 'COMMENT')) LOOP record_object:=obj; SELECT inhparent FROM pg_inherits WHERE inhrelid = obj.objid AND inhparent::regclass::text NOT LIKE 'pg_%' ORDER BY inhseqno DESC LIMIT 1 INTO parent_oid; WHILE parent_oid IS NOT NULL LOOP SELECT inhparent FROM pg_inherits WHERE inhrelid = parent_oid AND inhparent::regclass::text NOT LIKE 'pg_%' ORDER BY inhseqno DESC LIMIT 1 INTO top_objid; IF top_objid IS NULL THEN top_objid := parent_oid; exit; END IF; parent_oid := top_objid; END LOOP; IF top_objid IS NULL THEN top_objid :=record_object.objid; ELSE END IF; IF regexp_matches(ddl_text, 'ALTER\s+TABLE\s+.+\s+RENAME\s+TO\s+.+', 'i') IS NOT NULL THEN WITH RECURSIVE child_tables AS ( SELECT oid AS table_objid FROM pg_class WHERE oid = top_objid UNION SELECT c.oid AS table_objid FROM pg_class c JOIN pg_inherits i ON c.oid = i.inhrelid JOIN child_tables ct ON i.inhparent = ct.table_objid ) SELECT string_agg(table_objid::text, ', ') INTO all_children_table FROM child_tables WHERE table_objid <> top_objid; ELSE END IF; SELECT pn.nspname, ss.relname INTO obj from pg_catalog.pg_class ss JOIN pg_catalog.pg_namespace pn ON ss.relnamespace = pn.oid WHERE ss.oid = top_objid; top_object_identity:=obj.relname; top_schema:=obj.nspname; IF TG_TAG ='CREATE TABLE' AND record_object.object_type='table' THEN EXECUTE 'ALTER TABLE ' || record_object.object_identity || ' REPLICA IDENTITY FULL'; ELSE END IF; INSERT INTO public.oms_non_dropped_ddl_command(id,ddl_text,tag,database,schema,object_type, objid, top_objid, object_identity,top_object_identity,top_schema,all_children_table) VALUES (default,ddl_text, TG_TAG,current_database(),current_schema,record_object.object_type, record_object.objid, top_objid ,record_object.object_identity,top_object_identity,top_schema,all_children_table); END LOOP; EXCEPTION WHEN OTHERS THEN RAISE LOG 'OMS ddl trigger error during command process: %', SQLERRM; END $BODY$;
* DROP DDL event trigger function
```SQL
CREATE OR REPLACE FUNCTION public.oms_capture_ddl_for_dropped()
RETURNS event_trigger
LANGUAGE 'plpgsql'
COST 100
VOLATILE NOT LEAKPROOF SECURITY DEFINER
AS $BODY$
DECLARE ddl_text text;
DECLARE record_object record;
BEGIN
SELECT current_query() INTO ddl_text;
FOR record_object in (select * from pg_event_trigger_dropped_objects() WHERE TG_TAG NOT IN ('ALTER TABLE','DROP PUBLICATION','ALTER PUBLICATION')) LOOP
IF record_object.object_type = 'type' THEN
ELSE
INSERT INTO public.oms_dropped_ddl_command(id,ddl_text,tag,database,schema, objid, object_type, object_name)
VALUES (default,ddl_text, TG_TAG,current_database(),current_schema, record_object.objid, record_object.object_type, record_object.object_name);
END IF;
END LOOP;
EXCEPTION
WHEN OTHERS THEN
RAISE LOG 'OMS drop ddl trigger error during command process: %', SQLERRM;
END
$BODY$;
```
4. Change the owner of the created function to the account used to connect to the source database. Here is an example of `postgres`:
```SQL
ALTER FUNCTION public.oms_capture_ddl_for_non_dropped()
OWNER TO postgres;
ALTER FUNCTION public.oms_capture_ddl_for_dropped()
OWNER TO postgres;
Run the following command to create a global event trigger.
CREATE EVENT TRIGGER oms_capture_ddl_for_non_dropped ON ddl_command_end WHEN TAG IN ('CREATE TABLE', 'ALTER TABLE','CREATE INDEX','CREATE SCHEMA','COMMENT') EXECUTE PROCEDURE public.oms_capture_ddl_for_non_dropped(); CREATE EVENT TRIGGER oms_capture_ddl_for_dropped ON sql_drop EXECUTE PROCEDURE public.oms_capture_ddl_for_dropped();
Next steps
After you complete the data migration task, log in to the source database and execute the following commands to drop the trigger functions and tables:
drop EVENT trigger oms_capture_ddl_for_dropped;
drop EVENT trigger oms_capture_ddl_for_non_dropped;
drop function public.oms_capture_ddl_for_non_dropped();
drop function public.ons_capture_ddl_for_dropped();
drop table public.oms_non_dropped_ddl_command;
drop table public.oms_dropped_ddl_command;