`
sunwinner
  • 浏览: 198931 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

Hadoop samplers

 
阅读更多

Hadoop comes with a set of samplers for you to choose from. The idea behaind sampler is that you can get a fairly even set of partitions by sampling the key space. You look at a small subset of the keys to approximate the key distribution, which is then used to construct partitions.

 

  1. SplitSampler:  this sampler samples only the first n records in a split. It's not so good for sorted data because it doesn't select keys from thoughout the split. In some applications, it's common for some of the input to already be sorted, or at least partially sorted. So it's not the ideal circumstance you could apply SplitSampler. 
      /**
       * Samples the first n records from s splits.
       * Inexpensive way to sample random data.
       */
      public static class SplitSampler<K,V> implements Sampler<K,V> {
    
        private final int numSamples;
        private final int maxSplitsSampled;
    
        /**
         * Create a SplitSampler sampling <em>all</em> splits.
         * Takes the first numSamples / numSplits records from each split.
         * @param numSamples Total number of samples to obtain from all selected
         *                   splits.
         */
        public SplitSampler(int numSamples) {
          this(numSamples, Integer.MAX_VALUE);
        }
    
        /**
         * Create a new SplitSampler.
         * @param numSamples Total number of samples to obtain from all selected
         *                   splits.
         * @param maxSplitsSampled The maximum number of splits to examine.
         */
        public SplitSampler(int numSamples, int maxSplitsSampled) {
          this.numSamples = numSamples;
          this.maxSplitsSampled = maxSplitsSampled;
        }
    
        /**
         * From each split sampled, take the first numSamples / numSplits records.
         */
        @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
        public K[] getSample(InputFormat<K,V> inf, Job job) 
            throws IOException, InterruptedException {
          List<InputSplit> splits = inf.getSplits(job);
          ArrayList<K> samples = new ArrayList<K>(numSamples);
          int splitsToSample = Math.min(maxSplitsSampled, splits.size());
          int samplesPerSplit = numSamples / splitsToSample;
          long records = 0;
          for (int i = 0; i < splitsToSample; ++i) {
            TaskAttemptContext samplingContext = new TaskAttemptContext(
                job.getConfiguration(), new TaskAttemptID());
            RecordReader<K,V> reader = inf.createRecordReader(
                splits.get(i), samplingContext);
            reader.initialize(splits.get(i), samplingContext);
            while (reader.nextKeyValue()) {
              samples.add(ReflectionUtils.copy(job.getConfiguration(),
                                               reader.getCurrentKey(), null));
              ++records;
              if ((i+1) * samplesPerSplit <= records) {
                break;
              }
            }
            reader.close();
          }
          return (K[])samples.toArray();
        }
      }
     
  2. IntervalSampler. This sampler chooses keys at regular intervals through the split and makes a better choise for sorted data.
      /**
       * Sample from s splits at regular intervals.
       * Useful for sorted data.
       */
      public static class IntervalSampler<K,V> implements Sampler<K,V> {
        private final double freq;
        private final int maxSplitsSampled;
    
        /**
         * Create a new IntervalSampler sampling <em>all</em> splits.
         * @param freq The frequency with which records will be emitted.
         */
        public IntervalSampler(double freq) {
          this(freq, Integer.MAX_VALUE);
        }
    
        /**
         * Create a new IntervalSampler.
         * @param freq The frequency with which records will be emitted.
         * @param maxSplitsSampled The maximum number of splits to examine.
         * @see #getSample
         */
        public IntervalSampler(double freq, int maxSplitsSampled) {
          this.freq = freq;
          this.maxSplitsSampled = maxSplitsSampled;
        }
    
        /**
         * For each split sampled, emit when the ratio of the number of records
         * retained to the total record count is less than the specified
         * frequency.
         */
        @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
        public K[] getSample(InputFormat<K,V> inf, Job job) 
            throws IOException, InterruptedException {
          List<InputSplit> splits = inf.getSplits(job);
          ArrayList<K> samples = new ArrayList<K>();
          int splitsToSample = Math.min(maxSplitsSampled, splits.size());
          long records = 0;
          long kept = 0;
          for (int i = 0; i < splitsToSample; ++i) {
            TaskAttemptContext samplingContext = new TaskAttemptContext(
                job.getConfiguration(), new TaskAttemptID());
            RecordReader<K,V> reader = inf.createRecordReader(
                splits.get(i), samplingContext);
            reader.initialize(splits.get(i), samplingContext);
            while (reader.nextKeyValue()) {
              ++records;
              if ((double) kept / records < freq) {
                samples.add(ReflectionUtils.copy(job.getConfiguration(),
                                     reader.getCurrentKey(), null));
                ++kept;
              }
            }
            reader.close();
          }
          return (K[])samples.toArray();
        }
      }
     
  3. RandomSample. This is a good general-purpose sampler, it takes numSamples / maxSplitsSampled inputs from each split.
      /**
       * Sample from random points in the input.
       * General-purpose sampler. Takes numSamples / maxSplitsSampled inputs from
       * each split.
       */
      public static class RandomSampler<K,V> implements Sampler<K,V> {
        private double freq;
        private final int numSamples;
        private final int maxSplitsSampled;
    
        /**
         * Create a new RandomSampler sampling <em>all</em> splits.
         * This will read every split at the client, which is very expensive.
         * @param freq Probability with which a key will be chosen.
         * @param numSamples Total number of samples to obtain from all selected
         *                   splits.
         */
        public RandomSampler(double freq, int numSamples) {
          this(freq, numSamples, Integer.MAX_VALUE);
        }
    
        /**
         * Create a new RandomSampler.
         * @param freq Probability with which a key will be chosen.
         * @param numSamples Total number of samples to obtain from all selected
         *                   splits.
         * @param maxSplitsSampled The maximum number of splits to examine.
         */
        public RandomSampler(double freq, int numSamples, int maxSplitsSampled) {
          this.freq = freq;
          this.numSamples = numSamples;
          this.maxSplitsSampled = maxSplitsSampled;
        }
    
        /**
         * Randomize the split order, then take the specified number of keys from
         * each split sampled, where each key is selected with the specified
         * probability and possibly replaced by a subsequently selected key when
         * the quota of keys from that split is satisfied.
         */
        @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
        public K[] getSample(InputFormat<K,V> inf, Job job) 
            throws IOException, InterruptedException {
          List<InputSplit> splits = inf.getSplits(job);
          ArrayList<K> samples = new ArrayList<K>(numSamples);
          int splitsToSample = Math.min(maxSplitsSampled, splits.size());
    
          Random r = new Random();
          long seed = r.nextLong();
          r.setSeed(seed);
          LOG.debug("seed: " + seed);
          // shuffle splits
          for (int i = 0; i < splits.size(); ++i) {
            InputSplit tmp = splits.get(i);
            int j = r.nextInt(splits.size());
            splits.set(i, splits.get(j));
            splits.set(j, tmp);
          }
          // our target rate is in terms of the maximum number of sample splits,
          // but we accept the possibility of sampling additional splits to hit
          // the target sample keyset
          for (int i = 0; i < splitsToSample ||
                         (i < splits.size() && samples.size() < numSamples); ++i) {
            TaskAttemptContext samplingContext = new TaskAttemptContext(
                job.getConfiguration(), new TaskAttemptID());
            RecordReader<K,V> reader = inf.createRecordReader(
                splits.get(i), samplingContext);
            reader.initialize(splits.get(i), samplingContext);
            while (reader.nextKeyValue()) {
              if (r.nextDouble() <= freq) {
                if (samples.size() < numSamples) {
                  samples.add(ReflectionUtils.copy(job.getConfiguration(),
                                                   reader.getCurrentKey(), null));
                } else {
                  // When exceeding the maximum number of samples, replace a
                  // random element with this one, then adjust the frequency
                  // to reflect the possibility of existing elements being
                  // pushed out
                  int ind = r.nextInt(numSamples);
                  if (ind != numSamples) {
                    samples.set(ind, ReflectionUtils.copy(job.getConfiguration(),
                                     reader.getCurrentKey(), null));
                  }
                  freq *= (numSamples - 1) / (double) numSamples;
                }
              }
            }
            reader.close();
          }
          return (K[])samples.toArray();
        }
      }
    
     
  4. If none of these suits your application, you can write your own implementation of the Sampler interface. Remember that the point of sampling is to produce partitions that are approximately equal in size.
分享到:
评论

相关推荐

    hadoop2.7.3 hadoop.dll

    在windows环境下开发hadoop时,需要配置HADOOP_HOME环境变量,变量值D:\hadoop-common-2.7.3-bin-master,并在Path追加%HADOOP_HOME%\bin,有可能出现如下错误: org.apache.hadoop.io.nativeio.NativeIO$Windows....

    《Hadoop大数据开发实战》教学教案—01初识Hadoop.pdf

    《Hadoop大数据开发实战》教学教案—01初识Hadoop.pdf《Hadoop大数据开发实战》教学教案—01初识Hadoop.pdf《Hadoop大数据开发实战》教学教案—01初识Hadoop.pdf《Hadoop大数据开发实战》教学教案—01初识Hadoop.pdf...

    Hadoop下载 hadoop-2.9.2.tar.gz

    Hadoop 是一个处理、存储和分析海量的分布式、非结构化数据的开源框架。最初由 Yahoo 的工程师 Doug Cutting 和 Mike Cafarella Hadoop 是一个处理、存储和分析海量的分布式、非结构化数据的开源框架。最初由 Yahoo...

    Hadoop下载 hadoop-3.3.3.tar.gz

    Hadoop是一个由Apache基金会所开发的分布式系统基础架构。用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力进 Hadoop是一个由Apache基金会所开发的分布式系统基础架构。用户可以在不...

    Hadoop权威指南 中文版

    本书从hadoop的缘起开始,由浅入深,结合理论和实践,全方位地介绍hado叩这一高性能处理海量数据集的理想工具。全书共14章,3个附录,涉及的主题包括:haddoop简介:mapreduce简介:hadoop分布式文件系统;hadoop的i...

    hadoop最新版本3.1.1全量jar包

    hadoop-annotations-3.1.1.jar hadoop-common-3.1.1.jar hadoop-mapreduce-client-core-3.1.1.jar hadoop-yarn-api-3.1.1.jar hadoop-auth-3.1.1.jar hadoop-hdfs-3.1.1.jar hadoop-mapreduce-client-hs-3.1.1.jar ...

    hadoop-3.3.4 版本(最新版)

    Apache Hadoop (hadoop-3.3.4.tar.gz)项目为可靠、可扩展的分布式计算开发开源软件。官网下载速度非常缓慢,因此将hadoop-3.3.4 版本放在这里,欢迎大家来下载使用! Hadoop 架构是一个开源的、基于 Java 的编程...

    hadoop配置资源 ,hadoop-3.0.0,hadoop.dll,winutils

    调用保存文件的算子,需要配置Hadoop依赖 将文件夹中的 hadoop-3.0.0 解压到电脑任意位置 在Python代码中使用os模块配置:os.environ[‘HADOOP_HOME’] = ‘HADOOP解压文件夹路径’ winutils.exe,并放入Hadoop解压...

    Hadoop集群pdf文档

    Hadoop 集群配置详解 Hadoop_Hadoop集群(第1期)_CentOS安装配置 Hadoop_Hadoop集群(第2期)_机器信息分布表 Hadoop_Hadoop集群(第4期)_SecureCRT使用 Hadoop_Hadoop集群(第5期)_Hadoop安装配置 Hadoop_Hadoop...

    hadoop2.7.3 Winutils.exe hadoop.dll

    hadoop2.7.3 Winutils.exe hadoop.dll

    hadoop的dll文件 hadoop.zip

    hadoop的dll文件 hadoop.zip

    hadoop_tutorial hadoop入门经典

    hadoop_tutorial hadoop入门经典 Hadoop 是一个能够对大量数据进行分布式处理的软件框架。Hadoop 是可靠的,因为它假设计算元素和存储会失败,因此它维护多个工作数据副本,确保能够针对失败的节点重新分布处理。...

    Hadoop多版本 hadoop.dll和winutils.exe 下载

    支持如下版本的Hadoop hadoop-2.6.0 hadoop-2.6.3 hadoop-2.6.4 hadoop-2.7.1 hadoop-2.8.1 hadoop-2.8.3 hadoop-3.0.0

    hadoop-3.1.3安装包

    Hadoop实现了一个分布式文件系统(Hadoop Distributed File System),简称HDFS。HDFS有高容错性的特点,并且设计用来部署在低廉的(low-cost)硬件上;而且它提供高吞吐量(high throughput)来访问应用程序的数据,适合...

    Hadoop大数据资料集锦

    Hadoop大数据资料集锦Hadoop大数据资料集锦Hadoop大数据资料集锦Hadoop大数据资料集锦

    hadoop的hadoop.dll和winutils.exe

    hadoop hadoop的hadoop.dll和winutils.exe 解决方法, 把winutils.exe加入你的hadoop-x.x.x/bin下 Could not locate executable null\bin\winutils.exe in the Hadoop binaries

    Hadoop开发环境的插件hadoop-eclipse-plugin-2.10.1

    Hadoop Eclipse是Hadoop开发环境的插件,用户在创建Hadoop程序时,Eclipse插件会自动导入Hadoop编程接口的jar文件,这样用户就可以在Eclipse插件的图形界面中进行编码、调试和运行Hadop程序,也能通过Eclipse插件...

    windows64位hadoop2.7.7版本hadoop.dll

    windows下做hadoop入门,会出现hdfs报错,2.7.7版本兼容 windows下做hadoop入门,会出现hdfs报错,2.7.7版本兼容 windows下做hadoop入门,会出现hdfs报错,2.7.7版本兼容

    Hadoop集群程序设计与开发

    《Hadoop集群程序设计与开发(数据科学与大数据技术专业系列规划教材)》系统地介绍了基于Hadoop的大数据处理和系统开发相关技术,包括初识Hadoop、Hadoop基础知识、Hadoop开发环境配置与搭建、Hadoop分布式文件系统、...

    Hadoop豆瓣电影数据分析(Hadoop)操作源码

    Hadoop豆瓣电影数据分析(Hadoop)操作源码

Global site tag (gtag.js) - Google Analytics