`

hsqldb源码分析系列2 解析器分析

 
阅读更多

 

  在server类的handleConnection方法中处理客户端的输入,

  首先调用allowConnection方法检测下客户端ip是否合法,ServerAcl类中可以查看,在server启动输入参数中可以指定acl 黑白名单ip文件,检测代码很简单,这里就不说明了,有兴趣的看下ServerAcl类。

  

    protected boolean allowConnection(Socket socket) {

        if (isShuttingDown) {
            return false;
        }

        return (acl == null) ? true
                             : acl.permitAccess(
                                 socket.getInetAddress().getAddress());
    }

   然后通过参数判断server启动模式,是hsql模式还是http模式,其实就是启动端口不一样,然后格式不一样而已。

        if (serverProtocol == ServerConstants.SC_PROTOCOL_HSQL) {
            r   = new ServerConnection(s, this);
            ctn = ((ServerConnection) r).getConnectionThreadName();
        } else {
            r   = new WebServerConnection(s, (WebServer) this);
            ctn = ((WebServerConnection) r).getConnectionThreadName();
        }

        t = new Thread(serverConnectionThreadGroup, r, ctn);

        t.start();

 

启动一个新线程处理客户端连接,我们先看看ServerConnection的处理,每个ServerCOnnection会保存一个线程递增的id,然后往server添加当前连接

        mThread     = mCurrentThread.getAndIncrement();

        synchronized (server.serverConnSet) {
            server.serverConnSet.add(this);
        }

 

  看看run方法,init是处理第一次连接的请求,后面while循环处理该连接的下一个请求,最后close方法关闭连接。

 

  public void run() {

        int msgType;

        init();

        if (session != null) {
            try {
                while (keepAlive) {
                    msgType = dataInput.readByte();

                    if (msgType < ResultConstants.MODE_UPPER_LIMIT) {
                        receiveResult(msgType);
                    } else {
                        receiveOdbcPacket((char) msgType);
                    }
                }
   ......
close();
}
 init方法,handshake 方法是查下当前连接请求时哪种类型的连接

 

 

 private void init() {

        runnerThread = Thread.currentThread();
        keepAlive    = true;

        try {
            socket.setTcpNoDelay(true);

            dataInput = new DataInputStream(
                new BufferedInputStream(socket.getInputStream()));
            dataOutput = new DataOutputStream(socket.getOutputStream());

            int firstInt = handshake();

            switch (streamProtocol) {

                case HSQL_STREAM_PROTOCOL :
                    if (firstInt
                            != ClientConnection
                                .NETWORK_COMPATIBILITY_VERSION_INT) {
                        if (firstInt == -1900000) {
                            firstInt = -2000000;
                        }

                        String verString =
                            ClientConnection.toNetCompVersionString(firstInt);

                        throw Error.error(
                            null, ErrorCode.SERVER_VERSIONS_INCOMPATIBLE, 0,
                            new String[] {
                            verString, HsqlDatabaseProperties.hsqldb_version
                        });
                    }

                    Result resultIn = Result.newResult(dataInput, rowIn);

                    resultIn.readAdditionalResults(session, dataInput, rowIn);

                    Result resultOut;

                    resultOut = setDatabase(resultIn);

                    resultOut.write(session, dataOutput, rowOut);
                    break;

                case ODBC_STREAM_PROTOCOL :
                    odbcConnect(firstInt);
                    break;

                default :

                    // Protocol detection failures should already have been
                    // handled.
                    keepAlive = false;
            }
        } catch (Exception e) {
 前四个字节的最高位表示连接类型,这个整数还表示版本号,

 

 

   int firstInt = dataInput.readInt();

        switch (firstInt >> 24) {

            case 80 :    // Empirically
                server.print(
                    "Rejected attempt from client using hsql HTTP protocol");

                return 0;

            case 0 :

                // For ODBC protocol, this is the first byte of a 4-byte int
                // size.  The size can never be large enough that the first
                // byte will be non-zero.
                streamProtocol = ODBC_STREAM_PROTOCOL;
                break;

            default :
                streamProtocol = HSQL_STREAM_PROTOCOL;

            // HSQL protocol client
        }
 最重要的是中间的输入解析,查询和结果封装部分。具体看看

 

 第一次的处理session为null,后面会赋值,看看Result resultIn = Result.newResult(dataInput, rowIn);的核心部分,第一个字节=LARGE_OBJECT_OP表示是查询大数据对象内容,后面再看具体字段

  

public static Result newResult(DataInput dataInput,
                                   RowInputBinary in)
                                   throws IOException, HsqlException {
        return newResult(null, dataInput.readByte(), dataInput, in);
    }

    public static Result newResult(Session session, int mode,
                                   DataInput dataInput,
                                   RowInputBinary in)
                                   throws IOException, HsqlException {

        try {
            if (mode == ResultConstants.LARGE_OBJECT_OP) {
                return ResultLob.newLob(dataInput, false);
            }

            Result result = newResult(session, dataInput, in, mode);

            return result;
        } catch (IOException e) {
            throw Error.error(ErrorCode.X_08000);
        }
    }
 newResult 先根据不同的请求类型或者各种类型需要的一些字段信息,前4个字节是请求长度,把整个长度内容copy到in中,然后可以用封装好的RowInputBinary读取字符串,整数等信息。 
 private static Result newResult(Session session, DataInput dataInput,
                                    RowInputBinary in,
                                    int mode)
                                    throws IOException, HsqlException {

        Result result = newResult(mode);
        int    length = dataInput.readInt();

        in.resetRow(0, length);

        byte[]    byteArray = in.getBuffer();
        final int offset    = 4;

        dataInput.readFully(byteArray, offset, length - offset);

        switch (mode) {

            case ResultConstants.GETSESSIONATTR :
                result.statementReturnType = in.readByte();
                break;

            case ResultConstants.DISCONNECT :
            case ResultConstants.RESETSESSION :
            case ResultConstants.STARTTRAN :
                break;

            case ResultConstants.PREPARE :
                result.setStatementType(in.readByte());

                result.mainString   = in.readString();
                result.rsProperties = in.readByte();
                result.generateKeys = in.readByte();

                if (result.generateKeys == ResultConstants
                        .RETURN_GENERATED_KEYS_COL_NAMES || result
                        .generateKeys == ResultConstants
                        .RETURN_GENERATED_KEYS_COL_INDEXES) {
                    result.generatedMetaData = new ResultMetaData(in);
                }
                break;

            case ResultConstants.CLOSE_RESULT :
                result.id = in.readLong();
                break;

            case ResultConstants.FREESTMT :
                result.statementID = in.readLong();
                break;

            case ResultConstants.EXECDIRECT :
                result.updateCount         = in.readInt();
                result.fetchSize           = in.readInt();
                result.statementReturnType = in.readByte();
                result.mainString          = in.readString();
                result.rsProperties        = in.readByte();
                result.queryTimeout        = in.readShort();
                result.generateKeys        = in.readByte();

                if (result.generateKeys == ResultConstants
                        .RETURN_GENERATED_KEYS_COL_NAMES || result
                        .generateKeys == ResultConstants
                        .RETURN_GENERATED_KEYS_COL_INDEXES) {
                    result.generatedMetaData = new ResultMetaData(in);
                }
                break;

            case ResultConstants.CONNECT :
                result.databaseName = in.readString();
                result.mainString   = in.readString();
                result.subString    = in.readString();
                result.zoneString   = in.readString();
                result.updateCount  = in.readInt();
                break;

            case ResultConstants.ERROR :
            case ResultConstants.WARNING :
                result.mainString = in.readString();
                result.subString  = in.readString();
                result.errorCode  = in.readInt();
                break;

            case ResultConstants.CONNECTACKNOWLEDGE :
                result.databaseID   = in.readInt();
                result.sessionID    = in.readLong();
                result.databaseName = in.readString();
                result.mainString   = in.readString();
                break;

            case ResultConstants.UPDATECOUNT :
                result.updateCount = in.readInt();
                break;

            case ResultConstants.ENDTRAN : {
                int type = in.readInt();

                result.setActionType(type);                     // endtran type

                switch (type) {

                    case ResultConstants.TX_SAVEPOINT_NAME_RELEASE :
                    case ResultConstants.TX_SAVEPOINT_NAME_ROLLBACK :
                        result.mainString = in.readString();    // savepoint name
                        break;

                    case ResultConstants.TX_COMMIT :
                    case ResultConstants.TX_ROLLBACK :
                    case ResultConstants.TX_COMMIT_AND_CHAIN :
                    case ResultConstants.TX_ROLLBACK_AND_CHAIN :
                        break;

                    default :
                        throw Error.runtimeError(ErrorCode.U_S0500, "Result");
                }

                break;
            }
            case ResultConstants.SETCONNECTATTR : {
                int type = in.readInt();                        // attr type

                result.setConnectionAttrType(type);

                switch (type) {

                    case ResultConstants.SQL_ATTR_SAVEPOINT_NAME :
                        result.mainString = in.readString();    // savepoint name
                        break;

                    //  case ResultConstants.SQL_ATTR_AUTO_IPD :
                    //      - always true
                    //  default: throw - case never happens
                    default :
                        throw Error.runtimeError(ErrorCode.U_S0500, "Result");
                }

                break;
            }
            case ResultConstants.PREPARE_ACK :
                result.statementReturnType = in.readByte();
                result.statementID         = in.readLong();
                result.rsProperties        = in.readByte();
                result.metaData            = new ResultMetaData(in);
                result.parameterMetaData   = new ResultMetaData(in);
                break;

            case ResultConstants.CALL_RESPONSE :
                result.updateCount         = in.readInt();
                result.fetchSize           = in.readInt();
                result.statementID         = in.readLong();
                result.statementReturnType = in.readByte();
                result.rsProperties        = in.readByte();
                result.metaData            = new ResultMetaData(in);
                result.valueData           = readSimple(in, result.metaData);
                break;

            case ResultConstants.EXECUTE :
                result.updateCount  = in.readInt();
                result.fetchSize    = in.readInt();
                result.statementID  = in.readLong();
                result.rsProperties = in.readByte();
                result.queryTimeout = in.readShort();

                Statement statement =
                    session.statementManager.getStatement(session,
                        result.statementID);

                if (statement == null) {

                    // invalid statement
                    result.mode      = ResultConstants.EXECUTE_INVALID;
                    result.valueData = ValuePool.emptyObjectArray;

                    break;
                }

                result.statement = statement;
                result.metaData  = result.statement.getParametersMetaData();
                result.valueData = readSimple(in, result.metaData);
                break;

            case ResultConstants.UPDATE_RESULT : {
                result.id = in.readLong();

                int type = in.readInt();

                result.setActionType(type);

                result.metaData  = new ResultMetaData(in);
                result.valueData = readSimple(in, result.metaData);

                break;
            }
            case ResultConstants.BATCHEXECRESPONSE :
            case ResultConstants.BATCHEXECUTE :
            case ResultConstants.BATCHEXECDIRECT :
            case ResultConstants.SETSESSIONATTR : {
                result.updateCount  = in.readInt();
                result.fetchSize    = in.readInt();
                result.statementID  = in.readLong();
                result.queryTimeout = in.readShort();
                result.metaData     = new ResultMetaData(in);

                result.navigator.readSimple(in, result.metaData);

                break;
            }
            case ResultConstants.PARAM_METADATA : {
                result.metaData = new ResultMetaData(in);

                result.navigator.read(in, result.metaData);

                break;
            }
            case ResultConstants.REQUESTDATA : {
                result.id          = in.readLong();
                result.updateCount = in.readInt();
                result.fetchSize   = in.readInt();

                break;
            }
            case ResultConstants.DATAHEAD :
            case ResultConstants.DATA :
            case ResultConstants.GENERATED : {
                result.id           = in.readLong();
                result.updateCount  = in.readInt();
                result.fetchSize    = in.readInt();
                result.rsProperties = in.readByte();
                result.metaData     = new ResultMetaData(in);
                result.navigator    = new RowSetNavigatorClient();

                result.navigator.read(in, result.metaData);

                break;
            }
            case ResultConstants.DATAROWS : {
                result.metaData  = new ResultMetaData(in);
                result.navigator = new RowSetNavigatorClient();

                result.navigator.read(in, result.metaData);

                break;
            }
            default :
                throw Error.runtimeError(ErrorCode.U_S0500, "Result");
        }

        return result;
    }
 readAdditionalResults 是读取预取数据,取下一部分数据的处理;

 

  第一次是connect,则会调用setdatabse设置连接的session信息(数据库等)

  

   private Result setDatabase(Result resultIn) {

        try {
            String databaseName = resultIn.getDatabaseName();

            dbIndex = server.getDBIndex(databaseName);
            dbID    = server.dbID[dbIndex];
            user    = resultIn.getMainString();

            if (!server.isSilent()) {
                server.printWithThread(mThread + ":Trying to connect user '"
                                       + user + "' to DB (" + databaseName
                                       + ')');
            }

            session = DatabaseManager.newSession(dbID, user,
                                                 resultIn.getSubString(),
                                                 resultIn.getZoneString(),
                                                 resultIn.getUpdateCount());

            if (!server.isSilent()) {
                server.printWithThread(mThread + ":Connected user '" + user
                                       + "'");
            }

            return Result.newConnectionAcknowledgeResponse(
                session.getDatabase(), session.getId(),
                session.getDatabase().getDatabaseID());
        } catch (HsqlException e) {
            session = null;

            return Result.newErrorResult(e);
        } catch (RuntimeException e) {
            session = null;

            return Result.newErrorResult(e);
        }
    }
 DataBaseManager的newsession

 

  

 public static Session newSession(int dbID, String user, String password,
                                     String zoneString, int timeZoneSeconds) {

        Database db = null;

        synchronized (databaseIDMap) {
            db = (Database) databaseIDMap.get(dbID);
        }

        if (db == null) {
            return null;
        }

        Session session = db.connect(user, password, zoneString,
                                     timeZoneSeconds);

        session.isNetwork = true;

        return session;
    }
 最终调用Session类的new Session,默认事务是自动提交的。readonly是对应数据库设置是否为只读
  Session(Database db, User user, boolean autocommit, boolean readonly,
            long id, String zoneString, int timeZoneSeconds) {

        sessionId                   = id;
        database                    = db;
        this.user                   = user;
        this.sessionUser            = user;
        this.zoneString             = zoneString;
        this.sessionTimeZoneSeconds = timeZoneSeconds;
        this.timeZoneSeconds        = timeZoneSeconds;
        rowActionList               = new HsqlArrayList(32, true);
        waitedSessions              = new OrderedHashSet();
        waitingSessions             = new OrderedHashSet();
        tempSet                     = new OrderedHashSet();
        isolationLevelDefault       = database.defaultIsolationLevel;
        isolationLevel              = isolationLevelDefault;
        txConflictRollback          = database.txConflictRollback;
        isReadOnlyDefault           = readonly;
        isReadOnlyIsolation = isolationLevel
                              == SessionInterface.TX_READ_UNCOMMITTED;
        sessionContext              = new SessionContext(this);
        sessionContext.isAutoCommit = autocommit ? Boolean.TRUE
                                                 : Boolean.FALSE;
        sessionContext.isReadOnly   = isReadOnlyDefault ? Boolean.TRUE
                                                        : Boolean.FALSE;
        parser                      = new ParserCommand(this, new Scanner());

        setResultMemoryRowCount(database.getResultMaxMemoryRows());
        resetSchema();

        sessionData      = new SessionData(database, this);
        statementManager = new StatementManager(database);
    }
 sessionManager当然会保存当前的session和session连接数

 

  

        Session s = new Session(db, user, autoCommit, readonly,
                                sessionIdCount, zoneString, timeZoneSeconds);

        sessionMap.put(sessionIdCount, s);

        sessionIdCount++;
 最后write

 

  

    public void write(SessionInterface session, DataOutputStream dataOut,
                      RowOutputInterface rowOut)
                      throws IOException, HsqlException {

        rowOut.reset();
        rowOut.writeByte(mode);

        int startPos = rowOut.size();

        rowOut.writeSize(0);

        switch (mode) {

            case ResultConstants.GETSESSIONATTR :
                rowOut.writeByte(statementReturnType);
                break;

            case ResultConstants.DISCONNECT :
            case ResultConstants.RESETSESSION :
            case ResultConstants.STARTTRAN :
                break;

            case ResultConstants.PREPARE :
                rowOut.writeByte(statementReturnType);
                rowOut.writeString(mainString);
                rowOut.writeByte(rsProperties);
                rowOut.writeByte(generateKeys);

                if (generateKeys == ResultConstants
                        .RETURN_GENERATED_KEYS_COL_NAMES || generateKeys == ResultConstants
                        .RETURN_GENERATED_KEYS_COL_INDEXES) {
                    generatedMetaData.write(rowOut);
                }
                break;

            case ResultConstants.FREESTMT :
                rowOut.writeLong(statementID);
                break;

            case ResultConstants.CLOSE_RESULT :
                rowOut.writeLong(id);
                break;

            case ResultConstants.EXECDIRECT :
                rowOut.writeInt(updateCount);
                rowOut.writeInt(fetchSize);
                rowOut.writeByte(statementReturnType);
                rowOut.writeString(mainString);
                rowOut.writeByte(rsProperties);
                rowOut.writeShort(queryTimeout);
                rowOut.writeByte(generateKeys);

                if (generateKeys == ResultConstants
                        .RETURN_GENERATED_KEYS_COL_NAMES || generateKeys == ResultConstants
                        .RETURN_GENERATED_KEYS_COL_INDEXES) {
                    generatedMetaData.write(rowOut);
                }
                break;

            case ResultConstants.CONNECT :
                rowOut.writeString(databaseName);
                rowOut.writeString(mainString);
                rowOut.writeString(subString);
                rowOut.writeString(zoneString);
                rowOut.writeInt(updateCount);
                break;

            case ResultConstants.ERROR :
            case ResultConstants.WARNING :
                rowOut.writeString(mainString);
                rowOut.writeString(subString);
                rowOut.writeInt(errorCode);
                break;

            case ResultConstants.CONNECTACKNOWLEDGE :
                rowOut.writeInt(databaseID);
                rowOut.writeLong(sessionID);
                rowOut.writeString(databaseName);
                rowOut.writeString(mainString);
                break;

            case ResultConstants.UPDATECOUNT :
                rowOut.writeInt(updateCount);
                break;

            case ResultConstants.ENDTRAN : {
                int type = getActionType();

                rowOut.writeInt(type);                     // endtran type

                switch (type) {

                    case ResultConstants.TX_SAVEPOINT_NAME_RELEASE :
                    case ResultConstants.TX_SAVEPOINT_NAME_ROLLBACK :
                        rowOut.writeString(mainString);    // savepoint name
                        break;

                    case ResultConstants.TX_COMMIT :
                    case ResultConstants.TX_ROLLBACK :
                    case ResultConstants.TX_COMMIT_AND_CHAIN :
                    case ResultConstants.TX_ROLLBACK_AND_CHAIN :
                        break;

                    default :
                        throw Error.runtimeError(ErrorCode.U_S0500, "Result");
                }

                break;
            }
            case ResultConstants.PREPARE_ACK :
                rowOut.writeByte(statementReturnType);
                rowOut.writeLong(statementID);
                rowOut.writeByte(rsProperties);
                metaData.write(rowOut);
                parameterMetaData.write(rowOut);
                break;

            case ResultConstants.CALL_RESPONSE :
                rowOut.writeInt(updateCount);
                rowOut.writeInt(fetchSize);
                rowOut.writeLong(statementID);
                rowOut.writeByte(statementReturnType);
                rowOut.writeByte(rsProperties);
                metaData.write(rowOut);
                writeSimple(rowOut, metaData, (Object[]) valueData);
                break;

            case ResultConstants.EXECUTE :
                rowOut.writeInt(updateCount);
                rowOut.writeInt(fetchSize);
                rowOut.writeLong(statementID);
                rowOut.writeByte(rsProperties);
                rowOut.writeShort(queryTimeout);
                writeSimple(rowOut, metaData, (Object[]) valueData);
                break;

            case ResultConstants.UPDATE_RESULT :
                rowOut.writeLong(id);
                rowOut.writeInt(getActionType());
                metaData.write(rowOut);
                writeSimple(rowOut, metaData, (Object[]) valueData);
                break;

            case ResultConstants.BATCHEXECRESPONSE :
            case ResultConstants.BATCHEXECUTE :
            case ResultConstants.BATCHEXECDIRECT :
            case ResultConstants.SETSESSIONATTR : {
                rowOut.writeInt(updateCount);
                rowOut.writeInt(fetchSize);
                rowOut.writeLong(statementID);
                rowOut.writeShort(queryTimeout);
                metaData.write(rowOut);
                navigator.writeSimple(rowOut, metaData);

                break;
            }
            case ResultConstants.PARAM_METADATA : {
                metaData.write(rowOut);
                navigator.write(rowOut, metaData);

                break;
            }
            case ResultConstants.SETCONNECTATTR : {
                int type = getConnectionAttrType();

                rowOut.writeInt(type);                     // attr type / updateCount

                switch (type) {

                    case ResultConstants.SQL_ATTR_SAVEPOINT_NAME :
                        rowOut.writeString(mainString);    // savepoint name
                        break;

                    // case ResultConstants.SQL_ATTR_AUTO_IPD // always true
                    // default: // throw, but case never happens
                    default :
                        throw Error.runtimeError(ErrorCode.U_S0500, "Result");
                }

                break;
            }
            case ResultConstants.REQUESTDATA : {
                rowOut.writeLong(id);
                rowOut.writeInt(updateCount);
                rowOut.writeInt(fetchSize);

                break;
            }
            case ResultConstants.DATAROWS :
                metaData.write(rowOut);
                navigator.write(rowOut, metaData);
                break;

            case ResultConstants.DATAHEAD :
            case ResultConstants.DATA :
            case ResultConstants.GENERATED :
                rowOut.writeLong(id);
                rowOut.writeInt(updateCount);
                rowOut.writeInt(fetchSize);
                rowOut.writeByte(rsProperties);
                metaData.write(rowOut);
                navigator.write(rowOut, metaData);
                break;

            default :
                throw Error.runtimeError(ErrorCode.U_S0500, "Result");
        }

        rowOut.writeIntData(rowOut.size() - startPos, startPos);
        dataOut.write(rowOut.getOutputStream().getBuffer(), 0, rowOut.size());

        int    count   = getLobCount();
        Result current = this;

        for (int i = 0; i < count; i++) {
            ResultLob lob = current.lobResults;

            lob.writeBody(session, dataOut);

            current = current.lobResults;
        }

        if (chainedResult == null) {
            dataOut.writeByte(ResultConstants.NONE);
        } else {
            chainedResult.write(session, dataOut, rowOut);
        }

        dataOut.flush();
    }
 

 

然后在server的run方法中循环中处理完连接请求之后就可以执行数据库操作了。msgType大于 MODE_UPPER_LIMIT表示不是hsqldb的包类型而是odbc包类型

  

                while (keepAlive) {
                    msgType = dataInput.readByte();

                    if (msgType < ResultConstants.MODE_UPPER_LIMIT) {
                        receiveResult(msgType);
                    } else {
                        receiveOdbcPacket((char) msgType);
                    }
                }
 
 private void receiveResult(int resultMode) throws CleanExit, IOException {

        boolean terminate = false;
        Result resultIn = Result.newResult(session, resultMode, dataInput,
                                           rowIn);

        resultIn.readLobResults(session, dataInput, rowIn);
        server.printRequest(mThread, resultIn);

        Result resultOut = null;

        switch (resultIn.getType()) {

            case ResultConstants.CONNECT : {
                resultOut = setDatabase(resultIn);

                break;
            }
            case ResultConstants.DISCONNECT : {
                resultOut = Result.updateZeroResult;
                terminate = true;

                break;
            }
            case ResultConstants.RESETSESSION : {
                session.resetSession();

                resultOut = Result.updateZeroResult;

                break;
            }
            case ResultConstants.EXECUTE_INVALID : {
                resultOut =
                    Result.newErrorResult(Error.error(ErrorCode.X_07502));

                break;
            }
            default : {
                resultOut = session.execute(resultIn);

                break;
            }
        }

        resultOut.write(session, dataOutput, rowOut);
        rowOut.reset(mainBuffer);
        rowIn.resetRow(mainBuffer.length);

        if (terminate) {
            throw cleanExit;
        }
    }
 session的excute方法执行的主流程

 

   

public synchronized Result execute(Result cmd) {

        if (isClosed) {
            return Result.newErrorResult(Error.error(ErrorCode.X_08503));
        }

        sessionContext.currentMaxRows = 0;
        isBatch                       = false;

        JavaSystem.gc();

        switch (cmd.mode) {

            case ResultConstants.LARGE_OBJECT_OP : {
                return performLOBOperation((ResultLob) cmd);
            }
            case ResultConstants.EXECUTE : {
                int maxRows = cmd.getUpdateCount();

                if (maxRows == -1) {
                    sessionContext.currentMaxRows = 0;
                } else {
                    sessionContext.currentMaxRows = maxRows;
                }

                Statement cs = cmd.statement;

                if (cs == null
                        || cs.compileTimestamp
                           < database.schemaManager.schemaChangeTimestamp) {
                    long csid = cmd.getStatementID();

                    cs = statementManager.getStatement(this, csid);

                    cmd.setStatement(cs);

                    if (cs == null) {

                        // invalid sql has been removed already
                        return Result.newErrorResult(
                            Error.error(ErrorCode.X_07502));
                    }
                }

                Object[] pvals  = (Object[]) cmd.valueData;
                Result   result = executeCompiledStatement(cs, pvals);

                result = performPostExecute(cmd, result);

                return result;
            }
            case ResultConstants.BATCHEXECUTE : {
                isBatch = true;

                Result result = executeCompiledBatchStatement(cmd);

                result = performPostExecute(cmd, result);

                return result;
            }
            case ResultConstants.EXECDIRECT : {
                Result result = executeDirectStatement(cmd);

                result = performPostExecute(cmd, result);

                return result;
            }
            case ResultConstants.BATCHEXECDIRECT : {
                isBatch = true;

                Result result = executeDirectBatchStatement(cmd);

                result = performPostExecute(cmd, result);

                return result;
            }
            case ResultConstants.PREPARE : {
                Statement cs;

                try {
                    cs = statementManager.compile(this, cmd);
                } catch (Throwable t) {
                    String errorString = cmd.getMainString();

                    if (database.getProperties().getErrorLevel()
                            == HsqlDatabaseProperties.NO_MESSAGE) {
                        errorString = null;
                    }

                    return Result.newErrorResult(t, errorString);
                }

                Result result = Result.newPrepareResponse(cs);

                if (cs.getType() == StatementTypes.SELECT_CURSOR
                        || cs.getType() == StatementTypes.CALL) {
                    sessionData.setResultSetProperties(cmd, result);
                }

                result = performPostExecute(cmd, result);

                return result;
            }
            case ResultConstants.CLOSE_RESULT : {
                closeNavigator(cmd.getResultId());

                return Result.updateZeroResult;
            }
            case ResultConstants.UPDATE_RESULT : {
                Result result = this.executeResultUpdate(cmd);

                result = performPostExecute(cmd, result);

                return result;
            }
            case ResultConstants.FREESTMT : {
                statementManager.freeStatement(cmd.getStatementID());

                return Result.updateZeroResult;
            }
            case ResultConstants.GETSESSIONATTR : {
                int id = cmd.getStatementType();

                return getAttributesResult(id);
            }
            case ResultConstants.SETSESSIONATTR : {
                return setAttributes(cmd);
            }
            case ResultConstants.ENDTRAN : {
                switch (cmd.getActionType()) {

                    case ResultConstants.TX_COMMIT :
                        try {
                            commit(false);
                        } catch (Throwable t) {
                            return Result.newErrorResult(t);
                        }
                        break;

                    case ResultConstants.TX_COMMIT_AND_CHAIN :
                        try {
                            commit(true);
                        } catch (Throwable t) {
                            return Result.newErrorResult(t);
                        }
                        break;

                    case ResultConstants.TX_ROLLBACK :
                        rollback(false);
                        break;

                    case ResultConstants.TX_ROLLBACK_AND_CHAIN :
                        rollback(true);
                        break;

                    case ResultConstants.TX_SAVEPOINT_NAME_RELEASE :
                        try {
                            String name = cmd.getMainString();

                            releaseSavepoint(name);
                        } catch (Throwable t) {
                            return Result.newErrorResult(t);
                        }
                        break;

                    case ResultConstants.TX_SAVEPOINT_NAME_ROLLBACK :
                        try {
                            rollbackToSavepoint(cmd.getMainString());
                        } catch (Throwable t) {
                            return Result.newErrorResult(t);
                        }
                        break;
                    case ResultConstants.PREPARECOMMIT :
                        try {
                            prepareCommit();
                        } catch (Throwable t) {
                            return Result.newErrorResult(t);
                        }
                        break;
                }

                return Result.updateZeroResult;
            }
            case ResultConstants.SETCONNECTATTR : {
                switch (cmd.getConnectionAttrType()) {

                    case ResultConstants.SQL_ATTR_SAVEPOINT_NAME :
                        try {
                            savepoint(cmd.getMainString());
                        } catch (Throwable t) {
                            return Result.newErrorResult(t);
                        }

                    // case ResultConstants.SQL_ATTR_AUTO_IPD
                    //   - always true
                    // default: throw - case never happens
                }

                return Result.updateZeroResult;
            }
            case ResultConstants.REQUESTDATA : {
                return sessionData.getDataResultSlice(cmd.getResultId(),
                                                      cmd.getUpdateCount(),
                                                      cmd.getFetchSize());
            }
            case ResultConstants.DISCONNECT : {
                close();

                return Result.updateZeroResult;
            }
            default : {
                return Result.newErrorResult(
                    Error.runtimeError(ErrorCode.U_S0500, "Session"));
            }
        }
    }
 

 

比如使用下面方式执行:

   

String ddl0 =
            "DROP TABLE ADDRESSBOOK IF EXISTS; DROP TABLE ADDRESSBOOK_CATEGORY IF EXISTS; DROP TABLE USER IF EXISTS;";
        String ddl1 =
            "CREATE TABLE USER(USER_ID INTEGER NOT NULL PRIMARY KEY,LOGIN_ID VARCHAR(128) NOT NULL,USER_NAME VARCHAR(254) DEFAULT ' ' NOT NULL,CREATE_DATE TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,UPDATE_DATE TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,LAST_ACCESS_DATE TIMESTAMP,CONSTRAINT IXUQ_LOGIN_ID0 UNIQUE(LOGIN_ID))";
  
        String result1 = "1";
        String result2 = "2";

            stmnt.execute(ddl0);
            stmnt.execute(ddl1);
 则Result内容如下:

 

   

   case ResultConstants.EXECDIRECT :
                result.updateCount         = in.readInt();
                result.fetchSize           = in.readInt();
                result.statementReturnType = in.readByte();
                result.mainString          = in.readString();
                result.rsProperties        = in.readByte();
                result.queryTimeout        = in.readShort();
                result.generateKeys        = in.readByte();

                if (result.generateKeys == ResultConstants
                        .RETURN_GENERATED_KEYS_COL_NAMES || result
                        .generateKeys == ResultConstants
                        .RETURN_GENERATED_KEYS_COL_INDEXES) {
                    result.generatedMetaData = new ResultMetaData(in);
                }
                break;
 mainString就是sql语句,然后就是执行过程了

 

   sesseion的execute方法执行片段为

   

          case ResultConstants.EXECDIRECT : {
                Result result = executeDirectStatement(cmd);

                result = performPostExecute(cmd, result);

                return result;
            }
 来到executeDirectStatement了,
    public Result executeDirectStatement(Result cmd) {

        String        sql = cmd.getMainString();
        HsqlArrayList list;
        int           maxRows = cmd.getUpdateCount();

        if (maxRows == -1) {
            sessionContext.currentMaxRows = 0;
        } else if (sessionMaxRows == 0) {
            sessionContext.currentMaxRows = maxRows;
        } else {
            sessionContext.currentMaxRows = sessionMaxRows;
            sessionMaxRows                = 0;
        }

        try {
            list = parser.compileStatements(sql, cmd);
        } catch (Throwable e) {
            return Result.newErrorResult(e);
        }

        Result result = null;

        for (int i = 0; i < list.size(); i++) {
            Statement cs = (Statement) list.get(i);

            cs.setGeneratedColumnInfo(cmd.getGeneratedResultType(),
                                      cmd.getGeneratedResultMetaData());

            result = executeCompiledStatement(cs, ValuePool.emptyObjectArray);

            if (result.mode == ResultConstants.ERROR) {
                break;
            }
        }

        return result;
    }
 终于看到parser.compileStatements(sql, cmd) 解析sql的语法分析过程了,parser是在new session中初始化的,

 

   ParserCommand类的compileStatements方法

   

 HsqlArrayList compileStatements(String sql, Result cmd) {

        HsqlArrayList list = new HsqlArrayList();
        Statement     cs   = null;

        reset(sql);

        while (true) {
            if (token.tokenType == Tokens.X_ENDPARSE) {
                break;
            }

            try {
                lastError = null;
                cs        = compilePart(cmd.getExecuteProperties());
            } catch (HsqlException e) {
                if (lastError != null && lastError.getLevel() > e.getLevel()) {
                    throw lastError;
                }

                throw e;
            }

            if (!cs.isExplain
                    && cs.getParametersMetaData().getColumnCount() > 0) {
                throw Error.error(ErrorCode.X_42575);
            }

            cs.setCompileTimestamp(
                database.txManager.getGlobalChangeTimestamp());
            list.add(cs);
        }

        int returnType = cmd.getStatementType();

        if (returnType != StatementTypes.RETURN_ANY) {
            int group = cs.getGroup();

            if (group == StatementTypes.X_SQL_DATA) {
                if (returnType == StatementTypes.RETURN_COUNT) {
                    throw Error.error(ErrorCode.X_07503);
                }
            } else if (returnType == StatementTypes.RETURN_RESULT) {
                throw Error.error(ErrorCode.X_07504);
            }
        }

        return list;
    }
 好吧,compilePart里面支持的所有的DDL数据库操作语言
  private Statement compilePart(int props) {

        Statement cs;

        compileContext.reset();
        setParsePosition(getPosition());

        if (token.tokenType == Tokens.X_STARTPARSE) {
            read();
        }

        switch (token.tokenType) {

            // DQL
            case Tokens.WITH :
            case Tokens.OPENBRACKET :
            case Tokens.SELECT :
            case Tokens.TABLE : {
                cs = compileCursorSpecification(RangeGroup.emptyArray, props,
                                                false);

                break;
            }
            case Tokens.VALUES : {
                RangeGroup[] ranges = new RangeGroup[]{
                    new RangeGroupSimple(
                        session.sessionContext.sessionVariablesRange) };

                compileContext.setOuterRanges(ranges);

                cs = compileShortCursorSpecification(props);

                break;
            }

            // DML
            case Tokens.INSERT : {
                cs = compileInsertStatement(RangeGroup.emptyArray);

                break;
            }
            case Tokens.UPDATE : {
                cs = compileUpdateStatement(RangeGroup.emptyArray);

                break;
            }
            case Tokens.MERGE : {
                cs = compileMergeStatement(RangeGroup.emptyArray);

                break;
            }
            case Tokens.DELETE : {
                cs = compileDeleteStatement(RangeGroup.emptyArray);

                break;
            }
            case Tokens.TRUNCATE : {
                cs = compileTruncateStatement();

                break;
            }

            // PROCEDURE
            case Tokens.CALL : {
                cs = compileCallStatement(
                    new RangeGroup[]{ new RangeGroupSimple(
                        session.sessionContext
                            .sessionVariablesRange) }, false);

                break;
            }

            // SQL SESSION
            case Tokens.SET :
                cs = compileSet();
                break;

            // diagnostic
            case Tokens.GET :
                cs = compileGetStatement(
                    session.sessionContext.sessionVariablesRange);
                break;

            case Tokens.START :
                cs = compileStartTransaction();
                break;

            case Tokens.COMMIT :
                cs = compileCommit();
                break;

            case Tokens.ROLLBACK :
                cs = compileRollback();
                break;

            case Tokens.SAVEPOINT :
                cs = compileSavepoint();
                break;

            case Tokens.RELEASE :
                cs = compileReleaseSavepoint();
                break;

            // DDL
            case Tokens.CREATE :
                cs = compileCreate();
                break;

            case Tokens.ALTER :
                cs = compileAlter();
                break;

            case Tokens.DROP :
                cs = compileDrop();
                break;

            case Tokens.GRANT :
            case Tokens.REVOKE :
                cs = compileGrantOrRevoke();
                break;

            case Tokens.COMMENT :
                cs = compileComment();
                break;

            // HSQL SESSION
            case Tokens.LOCK :
                cs = compileLock();
                break;

            case Tokens.CONNECT :
                cs = compileConnect();
                break;

            case Tokens.DISCONNECT :
                cs = compileDisconnect();
                break;

            // HSQL COMMAND
            case Tokens.SCRIPT :
                cs = compileScript();
                break;

            case Tokens.SHUTDOWN :
                cs = compileShutdown();
                break;

            case Tokens.BACKUP :
                cs = compileBackup();
                break;

            case Tokens.CHECKPOINT :
                cs = compileCheckpoint();
                break;

            case Tokens.EXPLAIN : {
                int position = getPosition();

                cs = compileExplainPlan();

                cs.setSQL(getLastPart(position));

                break;
            }
            case Tokens.DECLARE :
                cs = compileDeclare();
                break;

            default :
                throw unexpectedToken();
        }

        // SET_SESSION_AUTHORIZATION is translated dynamically at runtime for logging
        switch (cs.type) {

            // these are set at compile time for logging
            case StatementTypes.COMMIT_WORK :
            case StatementTypes.ROLLBACK_WORK :
            case StatementTypes.SET_USER_PASSWORD :
            case StatementTypes.EXPLAIN_PLAN :
                break;

            default :
                cs.setSQL(getLastPart());
        }

        if (token.tokenType == Tokens.SEMICOLON) {
            read();
        } else if (token.tokenType == Tokens.X_ENDPARSE) {}

        return cs;
    }
 我们的语句是create的,我们看看compileCreate方法

 

   

StatementSchema compileCreate() {

        int     tableType   = TableBase.MEMORY_TABLE;
        boolean isTable     = false;
        boolean isOrReplace = false;

        read();

        switch (token.tokenType) {

            case Tokens.GLOBAL :
                read();
                readThis(Tokens.TEMPORARY);
                readIfThis(Tokens.MEMORY);
                readThis(Tokens.TABLE);

                isTable   = true;
                tableType = TableBase.TEMP_TABLE;
                break;

            case Tokens.TEMP :
                read();
                readThis(Tokens.TABLE);

                isTable   = true;
                tableType = TableBase.TEMP_TABLE;
                break;

            case Tokens.TEMPORARY :
                read();
                readThis(Tokens.TABLE);

                isTable   = true;
                tableType = TableBase.TEMP_TABLE;
                break;

            case Tokens.MEMORY :
                read();
                readThis(Tokens.TABLE);

                isTable = true;
                break;

            case Tokens.CACHED :
                read();
                readThis(Tokens.TABLE);

                isTable   = true;
                tableType = TableBase.CACHED_TABLE;
                break;

            case Tokens.TEXT :
                read();
                readThis(Tokens.TABLE);

                isTable   = true;
                tableType = TableBase.TEXT_TABLE;
                break;

            case Tokens.TABLE :
                read();

                isTable   = true;
                tableType = database.schemaManager.getDefaultTableType();
                break;

            case Tokens.OR :
                if (database.sqlSyntaxOra) {
                    read();
                    readThis(Tokens.REPLACE);

                    switch (token.tokenType) {

                        case Tokens.FUNCTION :
                        case Tokens.PROCEDURE :
                        case Tokens.TRIGGER :
                        case Tokens.TYPE :
                        case Tokens.VIEW :
                            break;

                        default :
                            throw unexpectedToken(Tokens.T_OR);
                    }

                    isOrReplace = true;
                }
            default :
        }

        if (isTable) {
            return compileCreateTable(tableType);
        }

        switch (token.tokenType) {

            // other objects
            case Tokens.ALIAS :
                return compileCreateAlias();

            case Tokens.SEQUENCE :
                return compileCreateSequence();

            case Tokens.SCHEMA :
                return compileCreateSchema();

            case Tokens.TRIGGER :
                return compileCreateTrigger(isOrReplace);

            case Tokens.USER :
                return compileCreateUser();

            case Tokens.ROLE :
                return compileCreateRole();

            case Tokens.VIEW :
                return compileCreateView(false, isOrReplace);

            case Tokens.DOMAIN :
                return compileCreateDomain();

            case Tokens.TYPE :
                return compileCreateType(isOrReplace);

            case Tokens.CHARACTER :
                return compileCreateCharacterSet();

            case Tokens.COLLATION :
                return compileCreateCollation();

            // index
            case Tokens.UNIQUE :
                read();
                checkIsThis(Tokens.INDEX);

                return compileCreateIndex(true);

            case Tokens.INDEX :
                return compileCreateIndex(false);

            case Tokens.AGGREGATE :
            case Tokens.FUNCTION :
            case Tokens.PROCEDURE :
                return compileCreateProcedureOrFunction(isOrReplace);

            default : {
                throw unexpectedToken();
            }
        }
    }
 
    StatementSchema compileCreateTable(int type) {

        boolean ifNot = false;

        if (token.tokenType == Tokens.IF) {
            int position = getPosition();

            read();

            if (token.tokenType == Tokens.NOT) {
                read();
                readThis(Tokens.EXISTS);

                ifNot = true;
            } else {
                rewind(position);
            }
        }

        HsqlName name = readNewSchemaObjectName(SchemaObject.TABLE, false);

        name.setSchemaIfNull(session.getCurrentSchemaHsqlName());

        Table table;

        switch (type) {

            case TableBase.TEMP_TEXT_TABLE :
            case TableBase.TEXT_TABLE : {
                table = new TextTable(database, name, type);

                break;
            }
            default : {
                table = new Table(database, name, type);
            }
        }

        return compileCreateTableBody(table, ifNot);
    }
 我们进入了Table类,database数据库对象,name表名,type表类型compileCreateTableBody编译表字段内容,

 

    StatementSchema compileCreateTableBody(Table table, boolean ifNot) {

        HsqlArrayList tempConstraints = new HsqlArrayList();

        if (token.tokenType == Tokens.AS) {
            return readTableAsSubqueryDefinition(table);
        }

        int position = getPosition();

        readThis(Tokens.OPENBRACKET);

        {
            Constraint c = new Constraint(null, null,
                                          SchemaObject.ConstraintTypes.TEMP);

            tempConstraints.add(c);
        }

        boolean start     = true;
        boolean startPart = true;
        boolean end       = false;

        while (!end) {
            switch (token.tokenType) {

                case Tokens.LIKE : {
                    ColumnSchema[] likeColumns = readLikeTable(table);

                    for (int i = 0; i < likeColumns.length; i++) {
                        table.addColumn(likeColumns[i]);
                    }

                    start     = false;
                    startPart = false;

                    break;
                }
                case Tokens.CONSTRAINT :
                case Tokens.PRIMARY :
                case Tokens.FOREIGN :
                case Tokens.UNIQUE :
                case Tokens.CHECK :
                    if (!startPart) {
                        throw unexpectedToken();
                    }

                    readConstraint(table, tempConstraints);

                    start     = false;
                    startPart = false;
                    break;

                case Tokens.COMMA :
                    if (startPart) {
                        throw unexpectedToken();
                    }

                    read();

                    startPart = true;
                    break;

                case Tokens.CLOSEBRACKET :
                    read();

                    end = true;
                    break;

                default :
                    if (!startPart) {
                        throw unexpectedToken();
                    }

                    checkIsSchemaObjectName();

                    HsqlName hsqlName =
                        database.nameManager.newColumnHsqlName(table.getName(),
                            token.tokenString, isDelimitedIdentifier());

                    read();

                    ColumnSchema newcolumn = readColumnDefinitionOrNull(table,
                        hsqlName, tempConstraints);

                    if (newcolumn == null) {
                        if (start) {
                            rewind(position);

                            return readTableAsSubqueryDefinition(table);
                        } else {
                            throw Error.error(ErrorCode.X_42000);
                        }
                    }

                    table.addColumn(newcolumn);

                    start     = false;
                    startPart = false;
            }
        }

        if (token.tokenType == Tokens.ON) {
            if (!table.isTemp()) {
                throw unexpectedToken();
            }

            read();
            readThis(Tokens.COMMIT);

            if (token.tokenType == Tokens.DELETE) {}
            else if (token.tokenType == Tokens.PRESERVE) {
                table.persistenceScope = TableBase.SCOPE_SESSION;
            }

            read();
            readThis(Tokens.ROWS);
        }

        OrderedHashSet names = new OrderedHashSet();

        names.add(database.getCatalogName());

        for (int i = 0; i < tempConstraints.size(); i++) {
            Constraint c    = (Constraint) tempConstraints.get(i);
            HsqlName   name = c.getMainTableName();

            if (name != null) {
                Table t = database.schemaManager.findUserTable(null,
                    name.name, name.schema.name);

                if (t != null && !t.isTemp()) {
                    names.add(table.getName());
                }
            }
        }

        String     sql            = getLastPart();
        Object[]   args           = new Object[] {
            table, tempConstraints, null, Boolean.valueOf(ifNot)
        };
        HsqlName[] writeLockNames = new HsqlName[names.size()];

        names.toArray(writeLockNames);

        return new StatementSchema(sql, StatementTypes.CREATE_TABLE, args,
                                   null, writeLockNames);
    }
 hsqlName是每个字段的名称,ColumnSchema是每个字段的类型还有约束条件,
 我们看看login_id字段的ColumnSchema,这里isnullable还是为1是有问题的,这是hsqldb的一个bug,不过说新版本已经修复了,http://sourceforge.net/projects/hsqldb/forums/forum/73674/topic/4647209 。


 
 我们看下这条sql最终解析之后的结果,这是Statement对象的Table对象的内容,这里有8个字段,其中user_id字段为NUMBER_TYPE类型,is_primary_key为true,


 
  bug主要在ParseDDL.java类的,不知道为什么要判断database.sqlSyntaxMys,这个not null不只是mysql的语法啊,而且这个值默认是false,无法通过参数设置为true,所以直接忽略not null了。
  
  if (!isGenerated && !isIdentity) {
            if (database.sqlSyntaxMys) {
                switch (token.tokenType) {

                    case Tokens.NULL :
                        read();
                        break;

                    case Tokens.NOT :
                        read();
                        readThis(Tokens.NULL);

                        isNullable = false;
                        break;

                    default :
                }
            }
   这个参数是各个数据库的语法有特殊处理,可以参考http://hsqldb.org/doc/2.0/guide/dbproperties-chapt.html。但是这里没有提到对于not null的处理。
 约束的话有下面这几种
                case Tokens.CONSTRAINT :
                case Tokens.PRIMARY :
                case Tokens.FOREIGN :
                case Tokens.UNIQUE :
                case Tokens.CHECK :
                    if (!startPart) {
                        throw unexpectedToken();
                    }

                    readConstraint(table, tempConstraints);

                    start     = false;
                    startPart = false;
                    break;
 Constraint对象包含 哪一列上的哪种约束类型
                Constraint c =
                    new Constraint(constName, set,
                                   SchemaObject.ConstraintTypes.UNIQUE);
 当然还有
        if (token.tokenType == Tokens.ON) {
            if (!table.isTemp()) {
                throw unexpectedToken();
            }

            read();
            readThis(Tokens.COMMIT);

            if (token.tokenType == Tokens.DELETE) {}
            else if (token.tokenType == Tokens.PRESERVE) {
                table.persistenceScope = TableBase.SCOPE_SESSION;
            }

            read();
            readThis(Tokens.ROWS);
        }
 
接下来就调试执行DDL的查询过程了
  result = executeCompiledStatement(cs, ValuePool.emptyObjectArray);
  • 大小: 24.6 KB
  • 大小: 184 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics