`

Neo4j批量插入性能优化

阅读更多

 

一. 原生的远程操作接口

Neo4j原生的直接可用的插入方式有以下几种:

1.    Rest api执行Cypher语句

   public boolean flush()  {
        if (sqlCache.size() != 0) {
            long start = System.currentTimeMillis();
            restAPI.executeBatch(new Process(sqlCache));
            logger.info("Size:{},cost:{}ms", sqlCache.size(), System.currentTimeMillis() - start);
            sqlCache.clear();
        }
        return true;
    }

 

2. Neo4j Jdbc 执行cypher语句

 public boolean flush() {
        if (sqlCache.size() != 0) {
            long start = System.currentTimeMillis();
            try {
                Statement statement = conn.createStatement();
                for(String sql:sqlCache){
                    statement.execute(sql);
                }
            } catch (SQLException e) {
                logger.error("failed to execute sqls",e);
            }
            logger.info("Size:{},cost:{}ms",sqlCache.size(),System.currentTimeMillis() - start);
            sqlCache.clear();
        }
        return true;
    }

 

上面两种方式执行效率差不多,都在500tps左右,其中Rest Api支持批量提交,调用了Neo4j的批量提交接口。如果不做批量提交,大概每次请求消耗在50ms左右。

 

二.嵌入式方式操作数据库

Neo4j还提供了一种嵌入式数据库操作,就是直接操作数据库文件。这种方式处理效率非常高,TPS1W以上。但是如果用这种方法的话,就不能分布式进行处理了。

1.创建开始数据库
String DB_PATH = "e:/zztt.db";
GraphDatabaseService graphDB = new GraphDatabaseFactory().newEmbeddedDatabase(DB_PATH);

//停止数据库
graphDB。shutdown();

 

三.插件方式操作数据库

为了结合以上的两种方式,想到了使用插件来操作数据库。插件方式是指编写插件代码,然后部署到服务器上。这样我们可以自定义请求路径,然后以嵌入式的方式在服务端直接处理数据库。代码如下所示,这种方法的TPS也在1W左右,并且可以分布式提交数据变更请求

 */
@Path("/")
public class UpdateResource {
    @POST
    @Path("/execute")
    public String execute( @Context GraphDatabaseService db,@FormParam("sqls") String sqls) throws IOException {
        if(sqls == null){
            return "error argument";
        }
       Object o =  JSONObject.parse(sqls);
        if(o instanceof List){
            UpdateServer.executeBatch(db,( List<Map<String, Object>>)o);
        }else{
            UpdateServer.execute(db,(Map<String, Object>) o);
        }
        return "success";
    }
}

 

 

 总结:原有的Cypher语句的批量执行的语句过慢,可能还是因为语法解析器效率不高的原因。上次查看过源码,批量提交实际上就是将请求打包,然后在服务端遍历所有请求,每个请求再直接调用jetty.handle方法,和非批量提交的区别就是少了一些网络传输交互时间

  while ((token = jp.nextToken()) != null)
        {
            if (token == JsonToken.START_OBJECT)
            {
                String jobMethod="", jobPath="", jobBody="";
                Integer jobId = null;
                while ((token = jp.nextToken()) != JsonToken.END_OBJECT && token != null )
                {
                    String field = jp.getText();
                    jp.nextToken();
                    switch ( field )
                    {
                    case METHOD_KEY:
                        jobMethod = jp.getText().toUpperCase();
                        break;
                    case TO_KEY:
                        jobPath = jp.getText();
                        break;
                    case ID_KEY:
                        jobId = jp.getIntValue();
                        break;
                    case BODY_KEY:
                        jobBody = readBody( jp );
                        break;
                    }
                }
                // Read one job description. Execute it.
                performRequest( uriInfo, jobMethod, jobPath, jobBody,
                        jobId, httpHeaders, locations, req );
            }
        }

    @Override
    public void invokeDirectly( String targetPath, HttpServletRequest request, HttpServletResponse response )
            throws IOException, ServletException
    {
        jetty.handle( targetPath, (Request) request, request, response );
    }

 

 

 

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics