钦娅芬 发表于 2025-10-31 11:20:10

17、Canal监听MySQL-Binlog实现数据监听

一、Canal简介:

Canal 是阿里巴巴开源的一款基于数据库增量日志解析的中间件,主要用于实现数据库变更数据的实时同步。

Canal源码
 
二、工作原理:

1、MySQL主备复制原理:

 
(1)、MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
(2)、MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
(3)、MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
2、canal工作原理:

(1)、canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
(2)、MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
(3)、canal 解析 binary log 对象(原始为 byte 流)
 
三、MySQL 配置(开启 Binlog):

1、开启 Binlog(ROW 模式):

# MySQL 配置文件
# Linux:my.cnf配置文件(/etc/mysql/)
# Window:my.ini配置文件(C:\ProgramData\MySQL\MySQL Server 5.7\)
# 开启 Binlog
log_bin = mysql-bin

# 选择 ROW 模式(记录行级变更)
binlog-format = ROW

# 配置数据库唯一 ID(与 Canal 服务端的 slaveId 不同)
server-id = 1

2、重启 MySQL 并验证:

# 打开命令提示符(cmd/services.msc):
# 按 Win + R 键,输入 cmd,然后按 Enter 键打开命令提示符窗口。
# 停止MySQL服务:
net stop MySQL57

# 启动MySQL服务:
net start MySQL57

# 验证
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';

3、创建 Canal 专用账号(权限最小化):

-- 1. 创建支持远程连接的用户(% 表示任意 IP)
-- CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
-- 授予权限
-- GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

-- 2. 创建支持本地连接的用户(localhost)
CREATE USER 'canal'@'localhost' IDENTIFIED BY 'canal';

-- 授予相同权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'localhost';

-- 刷新权限,使配置生效
FLUSH PRIVILEGES; 
四、Canal 服务端配置:

1、下载并解压 Canal 服务端:

github-canal包

2、配置 Canal 实例:

(1)、instance.properties配置:
# MySQL 主库地址(Canal 连接的 MySQL 地址)
canal.instance.master.address=127.0.0.1:3306

# MySQL 账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
(2)、windows启动 Canal 服务端:
1)、双击启动bin/startup.bat:
 
2)、存在黑屏闪退,修改bin/startup.bat,重启:

3)、日志:
 



 
五、SpringBoot整合Canal实现MySQL数据监听:

1、POM配置:

      <dependency>
            <groupId>com.alibaba.otter</groupId>
            canal.client</artifactId>
            <version>1.1.8</version>
      </dependency>
      <dependency>
            <groupId>com.alibaba.otter</groupId>
            canal.protocol</artifactId>
            <version>1.1.8</version>
      </dependency>2、YML配置:

canal:
# 自动启动同步标志位
auto-sync: true
instances:
    # 第一个实例
    instance1:
      host: 127.0.0.1
      port: 11111
      # canal server 中配置的实例名(canal.destinations = example)
      name: example
      # 批量拉取条数
      batch-size: 100
      # 无数据时休眠时间(ms)
      sleep-time: 10003、Entity类声明:

CanalProperties.class
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;

/**
* Canal配置属性类(映射YAML配置)
*/
@Data
@Component
@ConfigurationProperties(prefix = "canal")
public class CanalProperties {

    // 是否自动启动同步
    private boolean autoSync = true;
    // 多实例配置
    private Map<String, InstanceConfig> instances = new HashMap<>();

    @Data
    public static class InstanceConfig {
      private String host;
      private Integer port;
      private String name;
      private Integer batchSize = 100;
      private Integer sleepTime = 1000;
    }

}DataEventTypeEnum.enum
import org.springframework.util.StringUtils;
import java.util.Arrays;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

public enum DataEventTypeEnum {

    INSERT("INSERT"),
    UPDATE("UPDATE"),
    DELETE("DELETE");

    private final String name;

    DataEventTypeEnum(String name) {
      this.name = name;
    }

    public String NAME() {
      return name;
    }

    private static final Map<String, DataEventTypeEnum> NAME_MAP =
            Arrays.stream(DataEventTypeEnum.values())
                  .collect(Collectors.toMap(DataEventTypeEnum::NAME, Function.identity()));

    public static DataEventTypeEnum getEnum(String name) {
      if (!StringUtils.hasText(name)) {
            return null;
      }
      return NAME_MAP.get(name);
    }
}JsonMessageType.class
import lombok.Data;

@Data
public class JsonMessageType {

    /**
   * 库名
   */
    private String schemaName;

    /**
   * 表名
   */
    private String tableName;

    /**
   * 事件类型
   * (INSERT/UPDATE/DELETE)
   */
    private String eventType;

    /**
   * 数据JSON字符串
   */
    private String data;

}4、CanalRunnerAutoConfig启动Canal配置:

import com.iven.canal.entity.CanalProperties;
import com.iven.canal.handle.CanalWorkRegistry;
import com.iven.canal.utils.JsonMessageParser;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* Canal自动配置
*/
@Slf4j
@Configuration
@RequiredArgsConstructor
public class CanalRunnerAutoConfig {

    private final CanalProperties canalProperties;
    private final JsonMessageParser jsonMessageParser;
    private final CanalWorkRegistry workRegistry;

    @Bean
    public ApplicationRunner canalApplicationRunner() {
      return args -> {
            if (!canalProperties.isAutoSync()) {
                log.info("Canal自动同步已关闭");
                return;
            }
            // 如果没有任何Work,则不启动Canal
            if (!workRegistry.hasWork()) {
                log.info("无表同步处理器,不启动Canal");
                return;
            }
            // 启动所有配置的Canal实例
            canalProperties.getInstances().forEach((instanceKey, config) -> {
                CanalRunner runner = new CanalRunner(
                        config.getHost(),
                        config.getPort(),
                        config.getName(),
                        config.getBatchSize(),
                        config.getSleepTime(),
                        jsonMessageParser,
                        workRegistry
                );
                runner.start();
            });
      };
    }
}5、CanalRunner拉取数据:

import com.alibaba.fastjson2.JSON;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.iven.canal.entity.JsonMessageType;
import com.iven.canal.handle.CanalWork;
import com.iven.canal.handle.CanalWorkRegistry;
import com.iven.canal.utils.JsonMessageParser;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Canal运行器
* 手动管理生命周期
*
* 1、启动Canal实例
* 2、处理解析后的数据
*/
@Slf4j
public class CanalRunner {

    private Thread thread;
    private final String canalIp;
    private final Integer canalPort;
    private final String canalInstance;
    private final Integer batchSize;
    private final Integer sleepTime;
    private final JsonMessageParser jsonMessageParser;
    private final CanalWorkRegistry workRegistry;

    public CanalRunner(String canalIp, Integer canalPort, String canalInstance, Integer batchSize,
                     Integer sleepTime, JsonMessageParser jsonMessageParser, CanalWorkRegistry workRegistry) {
      this.canalIp = canalIp;
      this.canalPort = canalPort;
      this.canalInstance = canalInstance;
      this.batchSize = batchSize;
      this.sleepTime = sleepTime;
      this.jsonMessageParser = jsonMessageParser;
      this.workRegistry = workRegistry;
    }

    /**
   * 启动Canal实例
   */
    public void start() {
      if (thread == null || !thread.isAlive()) {
            thread = new Thread(this::run, "canal-runner-" + canalInstance);
            thread.start();
            log.info("Canal实例[{}]启动成功", canalInstance);
      }
    }

    /**
   * 停止Canal实例
   */
    public void stop() {
      if (thread != null && !thread.isInterrupted()) {
            thread.interrupt();
      }
    }

    private void run() {
      log.info("Canal实例[{}]启动中...", canalInstance);
      CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress(canalIp, canalPort), canalInstance, "", "");
      try {
            connector.connect();
            // 订阅所有表(后续通过Work过滤)
            connector.subscribe();
            connector.rollback();

            while (!thread.isInterrupted()) {
                Message message = connector.getWithoutAck(batchSize);
                long batchId = message.getId();
                List<CanalEntry.Entry> entries = message.getEntries();

                if (batchId == -1 || entries.isEmpty()) {
                  Thread.sleep(sleepTime);
                } else {
                  // 解析数据并处理
                  Map<String, List<JsonMessageType>> parsedData = jsonMessageParser.parse(entries);
                  processParsedData(parsedData);
                  // 确认处理成功
                  connector.ack(batchId);
                }
            }
      } catch (InterruptedException e) {
            log.info("Canal实例[{}]被中断", canalInstance);
      } catch (Exception e) {
            log.error("Canal实例[{}]运行异常", canalInstance, e);
            // 处理失败回滚
            connector.rollback();
      } finally {
            connector.disconnect();
            log.info("Canal实例[{}]已停止", canalInstance);
      }
    }

    /**
   * 调用Work处理解析后的数据
   *
   * @param parsedData
   */
    private void processParsedData(Map<String, List<JsonMessageType>> parsedData) {
      parsedData.forEach((tableKey, dataList) -> {
            // 获取该表的所有Work
            List<CanalWork> works = workRegistry.getWorksByTable(tableKey);
            if (!works.isEmpty() && !dataList.isEmpty()) {
                // 转换数据格式(Json字符串 -> Map)
                List<Map<String, Object>> dataMaps = dataList.stream()
                        .map(item -> JSON.<Map<String, Object>>parseObject(item.getData(), Map.class))
                        .collect(Collectors.toList());
                String schemaName = dataList.get(0).getSchemaName();
                // 调用每个Work的处理方法
                works.forEach(work -> work.handle(dataMaps, dataList.get(0).getEventType(), schemaName));
            }
      });
    }
   
}6、JsonMessageParser解析数据:

MessageParser
import com.alibaba.otter.canal.protocol.CanalEntry;
import java.util.List;

/**
* 消息解析器接口
*
*/
public interface MessageParser<T> {

    T parse(List<CanalEntry.Entry> canalEntryList);

} JsonMessageParserimport com.alibaba.fastjson2.JSON;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.iven.canal.entity.DataEventTypeEnum;
import com.iven.canal.entity.JsonMessageType;
import com.iven.canal.handle.CanalWorkRegistry;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.*;

/**
* Json消息解析器
*
* 1、遍历原始数据列表接收
* 2、解析行级变更数据
* 3、封装为 JsonParseType
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class JsonMessageParser implements MessageParser<Map<String, List<JsonMessageType>>> {

    private final CanalWorkRegistry workRegistry;

    @Override
    public Map<String, List<JsonMessageType>> parse(List<CanalEntry.Entry> canalEntryList) {
      Map<String, List<JsonMessageType>> dataMap = new HashMap<>();
      for (CanalEntry.Entry entry : canalEntryList) {
            if (!CanalEntry.EntryType.ROWDATA.equals(entry.getEntryType())) {
                continue;
            }

            // 1. 获取库名、表名、带库名的表标识
            String schemaName = entry.getHeader().getSchemaName();
            String tableName = entry.getHeader().getTableName();
            String fullTableName = schemaName + "." + tableName;

            // 2. 检查是否有对应的处理器(支持两种格式)
            boolean hasFullTableWork = !CollectionUtils.isEmpty(workRegistry.getWorksByTable(fullTableName));
            boolean hasSimpleTableWork = !CollectionUtils.isEmpty(workRegistry.getWorksByTable(tableName));
            if (!hasFullTableWork && !hasSimpleTableWork) {
                log.debug("表[{}]和[{}]均无同步处理器,跳过", fullTableName, tableName);
                continue;
            }

            try {
                CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                rowChange.getRowDatasList().forEach(rowData -> {
                  JsonMessageType jsonMessageType = parseRowData(entry.getHeader(), rowChange.getEventType(), rowData);
                  if (jsonMessageType != null) {
                        // 3. 按存在的处理器类型,分别添加到数据映射中
                        if (hasFullTableWork) {
                            dataMap.computeIfAbsent(fullTableName, k -> new ArrayList<>()).add(jsonMessageType);
                        }
                        if (hasSimpleTableWork) {
                            dataMap.computeIfAbsent(tableName, k -> new ArrayList<>()).add(jsonMessageType);
                        }
                  }
                });
            } catch (Exception e) {
                log.error("解析数据失败", e);
            }
      }
      return dataMap;
    }

    private JsonMessageType parseRowData(CanalEntry.Header header, CanalEntry.EventType eventType,
                                       CanalEntry.RowData rowData) {
      // 获取库名
      String schemaName = header.getSchemaName();
      // 获取表名
      String tableName = header.getTableName();
      if (eventType == CanalEntry.EventType.DELETE) {
            return dataWrapper(schemaName, tableName, DataEventTypeEnum.DELETE.NAME(), rowData.getBeforeColumnsList());
      } else if (eventType == CanalEntry.EventType.INSERT) {
            return dataWrapper(schemaName, tableName, DataEventTypeEnum.INSERT.NAME(), rowData.getAfterColumnsList());
      } else if (eventType == CanalEntry.EventType.UPDATE) {
            return dataWrapper(schemaName, tableName, DataEventTypeEnum.UPDATE.NAME(), rowData.getAfterColumnsList());
      }
      return null;
    }

    private JsonMessageType dataWrapper(String schemaName, String tableName, String eventType,
                                        List<CanalEntry.Column> columns) {
      Map<String, String> data = new HashMap<>();
      columns.forEach(column -> data.put(column.getName(), column.getValue()));
      JsonMessageType result = new JsonMessageType();
      result.setSchemaName(schemaName);
      result.setTableName(tableName);
      result.setEventType(eventType);
      result.setData(JSON.toJSONString(data));
      return result;
    }

}7、CanalWorkRegistry匹配处理器:

CanalWork
import java.util.List;
import java.util.Map;

/**
* Canal-Work处理器
*
*/
public interface CanalWork {
    /**
   * 返回需要处理的表名(如:tb_user)
   */
    String getTableName();

    /**
   * 处理表数据的方法
   * @param dataList 表数据列表(每条数据是字段名-值的Map)
   * @param eventType 事件类型(INSERT/UPDATE/DELETE)
   * @param schemaName 库名(用于区分不同库的表)
   */
    void handle(List<Map<String, Object>> dataList, String eventType, String schemaName);

}CanalWorkRegistry
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* 处理器注册器,
* 扫描并缓存所有CanalWork实现类,按表名分组管理,提供查询表对应处理器的方法
*/
@Slf4j
@Component
public class CanalWorkRegistry implements ApplicationContextAware {

    /**
   * 表名 -> Work列表(支持一个表多个Work)
   */
    private final Map<String, List<CanalWork>> tableWorkMap = new HashMap<>();

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
      // 扫描所有CanalWork实现类
      Map<String, CanalWork> workMap = applicationContext.getBeansOfType(CanalWork.class);
      // 按表名分组
      tableWorkMap.putAll(workMap.values().stream()
                .collect(Collectors.groupingBy(CanalWork::getTableName)));
      log.info("已注册的表同步处理器: {}", tableWorkMap.keySet());
    }

    /**
   * 获取指定表的Work列表
   *
   * @param tableName
   * @return
   */
    public List<CanalWork> getWorksByTable(String tableName) {
      return tableWorkMap.getOrDefault(tableName, Collections.emptyList());
    }

    /**
   * 判断是否有表需要处理
   *
   * @return
   */
    public boolean hasWork() {
      return !tableWorkMap.isEmpty();
    }

}8、CanalWork实现类处理数据:

import com.iven.canal.entity.DataEventTypeEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;

/**
* tb_user表数据处理
*
* Canal服务 → 变更数据 → CanalRunner 拉取 → JsonMessageParser 解析 →
* 筛选出 tb_user 数据 → CanalWorkRegistry 获取 TbUserCanalWorkHandle →
* 调用 handle 方法 → 按事件类型(INSERT/UPDATE/DELETE)执行对应逻辑
*/
@Slf4j
@Component
public class TbUserCanalWorkHandle implements CanalWork {
    @Override
    public String getTableName() {
      return "demo.tb_user";
    }

    @Override
    public void handle(List<Map<String, Object>> dataList, String eventType, String schemaName) {
      log.info("开始处理[{}库]的tb_user表数据,事件类型:{},数据量:{}", schemaName, eventType, dataList.size());

      DataEventTypeEnum dataEventTypeEnum = DataEventTypeEnum.getEnum(eventType);
      
      // 根据事件类型分别处理
      switch (dataEventTypeEnum) {
            case INSERT:
                handleInsert(dataList, schemaName);
                break;
            case UPDATE:
                handleUpdate(dataList, schemaName);
                break;
            case DELETE:
                handleDelete(dataList, schemaName);
                break;
            default:
                log.warn("未处理的事件类型:{}", eventType);
      }
    }

    /**
   * 处理新增数据
   */
    private void handleInsert(List<Map<String, Object>> dataList, String schemaName) {
      log.info("处理[{}库]的tb_user新增数据,共{}条", schemaName, dataList.size());
      dataList.forEach(data -> {
            Object userId = data.get("id");
            Object username = data.get("name");
            // 新增逻辑:如同步到ES、缓存初始化等
            log.info("新增用户 - ID: {}, 用户名: {}", userId, username);
      });
    }

    /**
   * 处理更新数据
   */
    private void handleUpdate(List<Map<String, Object>> dataList, String schemaName) {
      log.info("处理[{}库]的tb_user更新数据,共{}条", schemaName, dataList.size());
      dataList.forEach(data -> {
            Object userId = data.get("id");
            Object newPhone = data.get("phone"); // 假设更新了手机号
            // 更新逻辑:如更新ES文档、刷新缓存等
            log.info("更新用户 - ID: {}, 新手机号: {}", userId, newPhone);
      });
    }

    /**
   * 处理删除数据
   */
    private void handleDelete(List<Map<String, Object>> dataList, String schemaName) {
      log.info("处理[{}库]的tb_user删除数据,共{}条", schemaName, dataList.size());
      dataList.forEach(data -> {
            Object userId = data.get("id");
            // 删除逻辑:如从ES删除、清除缓存等
            log.info("删除用户 - ID: {}", userId);
      });
    }
}
 
调度流程:
整个流程通过注册器管理处理器、解析器转换数据格式、运行器控制 Canal 客户端生命周期,最终将数据库变更事件分发到对应表的处理器,实现了变更数据的监听与业务处理解耦。用户只需实现CanalWork接口,即可自定义任意表的变更处理逻辑。
(1)、初始化阶段
1)、Spring 容器启动时,CanalWorkRegistry 扫描所有 CanalWork 实现类(如 TbUserCanalWorkHandle),按表名分组缓存到 tableWorkMap 中。
2)、CanalRunnerAutoConfig 检查配置(CanalProperties),若开启自动同步且存在 CanalWork,则为每个 Canal 实例创建 CanalRunner 并启动。
(2)、运行阶段
1)、CanalRunner 建立与 Canal 服务的连接,订阅数据库变更事件。
2)、循环拉取变更数据(Message),通过 JsonMessageParser 解析为表名 - 数据列表的映射(Map)。
3)、调用 processParsedData 方法,根据表名从 CanalWorkRegistry 获取对应的 CanalWork 列表,执行 handle 方法处理数据。
(3)、销毁阶段
程序停止时,CanalRunner 中断线程,断开与 Canal 服务的连接。
 

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

尹心菱 发表于 2025-11-9 22:48:04

热心回复!

湛恶 发表于 2025-11-22 00:40:37

谢谢楼主提供!

巨耗 发表于 2025-12-2 14:22:21

用心讨论,共获提升!

班闵雨 发表于 2025-12-4 05:37:05

东西不错很实用谢谢分享

缑娅瑛 发表于 2025-12-10 00:49:55

这个有用。
页: [1]
查看完整版本: 17、Canal监听MySQL-Binlog实现数据监听