when i use
there is a error
java.lang.NoClassDefFoundError: org/kitesdk/morphline/api/MorphlineCompilationException at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:190) at org.apache.flume.sink.solr.morphline.MorphlineSink.start(MorphlineSink.java:93) at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:46) at org.apache.flume.SinkRunner.start(SinkRunner.java:79) at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
solution:
* clone flume from git * cd flume * edit flume-ng-sinks/flume-ng-morphline-solr-sink/pom.xml, in there do the following: ** make cdk-morphlines-all required by commenting out this blurb: <optional>true</optional> ** add the following mvn blurb to the <build> element in order to copy the dependency jars into the target/lib dir: <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <executions> <execution> <phase>package</phase> <goals> <goal>copy-dependencies</goal> </goals> <configuration> <outputDirectory>${project.build.directory}/lib</outputDirectory> <includeScope>runtime</includeScope> <!-- excludes test jars; see http://jira.codehaus.org/browse/MDEP-128 --> <excludeScope>provided</excludeScope> </configuration> </execution> </executions> </plugin> * mvn -Dhadoop.profile=2 clean package -pl flume-ng-sinks/flume-ng-morphline-solr-sink * find flume-ng-sinks/flume-ng-morphline-solr-sink/target -name '*.jar' * copy the jars printed out by the above find command into the flume lib dir
see : https://groups.google.com/a/cloudera.org/forum/#!msg/cdk-dev/7T4pTebdWN4/sBHGkoS70LkJ
---------
solr server's version is 4.10.1
error
solution:
find solr's lib
ls /home/tomcat/solr/WEB-INF/lib/ antlr-runtime-3.5.jar hadoop-hdfs-2.2.0.jar lexicon lucene-suggest-4.10.1.jar asm-4.1.jar hppc-0.5.2.jar log4j-1.2.16.jar mahout-collections-1.0.jar asm-commons-4.1.jar httpclient-4.3.1.jar lucene-analyzers-common-4.10.1.jar mahout-math-0.6.jar attributes-binder-1.2.2.jar httpcore-4.3.jar lucene-analyzers-kuromoji-4.10.1.jar noggit-0.5.jar carrot2-core-3.10.0-SNAPSHOT.jar httpmime-4.3.1.jar lucene-analyzers-phonetic-4.10.1.jar org.restlet-2.1.1.jar commons-cli-1.2.jar inok-solr-dataimportscheduler-1.1.jar lucene-analyzers-smartcn-4.10.1.jar org.restlet.ext.servlet-2.1.1.jar commons-codec-1.9.jar jackson-core-asl-1.9.13.jar lucene-codecs-4.10.1.jar protobuf-java-2.5.0.jar commons-configuration-1.6.jar jackson-mapper-asl-1.9.13.jar lucene-core-4.10.1.jar simple-xml-2.7.jar commons-fileupload-1.2.1.jar jcl-over-slf4j-1.6.6.jar lucene-expressions-4.10.1.jar slf4j-api-1.6.6.jar commons-io-2.3.jar jcseg-analyzer-1.9.5.jar lucene-grouping-4.10.1.jar slf4j-log4j12-1.6.6.jar commons-lang-2.6.jar jcseg-core-1.9.5.jar lucene-highlighter-4.10.1.jar solr-core-4.10.1.jar concurrentlinkedhashmap-lru-1.2.jar jcseg-core-1.9.5.jar.old lucene-join-4.10.1.jar solr-solrj-4.10.1.jar dom4j-1.6.1.jar jcseg.properties lucene-memory-4.10.1.jar spatial4j-0.4.1.jar guava-14.0.1.jar jcseg-solr-1.9.5.jar lucene-misc-4.10.1.jar wstx-asl-3.2.7.jar hadoop-annotations-2.2.0.jar jcseg-solr-1.9.5.jar.old lucene-queries-4.10.1.jar zookeeper-3.4.6.jar hadoop-auth-2.2.0.jar joda-time-2.2.jar lucene-queryparser-4.10.1.jar hadoop-common-2.2.0.jar jul-to-slf4j-1.6.6.jar lucene-spatial-4.10.1.jar
#cp /home/tomcat/solr/WEB-INF/lib/lucene-*-4.10.1.jar /root/aflume/lib/
#cp /home/tomcat/solr/WEB-INF/lib/solr-*-4.10.1.jar /root/aflume/lib/
#cp /home/tomcat/solr/WEB-INF/lib/http*-4.3.1.jar /root/aflume/lib/
delete old related jars in /root/aflume/lib/ such as lucene-*-4.3.0.jar solr-*-4.3.0.jar http*-4.2.1.jar
-------
when run
#flume-ng agent -c ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n a1
flume/conf/flume.conf
# Define a memory channel called ch1 on agent1 a1.channels.ch1.type = memory # Define an Avro source called avro-source1 on agent1 and tell it # to bind to 0.0.0.0:41414. Connect it to channel ch1. a1.sources.mysqlbinlog.channels = ch1 a1.sources.mysqlbinlog.type = com.inoknok.mysqlbinlog.source.InokMysqlBinlogSource a1.sources.mysqlbinlog.user=zhaohj a1.sources.mysqlbinlog.password=zhaohj111 a1.sources.mysqlbinlog.host=192.168.0.135 a1.sources.mysqlbinlog.port=3306 a1.sources.mysqlbinlog.serverId=2 a1.sources.mysqlbinlog.autoReconnect=false # Define a logger sink that simply logs all events it receives # and connect it to the other end of the same channel. a1.sinks.k1.channel = ch1 a1.sinks.k1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink a1.sinks.k1.morphlineFile = /root/aflume/conf/morphline.conf # Finally, now that we've defined all of our components, tell # agent1 which ones we want to activate. a1.channels = ch1 a1.sources = mysqlbinlog a1.sinks = k1
flume/conf/morphile.conf
morphlines : [ { id : morphline1 importCommands : ["org.kitesdk.**", "com.cloudera.**", "org.apache.solr.**"] commands : [ { readClob { charset : UTF-8 } } { split { inputField : message outputFields : ["", "", "",user_id, username, age] separator : "\u0001" isRegex : false addEmptyStrings : false trim : true } } { sanitizeUnknownSolrFields { solrLocator : { solrUrl : "192.168.10.204:8983" solrHomeDir : "/home/tomcat/solrHome/test.inok_user" #collection : "@${tablename}" #zkHost: "192.168.10.204:8983" } } } { logDebug { format : "xxxxxxxxx xxxxxx My output record: {}" args : ["@{}"] } } # load the record into a Solr server or MapReduce Reducer { loadSolr { solrLocator : { solrUrl : "192.168.10.204:8983" solrHomeDir : "/home/tomcat/solrHome/test.inok_user" #collection : "@${tablename}" # Name of solr collection #zkHost : "192.168.10.204:8983" # ZooKeeper ensemble } } } ] } ]
error:
Solution:
solrUrl : "http://192.168.10.204:8983/test.inok_user" solrHomeDir : "/home/tomcat/solrHome/test.inok_user"
----------
Morphline - Choose collection for loadSolr at run time
https://groups.google.com/a/cloudera.org/forum/#!topic/search-user/z9A_Xe5FviM
http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html#Implementing_your_own_Custom_Command
Use case: I want to dynaic set solrLocator at run time, but faild. The replacement way is to use tryRule command, but the solrLocator must be setted hardcode.
morphlines : [ { id : morphline1 importCommands : ["org.kitesdk.**", "com.cloudera.**", "org.apache.solr.**"] commands : [ { readClob { charset : UTF-8 } } { tryRules { catchExceptions : false throwExceptionIfAllRulesFailed : true rules : [ { //test.inok_user commands : [ {contains {tablename : [test.inok_user]} } {logDebug { format : "YYYYYYYYY 1111 My output record: {}, talbename={}",args : ["@{}","@{tablename}"]} } {split {inputField : message, outputFields : [updatetime,"","",user_id, username, age],separator : "\u0001", isRegex : false,addEmptyStrings : false, trim : true} } {convertTimestamp { field : updatetime inputFormats : ["unixTimeInMillis"] inputTimezone : UTC outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" outputTimezone : PRC } } {sanitizeUnknownSolrFields { solrLocator : { solrUrl :"http://192.168.10.204:8983/test.inok_user/" solrHomeDir : "/home/tomcat/solrHome/test.inok_user" } }} {logDebug { format : "xxxxxxxxx 1111 My output record: {}, talbename={}",args : ["@{}","@{tablename}"]} } # load the record into a Solr server or MapReduce Reducer {loadSolr { solrLocator : { solrUrl : "http://192.168.10.204:8983/test.inok_user" solrHomeDir : "/home/tomcat/solrHome/test.inok_user" batchSize : 1 } } } ] }//end rule1 #test.inoktest { //test.inoktest commands : [ {contains {tablename : [test.inoktest]} } {logDebug {format : "YYYYYYYYY 2222 My output record: {}, talbename={}",args : ["@{}","@{tablename}"] } } {split {inputField : message, outputFields : [updatetime,"","",id, content],separator : "\u0001", isRegex : false,addEmptyStrings : false, trim : true} } {convertTimestamp { field : updatetime inputFormats : ["unixTimeInMillis"] inputTimezone : UTC outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" outputTimezone : PRC } } {sanitizeUnknownSolrFields {solrLocator : { solrUrl :"http://192.168.10.204:8983/test.inoktest/" solrHomeDir : "/home/tomcat/solrHome/test.inoktest" } }} {logDebug { format : "xxxxxxxxx 2222 My output record: {}, talbename={}",args : ["@{}","@{tablename}"]} } # load the record into a Solr server or MapReduce Reducer {loadSolr { solrLocator : { solrUrl : "http://192.168.10.204:8983/test.inoktest" solrHomeDir : "/home/tomcat/solrHome/test.inoktest" batchSize : 1 } } } ] }//end rule2 ] }//end rules } ] } ]
--------------------
when I configure avro source in agent a while an avro sink in agent b in different hosts.
flume.conf in host a
# Define a memory channel called ch1 on agent1 a1.channels.ch1.type = memory # Define an Avro source called avro-source1 on agent1 and tell it # to bind to 0.0.0.0:41414. Connect it to channel ch1. a1.sources.mysqlbinlog.channels = ch1 a1.sources.mysqlbinlog.type = com.inoknok.mysqlbinlog.source.InokMysqlBinlogSource a1.sources.mysqlbinlog.user=zhaohj a1.sources.mysqlbinlog.password=zhaohj111 a1.sources.mysqlbinlog.host=192.168.0.135 a1.sources.mysqlbinlog.port=3306 a1.sources.mysqlbinlog.serverId=2 a1.sources.mysqlbinlog.autoReconnect=false #sink to solr a1.sinks.k1.channel = ch1 a1.sinks.k1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink a1.sinks.k1.morphlineFile = /root/aflume/conf/morphline.conf #sink to avro a1.sinks.k3.type = avro a1.sinks.k3.channel = ch1 a1.sinks.k3.hostname = 192.168.0.135 a1.sinks.k3.port = 4545 # Finally, now that we've defined all of our components, tell # agent1 which ones we want to activate. a1.channels = ch1 a1.sources = mysqlbinlog a1.sinks = k3
flume.conf in host b
# Define a memory channel called ch1 on agent1 a1.channels.ch1.type = memory #source avro a1.sources.s1.channels = ch1 a1.sources.s1.type = avro a1.sources.s1.bind = 0.0.0.0 a1.sources.s1.port = 4545 #sink to hdfs a1.sinks.k1.channel = ch1 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /user/flume/mysqlbinlog/%{tablename} a1.sinks.k1.hdfs.filePrefix = %{tablename}- a1.sinks.k1.hdfs.round = false a1.sinks.k1.hdfs.rollInterval = 0 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.rollSize = 0 a1.sinks.k1.hdfs.batchSize = 100 a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.useLocalTimeStamp = true # Finally, now that we've defined all of our components, tell # agent1 which ones we want to activate. a1.channels = ch1 a1.sources = s1 a1.sinks = k1
There is a error
--------------------
1. install flume in the same machine with solr
2. add zookeeper-3.4.6.jar to flume/lib dir
3. put plugin in plugins.d dir
plugins.d/ └── mysqlbinlog ├── lib │ └── mysqlbinlog-sources.jar └── libext └── open-replicator-1.0.7.jar
4. set mysql to master/salve replication set and start up master and slave mysql daemons
5. start up solr
6. test by use command
flume-ng agent -c ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n a1
References
http://flume.apache.org/FlumeUserGuide.html#morphlinesolrsink
http://cloudera.github.io/cdk/docs/current/cdk-morphlines/index.html
http://cloudera.github.io/cdk/docs/current/cdk-morphlines/morphlinesReferenceGuide.html
http://www.cloudera.com/content/cloudera/en/documentation/core/latest/topics/search_flume_morphline_solr_sink_config_options.html
http://www.slideshare.net/cloudera/using-morphlines-for-onthefly-etl
http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html#Implementing_your_own_Custom_Command
http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html
相关推荐
Lakhe proceeds to cover the selection criteria for ETL tools, the implementation steps for migration with SQOOP- and Flume-based data transfers, and transition optimization techniques for tuning ...
ETL采集hadoop数据,ETL采集hadoop数据,ETL采集hadoop数据,ETL采集hadoop数据,ETL采集hadoop数据,ETL采集hadoop数据,ETL采集hadoop数据,ETL采集hadoop数据,
Delivers real world solutions for the most time and labor intensive portion of data warehousing data staging or the extract transform load ETL process Delineates best practices for extracting data ...
Lakhe proceeds to cover the selection criteria for ETL tools, the implementation steps for migration with SQOOP- and Flume-based data transfers, and transition optimization techniques for tuning ...
基于Hadoop的ETL系统的设计与实现_______.caj
kettle 工具抽取数据到hadoop hive,hbase,使用kettle抽取案例
基于Hadoop的ETL大数据处理流程,配置方便,完全Shell编程实现,支持Bash Shell环境的Linux系统。
data staging, or the extract, transform, load (ETL) process Delineates best practices for extracting data from scattered sources, removing redundant and inaccurate data, transforming the remaining ...
· Get an in-depth view of the Apache Hadoop ecosystem and an overview of the architectural patterns pertaining to the popular Big Data platform · Conquer different data processing and analytics ...
《Data warehouse ETL Toolkit》的中文版,中文名为数据仓库ETL工具箱, 一本介绍数据仓库ETL设计与开发的经典书籍,是Kimball数据仓库序列之作中的一本,其它两本为维度建模指南和数据仓库生命周期。
基于Hadoop的ETL大数据处理流程,配置方便,完全Shell编程实现,支持Bash Shell环境的Linux系统。
本人花了半个月整理的The Data Warehouse ETL Toolkit中文版,详细介绍了ETL的各个环节,对于做ETL工作有很大帮助。
数据仓库ETL工具箱 Data Warehouse ETL Toolkit
The best book of ETL
个人认为是一本好的ETL的好书
Hadoop与ETL技术在视频数据中的应用,Hadoop与ETL技术在视频数据中的应用
数据仓库ETL工具箱(data warehouse ETL tool kits),翻译的很一般
数据仓库ETL工具箱 Data Warehouse ETL Toolkit.rar
#资源达人分享计划#
文档里有针对Talend Open Studio For Data integration (ETL) 的高清视频教程下载地址。有需要的人可以自行下载。