`

自定义DBInputFormat,抽取mysql表存储在分库中

阅读更多

 

 

前言:

mysql a表是按照分库存储的,现在需要抽取到hdfs中

 

实现点:

1 自定义DBInputFormat,将表对应的分库重新创建conn连接,然后切片

2 在mapper类中自定义切片后的接收数据的接收类

3 在mapper中得到数据写出去

 

sqoop.properties文件如下:

 

driverName=com.mysql.jdbc.Driver
globaldb=jdbc:mysql://192.168.8.177:3306/mydb?tinyInt1isBit=false
dbProcInstances=jdbc:mysql://192.168.1.39:3306/mydb[1~64],192.168.1.42:3306/mydb[1~64],192.168.1.133:3306/mydb[65~128],192.168.1.136:3306/mydb[65~128]
mysqlUser=root
mysqlPw=testhive
maxSplitRowsCount=50000
dbUrlConnProps=tinyInt1isBit=false
jobName=coursewares_content.2016-08-21@testhive
outPutPath=/user/bigdata/tmp/tmp_ct_teach_coursewares_content
query=select coursewares_id,belong_type,content,school_id from ct_teach_coursewares_content 
queryCount=SELECT COUNT(1) FROM ct_teach_coursewares_content 
tableIsSharding=true

 

 

代码:

 

  主类: 

import com.xuele.bigdata.config.Config;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;

/**
 * Created by zm on 16/7/12.
 */
public class ShardingDB {


    /**
     * commonRecord
     */
    public static class CommonRecord implements Writable, DBWritable {
        private String[] fields; // 列名 数组
        private String fieldsStr;  // 列之间用,间隔的字符串
        private Map<String,Object> kv=new HashMap<String,Object>(); // 类名和列值的 map集合 

        public CommonRecord() {

        }

        public String[] getFields(){
            return fields;
        }
        
        public void readFields(DataInput in) throws IOException {
            fieldsStr=Text.readString(in); // 得到列名 中间用,间隔
            fields=fieldsStr.split(","); // 将 从写入的列名中去kv map里获取列值 
            for(String f:fields){ 
                kv.put(f, Text.readString(in));
            }

        }
        
        public void write(DataOutput out) throws IOException {
            Text.writeString(out,fieldsStr); // 写出列名的组合 中间用,间隔
            for(String f:fields){ // 写出列值
                Text.writeString(out, kv.get(f).toString());
            }

        }
        public void readFields(ResultSet result) throws SQLException {
            ResultSetMetaData metaData=result.getMetaData(); // 获取元数据 
            int num=metaData.getColumnCount();
            fields=new String[num];
            fieldsStr="";
            for(int i=1;i<=num;i++){

                fieldsStr+=metaData.getColumnLabel(i)+","; // 得到列名
                kv.put(metaData.getColumnLabel(i),result.getObject(i)); // 将  <列名,列值> 放在 Map中 
            }
            fieldsStr=fieldsStr.substring(0,fieldsStr.length()-1);
            fields=fieldsStr.split(","); // 得到列名的数组 

        }
        public void write(PreparedStatement stmt) throws SQLException {

            int i=1;
            for(String f:fields){
                stmt.setObject(i++, kv.get(f)); // 将数组中的列遍历,然后去 map中获取列名对应的value值 
            }
        }
        
        
        public String toString() {
            StringBuilder sb=new StringBuilder();
            for(String f:fields){
                if (kv.get(f) != null) { // 将列值中的换行符用空格替代 列之间用\001间隔 
                    sb.append(kv.get(f).toString().replace("\r\n\n","").replace("\r\n", "").replace("\n", " ").replace("\r"," ").replace("\001","")).append("\001");

                } else {
                    sb.append("").append("\001");
                }
            }
            sb.deleteCharAt(sb.length()-1);
            return sb.toString();
        }
    }
    
    
    
    public static class DBInputMapper extends Mapper<LongWritable, CommonRecord,NullWritable,Text>{
        public void map(LongWritable key, CommonRecord value,Context context)
        throws IOException, InterruptedException {
            context.write(NullWritable.get(), new Text(value.toString()));
            //Thread.sleep(12000);
        }

    }
    public static void main(String[] args) throws Exception{
        try {
            if (args.length == 1) {
                Config propertiesConfig = new Config();
                propertiesConfig.init(args[0]);

                String globaldb = propertiesConfig.getValue("globaldb"); // jdbc:mysql://192.168.1.39:3306/mydb
                String dbProcInstances = propertiesConfig.getValue("dbProcInstances"); // jdbc:mysql://192.168.1.39:3306/,192.168.1.42:3306
                String dbUrlConnProps = propertiesConfig.getValue("dbUrlConnProps"); // tinyInt1isBit=false
                String driverName = propertiesConfig.getValue("driverName");// com.mysql.jdbc.Driver
                String mysqlUser = propertiesConfig.getValue("mysqlUser");// root
                String mysqlPw = propertiesConfig.getValue("mysqlPw"); // testhive

                String tableIsSharding = propertiesConfig.getValue("tableIsSharding"); // true or fasle
                String jobName = propertiesConfig.getValue("jobName");// ShardingDBTest
                String outPutPath = propertiesConfig.getValue("outPutPath");// /
                String maxSplitRowsCount = propertiesConfig.getValue("maxSplitRowsCount", "5000");


                String inputQuery = propertiesConfig.getValue("query");
                String inputCountQuery = propertiesConfig.getValue("queryCount");
                Configuration conf = new Configuration();
                conf.setBoolean("tableIsSharding", Boolean.parseBoolean(tableIsSharding));
                conf.set("globaldb", globaldb); // 单片库
                conf.set("dbProcInstances", dbProcInstances); // 分片库
                conf.set("dbUrlConnProps", dbUrlConnProps); // 分片库  增加针对 tinyiint类型的转义 
                conf.set("maxSplitRowsCount", maxSplitRowsCount); // 分片行数
                DBConfiguration.configureDB(conf, driverName, globaldb, mysqlUser, mysqlPw); //  使用hadoop的类连接   关系型数据库 

                Job job = Job.getInstance(conf, ShardingDB.class.getName());

                job.setJobName(jobName);
                job.setNumReduceTasks(1);
                job.setJarByClass(ShardingDB.class);
                job.setInputFormatClass(ShardingDBInputFormat.class);  // 自定义 分片读取数据规则, 主要针对 jdbc:mysql://192.168.1.39:3306/,192.168.1.42:3306/ 这种的进行读取数据并写出去
                job.setMapperClass(DBInputMapper.class); // 自定义 Mapper类 

                job.setMapOutputKeyClass(NullWritable.class); // 设置map输出key类型
                job.setMapOutputValueClass(Text.class); // 设置map输出value类型
       

                job.setNumReduceTasks(10);

                FileOutputFormat.setOutputPath(job, new Path(outPutPath)); // 设置输出目录 
                ShardingDBInputFormat.setInput(job, CommonRecord.class, inputQuery, inputCountQuery); //

                System.exit(job.waitForCompletion(true) ? 0 : 1);
            } else {
                System.out.println("The args's number is wrong!");
            }
        } catch (Exception e) {
            System.err.println("Error: "+e);
        }
    }
}


 

 

自定义分库切分类:

 

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */



import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.db.*;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;

/**
 * A InputFormat that reads input data from an SQL table with sharding.
 * <p>
 * ShardingDBInputFormat emits LongWritables containing the record number as
 * key and DBWritables as value. 
 * 
 * The SQL query, and input class can be using one of the two 
 * setInput methods.
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class ShardingDBInputFormat<T extends DBWritable> extends DBInputFormat<T> {

  private static final Log LOG = LogFactory.getLog(ShardingDBInputFormat.class);
  
  protected String dbProductName = "DEFAULT";

  /**
   * A Class that does nothing, implementing DBWritable
   */
  @InterfaceStability.Evolving
  public static class NullDBWritable implements DBWritable, Writable {
    
    public void readFields(DataInput in) throws IOException { }
    
    public void readFields(ResultSet arg0) throws SQLException { }
    
    public void write(DataOutput out) throws IOException { }
    
    public void write(PreparedStatement arg0) throws SQLException { }
  }
  
  /**
   * A InputSplit that spans a set of rows    自定义 split   在和源码  class DBInputSplit 类进行对比后,在  462行看到 将 jdbcrul添加到 split对象中  
   */
  @InterfaceStability.Evolving
  public static class ShardingDBInputSplit extends DBInputSplit{

    private String jdbcUrl;
    private long end = 0;
    private long start = 0;

    /**
     * Default Constructor
     */
    public ShardingDBInputSplit() {
    }


    /**
     * Convenience Constructor
     * @param jdbcUrl the jdbcUrl for this sharding
     * @param start the index of the first row to select
     * @param end the index of the last row to select
     */
    public ShardingDBInputSplit(String jdbcUrl,long start, long end) {
      this.jdbcUrl=jdbcUrl;
      this.start = start;
      this.end = end;
    }

    /** {@inheritDoc} */
    public String[] getLocations() throws IOException {
      // TODO Add a layer to enable SQL "sharding" and support locality
      return new String[] {};
    }

    public String getJdbcUrl(){
      return jdbcUrl;
    }

    /**
     * @return The index of the first row to select
     */
    public long getStart() {
      return start;
    }

    /**
     * @return The index of the last row to select
     */
    public long getEnd() {
      return end;
    }

    /**
     * @return The total row count in this split
     */
    public long getLength() throws IOException {
      return end - start;
    }

    /** {@inheritDoc} */
    public void readFields(DataInput input) throws IOException { // 在读取数据时,先得到jdbcUrl的长度 然后读取这个长度下的 jdbcUrl 最后在读取真正的数据 
      int len=input.readInt();
      byte[] bytes=new byte[len];
      input.readFully(bytes);
      jdbcUrl=new String(bytes);
      start = input.readLong();
      end = input.readLong();
    }

    /** {@inheritDoc} */
    public void write(DataOutput output) throws IOException { // 在写出数据时,先写出jdbcUrl的长度 ,然后写出 jdbcUrl的值,最后写出要写的内容的开始位置和结束位置
      int len=jdbcUrl.length();
      output.writeInt(len);
      output.writeBytes(jdbcUrl);
      output.writeLong(start);
      output.writeLong(end);
    }
  }

  protected String conditions;

  protected Connection connection;

  protected String tableName;

  protected String[] fieldNames;

  protected DBConfiguration dbConf;

  /** {@inheritDoc} */
  public void setConf(Configuration conf) {

    dbConf = new DBConfiguration(conf);

    try {
      this.connection = createConnection();

      DatabaseMetaData dbMeta = connection.getMetaData();
      this.dbProductName = dbMeta.getDatabaseProductName().toUpperCase();
    }
    catch (Exception ex) {
      ex.printStackTrace();
      throw new RuntimeException(ex);
    }

    tableName = dbConf.getInputTableName();
    fieldNames = dbConf.getInputFieldNames();
    conditions = dbConf.getInputConditions();
  }

  public Configuration getConf() {
    return dbConf.getConf();
  }
  
  public DBConfiguration getDBConf() {
    return dbConf;
  }

  public Connection getConnection() {
    // TODO Remove this code that handles backward compatibility.
    if (this.connection == null) {
      this.connection = createConnection();
    }

    return this.connection;
  }

  public Connection createConnection() {
    try {
//        System.out.println(dbConf.toString());
//        Iterator it = dbConf.getConf().iterator();
//        while (it.hasNext()) {
//            System.out.println(it.next());
//        }
//
//        System.out.println(dbConf.getConf().get(DBConfiguration.DRIVER_CLASS_PROPERTY));

        Connection newConnection = dbConf.getConnection();
      newConnection.setAutoCommit(false);
      newConnection.setTransactionIsolation(
          Connection.TRANSACTION_SERIALIZABLE);

      return newConnection;
    } catch (Exception e) {
      e.printStackTrace();
      throw new RuntimeException(e);
    }
  }

  Connection createConnection(String jdbcUrl){
    try{
      Class.forName(dbConf.getConf().get(DBConfiguration.DRIVER_CLASS_PROPERTY));

        return DriverManager.getConnection(jdbcUrl,
                dbConf.getConf().get(DBConfiguration.USERNAME_PROPERTY),
                dbConf.getConf().get(DBConfiguration.PASSWORD_PROPERTY));

  }catch(Exception e) {
      e.printStackTrace();
      throw new RuntimeException(e);
    }
  }

  public String getDBProductName() {
    return dbProductName;
  }

  protected RecordReader<LongWritable, T> createDBRecordReader(InputSplit splita,
                                                               Configuration conf) throws IOException {  // 读取每一个切片,读取后将数据存放在 dbConf.getInputClass()类中来接收读入的split的每一行数据

      //如果是sharding则对split进行初始化时重新填入sharding jdbcurl
    if(conf.getBoolean("tableIsSharding",false)) {
        ShardingDBInputSplit split = (ShardingDBInputSplit) splita; // 重新定义 split   
        DBConfiguration.configureDB(conf, conf.get(DBConfiguration.DRIVER_CLASS_PROPERTY),
                split.getJdbcUrl(), conf.get(DBConfiguration.USERNAME_PROPERTY), conf.get(DBConfiguration.PASSWORD_PROPERTY));
    }

    @SuppressWarnings("unchecked")
    DBInputSplit split = (DBInputSplit) splita;
    Class<T> inputClass = (Class<T>) (dbConf.getInputClass()); // 对应于文件末尾方法 setInput中的  ----> dbConf.setInputClass(inputClass); 这里这个类是CommonRecord类  
    // 上面是针对源码的修改 增加部分 ,下面这段是贴上的源码 
    try {  // 根据数据库类型不同  进行解析split的内容并将数据 映射到 CommonRecord类 中
      // use database product name to determine appropriate record reader.
      if (dbProductName.startsWith("ORACLE")) {
        // use Oracle-specific db reader.
        return new OracleDBRecordReader<T>(split, inputClass,
                conf, createConnection(), getDBConf(), conditions, fieldNames,
                tableName);
      } else if (dbProductName.startsWith("MYSQL")) {
        // use MySQL-specific db reader.
        return new MySQLDBRecordReader<T>(split, inputClass,
                conf, createConnection(), getDBConf(), conditions, fieldNames,
                tableName);
      } else {
        // Generic reader.
        return new DBRecordReader<T>(split, inputClass,
                conf, createConnection(), getDBConf(), conditions, fieldNames,
                tableName);
      }
    } catch (SQLException ex) {
      throw new IOException(ex.getMessage());
    }
  }

  /** {@inheritDoc} */
  public RecordReader<LongWritable, T> createRecordReader(InputSplit split,
                                                          TaskAttemptContext context) throws IOException, InterruptedException {
//      if (context.getConfiguration().getBoolean("tableIsSharding", true)) {
//          return createDBRecordReader((ShardingDBInputSplit) split, context.getConfiguration());
//      } else {
//          return createDBRecordReader((DBInputSplit) split, context.getConfiguration());
//      }
      return createDBRecordReader(split, context.getConfiguration());

  }

  /**
   * 支持对水平shardingdb 使用sharding数目个mapper分别去拉取shardingdb 中表数据
   * 对非sharding表,使用默认行为进行拉取.
   *
   **/
  public List<InputSplit> getSplits(JobContext job) throws IOException {
      List<InputSplit> splits = new ArrayList<InputSplit>();
      String dbUrlConnProps=job.getConfiguration().get("dbUrlConnProps","");   // tinyInt1isBit=false
      boolean tableIsSharding=job.getConfiguration().getBoolean("tableIsSharding",false);  // true or fasle
      int maxSplitRowsCount=job.getConfiguration().getInt("maxSplitRowsCount",50000);
      if(maxSplitRowsCount>50000){
        maxSplitRowsCount=50000;
      }
      if(tableIsSharding) {
          // jdbc:mysql://192.168.1.40:3306/mydb[1~64]
          String dbProcInstances=job.getConfiguration().get("dbProcInstances");// jdbc:mysql://192.168.1.39:3306/mydb[1~64],192.168.1.42:3306/mydb[1~64]
          String protocol=dbProcInstances.substring(0,dbProcInstances.indexOf("//"))+"//"; // jdbc:mysql://
          String tmp=dbProcInstances.replace(protocol,"");
          String[] instances=tmp.split(","); // 192.168.1.39:3306/mydb[1~64],192.168.1.42:3306/mydb[1~64]

          Connection connection=null;
          ResultSet results = null;
          Statement statement = null;
          String jdbcUrl=null;
          for(String instance:instances) { 
              if (instance.contains("/")) { // 192.168.1.39:3306/mydb[1~64]
                  String prefixBegin=instance.substring(0, instance.indexOf("/"))+"/";  // 192.168.1.39:3306/
                  String shardings=instance.replace(prefixBegin,""); // mydb[1~64]
                  String dbPrefix=shardings.substring(0,shardings.indexOf("[")); // mydb
                  tmp=shardings.substring(shardings.indexOf("[")+1).replace("]",""); // 1~64

                  if (tmp.contains("~")) {
                    //only support integer range split by ~
                      int start = Integer.parseInt(tmp.split("~")[0]); // 1
                      int end = Integer.parseInt(tmp.split("~")[1]); // 64 
                      for (int i = start; i <= end; i++) {
                        jdbcUrl =this.createJdbcUrl(protocol, prefixBegin, dbPrefix, i, dbUrlConnProps);  //jdbc:mysql://192.168.1.39:3306/mydb1?tinyInt1isBit=false,0,23520)  最后的这个 0,23520 不是这次代码中用到的
                          this.addInputSplit(splits,jdbcUrl,maxSplitRowsCount); 
                      }
                  } else if (tmp.contains("|")) {
                    //only support integer list split by |
                    String[] shardingNumArray = tmp.split("\\|");
                    if (ArrayUtils.isNotEmpty(shardingNumArray)) {
                      for (String shardingNumString : shardingNumArray) {
                        int shardingNum = Integer.parseInt(shardingNumString);
                        jdbcUrl =this.createJdbcUrl(protocol, prefixBegin, dbPrefix, shardingNum, dbUrlConnProps);
                        this.addInputSplit(splits,jdbcUrl,maxSplitRowsCount);
                      }
                    }

                  }
              }
          }
      }else{
        ResultSet results = null;
        Statement statement = null;
        try {
          statement = connection.createStatement();

          results = statement.executeQuery(getCountQuery());
          results.next();

          long count = results.getLong(1);
          int chunks = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
          long chunkSize = (count / chunks);

          results.close();
          statement.close();

          // Split the rows into n-number of chunks and adjust the last chunk
          // accordingly
          for (int i = 0; i < chunks; i++) {
            DBInputSplit split;

            if ((i + 1) == chunks)
              split = new DBInputSplit(i * chunkSize, count);
            else
              split = new DBInputSplit(i * chunkSize, (i * chunkSize)
                      + chunkSize);

            splits.add(split);
          }

          connection.commit();
          return splits;
        } catch (SQLException e) {
          throw new IOException("Got SQLException", e);
        } finally {
          try {
            if (results != null) { results.close(); }
          } catch (SQLException e1) {}
          try {
            if (statement != null) { statement.close(); }
          } catch (SQLException e1) {}

          closeConnection();
        }
      }



      return splits;
  }

  /**
   * 生成sharing db对应的jdbc url
   * @param protocol
   * @param prefixBegin
   * @param dbPrefix
   * @param shardingNum
   * @param dbUrlConnProps
   * @return
   */
  private String createJdbcUrl(String protocol, String prefixBegin, String dbPrefix, int shardingNum, String dbUrlConnProps) {
    String jdbcUrl=protocol+prefixBegin+dbPrefix+shardingNum;
    if (StringUtils.isNotBlank(dbUrlConnProps)){
      jdbcUrl = jdbcUrl+"?"+dbUrlConnProps;
    }
    return jdbcUrl;
  }

  /**
   * 根据指定的sharding db的url,查询数据行数,并根据maxSplitRowsCount切分InputSplit
   * @param splits
   * @param jdbcUrl
   * @param maxSplitRowsCount
   */
  private void addInputSplit(List<InputSplit> splits,String jdbcUrl,int maxSplitRowsCount) {
    Connection connection=null;
    ResultSet results = null;
    Statement statement = null;

    connection=createConnection(jdbcUrl);
    long count=0l;
    try {
      statement = connection.createStatement();
      results = statement.executeQuery(getCountQuery());
      results.next();

      count = results.getLong(1);
    }catch(Exception e){

    } finally {
      try {
        if (results != null) {
          results.close();
        }
        if (statement != null) {
          statement.close();
        }
        if (connection != null) {
          connection.close();
        }
      } catch (SQLException e) {
      }

    }
    if(count>maxSplitRowsCount){
      int chunks=(int)(count/maxSplitRowsCount);
      if(count%maxSplitRowsCount>0){
        chunks++;
      }

      long chunkSize = maxSplitRowsCount;
      for (int c = 0; c < chunks; c++) {
        ShardingDBInputSplit split;

        if ((c + 1) == chunks) {
          split = new ShardingDBInputSplit(jdbcUrl, c * chunkSize, count);  // 在这里给 ShardingDBInputSplit类赋值 jdbcUrl 和 这个分片的开头和长度
        }else {
          split = new ShardingDBInputSplit(jdbcUrl, c * chunkSize, (c * chunkSize)
                  + chunkSize);
        }
        System.out.println("ShardingDBInputSplit:"+jdbcUrl+","+split.getStart()+","+split.getEnd());
        splits.add(split);
      }
    }else{
      ShardingDBInputSplit split=new ShardingDBInputSplit(jdbcUrl,0,count);
      System.out.println("no ShardingDBInputSplit:"+jdbcUrl+",0,"+count+")");
      splits.add(split);
    }
  }


  /** Returns the query for getting the total number of rows, 
   * subclasses can override this for custom behaviour.*/
  protected String getCountQuery() {
    
    if(dbConf.getInputCountQuery() != null) {
      return dbConf.getInputCountQuery();
    }
    
    StringBuilder query = new StringBuilder();
    query.append("SELECT COUNT(*) FROM " + tableName);

    if (conditions != null && conditions.length() > 0)
      query.append(" WHERE " + conditions);
    return query.toString();
  }

  /**
   * Initializes the map-part of the job with the appropriate input settings.
   * 
   * @param job The map-reduce job
   * @param inputClass the class object implementing DBWritable, which is the 
   * Java object holding tuple fields.
   * @param tableName The table to read data from
   * @param conditions The condition which to select data with, 
   * eg. '(updated > 20070101 AND length > 0)'
   * @param orderBy the fieldNames in the orderBy clause.
   * @param fieldNames The field names in the table
   * @see #setInput(Job, Class, String, String)
   */
  public static void setInput(Job job,
                              Class<? extends DBWritable> inputClass,
                              String tableName, String conditions,
                              String orderBy, String... fieldNames) {
    job.setInputFormatClass(ShardingDBInputFormat.class);
    DBConfiguration dbConf = new DBConfiguration(job.getConfiguration());
    dbConf.setInputClass(inputClass);
    dbConf.setInputTableName(tableName);
    dbConf.setInputFieldNames(fieldNames);
    dbConf.setInputConditions(conditions);
    dbConf.setInputOrderBy(orderBy);
  }
  
  /**
   * Initializes the map-part of the job with the appropriate input settings.
   * 
   * @param job The map-reduce job
   * @param inputClass the class object implementing DBWritable, which is the 
   * Java object holding tuple fields.
   * @param inputQuery the input query to select fields. Example : 
   * "SELECT f1, f2, f3 FROM Mytable ORDER BY f1"
   * @param inputCountQuery the input query that returns 
   * the number of records in the table. 
   * Example : "SELECT COUNT(f1) FROM Mytable"
   * @see #setInput(Job, Class, String, String, String, String...) 
   */
  public static void setInput(Job job,
                              Class<? extends DBWritable> inputClass,
                              String inputQuery, String inputCountQuery) {
    job.setInputFormatClass(ShardingDBInputFormat.class);
    DBConfiguration dbConf = new DBConfiguration(job.getConfiguration());
    dbConf.setInputClass(inputClass);
    dbConf.setInputQuery(inputQuery);
    dbConf.setInputCountQuery(inputCountQuery);
  }

  protected void closeConnection() {
    try {
      if (null != this.connection) {
        this.connection.close();
        this.connection = null;
      }
    } catch (SQLException sqlE) {
      LOG.debug("Exception on close", sqlE);
    }
  }
}

 

 

 

流程图:

 



 

 

 

  • 大小: 24.1 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics