`

hsqldb源码分析系列3 执行引擎分析 插入过程分析

 
阅读更多

  

   我们看看insert语句的执行过程,

  如果cs.isTransactionStatement()是true,则表示启动事务执行,

      

 public Result executeCompiledStatement(Statement cs, Object[] pvals) {

        Result r;

        if (abortTransaction) {
            rollback(false);

            return Result.newErrorResult(Error.error(ErrorCode.X_40001));
        }

        if (sessionContext.depth > 0) {
            if (sessionContext.noSQL.booleanValue()
                    || cs.isAutoCommitStatement()) {
                return Result.newErrorResult(Error.error(ErrorCode.X_46000));
            }
        }

        if (cs.isAutoCommitStatement()) {
            if (isReadOnly()) {
                return Result.newErrorResult(Error.error(ErrorCode.X_25006));
            }

            try {

                /** special autocommit for backward compatibility */
                commit(false);
            } catch (HsqlException e) {
                database.logger.logInfoEvent("Exception at commit");
            }
        }

        sessionContext.currentStatement = cs;

        boolean isTX = cs.isTransactionStatement();

        if (!isTX) {
            if (database.logger.getSqlEventLogLevel()
                    >= SimpleLog.LOG_NORMAL) {
                sessionContext.setDynamicArguments(pvals);
                database.logger.logStatementEvent(this, cs, pvals,
                                                  SimpleLog.LOG_NORMAL);
            }

            r                               = cs.execute(this);
            sessionContext.currentStatement = null;

            return r;
        }

        while (true) {
            actionIndex = rowActionList.size();

            database.txManager.beginAction(this, cs);

            cs = sessionContext.currentStatement;

            if (cs == null) {
                return Result.newErrorResult(Error.error(ErrorCode.X_07502));
            }

            if (abortTransaction) {
                rollback(false);

                sessionContext.currentStatement = null;

                return Result.newErrorResult(Error.error(ErrorCode.X_40001));
            }

            try {
                latch.await();
            } catch (InterruptedException e) {
                abortTransaction = true;
            }

            if (abortTransaction) {
                rollback(false);

                sessionContext.currentStatement = null;

                return Result.newErrorResult(Error.error(ErrorCode.X_40001));
            }

            database.txManager.beginActionResume(this);

            //        tempActionHistory.add("sql execute " + cs.sql + " " + actionTimestamp + " " + rowActionList.size());
            sessionContext.setDynamicArguments(pvals);

            if (database.logger.getSqlEventLogLevel()
                    >= SimpleLog.LOG_NORMAL) {
                database.logger.logStatementEvent(this, cs, pvals,
                                                  SimpleLog.LOG_NORMAL);
            }

            r             = cs.execute(this);
            lockStatement = sessionContext.currentStatement;

            //        tempActionHistory.add("sql execute end " + actionTimestamp + " " + rowActionList.size());
            endAction(r);

            if (abortTransaction) {
                rollback(false);

                sessionContext.currentStatement = null;

                return Result.newErrorResult(Error.error(r.getException(),
                        ErrorCode.X_40001, null));
            }

            if (redoAction) {
                redoAction = false;

                try {
                    latch.await();
                } catch (InterruptedException e) {
                    abortTransaction = true;
                }
            } else {
                break;
            }
        }

        if (sessionContext.depth == 0
                && (sessionContext.isAutoCommit.booleanValue()
                    || cs.isAutoCommitStatement())) {
            try {
                if (r.mode == ResultConstants.ERROR) {
                    rollback(false);
                } else {
                    commit(false);
                }
            } catch (Exception e) {
                sessionContext.currentStatement = null;

                return Result.newErrorResult(Error.error(ErrorCode.X_40001,
                        e));
            }
        }

        sessionContext.currentStatement = null;

        return r;
    }

   database.txManager.beginAction(this, cs); 事务执行开始阶段, 这里事务处理用简单的方式进行处理,

   事务处理有几种实现,MVCC,2PL两阶段提交,

   

   /**
     * add session to the end of queue when a transaction starts
     * (depending on isolation mode)
     */
    public void beginAction(Session session, Statement cs) {

        if (session.isTransaction) {
            return;
        }

        if (cs == null) {
            return;
        }

        writeLock.lock();

        try {
            if (cs.getCompileTimestamp()
                    < database.schemaManager.getSchemaChangeTimestamp()) {
                cs = session.statementManager.getStatement(session, cs);
                session.sessionContext.currentStatement = cs;

                if (cs == null) {
                    return;
                }
            }

            session.isPreTransaction = true;

            if (!isLockedMode && !cs.isCatalogLock()) {
                return;
            }

            beginActionTPL(session, cs);
        } finally {
            writeLock.unlock();
        }
    }

 锁整个表其实就是在当前的TractionManager对象中保存每个表的session对象,也就是每个表操作的每个session对象,这样就是session当前操作需要对这个表加锁,有读锁,写锁。

    void lockTablesTPL(Session session, Statement cs) {

        if (cs == null || session.abortTransaction) {
            return;
        }

        HsqlName[] nameList = cs.getTableNamesForWrite();

        for (int i = 0; i < nameList.length; i++) {
            HsqlName name = nameList[i];

            if (name.schema == SqlInvariants.SYSTEM_SCHEMA_HSQLNAME) {
                continue;
            }

            tableWriteLocks.put(name, session);
        }

        nameList = cs.getTableNamesForRead();

        for (int i = 0; i < nameList.length; i++) {
            HsqlName name = nameList[i];

            if (name.schema == SqlInvariants.SYSTEM_SCHEMA_HSQLNAME) {
                continue;
            }

            tableReadLocks.put(name, session);
        }
    }

  执行过程 r             = cs.execute(this);

   中间主要部分是StatementInsert类的这个方法

  

 Result getResult(Session session) {

        Result          resultOut          = null;
        RowSetNavigator generatedNavigator = null;
        PersistentStore store              = baseTable.getRowStore(session);
        int             count;

        if (generatedIndexes != null) {
            resultOut = Result.newUpdateCountResult(generatedResultMetaData,
                    0);
            generatedNavigator = resultOut.getChainedResult().getNavigator();
        }

        if (isSimpleInsert) {
            Type[] colTypes = baseTable.getColumnTypes();
            Object[] data = getInsertData(session, colTypes,
                                          insertExpression.nodes[0].nodes);

            return insertSingleRow(session, store, data);
        }

        RowSetNavigator newDataNavigator = queryExpression == null
                                           ? getInsertValuesNavigator(session)
                                           : getInsertSelectNavigator(session);

        count = newDataNavigator.getSize();

        if (count > 0) {
            insertRowSet(session, generatedNavigator, newDataNavigator);
        }

        if (baseTable.triggerLists[Trigger.INSERT_AFTER].length > 0) {
            baseTable.fireTriggers(session, Trigger.INSERT_AFTER,
                                   newDataNavigator);
        }

        if (resultOut == null) {
            resultOut = new Result(ResultConstants.UPDATECOUNT, count);
        } else {
            resultOut.setUpdateCount(count);
        }

        if (count == 0) {
            session.addWarning(HsqlException.noDataCondition);
        }

        return resultOut;
    }

 PersistentStore store              = baseTable.getRowStore(session);就是获取这个表对应的内存存储

    我们用的是内存数据库,则这里store是org.hsqldb.persist.RowStoreAVLMemory@681070这个对象,待会我们再分析,

     终于来到 insertSingleRow插入一条记录操作了

      

 

  好吧,插入一条记录没那么简单,需要触发触发器操作和外键约束检测

   

 Result insertSingleRow(Session session, PersistentStore store,
                           Object[] data) {

        if (baseTable.triggerLists[Trigger.INSERT_BEFORE_ROW].length > 0) {
            baseTable.fireTriggers(session, Trigger.INSERT_BEFORE_ROW, null,
                                   data, null);
        }

        baseTable.insertSingleRow(session, store, data, null);
        performIntegrityChecks(session, baseTable, null, data, null);

        if (session.database.isReferentialIntegrity()) {
            for (int i = 0, size = baseTable.fkConstraints.length; i < size;
                    i++) {
                baseTable.fkConstraints[i].checkInsert(session, baseTable,
                                                       data, true);
            }
        }

        if (baseTable.triggerLists[Trigger.INSERT_AFTER_ROW].length > 0) {
            baseTable.fireTriggers(session, Trigger.INSERT_AFTER_ROW, null,
                                   data, null);
        }

        if (baseTable.triggerLists[Trigger.INSERT_AFTER].length > 0) {
            baseTable.fireTriggers(session, Trigger.INSERT_AFTER,
                                   (RowSetNavigator) null);
        }

        return Result.updateOneResult;
    }

 

  /**
     *  Mid level method for inserting single rows. Performs constraint checks and
     *  fires row level triggers.
     */
    Row insertSingleRow(Session session, PersistentStore store, Object[] data,
                        int[] changedCols) {

        if (identityColumn != -1) {
            setIdentityColumn(session, data);
        }

        if (hasGeneratedValues) {
            setGeneratedColumns(session, data);
        }

        if (hasDomainColumns || hasNotNullColumns) {
            enforceRowConstraints(session, data);
        }

        if (isView) {

            // may have domain column
            return null;
        }

        Row row = (Row) store.getNewCachedObject(session, data, true);

        session.addInsertAction(this, store, row, changedCols);

        return row;
    }

 如果有递增列,则要处理加1操作

  

    /**
     * If there is an identity column in the table, sets
     * the value and/or adjusts the identiy value for the table.
     */
    protected void setIdentityColumn(Session session, Object[] data) {

        if (identityColumn != -1) {
            Number id = (Number) data[identityColumn];

            if (identitySequence.getName() == null) {
                if (id == null) {
                    id = (Number) identitySequence.getValueObject();
                    data[identityColumn] = id;
                } else {
                    identitySequence.userUpdate(id.longValue());
                }
            } else {
                if (id == null) {
                    id = (Number) session.sessionData.getSequenceValue(
                        identitySequence);
                    data[identityColumn] = id;
                }
            }

            if (session != null) {
                session.setLastIdentity(id);
            }
        }
    }

 setGeneratedColumns 处理有表达式的列数据生成规则,

    public void setGeneratedColumns(Session session, Object[] data) {

        if (hasGeneratedValues) {
            for (int i = 0; i < colGenerated.length; i++) {
                if (colGenerated[i]) {
                    Expression e = getColumn(i).getGeneratingExpression();
                    RangeIteratorBase range =
                        session.sessionContext.getCheckIterator(
                            getDefaultRanges()[0]);

                    range.currentData = data;
                    data[i]           = e.getValue(session, colTypes[i]);
                }
            }
        }
    }

 enforceRowConstraints(session, data); 处理列的约束,如果整数是否超过限制等

  

    /**
     *  Enforce max field sizes according to SQL column definition.
     *  SQL92 13.8
     */
    public void enforceRowConstraints(Session session, Object[] data) {

        for (int i = 0; i < columnCount; i++) {
            Type         type = colTypes[i];
            ColumnSchema column;

            if (hasDomainColumns && type.isDomainType()) {
                Constraint[] constraints =
                    type.userTypeModifier.getConstraints();

                column = getColumn(i);

                for (int j = 0; j < constraints.length; j++) {
                    constraints[j].checkCheckConstraint(session, this, column,
                                                        (Object) data[i]);
                }
            }

            if (colNotNull[i] && data[i] == null) {
                String     constraintName;
                Constraint c = getNotNullConstraintForColumn(i);

                if (c == null) {
                    if (ArrayUtil.find(this.primaryKeyCols, i) > -1) {
                        c = this.getPrimaryConstraint();
                    }
                }

                constraintName = c == null ? ""
                                           : c.getName().name;
                column         = getColumn(i);

                String[] info = new String[] {
                    constraintName, tableName.statementName,
                    column.getName().statementName
                };

                throw Error.error(null, ErrorCode.X_23502,
                                  ErrorCode.COLUMN_CONSTRAINT, info);
            }
        }
    }

 

Row row = (Row) store.getNewCachedObject(session, data, true); 申请内存空间

  

    public CachedObject getNewCachedObject(Session session, Object object,
                                           boolean tx) {

        int id;

        synchronized (this) {
            id = rowIdSequence++;
        }

        Row row = new RowAVL(table, (Object[]) object, id, this);

        if (tx) {
            RowAction action = new RowAction(session, table,
                                             RowAction.ACTION_INSERT, row,
                                             null);

            row.rowAction = action;
        }

        return row;
    }

             RowAction action = new RowAction(session, table,

                                             RowAction.ACTION_INSERT, row,

                                             null);

    表操作Represents the chain of insert / delete / rollback / commit actions on a row.

 

    插入之前做的事情太多了,session.addInsertAction(this, store, row, changedCols); 终于来到操作插入的过程了。

     

    public void addInsertAction(Session session, Table table,
                                PersistentStore store, Row row,
                                int[] changedColumns) {

        RowAction action = row.rowAction;

        if (action == null) {
/*
            System.out.println("null insert action " + session + " "
                               + session.actionTimestamp);
*/
            throw Error.runtimeError(ErrorCode.GENERAL_ERROR,
                                     "null insert action ");
        }

        store.indexRow(session, row);
        session.rowActionList.add(action);

        row.rowAction = null;
    }

      用AVL tree保存表的数据信息,索引而且每个节点保存数据,普通的AVL数操作,插入过程还加了写锁

     

  /**
     * Insert a node into the index
     */
    public void insert(Session session, PersistentStore store, Row row) {

        NodeAVL        n;
        NodeAVL        x;
        boolean        isleft        = true;
        int            compare       = -1;
        final Object[] rowData       = row.getData();
        boolean        compareRowId  = !isUnique || hasNulls(session, rowData);
        boolean        compareSimple = isSimple;

        writeLock.lock();

        try {
            n = getAccessor(store);
            x = n;

            if (n == null) {
                store.setAccessor(this, ((RowAVL) row).getNode(position));

                return;
            }

            while (true) {
                Row currentRow = n.row;

                compare = 0;

                if (compareSimple) {
                    compare =
                        colTypes[0].compare(session, rowData[colIndex[0]],
                                            currentRow.getData()[colIndex[0]]);

                    if (compare == 0 && compareRowId) {
                        compare = compareRowForInsertOrDelete(session, row,
                                                              currentRow,
                                                              compareRowId, 1);
                    }
                } else {
                    compare = compareRowForInsertOrDelete(session, row,
                                                          currentRow,
                                                          compareRowId, 0);
                }

                // after the first match and check, all compares are with row id
                if (compare == 0 && session != null && !compareRowId
                        && session.database.txManager.isMVRows()) {
                    if (!isEqualReadable(session, store, n)) {
                        compareRowId = true;
                        compare = compareRowForInsertOrDelete(session, row,
                                                              currentRow,
                                                              compareRowId,
                                                              colIndex.length);
                    }
                }

                if (compare == 0) {
                    if (isConstraint) {
                        Constraint c =
                            ((Table) table).getUniqueConstraintForIndex(this);

                        throw c.getException(row.getData());
                    } else {
                        throw Error.error(ErrorCode.X_23505,
                                          name.statementName);
                    }
                }

                isleft = compare < 0;
                x      = n;
                n      = isleft ? x.nLeft
                                : x.nRight;

                if (n == null) {
                    break;
                }
            }

            x = x.set(store, isleft, ((RowAVL) row).getNode(position));

            balance(store, x, isleft);
        } finally {
            writeLock.unlock();
        }
    }

 

    

 /**
     * Balances part of the tree after an alteration to the index.
     */
    void balance(PersistentStore store, NodeAVL x, boolean isleft) {

        while (true) {
            int sign = isleft ? 1
                              : -1;

            switch (x.iBalance * sign) {

                case 1 :
                    x.iBalance = 0;

                    return;

                case 0 :
                    x.iBalance = -sign;
                    break;

                case -1 :
                    NodeAVL l = isleft ? x.nLeft
                                       : x.nRight;

                    if (l.iBalance == -sign) {
                        x.replace(store, this, l);
                        x.set(store, isleft, l.child(store, !isleft));
                        l.set(store, !isleft, x);

                        x.iBalance = 0;
                        l.iBalance = 0;
                    } else {
                        NodeAVL r = !isleft ? l.nLeft
                                            : l.nRight;

                        x.replace(store, this, r);
                        l.set(store, !isleft, r.child(store, isleft));
                        r.set(store, isleft, l);
                        x.set(store, isleft, r.child(store, !isleft));
                        r.set(store, !isleft, x);

                        int rb = r.iBalance;

                        x.iBalance = (rb == -sign) ? sign
                                                   : 0;
                        l.iBalance = (rb == sign) ? -sign
                                                  : 0;
                        r.iBalance = 0;
                    }

                    return;
            }

            if (x.nParent == null) {
                return;
            }

            isleft = x.nParent == null || x == x.nParent.nLeft;
            x      = x.nParent;
        }
    }

 

   我们看下数据,当前有两条记录的数据

   

 

 

     执行完之后   endAction(r); 事务处理结束,出错则回滚,成功则提交

    

    public void endAction(Result result) {

//        tempActionHistory.add("endAction " + actionTimestamp);
        sessionData.persistentStoreCollection.clearStatementTables();

        if (result.mode == ResultConstants.ERROR) {
            sessionData.persistentStoreCollection.clearResultTables(
                actionTimestamp);
            database.txManager.rollbackAction(this);
        } else {
            sessionContext
                .diagnosticsVariables[ExpressionColumn.idx_row_count] =
                    result.mode == ResultConstants.UPDATECOUNT
                    ? Integer.valueOf(result.getUpdateCount())
                    : ValuePool.INTEGER_0;

            database.txManager.completeActions(this);
        }

//        tempActionHistory.add("endAction ends " + actionTimestamp);
    }

 

    成功的话当前就是清楚transationManager保存的session需要加锁的表信息了

    

void endActionTPL(Session session) {

        if (session.isolationLevel == SessionInterface.TX_REPEATABLE_READ
                || session.isolationLevel
                   == SessionInterface.TX_SERIALIZABLE) {
            return;
        }

        if (session.sessionContext.currentStatement == null) {

            // after java function / proc with db access
            return;
        }

        if (session.sessionContext.depth > 0) {

            // routine or trigger
            return;
        }

        HsqlName[] readLocks =
            session.sessionContext.currentStatement.getTableNamesForRead();

        if (readLocks.length == 0) {
            return;
        }

        writeLock.lock();

        try {
            unlockReadTablesTPL(session, readLocks);

            final int waitingCount = session.waitingSessions.size();

            if (waitingCount == 0) {
                return;
            }

            boolean canUnlock = false;

            // if write lock was used for read lock
            for (int i = 0; i < readLocks.length; i++) {
                if (tableWriteLocks.get(readLocks[i]) != session) {
                    canUnlock = true;

                    break;
                }
            }

            if (!canUnlock) {
                return;
            }

            canUnlock = false;

            for (int i = 0; i < waitingCount; i++) {
                Session current = (Session) session.waitingSessions.get(i);

                if (current.abortTransaction) {
                    canUnlock = true;

                    break;
                }

                Statement currentStatement =
                    current.sessionContext.currentStatement;

                if (currentStatement == null) {
                    canUnlock = true;

                    break;
                }

                if (ArrayUtil.containsAny(
                        readLocks, currentStatement.getTableNamesForWrite())) {
                    canUnlock = true;

                    break;
                }
            }

            if (!canUnlock) {
                return;
            }

            resetLocks(session);
            resetLatchesMidTransaction(session);
        } finally {
            writeLock.unlock();
        }
    }

 

  • 大小: 210.3 KB
  • 大小: 140.4 KB
0
1
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics