`

FlumeNG 学习结合log4j使用

    博客分类:
  • web
阅读更多

1.有jdk6以上环境

2.安装flumeNG

wget http://mirrors.cnnic.cn/apache/flume/stable/apache-flume-1.5.0-bin.tar.gz
tar zxvf apache-flume-1.5.0-bin.tar.gz

 

3.启动 

[conf/example.conf]

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = x.x.x.x
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

 

 

bin/flume-ng agent -c conf -f conf/example.conf  -n a1 -Dflume.root.logger=INFO,console
 

 

 

4. 其它配置

 [conf/avrobase.conf]

 

# Name the components on this agent
a1.sources = r2 r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r2.type = avro
a1.sources.r2.bind = x.x.x.x
a1.sources.r2.port = 44443

a1.sources.r1.type = netcat
a1.sources.r1.bind = x.x.x.x
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sinks.k1.channel = c1

 

 [conf/fileavro.conf] 

 

# Name the components on this agent
a1.sources = r2 r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r2.type = avro
a1.sources.r2.bind = x.x.x.x
a1.sources.r2.port = 44443

a1.sources.r1.type = netcat
a1.sources.r1.bind = x.x.x.x
a1.sources.r1.port = 44444

# Describe the sink
#a1.sinks.k1.type = logger

a1.sinks.k1.type=file_roll
a1.sinks.k1.sink.directory=/home/yxt/soft/flume/test
a1.sinks.k1.sink.rollInterval=0

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sinks.k1.channel = c1
 

 

 5.log4j配置

由于此日志是集群环境使用,所以加上了日志所在机器ip,方便定位问题

log4j.rootLogger=INFO,flume,stdout

log4j.appender.flume=org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname=x.x.x.x
log4j.appender.flume.Port=44443
log4j.appender.flume.encoding=UTF8
log4j.appender.flume.layout=org.apache.log4j.PatternLayout
log4j.appender.flume.layout.ConversionPattern=[%X{ip}][%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} [%C : %M : %L]:%m%n

 

 6. 代码[转]&[加]

 

[pom.xml]

  	<dependency>
  		<groupId>org.apache.flume.flume-ng-clients</groupId>
  		<artifactId>flume-ng-log4jappender</artifactId>
  		<version>1.5.0</version>
  		<type>jar</type>
  		<scope>compile</scope>
  	</dependency>

 

[LogTestApp.java]

package flume.log4j.test;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.Charset;

import org.apache.log4j.Logger;
import org.apache.log4j.MDC;

public class LogTestApp {
	static{
		MDC.put("ip", "test127.0.0.1");
	}

  public static void main(String[] args) throws IOException {
    Logger logger = Logger.getLogger(LogTestApp.class);
    BufferedReader in = new BufferedReader(
        new InputStreamReader(System.in, Charset.forName("UTF-8")));
    String line;

    System.out.println("Initializing Flume log4j appender test.");
    System.out.println("Each line entered will be sent to Flume.");
    // send this line to Flume
    logger.info("LogTestApp initialized");

    while ((line = in.readLine()) != null) {
      System.out.println("Sending to log4j: " + line);
      logger.info("ok国这e"+line);
    }
  }
}

 

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics