`
功夫小当家
  • 浏览: 183722 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

flume ExecSource 支持获取获取指定列数据

阅读更多

需求描述:

    flume使用 execSource 类型 实现截取数据行中指定列的数据(详见下图)

    

 

实现:

1.方案一: execSource接受的是linux命令,所以可以使用linux awk实现这个功能

   命令:tail -F /root/test.log | awk -F ',' '{print $2;fflush()}'   注意:fflush()一定要加,否则不输出

  

    完整的flume-exec.properties文件内容如下:

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.


# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called 'agent'


hd1.sources=s1
hd1.sources.s1.type=exec
hd1.sources.s1.shell=/bin/bash -c
hd1.sources.s1.command=tail -F /root/test.log | awk -F ',' '{print $2;fflush()}'

hd1.channels=c1
hd1.channels.c1.type=memory
hd1.channels.c1.capacity=1000
hd1.channels.c1.transcationCapacity=100

hd1.sinks=sk1
hd1.sinks.sk1.type=logger

#把source 和 sink 关联到channel上
#1个source可以对应多个channel(重点)
hd1.sources.s1.channels=c1

#一个sink只对应1个sink(重点)
hd1.sinks.sk1.channel=c1

 

2.方案二:修改源码,扩展ExecSource

(1) 具体怎么改?

看ExecSource.java 的源码,可以知道 ExecSource是通过BufferedReader,读取InputStream,然后把读取出来的每行内容包装成event,发往channel,所以我们可以在包装成event之前,把内容替换成我们需要的


 

(2)具体实现:

    修改ExecSource.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.flume.source;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flume.*;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.SourceCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/**
 * <p>
 * A {@link Source} implementation that executes a Unix process and turns each
 * line of text into an event.
 * </p>
 * <p>
 * This source runs a given Unix command on start up and expects that process to
 * continuously produce data on standard out (stderr ignored by default). Unless
 * told to restart, if the process exits for any reason, the source also exits and
 * will produce no further data. This means configurations such as <tt>cat [named pipe]</tt>
 * or <tt>tail -F [file]</tt> are going to produce the desired results where as
 * <tt>date</tt> will probably not - the former two commands produce streams of
 * data where as the latter produces a single event and exits.
 * </p>
 * <p>
 * The <tt>ExecSource</tt> is meant for situations where one must integrate with
 * existing systems without modifying code. It is a compatibility gateway built
 * to allow simple, stop-gap integration and doesn't necessarily offer all of
 * the benefits or guarantees of native integration with Flume. If one has the
 * option of using the <tt>AvroSource</tt>, for instance, that would be greatly
 * preferred to this source as it (and similarly implemented sources) can
 * maintain the transactional guarantees that exec can not.
 * </p>
 * <p>
 * <i>Why doesn't <tt>ExecSource</tt> offer transactional guarantees?</i>
 * </p>
 * <p>
 * The problem with <tt>ExecSource</tt> and other asynchronous sources is that
 * the source can not guarantee that if there is a failure to put the event into
 * the {@link Channel} the client knows about it. As a for instance, one of the
 * most commonly requested features is the <tt>tail -F [file]</tt>-like use case
 * where an application writes to a log file on disk and Flume tails the file,
 * sending each line as an event. While this is possible, there's an obvious
 * problem; what happens if the channel fills up and Flume can't send an event?
 * Flume has no way of indicating to the application writing the log file that
 * it needs to retain the log or that the event hasn't been sent, for some
 * reason. If this doesn't make sense, you need only know this: <b>Your
 * application can never guarantee data has been received when using a
 * unidirectional asynchronous interface such as ExecSource!</b> As an extension
 * of this warning - and to be completely clear - there is absolutely zero
 * guarantee of event delivery when using this source. You have been warned.
 * </p>
 * <p>
 * <b>Configuration options</b>
 * </p>
 * <table>
 * <tr>
 * <th>Parameter</th>
 * <th>Description</th>
 * <th>Unit / Type</th>
 * <th>Default</th>
 * </tr>
 * <tr>
 * <td><tt>command</tt></td>
 * <td>The command to execute</td>
 * <td>String</td>
 * <td>none (required)</td>
 * </tr>
 * <tr>
 * <td><tt>restart</tt></td>
 * <td>Whether to restart the command when it exits</td>
 * <td>Boolean</td>
 * <td>false</td>
 * </tr>
 * <tr>
 * <td><tt>restartThrottle</tt></td>
 * <td>How long in milliseconds to wait before restarting the command</td>
 * <td>Long</td>
 * <td>10000</td>
 * </tr>
 * <tr>
 * <td><tt>logStderr</tt></td>
 * <td>Whether to log or discard the standard error stream of the command</td>
 * <td>Boolean</td>
 * <td>false</td>
 * </tr>
 * <tr>
 * <td><tt>batchSize</tt></td>
 * <td>The number of events to commit to channel at a time.</td>
 * <td>integer</td>
 * <td>20</td>
 * </tr>
 * <tr>
 * <td><tt>batchTimeout</tt></td>
 * <td>Amount of time (in milliseconds) to wait, if the buffer size was not reached,
 * before data is pushed downstream.</td>
 * <td>long</td>
 * <td>3000</td>
 * </tr>
 * </table>
 * <p>
 * <b>Metrics</b>
 * </p>
 * <p>
 * TODO
 * </p>
 */
public class ExecSource extends AbstractSource implements EventDrivenSource, Configurable {

    private static final Logger logger = LoggerFactory.getLogger(ExecSource.class);

    private String shell;

    private String command;

    private SourceCounter sourceCounter;

    private ExecutorService executor;

    private Future<?> runnerFuture;

    private long restartThrottle;

    private boolean restart;

    private boolean logStderr;

    private Integer bufferCount;

    private long batchTimeout;

    private ExecRunnable runner;

    private Charset charset;

    //开关,是否做split
    private boolean customSplitSwitchOn;

    //split的分隔符
    private String customSplitDelimiter;

    //split后获取的列
    private Integer customFetchColId;

    @Override
    public void start() {
        logger.info("Exec source starting with command: {}", command);

        // Start the counter before starting any threads that may access it.
        sourceCounter.start();

        executor = Executors.newSingleThreadExecutor();

        //把自定义的三个参数,传给构造函数
        runner = new ExecRunnable(shell, command, getChannelProcessor(), sourceCounter, restart,
                restartThrottle, logStderr, bufferCount, batchTimeout, charset,
                customSplitSwitchOn, customSplitDelimiter, customFetchColId);

        // Start the runner thread.
        runnerFuture = executor.submit(runner);

        // Mark the Source as RUNNING.
        super.start();

        logger.debug("Exec source started");
    }

    @Override
    public void stop() {
        logger.info("Stopping exec source with command: {}", command);
        if (runner != null) {
            runner.setRestart(false);
            runner.kill();
        }

        if (runnerFuture != null) {
            logger.debug("Stopping exec runner");
            runnerFuture.cancel(true);
            logger.debug("Exec runner stopped");
        }
        executor.shutdown();

        while (!executor.isTerminated()) {
            logger.debug("Waiting for exec executor service to stop");
            try {
                executor.awaitTermination(500, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                logger.debug("Interrupted while waiting for exec executor service "
                        + "to stop. Just exiting.");
                Thread.currentThread().interrupt();
            }
        }

        sourceCounter.stop();
        super.stop();

        logger.debug("Exec source with command:{} stopped. Metrics:{}", command,
                sourceCounter);
    }

    @Override
    public void configure(Context context) {
        command = context.getString("command");

        Preconditions.checkState(command != null,
                "The parameter command must be specified");

        restartThrottle = context.getLong(ExecSourceConfigurationConstants.CONFIG_RESTART_THROTTLE,
                ExecSourceConfigurationConstants.DEFAULT_RESTART_THROTTLE);

        restart = context.getBoolean(ExecSourceConfigurationConstants.CONFIG_RESTART,
                ExecSourceConfigurationConstants.DEFAULT_RESTART);

        logStderr = context.getBoolean(ExecSourceConfigurationConstants.CONFIG_LOG_STDERR,
                ExecSourceConfigurationConstants.DEFAULT_LOG_STDERR);

        bufferCount = context.getInteger(ExecSourceConfigurationConstants.CONFIG_BATCH_SIZE,
                ExecSourceConfigurationConstants.DEFAULT_BATCH_SIZE);

        batchTimeout = context.getLong(ExecSourceConfigurationConstants.CONFIG_BATCH_TIME_OUT,
                ExecSourceConfigurationConstants.DEFAULT_BATCH_TIME_OUT);

        charset = Charset.forName(context.getString(ExecSourceConfigurationConstants.CHARSET,
                ExecSourceConfigurationConstants.DEFAULT_CHARSET));

        shell = context.getString(ExecSourceConfigurationConstants.CONFIG_SHELL, null);


        // 获取split开关配置值
        customSplitSwitchOn = context
                .getBoolean(ExecSourceConfigurationConstants.CUSTOM_SPLIT_SWITCH_ON,
                        ExecSourceConfigurationConstants.DEFAULT_CUSTON_SWITCH_ON);
        //获取分隔符配置值
        customSplitDelimiter = context
                .getString(ExecSourceConfigurationConstants.CUSTOM_SPLIT_DELIMITER,
                        ExecSourceConfigurationConstants.DEFAULT_CUSTOM_SPLIT_DELIMITER);

        //获取split后的列
        customFetchColId = context.getInteger(ExecSourceConfigurationConstants.CUSTOM_FETCH_COL,
                ExecSourceConfigurationConstants.DEFAULT_CUSTOM_FETCH_COL_ID);

        if (sourceCounter == null) {
            sourceCounter = new SourceCounter(getName());
        }
    }

    private static class ExecRunnable implements Runnable {

        //构造函数加入三个参数
        public ExecRunnable(String shell, String command, ChannelProcessor channelProcessor,
                SourceCounter sourceCounter, boolean restart, long restartThrottle,
                boolean logStderr, int bufferCount, long batchTimeout, Charset charset,
                boolean customSplitSwitchOn, String customSplitDelimiter,
                Integer customFetchColId) {
            this.command = command;
            this.channelProcessor = channelProcessor;
            this.sourceCounter = sourceCounter;
            this.restartThrottle = restartThrottle;
            this.bufferCount = bufferCount;
            this.batchTimeout = batchTimeout;
            this.restart = restart;
            this.logStderr = logStderr;
            this.charset = charset;
            this.shell = shell;

            //custom属性
            this.customSplitSwitchOn = customSplitSwitchOn;
            this.customSplitDelimiter = customSplitDelimiter;
            this.customFetchColId = customFetchColId;
        }

        private final String shell;

        private final String command;

        private final ChannelProcessor channelProcessor;

        private final SourceCounter sourceCounter;

        private volatile boolean restart;

        private final long restartThrottle;

        private final int bufferCount;

        private long batchTimeout;

        private final boolean logStderr;

        private final Charset charset;

        //split的分隔符
        private String customSplitDelimiter;

        //开关(是否允许做split)
        private boolean customSplitSwitchOn;

        //split后需要获取的列id
        private int customFetchColId;

        private Process process = null;

        private SystemClock systemClock = new SystemClock();

        private Long lastPushToChannel = systemClock.currentTimeMillis();

        ScheduledExecutorService timedFlushService;

        ScheduledFuture<?> future;

        @Override
        public void run() {
            do {
                String exitCode = "unknown";
                BufferedReader reader = null;
                String line = null;
                final List<Event> eventList = new ArrayList<Event>();

                timedFlushService = Executors.newSingleThreadScheduledExecutor(
                        new ThreadFactoryBuilder().setNameFormat(
                                "timedFlushExecService" +
                                        Thread.currentThread().getId() + "-%d").build());
                try {
                    if (shell != null) {
                        String[] commandArgs = formulateShellCommand(shell, command);
                        process = Runtime.getRuntime().exec(commandArgs);
                    } else {
                        String[] commandArgs = command.split("\\s+");
                        process = new ProcessBuilder(commandArgs).start();
                    }
                    reader = new BufferedReader(
                            new InputStreamReader(process.getInputStream(), charset));

                    // StderrLogger dies as soon as the input stream is invalid
                    StderrReader stderrReader = new StderrReader(new BufferedReader(
                            new InputStreamReader(process.getErrorStream(), charset)), logStderr);
                    stderrReader.setName("StderrReader-[" + command + "]");
                    stderrReader.setDaemon(true);
                    stderrReader.start();

                    future = timedFlushService.scheduleWithFixedDelay(new Runnable() {
                                                                          @Override
                                                                          public void run() {
                                                                              try {
                                                                                  synchronized (eventList) {
                                                                                      if (!eventList.isEmpty() && timeout()) {
                                                                                          flushEventBatch(eventList);
                                                                                      }
                                                                                  }
                                                                              } catch (Exception e) {
                                                                                  logger.error("Exception occurred when processing event batch", e);
                                                                                  if (e instanceof InterruptedException) {
                                                                                      Thread.currentThread().interrupt();
                                                                                  }
                                                                              }
                                                                          }
                                                                      },
                            batchTimeout, batchTimeout, TimeUnit.MILLISECONDS);

                    String splits[];
                    while ((line = reader.readLine()) != null) {
                        sourceCounter.incrementEventReceivedCount();
                        synchronized (eventList) {
                            //如果开启了split开关,那么将根据指定的分割符做split,并返回指定列的内容
                            if (customSplitSwitchOn) {
                                try {
                                    splits = line.split(customSplitDelimiter);
                                    if (splits.length > customFetchColId) {
                                        line = splits[customFetchColId];
                                    } else {
                                        logger.error("customColId is larger than " + splits.length);
                                        continue;
                                    }
                                } catch (Exception e) {
                                    logger.error("Failed while split line: ", e);
                                    continue;
                                }
                            }

                            eventList.add(EventBuilder.withBody(line.getBytes(charset)));
                            if (eventList.size() >= bufferCount || timeout()) {
                                flushEventBatch(eventList);
                            }
                        }
                    }

                    synchronized (eventList) {
                        if (!eventList.isEmpty()) {
                            flushEventBatch(eventList);
                        }
                    }
                } catch (Exception e) {
                    logger.error("Failed while running command: " + command, e);
                    if (e instanceof InterruptedException) {
                        Thread.currentThread().interrupt();
                    }
                } finally {
                    if (reader != null) {
                        try {
                            reader.close();
                        } catch (IOException ex) {
                            logger.error("Failed to close reader for exec source", ex);
                        }
                    }
                    exitCode = String.valueOf(kill());
                }
                if (restart) {
                    logger.info("Restarting in {}ms, exit code {}", restartThrottle,
                            exitCode);
                    try {
                        Thread.sleep(restartThrottle);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                } else {
                    logger.info("Command [" + command + "] exited with " + exitCode);
                }
            } while (restart);
        }

        private void flushEventBatch(List<Event> eventList) {
            channelProcessor.processEventBatch(eventList);
            sourceCounter.addToEventAcceptedCount(eventList.size());
            eventList.clear();
            lastPushToChannel = systemClock.currentTimeMillis();
        }

        private boolean timeout() {
            return (systemClock.currentTimeMillis() - lastPushToChannel) >= batchTimeout;
        }

        private static String[] formulateShellCommand(String shell, String command) {
            String[] shellArgs = shell.split("\\s+");
            String[] result = new String[shellArgs.length + 1];
            System.arraycopy(shellArgs, 0, result, 0, shellArgs.length);
            result[shellArgs.length] = command;
            return result;
        }

        public int kill() {
            if (process != null) {
                synchronized (process) {
                    process.destroy();

                    try {
                        int exitValue = process.waitFor();

                        // Stop the Thread that flushes periodically
                        if (future != null) {
                            future.cancel(true);
                        }

                        if (timedFlushService != null) {
                            timedFlushService.shutdown();
                            while (!timedFlushService.isTerminated()) {
                                try {
                                    timedFlushService.awaitTermination(500, TimeUnit.MILLISECONDS);
                                } catch (InterruptedException e) {
                                    logger.debug(
                                            "Interrupted while waiting for exec executor service "
                                                    + "to stop. Just exiting.");
                                    Thread.currentThread().interrupt();
                                }
                            }
                        }
                        return exitValue;
                    } catch (InterruptedException ex) {
                        Thread.currentThread().interrupt();
                    }
                }
                return Integer.MIN_VALUE;
            }
            return Integer.MIN_VALUE / 2;
        }

        public void setRestart(boolean restart) {
            this.restart = restart;
        }
    }

    private static class StderrReader extends Thread {
        private BufferedReader input;

        private boolean logStderr;

        protected StderrReader(BufferedReader input, boolean logStderr) {
            this.input = input;
            this.logStderr = logStderr;
        }

        @Override
        public void run() {
            try {
                int i = 0;
                String line = null;
                while ((line = input.readLine()) != null) {
                    if (logStderr) {
                        // There is no need to read 'line' with a charset
                        // as we do not to propagate it.
                        // It is in UTF-16 and would be printed in UTF-8 format.
                        logger.info("StderrLogger[{}] = '{}'", ++i, line);
                    }
                }
            } catch (IOException e) {
                logger.info("StderrLogger exiting", e);
            } finally {
                try {
                    if (input != null) {
                        input.close();
                    }
                } catch (IOException ex) {
                    logger.error("Failed to close stderr reader for exec source", ex);
                }
            }
        }
    }
}

 

 

   修改ExecSourceConfigurationConstants

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.flume.source;

import com.google.common.annotations.VisibleForTesting;

public class ExecSourceConfigurationConstants {

    /**
     * Should the exec'ed command restarted if it dies: : default false
     */
    public static final String CONFIG_RESTART = "restart";

    public static final boolean DEFAULT_RESTART = false;

    /**
     * Amount of time to wait before attempting a restart: : default 10000 ms
     */
    public static final String CONFIG_RESTART_THROTTLE = "restartThrottle";

    public static final long DEFAULT_RESTART_THROTTLE = 10000L;

    /**
     * Should stderr from the command be logged: default false
     */
    public static final String CONFIG_LOG_STDERR = "logStdErr";

    public static final boolean DEFAULT_LOG_STDERR = false;

    /**
     * Number of lines to read at a time
     */
    public static final String CONFIG_BATCH_SIZE = "batchSize";

    public static final int DEFAULT_BATCH_SIZE = 20;

    /**
     * Amount of time to wait, if the buffer size was not reached, before
     * to data is pushed downstream: : default 3000 ms
     */
    public static final String CONFIG_BATCH_TIME_OUT = "batchTimeout";

    public static final long DEFAULT_BATCH_TIME_OUT = 3000L;

    /**
     * Charset for reading input
     */
    public static final String CHARSET = "charset";

    public static final String DEFAULT_CHARSET = "UTF-8";

    /**
     * Optional shell/command processor used to run command
     */
    public static final String CONFIG_SHELL = "shell";

    /**
     * 自定义分隔符,默认使用逗号分割
     */
    public static final String CUSTOM_SPLIT_DELIMITER = "customSplitDelimiter";

    public static final String DEFAULT_CUSTOM_SPLIT_DELIMITER = ",";

    /**
     * split的开关,默认关闭
     */
    public static final String CUSTOM_SPLIT_SWITCH_ON = "customSwitchOn";

    public static final boolean DEFAULT_CUSTON_SWITCH_ON = false;

    /**
     * split后获取哪一列,从0开始,同数组下标
     */
    public static final String CUSTOM_FETCH_COL = "customFetchCol";

    public static final int DEFAULT_CUSTOM_FETCH_COL_ID = 0;
}

   

 

    修改flume-custom.properties

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.


# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called 'agent'


hd1.sources=s1
hd1.sources.s1.type=exec
hd1.sources.s1.shell=/bin/bash -c

#方法一
#hd1.sources.s1.command=tail -F /root/test.log | awk -F ',' '{print $2;fflush()}'

#方法二,改源码
hd1.sources.s1.command=tail -F /root/test.log
hd1.sources.s1.customSwitchOn=true
hd1.sources.s1.customFetchCol=1

hd1.channels=c1
hd1.channels.c1.type=memory
hd1.channels.c1.capacity=1000
hd1.channels.c1.transcationCapacity=100

hd1.sinks=sk1
hd1.sinks.sk1.type=logger

#把source 和 sink 关联到channel上
#1个source可以对应多个channel(重点)
hd1.sources.s1.channels=c1

#一个sink只对应1个sink(重点)
hd1.sinks.sk1.channel=c1

 

    做完以上修改 ,重新打包 flume-ng-core模块,把打完的新包,替换掉服务器的apache-flume-1.8.0-bin/lib目录下的flume-ng-core-1.8.0.jar (这个包的版本号,根据你实际的版本去替换)即可,使用上面修改好的 flume-custom.properties启动flume测试即可


 

上一步改完源码重新打包,flume pom.xml中配置了代码风格检测的插件,需要注释掉这个插件,否则会因为编码风格不一致,打包报错


 

 

思考:

  • linux的一些常用命令还是要好好掌握,遇到问题慢慢分析,例如:fflush()
  • 不要局限在解决问题,要发散,多扩展思路
  • 源码其实并没那么难改,有时候限制我们的,可能是我们自己的耐心,细心等,例如:源码编译过程比较耗时,各种找不到包等,很可能让你望而却步了
  • 多看官网,多查阅,多和大牛交流

  

 

 

 

  • 大小: 18.4 KB
  • 大小: 94.8 KB
  • 大小: 205.2 KB
  • 大小: 52.3 KB
0
0
分享到:
评论

相关推荐

    flume-ftp-source 相关jar包

    由于flume官方并未提供ftp,source的支持; 因此想使用ftp文件服务器的资源作为数据的来源就需要自定义ftpsource,根据github:https://github.com/keedio/flume-ftp-source,提示下载相关jar,再此作为记录。

    Flume自定义source

    Flume自定义Source,数据不丢失,一致性,可以根据自己开发情况选择

    flume抽取数据库数据的source

    flume抽取数据库数据的源码,可以自动检测数据库的sql语句是否更新

    Flume配置文件kafkaSource

    Flume配置文件kafkaSource 包含Intercepter,包含正则表达式。

    flume自定义source,sink,intercepor的demo,已经配置好maven插件

    一个简单的工程,已经设置各种配置,直接只用maven打包好就可以在flume工程包新建plugins.d/custom/lib 目录,并拷贝到下面,并将工程里面的conf文件拷贝到flume的conf目录下启动命令 nohup flume-ng agent -n ...

    Flume 抽取MYSQL Oracle数据 JSON格式 推送Kafka

    Flume二次开发,支持抽取MYSQL Oracle数据库数据 以JSON格式推送至Kafka。 demo: sql_json.sources.sql_source.type = com.hbn.rdb.source.SQLSource sql_json.sources.sql_source.connectionurl = jdbc:oracle:...

    flume-ng-sql-source

    flume-ng从数据库抽取数据到kafka,支持按数据库中时间字段,准实时抽取实时数据。已经在oracle-kafka中长期测试可用

    flume-taildir-source-1.9.0.jar

    flume断点续传覆盖jar,使用组件flume-taildir-source-1.9.0覆盖flume/bin目录下的jar即可

    rocketmq_flume_project.zip

    flume项目根目录执行mvn clean install dependency:copy-dependencies进行源码编译,产生 rocketmq-flume-source-1.0.0.jar包,拷贝到flume的lib目录下,通过添加flume的source配置,即可实现 rocketmq中数据接入...

    flume包,用于数据的采集

    支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力 。Client:Client生产数据,运行在一个独立的线程。  Event: 一个...

    数据采集之Flume.xmind

    参照下图可以看得出Agent就是Flume的一个部署实例, 一个完整的Agent中包含了必须的三个组件Source、Channel和Sink,Source是指数据的来源和方式,Channel是一个数据的缓冲池,Sink定义了数据输出的方式和目的地(这...

    flume支持RabbitMQ插件

    flume支持RabbitMQ插件

    Flume配置文件kafkaSource Interceptor

    Flume配置文件kafkaSource Interceptor,包含获取数据中的关键词时间日期等信息

    Flume 数据采集实战

    Flume 是大数据组件中重要的数据采集工具,我们常利用 Flume 采集某个各种数据源的数据供其他组件分析使用。在日志分析业务中,我们常采集服务器日志,以分析服务器运行状态是否正常。在实时业务中,我们常将数据...

    flume数据采集端过滤工程

    flume进行数据采集,在采集端增加过滤

    flume-ftp-source-master.jar

    flum支持ftp工具jarflum支持ftp工具jar

    flume-ng-sql-source-1.5.1

    flume-ng-sql-source-1.5.1 flume连接数据库 很好用的工具

    flume-ng-sql-source-1.5.2.jar

    flume-ng-sql-source-1.5.2.jar从数据库中增量读取数据到hdfs中的jar包

    让你快速认识flume及安装和使用flume1 5传输数据 日志 到hadoop2 2 文档

    让你快速认识flume及安装和使用flume1 5传输数据 日志 到hadoop2 2 中文文档 认识 flume 1 flume 是什么 这里简单介绍一下 它是 Cloudera 的一个产品 2 flume 是干什么的 收集日志的 3 flume 如何搜集日志 我们把...

Global site tag (gtag.js) - Google Analytics