Canal实时解析mysql binlog数据实战

一、说明

通过canal实时监听mysql binlog日志文件的变化,并将数据解析出来

二、环境准备

1、创建maven项目并修改pom.xml配置文件

 <dependencies>
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.4</version>
        </dependency>
    </dependencies>

《Canal实时解析mysql binlog数据实战》

 

2、嗦代码

 特别说明:在解析数据时,相当于程序是客户端,客户端在连接canal服务端时是不需要用户名和密码 

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;

import java.net.InetSocketAddress;
import java.util.List;

public class CanalClient {
    public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException {

    // 获取连接
    CanalConnector canalConnector=CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.140.131",11111),
            "example","","");

    while(true)
    {
       // 连接
        canalConnector.connect();
        // 订阅数据库
        canalConnector.subscribe("CanalDb.*");
        // 获取数据
        Message message = canalConnector.get(100);
        // 获取Entry集合
        List<CanalEntry.Entry> entries=message.getEntries();
        // 判断集合是否为空,如果为空,则线程等待一分钟再拉取数据
        if (entries.size()<=0)
        {
            System.out.println("档次抓取没有数据,休息一会儿。。。");
            Thread.sleep(2000);
        }
        else
        {
            // 遍历entries,单条解析
            for (CanalEntry.Entry entry:entries)
            {
               // 1,获取表名
                String tableName=entry.getHeader().getTableName();
                // 2,获取类型
                CanalEntry.EntryType entryType=entry.getEntryType();
                // 3,获取序列化后的数据
                ByteString storeValue=entry.getStoreValue();
                // 4.判断当前entryType类型是否为ROWDATA
                if (CanalEntry.EntryType.ROWDATA.equals(entryType))
                {
                    //5.反序列化数据
                    CanalEntry.RowChange rowChange=CanalEntry.RowChange.parseFrom(storeValue);
                    //6.获取当前事件的操作类型
                    CanalEntry.EventType eventType=rowChange.getEventType();
                    //7.获取数据集
                    List<CanalEntry.RowData> rowDataList=rowChange.getRowDatasList();
                    //8.遍历rowDataList并打印数据集
                    for(CanalEntry.RowData rowData:rowDataList)
                    {
                        JSONObject beforData=new JSONObject();
                        List<CanalEntry.Column> beforClountList=rowData.getBeforeColumnsList();
                        for (CanalEntry.Column column:beforClountList)
                        {
                            beforData.put(column.getName(),column.getValue());
                        }
                        JSONObject afterData=new JSONObject();
                        List<CanalEntry.Column> afterClountList=rowData.getAfterColumnsList();
                        for (CanalEntry.Column column:afterClountList)
                        {
                            afterData.put(column.getName(),column.getValue());
                        }
                        // 打印数据
                        System.out.println(""+tableName+
                                ",EventType:"+eventType+
                                ",Before:"+beforData+
                                ",After:"+afterData);
                    }

                }
                else
                {
                    System.out.println("当前操作类型为"+entryType);
                }
            }
        }
    }
  }
}

三、项目效果

《Canal实时解析mysql binlog数据实战》

 

    原文作者:SportSky
    原文地址: https://www.cnblogs.com/sportsky/p/16530806.html
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞