`

kettle阿里云下载日志文件

 
阅读更多

拖拽控件User Defined Java Class

import java.io.File;
import java.text.SimpleDateFormat;
import java.util.*;

import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.plugins.PluginRegistry;
import org.pentaho.di.core.plugins.RepositoryPluginType;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.repository.RepositoriesMeta;
import org.pentaho.di.repository.Repository;
import org.pentaho.di.repository.RepositoryDirectoryInterface;
import org.pentaho.di.repository.RepositoryElementMetaInterface;
import org.pentaho.di.repository.RepositoryMeta;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.aliyun.openservices.oss.OSSClient;
import com.aliyun.openservices.oss.model.GetObjectRequest;
import com.aliyun.openservices.oss.model.OSSObjectSummary;
import com.aliyun.openservices.oss.model.ObjectListing;

private int accessIdIndex;
private int accessKeyIndex;
private int bucketNameIndex;
private int prefixIndex;

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException 
{
  Object[] r=getRow();
  if (r==null)
  {
    setOutputDone();
	return false;
  }

  if (first) {
     accessIdIndex = getInputRowMeta().indexOfValue(getParameter("ACCESS_ID"));
     if (accessIdIndex<0) {
         throw new KettleException("accessId field not found in the input row, check parameter 'accessId'!");
     }
     accessKeyIndex = getInputRowMeta().indexOfValue(getParameter("ACCESS_KEY"));
     if (accessKeyIndex<0) {
         throw new KettleException("accessKey field not found in the input row, check parameter 'accessKey'!");
     }
     bucketNameIndex = getInputRowMeta().indexOfValue(getParameter("BUCKET_NAME"));
     if (bucketNameIndex<0) {
         throw new KettleException("bucketName field not found in the input row, check parameter 'bucketName'!");
     }
     prefixIndex = getInputRowMeta().indexOfValue(getParameter("PREFIX"));
     if (prefixIndex<0) {
         throw new KettleException("prefix field not found in the input row, check parameter 'prefix'!");
     }
     first=false;
  }
 
  Object[] outputRowData = RowDataUtil.resizeArray(r, data.outputRowMeta.size());
  int outputIndex = getInputRowMeta().size();

  String accessId = getInputRowMeta().getString(r, accessIdIndex);
  String accessKey = getInputRowMeta().getString(r, accessKeyIndex);
  String bucketName = getInputRowMeta().getString(r, bucketNameIndex);
  String prefix = getInputRowMeta().getString(r, prefixIndex);
  downAliyunLog(accessId, accessKey, bucketName, prefix);

  putRow(data.outputRowMeta, outputRowData);

  return true;
}

	public void downAliyunLog(String accessId, String accessKey, String bucketName, String prefix) {
        OSSClient client = new OSSClient(accessId, accessKey);
        Calendar calendar = Calendar.getInstance();
        calendar.set(Calendar.DAY_OF_YEAR, -1);

        String date = new SimpleDateFormat("yyyy-MM-dd").format(calendar.getTime());
        ObjectListing ol = client.listObjects(bucketName, prefix + date);
        List yesterdayList = ol.getObjectSummaries();
        
        

        date = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
        ol = client.listObjects(bucketName, prefix + date);
        List todayList = ol.getObjectSummaries();
        

        List list = new ArrayList();
        list.addAll(yesterdayList);
        list.addAll(todayList);
        
        File downloadPath = new File(System.getProperty("LOG_FOLDER"));
        if(!downloadPath.exists())
        	downloadPath.mkdirs();
        File downloadPathBak = new File(System.getProperty("DOWN_LOG_FOLDER"));
        if(!downloadPathBak.exists())
        	downloadPathBak.mkdirs();
        List file = Arrays.asList(downloadPath.list());
        List fileBak = Arrays.asList(downloadPathBak.list());
        for (int i = 0; i < list.size(); i++) {
        	OSSObjectSummary ossObjectSummary = (OSSObjectSummary) list.get(i);
        	if(!file.contains(ossObjectSummary.getKey()) && !fileBak.contains(ossObjectSummary.getKey()))
                client.getObject(new GetObjectRequest(bucketName, ossObjectSummary.getKey()), new File(downloadPath, ossObjectSummary.getKey()));
		}
        System.out.println(list.size());
	}

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics