flume自带两种channelSelector。一种是MultiplexingChannelSelector,另一种是ReplicatingChannelSelector。
ReplicatingChannelSelector是将event发送到每个channel
public class ReplicatingChannelSelector extends AbstractChannelSelector { private final List<Channel> emptyList = Collections.emptyList(); @Override public List<Channel> getRequiredChannels(Event event) { return getAllChannels(); } @Override public List<Channel> getOptionalChannels(Event event) { return emptyList; } @Override public void configure(Context context) { // No configuration necessary } }
MultiplexingChannelSelector按照header的配置将event发送到相应的channel
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.flume.channel; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.FlumeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MultiplexingChannelSelector extends AbstractChannelSelector { public static final String CONFIG_MULTIPLEX_HEADER_NAME = "header"; public static final String DEFAULT_MULTIPLEX_HEADER = "flume.selector.header"; public static final String CONFIG_PREFIX_MAPPING = "mapping."; public static final String CONFIG_DEFAULT_CHANNEL = "default"; @SuppressWarnings("unused") private static final Logger LOG = LoggerFactory .getLogger(MultiplexingChannelSelector.class); private static final List<Channel> EMPTY_LIST = Collections.emptyList(); private String headerName; private Map<String, List<Channel>> channelMapping; private List<Channel> defaultChannels; @Override public List<Channel> getRequiredChannels(Event event) { String headerValue = event.getHeaders().get(headerName); if (headerValue == null || headerValue.trim().length() == 0) { return defaultChannels; } List<Channel> channels = channelMapping.get(headerValue); //This header value does not point to anything //Return default channel(s) here. if (channels == null) { channels = defaultChannels; } return channels; } @Override public List<Channel> getOptionalChannels(Event event) { return EMPTY_LIST; } @Override public void configure(Context context) { this.headerName = context.getString(CONFIG_MULTIPLEX_HEADER_NAME, DEFAULT_MULTIPLEX_HEADER); Map<String, Channel> channelNameMap = new HashMap<String, Channel>(); for (Channel ch : getAllChannels()) { channelNameMap.put(ch.getName(), ch); } defaultChannels = getChannelListFromNames( context.getString(CONFIG_DEFAULT_CHANNEL), channelNameMap); if(defaultChannels.isEmpty()){ throw new FlumeException("Default channel list empty"); } Map<String, String> mapConfig = context.getSubProperties(CONFIG_PREFIX_MAPPING); channelMapping = new HashMap<String, List<Channel>>(); for (String headerValue : mapConfig.keySet()) { List<Channel> configuredChannels = getChannelListFromNames( mapConfig.get(headerValue), channelNameMap); //This should not go to default channel(s) //because this seems to be a bad way to configure. if (configuredChannels.size() == 0) { throw new FlumeException("No channel configured for when " + "header value is: " + headerValue); } if (channelMapping.put(headerValue, configuredChannels) != null) { throw new FlumeException("Selector channel configured twice"); } } //If no mapping is configured, it is ok. //All events will go to the default channel(s). } //Given a list of channel names as space delimited string, //returns list of channels. private List<Channel> getChannelListFromNames(String channels, Map<String, Channel> channelNameMap){ List<Channel> configuredChannels = new ArrayList<Channel>(); String[] chNames = channels.split(" "); for (String name : chNames) { Channel ch = channelNameMap.get(name); if (ch != null) { configuredChannels.add(ch); } else { throw new FlumeException("Selector channel not found: " + name); } } return configuredChannels; } }
相关推荐
flume-1.5.0-cdh5.3.6。 大数据日志收集工具 flume-1.5.0-cdh5.3.6。 大数据日志收集工具flume-1.5.0-cdh5.3.6。 大数据日志收集工具flume-1.5.0-cdh5.3.6。 大数据日志收集工具flume-1.5.0-cdh5.3.6。 大数据日志...
Flume-ng-1.6.0-cdh.zip 内压缩了 3 个项目,分别为:flume-ng-1.6.0-cdh5.5.0.tar.gz、flume-ng-1.6.0-cdh5.7.0.tar.gz 和 flume-ng-1.6.0-cdh5.10.1.tar.gz,选择你需要的版本。
flume-ng-sql-source-1.5.2.jar从数据库中增量读取数据到hdfs中的jar包
flume-ng-sql-source-1.5.2源码
flume断点续传覆盖jar,使用组件flume-taildir-source-1.9.0覆盖flume/bin目录下的jar即可
flume-ng-sql-source-release-1.5.2.jar 用flume-ng-sql-source 从数据库抽取数据到kafka,支持sql
spark-streaming-flume_2.11-2.1.0.jar
flume-ng-1.6.0-cdh5.5.0.tar.gz
flume-ng-1.5.0-cdh5.3.6.rarflume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume-ng-1.5.0-cdh5.3.6.rar flume...
flume-ng-sql-source-1.5.1 flume连接数据库 很好用的工具
flume-ng-sql-source-1.5.3.jar,flume采集mysql数据jar包,将此文件拖入FLUME_HOME/lib目录下,如果是CM下CDH版本的flume,则放到/opt/cloudera/parcels/CDH-xxxx/lib/flume-ng/lib下,同样需要的包还有mysql-...
注意:flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-1.0-SNAPSHOT.jar flume-interceptor-...
apache-flume-1.7.0-bin.tar.zip,解压后是tar包
flume拦截器 保留binlog es、data、database、table、type字段 分区字段名称: eventDate 放入 /opt/cloudera/parcels/CDH/lib/flume-ng/lib目录重启flume即可
修改以后的flume-ng-core-1.7.0.jar,将原来的文件按行读取修改为按文件读取。使用时,直接替换到集群中flume安装目录下面lib文件夹中的flume-ng-core-1.7.0.jar即可使用。
$ cd flume-round-robin-channel-selector $ mvn clean package $ ls target flume-round-robin-channel-selector-1.0.jar 将JAR添加到Flume类路径 $ cp /etc/flume-ng/conf/flume-env.sh.template /etc/flume-...
包含flume-ng-sql-source-1.5.1&flume;-ng-sql-source-1.4.1 此内容均为网上下载
apache旗下组件flume,用于数据的采集,主要包括数据的source、channel以及sink,提供1.7.0稳定版本
编译好的flume1.9.0,下载安装即可使用
flume-ng-sql-source-1.4.3.jar