- 浏览: 183226 次
- 性别:
- 来自: 武汉
最新评论
使用DataImportHandler进行简单数据导入还是比较有效的,特别是DIH中针对简单的数据库表,可以把完全导入和增量导入合并成一个语句,非常方便。我的使用方式如下所示
1。配置schema
<requestHandler name="/dataimport" class="org.apache.solr.handler.dataimport.DataImportHandler"> <lst name="defaults"> <str name="config">/home/tomcat/bin/solr/conf/data-config.xml</str> </lst> </requestHandler>
2.添加data-config文件
data-config.xml
<dataConfig> <dataSource type="JdbcDataSource" driver="com.mysql.jdbc.Driver" url="jdbc:mysql://127.0.0.1/db" user="root" password="pass" batchSize="-1"/> <document> <entity name="id" pk="id" query="select id,username,text,cat from hot where '${dataimporter.request.clean}' != 'false' OR timestamp > '${dataimporter.last_index_time}'"> <field column="id" name="id"/> <field column="text" name="text"/> <field column="username" name="username_s"/> <field column="cat" name="cat_t"/> </entity> </document> </dataConfig>
3.让DIH周期性的运行
修改dataimport.properties文件,这个是自动生成的,同在solr/conf下,添加参数
interval 间隔时间 单位 分钟
syncEnabled=1 打开周期运行
params 其实就是具体调用的url,周期运行就是周期性的访问一个url
#Wed Dec 28 09:29:42 UTC 2011 port=8983 interval=5 last_index_time=2011-12-28 09\:29\:26 syncEnabled=1 webapp=solr id.last_index_time=2011-12-28 09\:29\:26 server=127.0.0.1 params=/select?qt\=/dataimport&command\=full-import&clean\=false&commit\=true&optimize\=false
到此还并不能周期运行,在solr的wiki中有一段实现这个功能的代码,但并没有加入到solr的发行包中,于是我们需要重新编译这段代码,打包放到webapp/solr/WEB-INF/lib中才行
<web-app> <listener> <listener-class>org.apache.solr.handler.dataimport.scheduler.ApplicationListener</listener-class> </listener> ... </web-app>
以下是solr wiki上周期运行的代码,我已打好包,放在附件里。
package org.apache.solr.handler.dataimport.scheduler; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.util.Properties; import org.apache.solr.core.SolrResourceLoader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SolrDataImportProperties { private Properties properties; public static final String SYNC_ENABLED = "syncEnabled"; public static final String SYNC_CORES = "syncCores"; public static final String SERVER = "server"; public static final String PORT = "port"; public static final String WEBAPP = "webapp"; public static final String PARAMS = "params"; public static final String INTERVAL = "interval"; private static final Logger logger = LoggerFactory.getLogger(SolrDataImportProperties.class); public SolrDataImportProperties(){ // loadProperties(true); } public void loadProperties(boolean force){ try{ SolrResourceLoader loader = new SolrResourceLoader(null); logger.info("Instance dir = " + loader.getInstanceDir()); String configDir = loader.getConfigDir(); configDir = SolrResourceLoader.normalizeDir(configDir); if(force || properties == null){ properties = new Properties(); String dataImportPropertiesPath = configDir + "\\dataimport.properties"; FileInputStream fis = new FileInputStream(dataImportPropertiesPath); properties.load(fis); } }catch(FileNotFoundException fnfe){ logger.error("Error locating DataImportScheduler dataimport.properties file", fnfe); }catch(IOException ioe){ logger.error("Error reading DataImportScheduler dataimport.properties file", ioe); }catch(Exception e){ logger.error("Error loading DataImportScheduler properties", e); } } public String getProperty(String key){ return properties.getProperty(key); } }
package org.apache.solr.handler.dataimport.scheduler; import java.util.Calendar; import java.util.Date; import java.util.Timer; import javax.servlet.ServletContext; import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ApplicationListener implements ServletContextListener { private static final Logger logger = LoggerFactory.getLogger(ApplicationListener.class); @Override public void contextDestroyed(ServletContextEvent servletContextEvent) { ServletContext servletContext = servletContextEvent.getServletContext(); // get our timer from the context Timer timer = (Timer)servletContext.getAttribute("timer"); // cancel all active tasks in the timers queue if (timer != null) timer.cancel(); // remove the timer from the context servletContext.removeAttribute("timer"); } @Override public void contextInitialized(ServletContextEvent servletContextEvent) { ServletContext servletContext = servletContextEvent.getServletContext(); try{ // create the timer and timer task objects Timer timer = new Timer(); HTTPPostScheduler task = new HTTPPostScheduler(servletContext.getServletContextName(), timer); // get our interval from HTTPPostScheduler int interval = task.getIntervalInt(); // get a calendar to set the start time (first run) Calendar calendar = Calendar.getInstance(); // set the first run to now + interval (to avoid fireing while the app/server is starting) calendar.add(Calendar.MINUTE, interval); Date startTime = calendar.getTime(); // schedule the task timer.scheduleAtFixedRate(task, startTime, 1000 * 60 * interval); // save the timer in context servletContext.setAttribute("timer", timer); } catch (Exception e) { if(e.getMessage().endsWith("disabled")){ logger.info("Schedule disabled"); }else{ logger.error("Problem initializing the scheduled task: ", e); } } } }
package org.apache.solr.handler.dataimport.scheduler; import java.io.IOException; import java.net.HttpURLConnection; import java.net.MalformedURLException; import java.net.URL; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Timer; import java.util.TimerTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class HTTPPostScheduler extends TimerTask { private String syncEnabled; private String[] syncCores; private String server; private String port; private String webapp; private String params; private String interval; private String cores; private SolrDataImportProperties p; private boolean singleCore; private static final Logger logger = LoggerFactory.getLogger(HTTPPostScheduler.class); public HTTPPostScheduler(String webAppName, Timer t) throws Exception{ //load properties from global dataimport.properties p = new SolrDataImportProperties(); reloadParams(); fixParams(webAppName); if(!syncEnabled.equals("1")) throw new Exception("Schedule disabled"); if(syncCores == null || (syncCores.length == 1 && syncCores[0].isEmpty())){ singleCore = true; logger.info("<index update process> Single core identified in dataimport.properties"); }else{ singleCore = false; logger.info("<index update process> Multiple cores identified in dataimport.properties. Sync active for: " + cores); } } private void reloadParams(){ p.loadProperties(true); syncEnabled = p.getProperty(SolrDataImportProperties.SYNC_ENABLED); cores = p.getProperty(SolrDataImportProperties.SYNC_CORES); server = p.getProperty(SolrDataImportProperties.SERVER); port = p.getProperty(SolrDataImportProperties.PORT); webapp = p.getProperty(SolrDataImportProperties.WEBAPP); params = p.getProperty(SolrDataImportProperties.PARAMS); interval = p.getProperty(SolrDataImportProperties.INTERVAL); syncCores = cores != null ? cores.split(",") : null; } private void fixParams(String webAppName){ if(server == null || server.isEmpty()) server = "localhost"; if(port == null || port.isEmpty()) port = "8080"; if(webapp == null || webapp.isEmpty()) webapp = webAppName; if(interval == null || interval.isEmpty() || getIntervalInt() <= 0) interval = "30"; } public void run() { try{ // check mandatory params if(server.isEmpty() || webapp.isEmpty() || params == null || params.isEmpty()){ logger.warn("<index update process> Insuficient info provided for data import"); logger.info("<index update process> Reloading global dataimport.properties"); reloadParams(); // single-core }else if(singleCore){ prepUrlSendHttpPost(); // multi-core }else if(syncCores.length == 0 || (syncCores.length == 1 && syncCores[0].isEmpty())){ logger.warn("<index update process> No cores scheduled for data import"); logger.info("<index update process> Reloading global dataimport.properties"); reloadParams(); }else{ for(String core : syncCores){ prepUrlSendHttpPost(core); } } }catch(Exception e){ logger.error("Failed to prepare for sendHttpPost", e); reloadParams(); } } private void prepUrlSendHttpPost(){ String coreUrl = "http://" + server + ":" + port + "/" + webapp + params; sendHttpPost(coreUrl, null); } private void prepUrlSendHttpPost(String coreName){ String coreUrl = "http://" + server + ":" + port + "/" + webapp + "/" + coreName + params; sendHttpPost(coreUrl, coreName); } private void sendHttpPost(String completeUrl, String coreName){ DateFormat df = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss SSS"); Date startTime = new Date(); // prepare the core var String core = coreName == null ? "" : "[" + coreName + "] "; logger.info(core + "<index update process> Process started at .............. " + df.format(startTime)); try{ URL url = new URL(completeUrl); HttpURLConnection conn = (HttpURLConnection)url.openConnection(); conn.setRequestMethod("POST"); conn.setRequestProperty("type", "submit"); conn.setDoOutput(true); // Send HTTP POST conn.connect(); logger.info(core + "<index update process> Request method\t\t\t" + conn.getRequestMethod()); logger.info(core + "<index update process> Succesfully connected to server\t" + server); logger.info(core + "<index update process> Using port\t\t\t" + port); logger.info(core + "<index update process> Application name\t\t\t" + webapp); logger.info(core + "<index update process> URL params\t\t\t" + params); logger.info(core + "<index update process> Full URL\t\t\t\t" + conn.getURL()); logger.info(core + "<index update process> Response message\t\t\t" + conn.getResponseMessage()); logger.info(core + "<index update process> Response code\t\t\t" + conn.getResponseCode()); //listen for change in properties file if an error occurs if(conn.getResponseCode() != 200){ reloadParams(); } conn.disconnect(); logger.info(core + "<index update process> Disconnected from server\t\t" + server); Date endTime = new Date(); logger.info(core + "<index update process> Process ended at ................ " + df.format(endTime)); }catch(MalformedURLException mue){ logger.error("Failed to assemble URL for HTTP POST", mue); }catch(IOException ioe){ logger.error("Failed to connect to the specified URL while trying to send HTTP POST", ioe); }catch(Exception e){ logger.error("Failed to send HTTP POST", e); } } public int getIntervalInt() { try{ return Integer.parseInt(interval); }catch(NumberFormatException e){ logger.warn("Unable to convert 'interval' to number. Using default value (30) instead", e); return 30; //return default in case of error } } }
- apache-solr-dataimporthandler-scheduler.jar (7.5 KB)
- 下载次数: 229
评论
5 楼
overflow_exception
2018-02-26
mark.
4 楼
tfkd丶
2016-07-15
mark.
3 楼
ethan_shan
2015-09-25
Mark.
2 楼
sunny8zhou
2014-03-18
requestHandler 是配置在solrconfig.xml里的吧
1 楼
GraysonHK
2012-12-11
能否提供编译schduler的过程,非常感谢!
发表评论
-
Solr 学习(7) —- Solr Facet
2011-12-30 11:44 35193一、Facet介绍 solr facet 是solr ... -
Solr 学习(6) —- Solr的PHP客户端
2011-12-28 20:57 13839solr查询返回只是xml格式或是json格式,并不像我们平 ... -
Solr 学习(5) —- Solr查询语法和参数
2011-12-28 20:07 696331、查询地址 建立好solr的索引后,可以通过管理界 ... -
Solr 学习(4) —- Solr数据导入 <二>SolrJ
2011-12-28 17:43 8786DIH虽然有不写程序就 ... -
Solr 学习(2) ——Solr配置
2011-12-28 13:34 8572solr配置通过两个文件,一个是solrconfig.xml ... -
Solr 学习(1) —— 搭建环境
2010-11-10 13:11 13685写在前面 2010年参加了一个全文搜索的项目,开始 ...
相关推荐
包括部署、配置、Solr Core、Solr DIH、全量导入、增量导入、索引、中文分词、查询组件、Solr Facet、高亮、查询建议,以及企业如何在真实的项目中使用Solr。不仅讲解了基本概念和使用方法,而且还分析了各组件的...
solr7.4DIH定时增量导入依赖包
包括部署、配置、Solr Core、Solr DIH、全量导入、增量导入、索引、中文分词、查询组件、Solr Facet、高亮、查询建议,以及企业如何在真实的项目中使用Solr。不仅讲解了基本概念和使用方法,而且还分析了各组件的...
solr mysql 数据导入必需类
Solr DIH JDBC 数据源这种在 Solr DIH 之上开发的目标是通过将一些设置外部化到共享配置文件,允许 DIH 配置对各种集合通用。 多个集合可以使用相同的架构和相同的 DIH 设置。 例如,SqlEntityProcessor 的query、...
solr DIH 必须类
java -jar solr-loader3-standalone.jar --upload <config> 配置 配置是使用原生 Clojure 格式在 EDN 文件中指定的。 用于索引的数据库连接参数和实体以及用于提取实体的 SQL 在配置文件中指定。 有关详细信息,请...
1.下载solr 下载地址 http://www.apache.org/dyn/closer.lua/lucene/solr/8.0.0 windows下载zip,linux下载tgz ...将example\example-DIH\solr\db下的文件 copy到/new_core下 将数据库驱动(mysql-cnnector-java-5.1.
Solr3.6用DIH组件进行MySQL数据库全文索引部署包 完整的工程部署包 apache-solr-3.6.0.xml 放入apache-tomcat-7.0.27\conf\Catalina\localhost
使用描述导入机制的简单XML文件,您可以将neo4j数据库与外部数据源同步,例如SQL数据库,CSV / XML / JSON文件... 如何安装 安装此扩展程序仅需三个步骤: 将zip的所有内容解压缩到NEO4J_HOME/plugins文件夹中 ...
Solr3.6用DIH组件进行MySQL数据库全文索引[归纳].pdf
Solr3.6用DIH组件进行MySQL数据库全文索引[参照].pdf
使用 CSV-JDBC 读取 CSV 文件作为 JDBC 数据库的示例 Apache Solr 4.x 核心
此处的代码与 Apache SOLR 的数据导入处理程序 (DIH) 相关,用于将开放库记录导入系统并可进行搜索。 OL 将用作作品标题/作者的初始种子,用于在建立和发布资源集合时为教师提供更快的数据输入。 有一个带有模拟的...