`

基于mysql数据库binlog的增量订阅&消费

 
阅读更多

1.前期间准备工作 

  1. mysql 开启binlog写入功能,配置binlog-format为ROW模式
    log-bin=mysql-bin # 开启 binlog
    binlog-format=ROW # 选择 ROW 模式
    server_id=8 # 不要和 canal 的 slaveId 重复
     
     
  2. 创建mysql账号
    CREATE USER canal IDENTIFIED BY 'canal'; 
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;
     
     
  3. 安装canal 
    wget https://github.com/alibaba/canal/releases/download/canal-1.0.17/canal.deployer-1.0.17.tar.gz
    
    mkdir /usr/local/canal
    tar -zxvf canal.deployer-$version.tar.gz -C /usr/local/canal
    
    
     
     
  4. 修改配置文件example/instance.properties
    canal.instance.mysql.slaveId=8
    canal.instance.master.address=192.168.207.200:3306
    
     
  5. 启动canal服务 ./startup.sh
  6. 编写java程序 添加依赖
    <dependency>
     <groupId>com.alibaba.otter</groupId>
     <artifactId>canal.client</artifactId>
     <version>1.1.3</version>
    </dependency>
     连接canal
    @Bean
        public CanalConnector getCanalConnector() {
    
            canalConnector = CanalConnectors.newClusterConnector(Lists.newArrayList(new InetSocketAddress(canalHost, Integer.valueOf(canalPort))), canalDestination, canalUsername, canalPassword);
            canalConnector.connect();
            // 指定filter,格式 {database}.{table},这里不做过滤,过滤操作留给用户
            canalConnector.subscribe();
            // 回滚寻找上次中断的位置
            canalConnector.rollback();
            logger.info("canal客户端启动成功");
            return canalConnector;
        }
     调度任务拉取数据
    package com.demo.es.scheduling;
    
    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
    import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
    import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
    import com.alibaba.otter.canal.protocol.Message;
    import com.demo.es.event.DeleteAbstractCanalEvent;
    import com.demo.es.event.InsertAbstractCanalEvent;
    import com.demo.es.event.UpdateAbstractCanalEvent;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.BeansException;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.ApplicationContextAware;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    import java.util.List;
    
    /**
     * @version 1.0
     * @since 2017-08-26 22:44:00
     */
    @Component
    public class CanalScheduling implements Runnable, ApplicationContextAware {
        private static final Logger logger = LoggerFactory.getLogger(CanalScheduling.class);
        private ApplicationContext applicationContext;
    
        @Resource
        private CanalConnector canalConnector;
    
        @Scheduled(fixedDelay = 100)
        @Override
        public void run() {
            try {
                int batchSize = 1000;
    //            Message message = connector.get(batchSize);
                Message message = canalConnector.getWithoutAck(batchSize);
                long batchId = message.getId();
                logger.debug("scheduled_batchId=" + batchId);
                try {
                    List<Entry> entries = message.getEntries();
                    if (batchId != -1 && entries.size() > 0) {
                        entries.forEach(entry -> {
                            if (entry.getEntryType() == EntryType.ROWDATA) {
                                publishCanalEvent(entry);
                            }
                        });
                    }
                    canalConnector.ack(batchId);
                } catch (Exception e) {
                    logger.error("发送监听事件失败!batchId回滚,batchId=" + batchId, e);
                    canalConnector.rollback(batchId);
                }
            } catch (Exception e) {
                logger.error("canal_scheduled异常!", e);
            }
        }
    
        private void publishCanalEvent(Entry entry) {
            EventType eventType = entry.getHeader().getEventType();
            switch (eventType) {
                case INSERT:
                    applicationContext.publishEvent(new InsertAbstractCanalEvent(entry));
                    break;
                case UPDATE:
                    applicationContext.publishEvent(new UpdateAbstractCanalEvent(entry));
                    break;
                case DELETE:
                    applicationContext.publishEvent(new DeleteAbstractCanalEvent(entry));
                    break;
                default:
                    break;
            }
        }
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.applicationContext = applicationContext;
        }
    }
    
     解析数据
    package com.demo.es.listener;
    
    import com.alibaba.otter.canal.protocol.CanalEntry.Column;
    import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
    import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
    import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
    import com.demo.es.event.AbstractCanalEvent;
    import com.demo.es.model.DatabaseTableModel;
    import com.demo.es.model.IndexTypeModel;
    import com.demo.es.service.MappingService;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.context.ApplicationListener;
    
    import javax.annotation.Resource;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    /**
     * @version 1.0
     * @since 2017-08-28 14:40:00
     */
    public abstract class AbstractCanalListener<EVENT extends AbstractCanalEvent> implements ApplicationListener<EVENT> {
        private static final Logger logger = LoggerFactory.getLogger(AbstractCanalListener.class);
    
        @Resource
        private MappingService mappingService;
    
        @Override
        public void onApplicationEvent(EVENT event) {
            Entry entry = event.getEntry();
            String database = entry.getHeader().getSchemaName();
            String table = entry.getHeader().getTableName();
            IndexTypeModel indexTypeModel = mappingService.getIndexType(new DatabaseTableModel(database, table));
            if (indexTypeModel == null) {
                return;
            }
            String index = indexTypeModel.getIndex();
            String type = indexTypeModel.getType();
            RowChange change;
            try {
                change = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                logger.error("canalEntry_parser_error,根据CanalEntry获取RowChange失败!", e);
                return;
            }
            change.getRowDatasList().forEach(rowData -> doSync(database, table, index, type, rowData));
        }
    
        Map<String, Object> parseColumnsToMap(List<Column> columns) {
            Map<String, Object> jsonMap = new HashMap<>();
            columns.forEach(column -> {
                if (column == null) {
                    return;
                }
                jsonMap.put(column.getName(), column.getIsNull() ? null : mappingService.getElasticsearchTypeObject(column.getMysqlType(), column.getValue()));
            });
            return jsonMap;
        }
    
        protected abstract void doSync(String database, String table, String index, String type, RowData rowData);
    }
    
     
    package com.demo.es.listener;
    
    import com.alibaba.fastjson.JSONObject;
    import com.alibaba.otter.canal.protocol.CanalEntry.Column;
    import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
    import com.demo.es.event.InsertAbstractCanalEvent;
    import com.demo.es.service.ElasticsearchService;
    import com.demo.es.service.MappingService;
    import org.apache.commons.lang.StringUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    import java.util.List;
    import java.util.Map;
    import java.util.Optional;
    
    /**
     * @version 1.0
     * @since 2017-08-26 22:32:00
     */
    @Component
    public class InsertCanalListener extends AbstractCanalListener<InsertAbstractCanalEvent> {
        private static final Logger logger = LoggerFactory.getLogger(InsertCanalListener.class);
    
        @Resource
        private MappingService mappingService;
    
        @Resource
        private ElasticsearchService elasticsearchService;
    
        @Override
        protected void doSync(String database, String table, String index, String type, RowData rowData) {
            List<Column> columns = rowData.getAfterColumnsList();
            String primaryKey = Optional.ofNullable(mappingService.getTablePrimaryKeyMap().get(database + "." + table)).orElse("id");
            Column idColumn = columns.stream().filter(column -> column.getIsKey() && primaryKey.equals(column.getName())).findFirst().orElse(null);
            if (idColumn == null || StringUtils.isBlank(idColumn.getValue())) {
                logger.warn("insert_column_find_null_warn insert从column中找不到主键,database=" + database + ",table=" + table);
                return;
            }
            logger.debug("insert_column_id_info insert主键id,database=" + database + ",table=" + table + ",id=" + idColumn.getValue());
            Map<String, Object> dataMap = parseColumnsToMap(columns);
            elasticsearchService.insertById(index, type, idColumn.getValue(), dataMap);
            logger.debug("insert_es_info 同步es插入操作成功!database=" + database + ",table=" + table + ",data=" + JSONObject.toJSONString(dataMap));
        }
    }
    
     
    package com.demo.es.service.impl;
    
    import com.demo.es.model.DatabaseTableModel;
    import com.demo.es.model.IndexTypeModel;
    import com.demo.es.service.MappingService;
    import com.google.common.collect.BiMap;
    import com.google.common.collect.HashBiMap;
    import com.google.common.collect.Maps;
    import org.apache.commons.lang.StringUtils;
    import org.springframework.beans.factory.InitializingBean;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.PropertySource;
    import org.springframework.stereotype.Service;
    
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    import java.util.Map;
    import java.util.Map.Entry;
    import java.util.Optional;
    
    /**
     * @version 1.0
     * @since 2017-08-27 13:14:00
     */
    @Service
    @PropertySource("classpath:mapping.properties")
    @ConfigurationProperties
    public class MappingServiceImpl implements MappingService, InitializingBean {
        private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    
        private Map<String, String> dbEsMapping;
        private BiMap<DatabaseTableModel, IndexTypeModel> dbEsBiMapping;
        private Map<String, String> tablePrimaryKeyMap;
        private Map<String, Converter> mysqlTypeElasticsearchTypeMapping;
    
        @Override
        public Map<String, String> getTablePrimaryKeyMap() {
            return tablePrimaryKeyMap;
        }
    
        @Override
        public void setTablePrimaryKeyMap(Map<String, String> tablePrimaryKeyMap) {
            this.tablePrimaryKeyMap = tablePrimaryKeyMap;
        }
    
        @Override
        public IndexTypeModel getIndexType(DatabaseTableModel databaseTableModel) {
            return dbEsBiMapping.get(databaseTableModel);
        }
    
        @Override
        public DatabaseTableModel getDatabaseTableModel(IndexTypeModel indexTypeModel) {
            return dbEsBiMapping.inverse().get(indexTypeModel);
        }
    
        @Override
        public Object getElasticsearchTypeObject(String mysqlType, String data) {
            Optional<Entry<String, Converter>> result = mysqlTypeElasticsearchTypeMapping.entrySet().parallelStream().filter(entry -> mysqlType.toLowerCase().contains(entry.getKey())).findFirst();
            return (result.isPresent() ? result.get().getValue() : (Converter) data1 -> data1).convert(data);
        }
    
        @Override
        public void afterPropertiesSet() throws Exception {
            dbEsBiMapping = HashBiMap.create();
            dbEsMapping.forEach((key, value) -> {
                String[] keyStrings = StringUtils.split(key, ".");
                String[] valueStrings = StringUtils.split(value, ".");
                dbEsBiMapping.put(new DatabaseTableModel(keyStrings[0], keyStrings[1]), new IndexTypeModel(valueStrings[0], valueStrings[1]));
            });
    
            mysqlTypeElasticsearchTypeMapping = Maps.newHashMap();
            mysqlTypeElasticsearchTypeMapping.put("char", data -> data);
            mysqlTypeElasticsearchTypeMapping.put("text", data -> data);
            mysqlTypeElasticsearchTypeMapping.put("blob", data -> data);
            mysqlTypeElasticsearchTypeMapping.put("int", Long::valueOf);
            mysqlTypeElasticsearchTypeMapping.put("date", data -> LocalDateTime.parse(data, FORMATTER));
            mysqlTypeElasticsearchTypeMapping.put("time", data -> LocalDateTime.parse(data, FORMATTER));
            mysqlTypeElasticsearchTypeMapping.put("float", Double::valueOf);
            mysqlTypeElasticsearchTypeMapping.put("double", Double::valueOf);
            mysqlTypeElasticsearchTypeMapping.put("decimal", Double::valueOf);
        }
    
        public Map<String, String> getDbEsMapping() {
            return dbEsMapping;
        }
    
        public void setDbEsMapping(Map<String, String> dbEsMapping) {
            this.dbEsMapping = dbEsMapping;
        }
    
        private interface Converter {
            Object convert(String data);
        }
    }
    
     
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics