`
bit1129
  • 浏览: 1051499 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark九十三】Spark读写Sequence File

 
阅读更多

 

1. 代码:

package spark.examples.fileformat

import org.apache.spark.{SparkConf, SparkContext}

object SequenceFileTest {
  def main(args: Array[String]) {
    val conf = new SparkConf()
    conf.setAppName("SequenceFileTest")
    conf.setMaster("local[3]")
    val sc = new SparkContext(conf)
    val data = List(("ABC", 1), ("BCD", 2), ("CDE", 3), ("DEF", 4), ("FGH", 5))
    val rdd = sc.parallelize(data, 1)
    val dir = "file:///D:/sequenceFile-" + System.currentTimeMillis()
    rdd.saveAsSequenceFile(dir)
    val rdd2 = sc.sequenceFile[String, Int](dir + "/part-00000")
    println(rdd2.collect().map(elem => (elem._1 + ", " + elem._2)).toList)
  }
}

 

2. SequenceFile的内容:



 

3.注意:

saveAsSequenceFile是SequenceFileRDDFunctions定义的方法,但是在上面的代码中并没有显式的指定隐式转换,原因是上面的代码运行于Spark1.3中,在SparkContext中有如下的注释解释了这种行为

  // The following implicit functions were in SparkContext before 1.3 and users had to
  // `import SparkContext._` to enable them. Now we move them here to make the compiler find
  // them automatically. However, we still keep the old functions in SparkContext for backward
  // compatibility and forward to the following functions directly.
implicit def intWritableConverter(): WritableConverter[Int] =
    simpleWritableConverter[Int, IntWritable](_.get)

  implicit def longWritableConverter(): WritableConverter[Long] =
    simpleWritableConverter[Long, LongWritable](_.get)

  implicit def doubleWritableConverter(): WritableConverter[Double] =
    simpleWritableConverter[Double, DoubleWritable](_.get)

  implicit def floatWritableConverter(): WritableConverter[Float] =
    simpleWritableConverter[Float, FloatWritable](_.get)

  implicit def booleanWritableConverter(): WritableConverter[Boolean] =
    simpleWritableConverter[Boolean, BooleanWritable](_.get)

  implicit def bytesWritableConverter(): WritableConverter[Array[Byte]] = {
    simpleWritableConverter[Array[Byte], BytesWritable] { bw =>
      // getBytes method returns array which is longer then data to be returned
      Arrays.copyOfRange(bw.getBytes, 0, bw.getLength)
    }
  }

  implicit def stringWritableConverter(): WritableConverter[String] =
    simpleWritableConverter[String, Text](_.toString)

  implicit def writableWritableConverter[T <: Writable](): WritableConverter[T] =
    new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T])
}

 

而SequenceFileRDDFunctions是针对KV都是继承自Writable的PairRDD

 

/**
 * Extra functions available on RDDs of (key, value) pairs to create a Hadoop SequenceFile,
 * through an implicit conversion. Note that this can't be part of PairRDDFunctions because
 * we need more implicit parameters to convert our keys and values to Writable.
 *
 */
class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag](
    self: RDD[(K, V)],
    _keyWritableClass: Class[_ <: Writable],
    _valueWritableClass: Class[_ <: Writable])
  extends Logging
  with Serializable {

 

 

 

  • 大小: 8.5 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics