接着上篇安装完 postgresql connect ,我们再安装es connect就容易多了; 安装es connector plugins
因为docker 安装的connect容器里没有es的connect plugins,所以我们去 confluent 官网下载(搜索 Kafka Connect Elasticsearch下载即可)
下载解压后放至 connect目录(上篇中设置的挂载目录)中,如果不记得将容器目录挂载到哪可通过如下命令查看: docker inspect 容器id |grep Mounts -A 20
放置完成后重启connect 容器,并请求如下http验证: get ip:8093/connector-plugins
创建es sink connector post ip:8093/connectors 为何不可为大牛? { "name": "es-sink1", "config": { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "connection.url": "http://ip:9200", "connection.username": "elastic", "connection.password": "elastic_xdeas", "type.name": "_doc", "key.ignore": "false", "topics": "know.knowledge.formal_new", "write.method": "upsert", "behavior.on.null.values": "delete", "transforms": "key,ExtractFieldObject", "transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key", "transforms.key.field": "id", "transforms.ExtractFieldObject.type": "org.apache.kafka.connect.transforms.ExtractField$Value", "transforms.ExtractFieldObject.field": "after" } } (为何不可为大牛?) 这里的es connector 配置的着重解析一下: 一开始不知道怎么配认证,翻遍了国内外官方/非官方博客文档都没有找到,几乎要放弃了,最后在stackoverflow找到了 https://stackoverflow.com/questions/58381240/how-to-kafka-connect-elasticsearch-with-ssl (强烈吐槽!官方文档能不能详细点!) key.ignore 如果设置为true,ES里面_id的值会自动生成,这样的话表里某行记录只要一变化,es就会增加一条数据,所以一定要设置为false; topics:需要订阅的topic,即上篇配置完pg connector后生成的topic; transforms:数据转换有关; transforms.key.type和transforms.key.field这里配置的意思是将表中的id作为es里面的文档id; "transforms.ExtractFieldObject.field": "after" 字段筛选,我们只需要"after"字段的数据, 因为如果没有transforms.ExtractFieldObject.type 和 transforms.ExtractFieldObject.field的配置,其他的一些无关紧要的元数据也会进入es,索引里数据会是下面这样: (再次吐槽官方文档,这里也是花了很多时间才摸索这试出来,太难了) "payload":{"before":null, "after":{"id":"1","collect_id":"1","title":"test","content":"1","publish_date":1591025759000000,"collect_date":1591025761000000,"status":1,"create_date":1591025764000000,"creater":"1","update_date":1591025769000000,"updater":"1","link":"1","label":["1"],"origin":"4"}, "source":{"version":"1.1.1.Final","connector":"postgresql","name":"know","ts_ms":1591006642405,"snapshot":"false","db":"xdeasdb","schema":"knowledge","table":"knowledge_formal_new","txId":1604,"lsn":29368760,"xmin":null}, "op":"u","ts_ms":1591006642869,"transaction":null}} 验证:获取所有的connectors:get ip:8093/connectors/ 同步验证
如上述操作没问题,修改表数据,能看到es中自动创建了索引并将最新数据同步了过来,索引名即对应上步配置的topics :know.knowledge.formal_new
总结:kafka connector 是kafka内置的数据传输工具,上文我们创建了一个postgresql connector(依赖debezium的PostgresConnector)其实就是等价于我们在kafka的config目录中添加了一个connect-file-source.properties配置文件(source代表数据来源);这里我们创建的 es sink connector 等价于在config目录添加了一个connect-file-sink.properties配置文件(sink代表数据输出);这里采用docker 和api管理kafka的connector就显得方便多了;
转发自http://www.forenose.com/column/content/427083577.html
相关推荐
消费kafka数据,然后批量导入到Elasticsearch,本例子使用的kafka版本0.10,es版本是6.4,使用bulk方式批量导入到es中,也可以一条一条的导入,不过比较慢。 <groupId>org.elasticsearch <artifactId>elastic...
研究了一段时间后,根据网上的例子,做大量的削减及根据需要做出的最简化使用实例,并且加入了获取kafka的server端的状态信息,根据状态信息配置启动时读写位置
kafka-connect-ui 这是Kafka Connect的网络工具,用于设置和管理多个连接集群的连接器。现场演示与Docker独立运行docker run --... 例如:CONNECT_URL = “ 另外,您可以通过在端点URL后面附加分号和群集名称来为Connec
Kafka Connect Redis 用于Redis的Kafka源和接收器连接器... 该演示将引导您在本地计算机上设置Kubernetes,安装连接器,并使用连接器将数据写入Redis集群或将数据从Redis引入Kafka。 兼容性 需要Redis 2.6或更高版本。
Kafka Connect Elasticsearch来源:从elastic-search获取数据并将其发送到kafka。 连接器仅使用严格的增量/时间字段(例如时间戳或增量ID)来获取新数据。 它支持动态模式和嵌套对象/数组。 要求: Elasticsearch ...
docker-kafka-连接 Dockerized (分布式模式) 支持的标签 0.10.0.0 (2.11) 0.10.1.1 (2.11) 最新的0.10.2.0 (2.12) 快速开始 使用Docker Compose 像这样编写docker-compose.yml ,然后执行docker-...
您可以通过使用kafka9-connect-mongodb分支将此连接器用于Kafka9。 用于Kafka Connect的MongoDB接收器连接器提供了从Kafka主题或主题集到MongoDB集合或多个集合的简单,连续的链接。 连接器使用Kafka消息,重命名...
kafka-connect-elasticsearch-5.4.1.zip包含了kafka-connect-elasticsearch-5.4.1.jar包。 欢迎大家下载,大家也可关注我的博客,欢迎一起交流,如有疑问请留言!
转换器控制将数据写入源连接器的Kafka或从Kafka接收器的连接器读取的数据格式。兼容性2.x版本系列与Kafka Connect 5.x兼容。 以及更高版本(较早的版本已经过验证,可以一直使用到Kafka Connect 3.2.0,尽管我们...
Kafka Connect Elasticsearch连接器 kafka-connect-elasticsearch是一个用于在Kafka和Elasticsearch之间复制数据。发展要构建开发版本,您需要Kafka的最新版本。 您可以使用标准生命周期阶段,使用Maven构建kafka-...
kafka-connect-jdbc是一个用于向与JDBC兼容的数据库之间加载数据。 可以在找到该连接器的文档。 发展 要构建开发版本,您需要Kafka的最新版本。 您可以使用标准生命周期阶段在Maven中构建kafka-connect-jdbc。 ...
该库的目的是帮助从字符串到正确的Kafka Connect数据类型的转换。 希望该库可用于减少麻痹字符串解析代码的麻烦。 类型解析 这部分代码将有助于从字符串到正确的Kafka Connect数据类型的转换。 希望该库在减少将...
StreamX:Kafka Connect for S3 从很棒的 StreamX是基于kafka连接的连接器,用于将数据从Kafka复制到对象存储,例如Amazon s3,Google Cloud Storage和Azure Blob存储。 它专注于可靠和可扩展的数据复制。 它可以以...
kafka数据同步工具kafka2x-elasticsearch-master.zip
介绍通过安装该Kafka Connect连接器提供了监视目录的文件和在将新文件写入输入目录时读取数据的功能。 输入文件中的每个记录将根据用户提供的模式进行转换。 CSVRecordProcessor支持读取CSV或TSV文件。 它可以将CSV...
Kafka Connect JDBC连接器 kafka-connect-jdbc是一个用于与任何兼容JDBC的数据库之间加载数据。 可以在找到该连接器的文档。发展要构建开发版本,您需要Kafka的最新版本以及一系列上游Confluent项目,您必须从其相应...
kafka-connect-mqtt 此仓库包含用于Apache Kafka的MQTT源和接收器连接器。 已通过Kafka 2+进行了测试。 使用源连接器,您可以订阅MQTT主题,并将这些消息写到Kafka主题。 接收器连接器以相反的方式工作。 笔记: ...
MaxCompute技术公开课第四季之如何将Kafka数据同步至MaxCompute.pdf
使用kafka主题/连接从rdbms提取数据并将数据推送到es 定制卡夫卡消费者 使用kafka连接 Kafka Connect JDBC cd ./connect别名kconnect ='$ {PATH_2_KAFKA} /bin/connect-standalone.sh ./worker.properties ./...
Kafka技术内幕:图文详解Kafka源码设计与实现 PDF 下载 Kafka技术内幕:图文详解Kafka源码设计与实现 PDF 下载