Adapter开发

OGG官方提供了一些中间件的Adapter,比如Kafka,但并未提供RabbitMQ的Adapter因此需要自行开发

下载开发包

在目标端的$OGG_HOME/ggjava/resources/lib目录下包含adapter开发所需的依赖,将其下载到本地,添加到项目依赖中,如果是Intellij idea,可以右键项目选择Module Setting添加本地依赖

这里是添加所有的依赖,实际上如果只是进行简单的开发,只需添加ggdbutil-19.1.0.0.5.007.jar即可,可以执行以下maven命令将该jar包安装到本地仓库

mvn install:install-file -Dfile="./ggdbutil-19.1.0.0.5.007.jar" -DgroupId=com.oracle -DartifactId=ggdbutil -Dversion=1.0 -Dpackaging=jar

那么只需在项目中添加依赖即可

<dependency>
    <groupId>com.oracle</groupId>
    <artifactId>ggdbutil</artifactId>
    <version>1.0</version>
    <scope>provided</scope>
</dependency>

本次开发需要用到所有的依赖如下,除了amqp-client其余ogg都提供,可以设置scope为provided

<dependencies>
        <dependency>
            <groupId>com.oracle</groupId>
            <artifactId>ggdbutil</artifactId>
            <version>1.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.13.0</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.25</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.12.1</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.12.4</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

创建Adapter

Adapter需要继承oracle.goldengate.datasource.AbstractHandler类,有以下几个核心方法需要重载

import oracle.goldengate.datasource.*;
import static oracle.goldengate.datasource.GGDataSource.Status;
public class SampleHandler extends AbstractHandler {
        @Override
        public void init(DsConfiguration conf, DsMetaData metaData) {
            super.init(conf, metaData);
            // ... do additional config...
        }
        @Override
        public Status operationAdded(DsEvent e, DsTransaction tx, DsOperation op) { ... }
        @Override
        public Status transactionCommit(DsEvent e, DsTransaction tx) { ... }
        @Override
        public void destroy() { /* ... do cleanup ... */ }
        @Override
        public String reportStatus() { return "status report..."; }
}
  • init:初始化时调用,可以在里面做些初始化工作,比如加载配置文件等
  • operationAdded:每次数据进来都会调用,也是我们实现数据接收的入口
  • transactionCommit:数据提交完毕时会调用
  • destroy:停止作业时调用
  • reportStatus:用户在ggsci发送发送指令view report时会调用,ogg在关闭服务时也会进行调用,可以返回一些状态信息

所有数据都是通过函数operationAdded进行接收,metaData包含数据的定义和实际数据,我们只需从metaData中获取相应的信息即可,具体用法可以参考代码,这里就不展开

部署

打包

开发完需要将项目打包成jar格式,如果是通过maven进行管理,可以直接执行以下命令进行打包

mvn clean package -Dmaven.test.skip

命令执行成功后会将生成的jar包放到target目录下

上传

将程序jar包和所有相关非provided依赖的jar包上传到目标端服务器,放入指定目录,比如/home/ogg/u01/rabbitmq-ogg-plugin/lib

如果有用到第三方包也要将第三方包一起上传,比如本次用到了RabbitMQ的依赖,也要将依赖包上传,但ggjava中有的可以不用上传

创建配置文件javaue.properties,内容如下

gg.handlerlist=rabbitmq
gg.handler.rabbitmq.type=com.xinyou.ogg.handler.rabbitmq.RabbitMQHandler
gg.handler.rabbitmq.host=10.28.30.166
gg.handler.rabbitmq.port=5672
gg.handler.rabbitmq.username=admin
gg.handler.rabbitmq.password=admin
gg.handler.rabbitmq.vhost=/
goldengate.userexit.timestamp=utc
gg.classpath=/home/ogg/u01/rabbitmq-ogg-plugin/lib/*
gg.log.level=debug
jvm.bootoptions=-Xmx512m -Djava.class.path=ggjava/ggjava.jar
  • gg.handlerlist:adapter名称,可以自定义,但不能包含中文
  • gg.handler.{{adapterName}}.xxx:adapterName就是上面定义的名称,以该前缀的配置都会在Adapter初始化的时候传入到Adapter中,比如自定义的Adapter中有个变量叫做host,并且有getter和setter函数,那么在初始化时,ogg会自动将gg.handler.rabbitmq.host赋值给Adapter的host变量
  • gg.handler.rabbitmq.type:这个比较特殊,这个需要指定Adapter类路径
  • gg.classpath:配置classpath,就是你上传jar包的路径
  • gg.log.level:配置日志等级
  • jvm.bootoptions:配置启动参数,这里可以配置一些内存参数等,其中必须包含-Djava.class.path=ggjava/ggjava.jar参数

创建Replicat进程

  • 创建参数R1
./ggsci
GGSCI (MQmessage1) 1> edit param R1
REPLICAT R1
TARGETDB LIBFILE libggjava.so SET property=/home/ogg/u01/rabbitmq-ogg-plugin/javaue.properties
GROUPTRANSOPS 10000
MAP oggadmin.ogg_test1;, TARGET oggadmin.ogg_test1;
/home/ogg/u01/rabbitmq-ogg-plugin/javaue.properties是我们上一步中创建配置文件的绝对路径
  • 创建Replicat进程R1
./ggsci
GGSCI (MQmessage1) 1> add replicat R1, exttrail ./dirdat/mq
GGSCI (MQmessage1) 2> start RPE6
GGSCI (MQmessage1) 1> info all          
Program     Status      Group       Lag at Chkpt  Time Since Chkpt
MANAGER     RUNNING                                           
REPLICAT    RUNNING     R1          00:00:00      00:00:01
Trackback

no comment untill now

Add your comment now