一、踩坑收藏 环境
操作系统:centos7.9 oracle版本:oracle-database-ee-19c-1.0-1.x86_64.rpm zookeeper版本:apache-zookeeper-3.7.0-bin.tar.gz kafka版本:kafka_2.12-2.7.0.tgz 参考文章
Debezium Connector for Oracle :: Debezium Documentation Apache Kafka 实时监视同步数据库变更,这个框架真是神器 Introduction to Debezium | Baeldung debeziumEmbedded: 自己编写的使用 debezium 访问数据库 docker安装oracle19c oracle 12c的PDB数据库未打开 oracle的补全日志–Supplemental Logging oracle 归档日志模式和非归档日志模式 Oracle数据库的非归档模式迁移到归档模式 Oracle登录 ORA-01033: ORACLE正在初始化或关闭的解决方法 Debezium 从oracle抓取数据到kafka_ Kafka Connect kafka connect简介以及部署 关键字: oracle lrm-00109: could not open parameter file ‘/opt/oracle - adodo1 Kafka使用Debezium实时同步Oracle数据 | BlackC ORA-00942: 表或视图不存在解决方法 oracle - Maven including ocijdbc19 in java.library.path - Stack Overflow JDBC驱动oci和thin区别 Error while fetching metadata with correlation id : {LEADER_NOT_AVAILABLE} 正确处理姿势 二、监控Oracle Debezium提供了两种监控数据库的方式,对应了oracle的两种连接方式。
LogMiner:本质是jdbc thin driver,纯Java开发,与平台无关。 XStream API:本质是jdbc oci driver,通过调用oci客户端c动态库实现。 引用官方描述
The JDBC Thin driver is a pure Java, Type IV driver that can be used in applications and applets. It is platform-independent and does not require any additional Oracle software on the client-side. The JDBC Thin driver communicates with the server using SQL*Net to access Oracle Database.
The JDBC Thin driver allows a direct connection to the database by providing an implementation of SQL*Net on top of Java sockets. The driver supports the TCP/IP protocol and requires a TNS listener on the TCP/IP sockets on the database server.
The JDBC OCI driver is a Type II driver used with Java applications. It requires an Oracle client installation and, therefore, is Oracle platform-specific. It supports all installed Oracle Net adapters, including interprocess communication (IPC), named pipes, TCP/IP, and Internetwork Packet Exchange/Sequenced Packet Exchange (IPX/SPX).
The JDBC OCI driver, written in a combination of Java and C, converts JDBC invocations to calls to OCI, using native methods to call C-entry points. These calls communicate with the database using SQL*Net.
The JDBC OCI driver uses the OCI libraries, C-entry points, Oracle Net, core libraries, and other necessary files on the client computer where it is installed.
下面的步骤基于已经安装好oracle 19c,可以参考Centos8安装Oracle19c
2.1 LogMiner 切换到用户oracle
连接oracle,修改sys密码,这是为了跟debezium上的语句对应,可以拿来就用。
1 2 3 4 sqlplus / as sysdba connect / as sysdba alter user sys identified by top_secret; exit;
数据库开启归档模式
1 2 3 4 5 6 7 8 9 10 11 12 sqlplus / as sysdba connect sys/top_secret AS SYSDBA alter system set db_recovery_file_dest_size = 10G; alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope =spfile; shutdown immediate startup mount alter database archivelog; alter database open; -- Should now "Database log mode: Archive Mode" archive log list exit;
切换到root用户,创建db_recovery_file_dest
文件夹,并赋予权限后,再切换回oracle用户
1 2 3 4 su root mkdir /opt/ oracle/oradata/ recovery_area chmod 777 /opt/ oracle/oradata/ recovery_area su oracle
7表示r(读)、w(写)、x(执行)权限
777表示给文件拥有者、同组用户、其他组用户都分配rwx权限
在数据库级别启用最小补充日志记录,并且可以按如下方式配置。
1 2 3 ALTER DATABASE ADD SUPPLEMENTAL LOG DATA ;ALTER DATABASE DROP SUPPLEMENTAL LOG DATA ;
如果只是想给某个表(比如stuinfo),开启最小日志记录,参考下面。
更改不成功并且表存在时,就先select * from C##TEST.STUINFO
,如果提示没有表,就换个方式select * from C##TEST."STUINFO"
1 ALTER TABLE C##TEST.STUINFO ADD SUPPLEMENTAL LOG DATA (ALL ) COLUMNS ;
创建用户并分配权限
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/logminer_tbs.dbf' SIZE 25 M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED ; exit; sqlplus sys/top_secret@//localhost:1521/ORCLPDB1 as sysdba CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/logminer_tbs.dbf' SIZE 25 M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED ; exit; sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba CREATE USER c DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs CONTAINER =ALL ; GRANT CREATE SESSION TO c GRANT SET CONTAINER TO c GRANT SELECT ON V_$DATABASE to c GRANT FLASHBACK ANY TABLE TO c GRANT SELECT ANY TABLE TO c GRANT SELECT_CATALOG_ROLE TO c GRANT EXECUTE_CATALOG_ROLE TO c GRANT SELECT ANY TRANSACTION TO c GRANT LOGMINING TO c GRANT CREATE TABLE TO c GRANT LOCK ANY TABLE TO c GRANT ALTER ANY TABLE TO c GRANT CREATE SEQUENCE TO c GRANT EXECUTE ON DBMS_LOGMNR TO c GRANT EXECUTE ON DBMS_LOGMNR_D TO c GRANT SELECT ON V_$LOG TO c GRANT SELECT ON V_$LOG_HISTORY TO c GRANT SELECT ON V_$LOGMNR_LOGS TO c GRANT SELECT ON V_$LOGMNR_CONTENTS TO c GRANT SELECT ON V_$LOGMNR_PARAMETERS TO c GRANT SELECT ON V_$LOGFILE TO c GRANT SELECT ON V_$ARCHIVED_LOG TO c GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO c exit ;
创建表,并开启最小日志
1 2 3 4 5 sqlplus / as sysdba conn c##dbzuser/dbz; CREATE TABLE STU ( "s_id" INT PRIMARY KEY , "s_name" VARCHAR ( 255 ) );ALTER TABLE C##DBZUSER.STU ADD SUPPLEMENTAL LOG DATA (ALL ) COLUMNS ;exit ;
经过上面的步骤,接下来,就可以通过java api或者kafka-connector方式来监控数据库。相对来说,直接通过java api会方便许多。
java API 创建SpringBoot项目
pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 <?xml version="1.0" encoding="UTF-8"?> <project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion > 4.0.0</modelVersion > <parent > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-parent</artifactId > <version > 2.3.1.RELEASE</version > <relativePath /> </parent > <groupId > com.example</groupId > <artifactId > demo</artifactId > <version > 0.0.1-SNAPSHOT</version > <name > demo</name > <description > Demo project for Spring Boot</description > <properties > <java.version > 1.8</java.version > </properties > <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > <scope > test</scope > </dependency > <dependency > <groupId > io.debezium</groupId > <artifactId > debezium-api</artifactId > <version > 1.6.2.Final</version > </dependency > <dependency > <groupId > io.debezium</groupId > <artifactId > debezium-embedded</artifactId > <version > 1.6.2.Final</version > </dependency > <dependency > <groupId > io.debezium</groupId > <artifactId > debezium-connector-mysql</artifactId > <version > 1.6.2.Final</version > </dependency > <dependency > <groupId > io.debezium</groupId > <artifactId > debezium-connector-oracle</artifactId > <version > 1.6.2.Final</version > </dependency > <dependency > <groupId > com.oracle.ojdbc</groupId > <artifactId > ojdbc8</artifactId > <version > 19.3.0.0</version > </dependency > </dependencies > <build > <plugins > <plugin > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-maven-plugin</artifactId > </plugin > </plugins > </build > </project >
resources下面创建logback.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 <?xml version="1.0" encoding="UTF-8"?> <configuration debug ="true" > <appender name ="stdout" class ="ch.qos.logback.core.ConsoleAppender" > <Target > System.out</Target > <encoder > <pattern > %-5p [%d][%mdc{mdc_userId}] %C:%L - %m %n</pattern > <charset > utf-8</charset > </encoder > <filter class ="ch.qos.logback.classic.filter.ThresholdFilter" > <level > INFO</level > </filter > </appender > <root level ="info" > <appender-ref ref ="stdout" /> </root > </configuration >
创建OracleDebezium_19c类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 import io.debezium.engine.ChangeEvent;import io.debezium.engine.DebeziumEngine;import io.debezium.engine.format.Json;import io.debezium.relational.history.FileDatabaseHistory;import java.util.Properties;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class OracleDebezium_19c { public static void main (String[] args) { Properties props = genProps(); DebeziumEngine<ChangeEvent<String, String>> engine = engineBuild(props); runSoftware(engine); } private static Properties genProps () { Properties props = new Properties(); props.setProperty("name" , "oracle-engine-0033" ); props.setProperty("connector.class" , "io.debezium.connector.oracle.OracleConnector" ); props.setProperty("offset.storage" , "org.apache.kafka.connect.storage.FileOffsetBackingStore" ); props.setProperty("offset.storage.file.filename" , "D:\\temp\\oracle4.txt" ); props.setProperty("offset.flush.interval.ms" , "6000" ); props.setProperty("database.hostname" , "192.168.10.132" ); props.setProperty("database.port" , "1521" ); props.setProperty("database.user" , "C##DBZUSER" ); props.setProperty("database.password" , "dbz" ); props.setProperty("database.server.id" , "85701" ); props.setProperty("table.include.list" , "C##DBZUSER.STU" ); props.setProperty("database.history" , FileDatabaseHistory.class .getCanonicalName ()) ; props.setProperty("database.history.file.filename" , "D:\\temp\\oracle4.txt" ); props.setProperty("database.server.name" , "my-oracle-connector-0023" ); props.setProperty("database.dbname" , "ORCLCDB" ); props.setProperty("key.converter.schemas.enable" , "false" ); props.setProperty("value.converter.schemas.enable" , "false" ); props.setProperty("database.serverTimezone" , "UTC" ); props.setProperty("database.connection.adapter" , "logminer" ); return props; } public static void runSoftware (DebeziumEngine<ChangeEvent<String, String>> engine) { ExecutorService executor = Executors.newSingleThreadExecutor(); executor.execute(engine); } public static DebeziumEngine<ChangeEvent<String, String>> engineBuild(Properties props) { DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine .create(Json.class ) .using (props ) .notifying (record -> { System.out.println("record.key() = " + record.key()); System.out.println("record.value() = " + record.value()); }) .using((success, message, error) -> { if (!success && error != null ) { System.out.println("----------error------" ); System.out.println(message); error.printStackTrace(); } }) .build(); return engine; } }
启动项目,执行完step六步之后,如果没有报错,说明启动成功。
进入数据库,对进行监控的表进行添加一条数据。
会出现下面的日志。说明监控成功。
kafka-connector 使用到的java、zookeeper、kafka解压到/opt/module下,java需要配置环境变量
去Central Repository: io/debezium/debezium-connector-oracle 下载你需要的版本的plugin,如debezium-connector-oracle-1.6.2.Final-plugin.tar.gz
创建文件夹,存放kafka-connector-plugin
解压下载的plugin,将里面的内容全部拷贝,复制到kafka-plugin一份、kafka的libs一份,如下图
去Oracle Instant Client Downloads 下载对应操作系统
的Basic Package (ZIP)。
将其解压,提取其中的ojdbc8.jar
到kafka的libs
中。
配置kafka-connector
1 2 cd /opt/module/kafka_2.12 -2.7 .0 / vi config/connect-distributed.properties
添加plugin.path
为刚才配置好的kafka-plugin
,保存。
1 plugin.path =/opt/kafka-plugin
如此,就配置好了。
进入zookeeper路径,复制一份zookeeper配置文件出来,启动zookeeper
1 2 3 cp conf /zoo_sample.cfg conf /zoo.conf bin/ bin/zkServer.sh start
进入kafka路径,先启动kafka,启动成功后,再去启动kafka-connect
1 2 bin/kafka-server -start .sh config/server .properties bin/connect -distributed.sh config/connect -distributed.properties
打开浏览器/postman,get访问8083端口,会出现版本信息
通过post访问,ip:8083/connectors,并且携带配置json,可以注册connector
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 { "name" : "stu2" , "config" : { "connector.class" : "io.debezium.connector.oracle.OracleConnector" , "tasks.max" : "1" , "database.server.name" : "server2" , "database.hostname" : "192.168.10.132" , "database.port" : "1521" , "database.user" : "c##dbzuser" , "database.password" : "dbz" , "database.dbname" : "ORCLCDB" , "table.include.list" : "C##DBZUSER.STU2" , "database.history.kafka.bootstrap.servers" : "192.168.10.132:9092" , "database.history.kafka.topic" : "schema-changes.stu2" } }
kafka-connector会自动生成kafka-topic,一般是server.库名.表名
,不过像#
符,一般给转成了_
符,像server2.C##DBZUSER.STU2
就转成了server2.C__DBZUSER.STU2
,可以通过注册connector仔细观察日志发现。
进入kafka路径,查看kafka所有的topic
1 bin/kafka-topics.sh --list --zookeeper 192.168 .10 .132 :2181
监控当前topic,是否监控到数据库变化
1 bin/kafka-console-consumer.sh --bootstrap-server 192.168 .10 .132 :9092 --topic server2.C__DBZUSER.STU2
监控到如上图这样的数据,说明监控成功!
2.2 XStream API切换到用户oracle
连接oracle,修改sys密码,这是为了跟debezium上的语句对应,可以拿来就用。
1 2 3 4 sqlplus / as sysdba connect / as sysdba alter user sys identified by top_secret; exit;
开启归档模式
1 2 3 4 5 6 7 8 9 10 11 12 CONNECT sys/top_secret AS SYSDBA alter system set db_recovery_file_dest_size = 5G; alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope =spfile; alter system set enable_goldengate_replication =true ; shutdown immediate startup mount alter database archivelog; alter database open; -- Should show "Database log mode: Archive Mode" archive log list exit;
切换到root用户,创建db_recovery_file_dest
文件夹,并赋予权限后,再切换回oracle用户
1 2 3 4 su root mkdir /opt/ oracle/oradata/ recovery_area chmod 777 /opt/ oracle/oradata/ recovery_area su oracle
7表示r(读)、w(写)、x(执行)权限
777表示给文件拥有者、同组用户、其他组用户都分配rwx权限
在数据库级别启用最小补充日志记录,并且可以按如下方式配置。
1 ALTER DATABASE ADD SUPPLEMENTAL LOG DATA ;
如果只是想给某个表(比如stuinfo),开启最小日志记录,参考下面。
更改不成功并且表存在时,就先select * from C##TEST.STUINFO
,如果提示没有表,就换个方式select * from C##TEST."STUINFO"
1 ALTER TABLE C##TEST.STUINFO ADD SUPPLEMENTAL LOG DATA (ALL ) COLUMNS ;
配置XStream admin用户
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba CREATE TABLESPACE xstream_adm_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/xstream_adm_tbs.dbf' SIZE 25 M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED ; exit; sqlplus sys/top_secret@//localhost:1521/ORCLPDB1 as sysdba CREATE TABLESPACE xstream_adm_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/xstream_adm_tbs.dbf' SIZE 25 M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED ; exit; sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba CREATE USER c DEFAULT TABLESPACE xstream_adm_tbs QUOTA UNLIMITED ON xstream_adm_tbs CONTAINER =ALL ; GRANT CREATE SESSION , SET CONTAINER TO c BEGIN DBMS_XSTREAM_AUTH.GRANT_ADMIN_PRIVILEGE( grantee => 'c##dbzadmin' , privilege_type => 'CAPTURE' , grant_select_privileges => TRUE , container => 'ALL' ); END ; / exit;
创建XStream用户
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba CREATE TABLESPACE xstream_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/xstream_tbs.dbf' SIZE 25 M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED ; exit; sqlplus sys/top_secret@//localhost:1521/ORCLPDB1 as sysdba CREATE TABLESPACE xstream_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/xstream_tbs.dbf' SIZE 25 M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED ; exit; sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba CREATE USER c DEFAULT TABLESPACE xstream_tbs QUOTA UNLIMITED ON xstream_tbs CONTAINER =ALL ; GRANT CREATE SESSION TO c GRANT SET CONTAINER TO c GRANT SELECT ON V_$DATABASE to c GRANT FLASHBACK ANY TABLE TO c GRANT SELECT_CATALOG_ROLE TO c GRANT EXECUTE_CATALOG_ROLE TO c exit ;
创建XStream出站服务器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 sqlplus c##dbzadmin/dbz@//localhost:1521 /ORCLCDB DECLARE tables DBMS_UTILITY.UNCL_ARRAY; schemas DBMS_UTILITY.UNCL_ARRAY; BEGIN tables (1 ) := NULL ; schemas (1 ) := 'debezium' ; DBMS_XSTREAM_ADM.CREATE_OUTBOUND( server_name => 'dbzxout' , table_names => tables , schema_names => schemas ); END ;/ exit ;
配置 XStream 用户帐户以连接到 XStream 出站服务器
1 2 3 4 5 6 7 8 sqlplus sys/top_secret@ BEGIN DBMS_XSTREAM_ADM.ALTER_OUTBOUND( server_name => 'dbzxout' , connect_user => 'c##dbzuser' ); END ;/ exit ;
创建表,并开启最小日志
1 2 3 4 5 sqlplus / as sysdba conn c##dbzuser/dbz; CREATE TABLE STU ( "s_id" INT PRIMARY KEY , "s_name" VARCHAR ( 255 ) );ALTER TABLE C##DBZUSER.STU ADD SUPPLEMENTAL LOG DATA (ALL ) COLUMNS ;exit ;
经过上面的步骤,接下来,就可以通过java api或者kafka-connector方式来监控数据库。相对来说,直接通过java api会方便许多。
java API待完成
kafka-connector玄学文件,监控不到数据,也没有报错
使用到的java、zookeeper、kafka解压到/opt/module下,java需要配置环境变量
去Central Repository: io/debezium/debezium-connector-oracle 下载你需要的版本的plugin,如debezium-connector-oracle-1.6.2.Final-plugin.tar.gz
创建文件夹,存放kafka-connector-plugin
解压下载的plugin,将里面的内容全部拷贝,复制到kafka-plugin一份、kafka的libs一份,如下图
去Oracle Instant Client Downloads 下载对应操作系统
的Basic Package (ZIP)。
将其解压,提取其中的ojdbc8.jar
和xstream.jar
到kafka的libs
中。
将解压后的instantClient配置成环境变量
导出LD_LIBRARY_PATH,并保存
1 export LD_LIBRARY_PATH =/opt/instantclient_19_12
刷新环境变量
配置kafka-connector
1 2 cd /opt/module/kafka_2.12 -2.7 .0 / vi config/connect-distributed.properties
添加plugin.path
为刚才配置好的kafka-plugin
,保存。
1 plugin.path =/opt/kafka-plugin
如此,就配置好了。
进入zookeeper路径,复制一份zookeeper配置文件出来,启动zookeeper
1 2 3 cp conf /zoo_sample.cfg conf /zoo.conf bin/ bin/zkServer.sh start
进入kafka路径,先启动kafka,启动成功后,再去启动kafka-connect
1 2 bin/kafka-server -start .sh config/server .properties bin/connect -distributed.sh config/connect -distributed.properties
打开浏览器/postman,get访问8083端口,会出现版本信息
通过post访问,ip:8083/connectors,并且携带配置json,可以注册connector
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 { "name" : "stu2" , "config" : { "connector.class" : "io.debezium.connector.oracle.OracleConnector" , "tasks.max" : "1" , "database.server.name" : "server6" , "database.hostname" : "192.168.10.131" , "database.port" : "1521" , "database.user" : "c##dbzuser" , "database.password" : "dbz" , "database.dbname" : "ORCLCDB" , "table.include.list" : "C##DBZUSER.STU" , "database.history.kafka.bootstrap.servers" : "192.168.10.131:9092" , "database.history.kafka.topic" : "schema-changes.stu2" , "database.connection.adapter" : "xstream" , "database.out.server.name" : "dbzxout" } }
kafka-connector会自动生成kafka-topic,一般是server.库名.表名
,不过像#
符,一般给转成了_
符,像server2.C##DBZUSER.STU2
就转成了server2.C__DBZUSER.STU2
,可以通过注册connector仔细观察日志发现。
进入kafka路径,查看kafka所有的topic
1 bin/kafka-topics.sh --list --zookeeper 192.168 .10 .132 :2181
监控当前topic,是否监控到数据库变化
1 bin/kafka-console-consumer.sh --bootstrap-server 192.168 .10 .132 :9092 --topic server2.C__DBZUSER.STU2