`

spark-学习笔记--22 RDD 转换为 DataFrame--使用编程接口

 
阅读更多

RDD 转换为 DataFrame--使用编程接口

 

 

	public static void main(String[] args) {
		
		SparkConf conf  = new SparkConf().setMaster("local").setAppName("PersonDataFrame");
		JavaSparkContext sc = new JavaSparkContext(conf);
		SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
		JavaRDD<String> textFile = sc.textFile("d://json.txt");
		JavaRDD<Person> mapPerson = textFile.map(new Function<String, Person>() {

			@Override
			public Person call(String arg0) throws Exception {
				
				Person p = JSONObject.parseObject(arg0, Person.class);
				
				return p;
			}
		});
		
		//使用编程接口      动态构建
		String schemaString = "name age";
		List<StructField> fields = new ArrayList<StructField>();
		for (String fieldName: schemaString.split(" ")) {
		  fields.add(DataTypes.createStructField(fieldName, DataTypes.StringType, true));
		}
//		fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
//		fields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
		//构建 schema
		StructType schema = DataTypes.createStructType(fields);
		//构建  数组
		JavaRDD<Row> rowRDD = mapPerson.map(new Function<Person, Row>() {

			@Override
			public Row call(Person p) throws Exception {
				return   RowFactory.create(p.getName(),String.valueOf(p.getAge()));
			}
		});
		//使用 rowRDD 数据 和  schema  生成    DataFrame
		DataFrame peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema);
		peopleDataFrame.registerTempTable("people");
		DataFrame results = sqlContext.sql("SELECT name  FROM people");
		results.show();
		
	}

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics