`

Hadoop_Avro数据类型与模式

阅读更多
1.Avro基本数据类型
类型 描述   模式示例
null          The absence of a value      "null"
boolean    A binary value       "boolean"
int    32位带符号整数         "int"
long    64位带符号整数           "long"
float    32位单精度浮点数         "float"
double    64位双精度浮点数         "double"
bytes    byte数组(8位无字符字节序列)     "bytes"
string    Unicode字符串       "string"
【Avro基本数据类型还可以使用更冗长的形式使用type属性来指定如{"type":"null"}】
2.Avro复杂数据类型
数据类型 类型描述 模式示例
array        An ordered collection of objects.                 {              
             All objects in a particular                       "type": "array",
            array must have the same schema.                  "items": "long"
                                                                } 
                                                               
                                                                            
map        An unordered collection of key-value pairs.         {                                 
           Keys must be strings and values may be any type,       "type": "map",    
           although within a particular map,                      "values": "string"
           all values must have the same schema.                  }
          
                                                                                       
record A collection of named fields of any type. {                                      
                                                                "type": "record",                      
                                                                "name": "WeatherRecord",               
                                                                "doc": "A weather reading.",           
                                                                "fields": [                            
                                                                {"name": "year", "type": "int"},       
                                                                {"name": "temperature", "type": "int"},
                                                                {"name": "stationId", "type": "string"}
                                                                ]                                      
                                                                }                                      


enum A set of named values. {                                     
                                                                "type": "enum",                       
                                                                "name": "Cutlery",                    
                                                                "doc": "An eating utensil.",          
                                                                "symbols": ["KNIFE", "FORK", "SPOON"] 
                                                                }                                     
                                                                fixed                                 
                                                                A fixed number of 8-bit unsigned bytes.
                                                                {                                     
                                                                "type": "fixed",                      
                                                                "name": "Md5Hash",                    
                                                                "size": 16                            
                                                                }                                     


union A union of schemas. A union is represented by a JSON [                                  
            array, where each element in the array is a schema.       "null",                            
             Data represented by a union must match                   "string",                          
one of the schemas in the union.                        {"type": "map", "values": "string"}
                                                                      ] 

通过上图所示,通过程序可以将本地的小文件进行打包,组装成一个大文件在HDFS中进行保存,本地的小文件成为Avro的记录。具体的程序如下面的代码所示:
//对Avro数据文件的写入
public class AVRO_WRITE { 
    public static final String FIELD_CONTENTS = "contents"; 
    public static final String FIELD_FILENAME = "filename"; 
    public static final String SCHEMA_JSON = "{\"type\": \"record\",\"name\": \"SmallFilesTest\", " 
            + "\"fields\": [" 
            + "{\"name\":\"" 
            + FIELD_FILENAME 
            + "\",\"type\":\"string\"}," 
            + "{\"name\":\"" 
            + FIELD_CONTENTS 
            + "\", \"type\":\"bytes\"}]}"; 
    public static final Schema SCHEMA = new Schema.Parser().parse(SCHEMA_JSON); 
 
    public static void writeToAvro(File srcPath, OutputStream outputStream) throws IOException { 
        DataFileWriter<Object> writer = new  DataFileWriter<Object>(new GenericDatumWriter<Object>()).setSyncInterval(100); 
        writer.setCodec(CodecFactory.snappyCodec()); 
        writer.create(SCHEMA, outputStream); 
        for (Object obj : FileUtils.listFiles(srcPath, null, false)){ 
            File file = (File) obj; 
            String filename = file.getAbsolutePath(); 
            byte content[] = FileUtils.readFileToByteArray(file); 
            GenericRecord record = new GenericData.Record(SCHEMA); 
            record.put(FIELD_FILENAME, filename); 
            record.put(FIELD_CONTENTS, ByteBuffer.wrap(content)); 
            writer.append(record); 
            System.out.println(file.getAbsolutePath() + ":"+ DigestUtils.md5Hex(content)); 
        } 
        IOUtils.cleanup(null, writer); 
        IOUtils.cleanup(null, outputStream); 
    } 
 
    public static void main(String args[]) throws Exception { 
        Configuration config = new Configuration(); 
        FileSystem hdfs = FileSystem.get(config); 
        File sourceDir = new File(args[0]); 
        Path destFile = new Path(args[1]); 
        OutputStream os = hdfs.create(destFile); 
        writeToAvro(sourceDir, os); 
    } 
}

//对Avro数据文件的读取
public class AVRO_READ{ 
    private static final String FIELD_FILENAME = "filename"; 
    private static final String FIELD_CONTENTS = "contents"; 
 
    public static void readFromAvro(InputStream is) throws  IOException { 
        DataFileStream<Object> reader = new DataFileStream<Object>(is,new GenericDatumReader<Object>()); 
        for (Object o : reader) { 
            GenericRecord r = (GenericRecord) o; 
            System.out.println(r.get(FIELD_FILENAME)+ ":"+DigestUtils.md5Hex(((ByteBuffer)r.get(FIELD_CONTENTS)).array())); 
        } 
        IOUtils.cleanup(null, is); 
        IOUtils.cleanup(null, reader); 
    } 
 
    public static void main(String... args) throws Exception { 
        Configuration config = new Configuration(); 
        FileSystem hdfs = FileSystem.get(config); 
        Path destFile = new Path(args[0]); 
        InputStream is = hdfs.open(destFile); 
        readFromAvro(is); 
    } 
}  
  • 大小: 39.3 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics