1.前期间准备工作
- mysql 开启binlog写入功能,配置binlog-format为ROW模式
log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=8 # 不要和 canal 的 slaveId 重复
- 创建mysql账号
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
- 安装canal
wget https://github.com/alibaba/canal/releases/download/canal-1.0.17/canal.deployer-1.0.17.tar.gz mkdir /usr/local/canal tar -zxvf canal.deployer-$version.tar.gz -C /usr/local/canal
- 修改配置文件example/instance.properties
canal.instance.mysql.slaveId=8 canal.instance.master.address=192.168.207.200:3306
- 启动canal服务 ./startup.sh
- 编写java程序 添加依赖
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.3</version> </dependency>
连接canal@Bean public CanalConnector getCanalConnector() { canalConnector = CanalConnectors.newClusterConnector(Lists.newArrayList(new InetSocketAddress(canalHost, Integer.valueOf(canalPort))), canalDestination, canalUsername, canalPassword); canalConnector.connect(); // 指定filter,格式 {database}.{table},这里不做过滤,过滤操作留给用户 canalConnector.subscribe(); // 回滚寻找上次中断的位置 canalConnector.rollback(); logger.info("canal客户端启动成功"); return canalConnector; }
调度任务拉取数据package com.demo.es.scheduling; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; import com.alibaba.otter.canal.protocol.CanalEntry.EventType; import com.alibaba.otter.canal.protocol.Message; import com.demo.es.event.DeleteAbstractCanalEvent; import com.demo.es.event.InsertAbstractCanalEvent; import com.demo.es.event.UpdateAbstractCanalEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.List; /** * @version 1.0 * @since 2017-08-26 22:44:00 */ @Component public class CanalScheduling implements Runnable, ApplicationContextAware { private static final Logger logger = LoggerFactory.getLogger(CanalScheduling.class); private ApplicationContext applicationContext; @Resource private CanalConnector canalConnector; @Scheduled(fixedDelay = 100) @Override public void run() { try { int batchSize = 1000; // Message message = connector.get(batchSize); Message message = canalConnector.getWithoutAck(batchSize); long batchId = message.getId(); logger.debug("scheduled_batchId=" + batchId); try { List<Entry> entries = message.getEntries(); if (batchId != -1 && entries.size() > 0) { entries.forEach(entry -> { if (entry.getEntryType() == EntryType.ROWDATA) { publishCanalEvent(entry); } }); } canalConnector.ack(batchId); } catch (Exception e) { logger.error("发送监听事件失败!batchId回滚,batchId=" + batchId, e); canalConnector.rollback(batchId); } } catch (Exception e) { logger.error("canal_scheduled异常!", e); } } private void publishCanalEvent(Entry entry) { EventType eventType = entry.getHeader().getEventType(); switch (eventType) { case INSERT: applicationContext.publishEvent(new InsertAbstractCanalEvent(entry)); break; case UPDATE: applicationContext.publishEvent(new UpdateAbstractCanalEvent(entry)); break; case DELETE: applicationContext.publishEvent(new DeleteAbstractCanalEvent(entry)); break; default: break; } } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } }
解析数据package com.demo.es.listener; import com.alibaba.otter.canal.protocol.CanalEntry.Column; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; import com.alibaba.otter.canal.protocol.CanalEntry.RowData; import com.demo.es.event.AbstractCanalEvent; import com.demo.es.model.DatabaseTableModel; import com.demo.es.model.IndexTypeModel; import com.demo.es.service.MappingService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationListener; import javax.annotation.Resource; import java.util.HashMap; import java.util.List; import java.util.Map; /** * @version 1.0 * @since 2017-08-28 14:40:00 */ public abstract class AbstractCanalListener<EVENT extends AbstractCanalEvent> implements ApplicationListener<EVENT> { private static final Logger logger = LoggerFactory.getLogger(AbstractCanalListener.class); @Resource private MappingService mappingService; @Override public void onApplicationEvent(EVENT event) { Entry entry = event.getEntry(); String database = entry.getHeader().getSchemaName(); String table = entry.getHeader().getTableName(); IndexTypeModel indexTypeModel = mappingService.getIndexType(new DatabaseTableModel(database, table)); if (indexTypeModel == null) { return; } String index = indexTypeModel.getIndex(); String type = indexTypeModel.getType(); RowChange change; try { change = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { logger.error("canalEntry_parser_error,根据CanalEntry获取RowChange失败!", e); return; } change.getRowDatasList().forEach(rowData -> doSync(database, table, index, type, rowData)); } Map<String, Object> parseColumnsToMap(List<Column> columns) { Map<String, Object> jsonMap = new HashMap<>(); columns.forEach(column -> { if (column == null) { return; } jsonMap.put(column.getName(), column.getIsNull() ? null : mappingService.getElasticsearchTypeObject(column.getMysqlType(), column.getValue())); }); return jsonMap; } protected abstract void doSync(String database, String table, String index, String type, RowData rowData); }
package com.demo.es.listener; import com.alibaba.fastjson.JSONObject; import com.alibaba.otter.canal.protocol.CanalEntry.Column; import com.alibaba.otter.canal.protocol.CanalEntry.RowData; import com.demo.es.event.InsertAbstractCanalEvent; import com.demo.es.service.ElasticsearchService; import com.demo.es.service.MappingService; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.List; import java.util.Map; import java.util.Optional; /** * @version 1.0 * @since 2017-08-26 22:32:00 */ @Component public class InsertCanalListener extends AbstractCanalListener<InsertAbstractCanalEvent> { private static final Logger logger = LoggerFactory.getLogger(InsertCanalListener.class); @Resource private MappingService mappingService; @Resource private ElasticsearchService elasticsearchService; @Override protected void doSync(String database, String table, String index, String type, RowData rowData) { List<Column> columns = rowData.getAfterColumnsList(); String primaryKey = Optional.ofNullable(mappingService.getTablePrimaryKeyMap().get(database + "." + table)).orElse("id"); Column idColumn = columns.stream().filter(column -> column.getIsKey() && primaryKey.equals(column.getName())).findFirst().orElse(null); if (idColumn == null || StringUtils.isBlank(idColumn.getValue())) { logger.warn("insert_column_find_null_warn insert从column中找不到主键,database=" + database + ",table=" + table); return; } logger.debug("insert_column_id_info insert主键id,database=" + database + ",table=" + table + ",id=" + idColumn.getValue()); Map<String, Object> dataMap = parseColumnsToMap(columns); elasticsearchService.insertById(index, type, idColumn.getValue(), dataMap); logger.debug("insert_es_info 同步es插入操作成功!database=" + database + ",table=" + table + ",data=" + JSONObject.toJSONString(dataMap)); } }
package com.demo.es.service.impl; import com.demo.es.model.DatabaseTableModel; import com.demo.es.model.IndexTypeModel; import com.demo.es.service.MappingService; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; import com.google.common.collect.Maps; import org.apache.commons.lang.StringUtils; import org.springframework.beans.factory.InitializingBean; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.PropertySource; import org.springframework.stereotype.Service; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; /** * @version 1.0 * @since 2017-08-27 13:14:00 */ @Service @PropertySource("classpath:mapping.properties") @ConfigurationProperties public class MappingServiceImpl implements MappingService, InitializingBean { private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); private Map<String, String> dbEsMapping; private BiMap<DatabaseTableModel, IndexTypeModel> dbEsBiMapping; private Map<String, String> tablePrimaryKeyMap; private Map<String, Converter> mysqlTypeElasticsearchTypeMapping; @Override public Map<String, String> getTablePrimaryKeyMap() { return tablePrimaryKeyMap; } @Override public void setTablePrimaryKeyMap(Map<String, String> tablePrimaryKeyMap) { this.tablePrimaryKeyMap = tablePrimaryKeyMap; } @Override public IndexTypeModel getIndexType(DatabaseTableModel databaseTableModel) { return dbEsBiMapping.get(databaseTableModel); } @Override public DatabaseTableModel getDatabaseTableModel(IndexTypeModel indexTypeModel) { return dbEsBiMapping.inverse().get(indexTypeModel); } @Override public Object getElasticsearchTypeObject(String mysqlType, String data) { Optional<Entry<String, Converter>> result = mysqlTypeElasticsearchTypeMapping.entrySet().parallelStream().filter(entry -> mysqlType.toLowerCase().contains(entry.getKey())).findFirst(); return (result.isPresent() ? result.get().getValue() : (Converter) data1 -> data1).convert(data); } @Override public void afterPropertiesSet() throws Exception { dbEsBiMapping = HashBiMap.create(); dbEsMapping.forEach((key, value) -> { String[] keyStrings = StringUtils.split(key, "."); String[] valueStrings = StringUtils.split(value, "."); dbEsBiMapping.put(new DatabaseTableModel(keyStrings[0], keyStrings[1]), new IndexTypeModel(valueStrings[0], valueStrings[1])); }); mysqlTypeElasticsearchTypeMapping = Maps.newHashMap(); mysqlTypeElasticsearchTypeMapping.put("char", data -> data); mysqlTypeElasticsearchTypeMapping.put("text", data -> data); mysqlTypeElasticsearchTypeMapping.put("blob", data -> data); mysqlTypeElasticsearchTypeMapping.put("int", Long::valueOf); mysqlTypeElasticsearchTypeMapping.put("date", data -> LocalDateTime.parse(data, FORMATTER)); mysqlTypeElasticsearchTypeMapping.put("time", data -> LocalDateTime.parse(data, FORMATTER)); mysqlTypeElasticsearchTypeMapping.put("float", Double::valueOf); mysqlTypeElasticsearchTypeMapping.put("double", Double::valueOf); mysqlTypeElasticsearchTypeMapping.put("decimal", Double::valueOf); } public Map<String, String> getDbEsMapping() { return dbEsMapping; } public void setDbEsMapping(Map<String, String> dbEsMapping) { this.dbEsMapping = dbEsMapping; } private interface Converter { Object convert(String data); } }
相关推荐
k8s1.16的jenkins部署java项目cicd(cd手动)-kubernetes安装包和详细文档笔记整理
SQLyong 各个版本,免费下载 SQLyog是业界著名的Webyog公司出品的一款简洁高效、功能强大的图形化MySQL数据库管理工具。使用SQLyog可以快速直观地让您从世界的任何角落通过网络来维护远端的MySQL数据库。
Python库是一组预先编写的代码模块,旨在帮助开发者实现特定的编程任务,无需从零开始编写代码。这些库可以包括各种功能,如数学运算、文件操作、数据分析和网络编程等。Python社区提供了大量的第三方库,如NumPy、Pandas和Requests,极大地丰富了Python的应用领域,从数据科学到Web开发。Python库的丰富性是Python成为最受欢迎的编程语言之一的关键原因之一。这些库不仅为初学者提供了快速入门的途径,而且为经验丰富的开发者提供了强大的工具,以高效率、高质量地完成复杂任务。例如,Matplotlib和Seaborn库在数据可视化领域内非常受欢迎,它们提供了广泛的工具和技术,可以创建高度定制化的图表和图形,帮助数据科学家和分析师在数据探索和结果展示中更有效地传达信息。
SQLyog-12.5.0-0.x64Community
Python库是一组预先编写的代码模块,旨在帮助开发者实现特定的编程任务,无需从零开始编写代码。这些库可以包括各种功能,如数学运算、文件操作、数据分析和网络编程等。Python社区提供了大量的第三方库,如NumPy、Pandas和Requests,极大地丰富了Python的应用领域,从数据科学到Web开发。Python库的丰富性是Python成为最受欢迎的编程语言之一的关键原因之一。这些库不仅为初学者提供了快速入门的途径,而且为经验丰富的开发者提供了强大的工具,以高效率、高质量地完成复杂任务。例如,Matplotlib和Seaborn库在数据可视化领域内非常受欢迎,它们提供了广泛的工具和技术,可以创建高度定制化的图表和图形,帮助数据科学家和分析师在数据探索和结果展示中更有效地传达信息。
SQLyong.exe 各个版本,免费下载 SQLyog是业界著名的Webyog公司出品的一款简洁高效、功能强大的图形化MySQL数据库管理工具。使用SQLyog可以快速直观地让您从世界的任何角落通过网络来维护远端的MySQL数据库。
python课程设计大作业问答系统源码.zip python课程设计大作业问答系统源码.zippython课程设计大作业问答系统源码.zippython课程设计大作业问答系统源码.zippython课程设计大作业问答系统源码.zippython课程设计大作业问答系统源码.zippython课程设计大作业问答系统源码.zippython课程设计大作业问答系统源码.zippython课程设计大作业问答系统源码.zippython课程设计大作业问答系统源码.zippython课程设计大作业问答系统源码.zippython课程设计大作业问答系统源码.zippython课程设计大作业问答系统源码.zippython课程设计大作业问答系统源码.zippython课程设计大作业问答系统源码.zippython课程设计大作业问答系统源码.zippython课程设计大作业问答系统源码.zippython课程设计大作业问答系统源码.zippython课程设计大作业问答系统源码.zippython课程设计大作业问答系统源码.zippython课程设计大作业问答系统源码.zippython课程设计大作业问
强化学习_基于Pytorch的深度强化学习的模块化实现_附项目源码_优质项目实战
源代码-Gallery4U ASP图片管理系统 v1.25.zip 源代码-Gallery4U ASP图片管理系统 v1.25.zip 源代码-Gallery4U ASP图片管理系统 v1.25.zip 源代码-Gallery4U ASP图片管理系统 v1.25.zip 源代码-Gallery4U ASP图片管理系统 v1.25.zip 源代码-Gallery4U ASP图片管理系统 v1.25.zip 源代码-Gallery4U ASP图片管理系统 v1.25.zip 源代码-Gallery4U ASP图片管理系统 v1.25.zip 源代码-Gallery4U ASP图片管理系统 v1.25.zip
能源是工业的粮食,能源安全事关国家根本安全。当今国际局势风云变幻,全球地缘政治、经济、科技体系正经历深刻变化,能源局势将更加错综复杂,威胁能源安全的各种“灰犀牛”“黑天鹅”事件时有发生,促使国际能源版图深刻变迁。作为世界最大的能源消费国,如何有效保障国家能源安全、有力保障国家经济社会发展,始终是我国能源发展的首要问题。只有把能源的饭碗端在自己手里,充分保障国家能源安全,才能把握未来发展主动权,牢牢守住新发展格局的安全底线。当下,随着新一代信息技术的蓬勃发展,能源行业的数字化和智能化程度也在不断加深,网络与数据安全深刻影响着整体能源安全的各个方面。随之,能源领域敏感数据的泄露、滥用、篡改等安全威胁也接踵而至,影响整体能源安全,进而威胁国家安全。因此,制定相关政策和标准,从合规维度赋能能源网络与数据安全体系建设至关重要。
C#上位机(驱动实验).zip
计算机专业毕业时间之VB精品论文源代码资源
Python库是一组预先编写的代码模块,旨在帮助开发者实现特定的编程任务,无需从零开始编写代码。这些库可以包括各种功能,如数学运算、文件操作、数据分析和网络编程等。Python社区提供了大量的第三方库,如NumPy、Pandas和Requests,极大地丰富了Python的应用领域,从数据科学到Web开发。Python库的丰富性是Python成为最受欢迎的编程语言之一的关键原因之一。这些库不仅为初学者提供了快速入门的途径,而且为经验丰富的开发者提供了强大的工具,以高效率、高质量地完成复杂任务。例如,Matplotlib和Seaborn库在数据可视化领域内非常受欢迎,它们提供了广泛的工具和技术,可以创建高度定制化的图表和图形,帮助数据科学家和分析师在数据探索和结果展示中更有效地传达信息。
SQLyog-13.1.4-0.x64Community
源代码-Excel二维表数据转SELECT CASE 工具 v2.0.zip
计算机专业毕业设计VB精品论文资源
WX小程序源码影音娱乐提取方式是百度网盘分享地址
c语言文件读写操作代码 C语言读写文件,非常适合小白学习 C语言读写文件,非常适合小白学习 C语言读写文件,非常适合小白学习 C语言读写文件,非常适合小白学习 C语言读写文件,非常适合小白学习
大学生毕业答辨ppt免费模板【不要积分】下载可编辑可用(136).zip
计算机专业毕业设计VB精品论文资源