`
duming115
  • 浏览: 113359 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

HBase(0.89.20100726) 关于RowLock

阅读更多
看了http://blog.csdn.net/TREND_CDC_SPN/archive/2010/05/04/5557626.aspx 这篇文章后才较仔细的看了下HBase的RowLock这一块,非常感谢趋势科技中国研发中心的热心者.

  HBase(0.89.20100726)中的RowLock使用及实现
  HBase只实现了基于row-key的锁机制。
1.客户端代码
  RowLock rl = table.lockRow ("test".getBytes());
  Put p = new Put(rowkey, rl );
  ....处理
  table.unlockRow (rl);
  //这里应该放在try{..}finally{..}中

2.服务器端是由HRegionServer来实现加锁的过程
a. public long lockRow(byte [] regionName, byte [] row) throws IOException 为实现的方法
核心代码为:
HRegion region = getRegion(regionName);
      Integer r = region.obtainRowLock(row);
      long lockId = addRowLock(r,region);
      return lockId;
  b. 调用的HRegion -->  public Integer obtainRowLock(final byte [] row) throws IOException
  --> private Integer internalObtainRowLock(final byte[] row, boolean waitForLock)
  throws IOException 方法
  核心代码为
  private final Set<byte[]> lockedRows =
    new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
    ....
  synchronized (lockedRows) {
        while (lockedRows.contains(row)) {//如果要锁定的行已经处于锁定的状态
          if (!waitForLock) { //如果不等待,那么直接返回
            return null;
          }
          try {
            lockedRows.wait(); //线程等待
          } catch (InterruptedException ie) {
            // Empty
          }
        }
        // generate a new lockid. Attempt to insert the new [lockid, row].
        // if this lockid already exists in the map then revert and retry
        // We could have first done a lockIds.get, and if it does not exist only
        // then do a lockIds.put, but the hope is that the lockIds.put will
        // mostly return null the first time itself because there won't be
        // too many lockId collisions.
        byte [] prev = null;
        Integer lockId = null;
        do {
          lockId = new Integer(lockIdGenerator++);
          prev = lockIds.put(lockId, row);
          if (prev != null) { //如果lockId之前已经存在,那么还原成之前的状态。
            lockIds.put(lockId, prev);    // revert old value
            lockIdGenerator = rand.nextInt(); // generate new start point
          }
        } while (prev != null); //直到产生一个随机的值,这个值之前没有对应任何的row

        lockedRows.add(row);
        lockedRows.notifyAll(); //这里为会要notifyAll(),原因不明
        return lockId;
      }
c. HRegionServer中的protected long addRowLock(Integer r, HRegion region) throws LeaseStillHeldException方法
{
long lockId = -1L;
   lockId = rand.nextLong(); //这里产生一个随机的值
   String lockName = String.valueOf(lockId);
   rowlocks.put(lockName, r);//直接把这个值放到rowlocks(ConcurrentHashMap<String, Integer>)中应该会有冲突的存在。
   //如果有冲突的话,那么会不会导致操作B的releaseLock()也把之前操作A的锁也释放掉了?
   this.leases.
     createLease(lockName, new RowLockListener(lockName, region));//这里是通过leases来注册一个定时的操作
   return lockId;
   }
   实际上这里返回的是由HRegionServer产生的lockId,这里一直不是很清楚为什么HRegionServer来生成一个随机数(而且还可能有冲突,要解决冲突还要使用锁来解决竞争的问题),如果只是因为只通过HRegion产生的lockId不能保证在HRegionServer范围内的唯一性,再者传递给leases要有一个名称来对应Leases对象,那么使用Region与Region产生的lockId来作为唯一键不是更好?
d. org.apache.hadoop.hbase.Leases.createLease(String leaseName, LeaseListener listener) throws LeaseStillHeldException
Lease lease = new Lease(leaseName, listener,
       System.currentTimeMillis() + leasePeriod);//Lease实现了Delayed接口,leasePeriod就指明这个定时的时间是在多少ms后
       //可以通过hbase.regionserver.lease.period配置,默认为60ms
   synchronized (leaseQueue) {
     if (leases.containsKey(leaseName)) {
       throw new LeaseStillHeldException(leaseName);//这里如果名称冲突会抛出异常
     }
     leases.put(leaseName, lease);
     leaseQueue.add(lease);
   }
  //Leases extends Thread 每间隔leaseCheckFrequency (由hbase.server.thread.wakefrequency定义)扫描一次,调用的细节如下:
   if (lease.getListener() == null) {
        LOG.error("lease listener is null for lease " + lease.getLeaseName());
      } else {
        lease.getListener().leaseExpired();//调用listener的方法,在本例中指RowLockListener
      }
      synchronized (leaseQueue) {
        leases.remove(lease.getLeaseName());
      }
e. org.apache.hadoop.hbase.regionserver.HRegionServer.RowLockListener 的具体实现为
public void leaseExpired() {
     Integer r = rowlocks.remove(this.lockName);//从映射中取那个lockName对应的region中的int lock值,
     if(r != null) {
     //这个值还有不为空的时候?也许unlockRow(byte [] regionName, long lockId)方法调用时
     //rowlocks.remove(lockName);方法调用后还没来的及调用leases.cancelLease(lockName);时,leases开始了扫描
       region.releaseRowLock(r);//如果这个值不为空,释放锁.
     }
   }   
  以上只是获得锁的部分代码.
3.获得锁后的操作中,比如put,get操作。
为了确保锁的持有者对row的锁定,HRegionServer的
public Result get(byte [] regionName, Get get) throws IOException
public void put(final byte [] regionName, final Put put) 方法都调用了
Integer org.apache.hadoop.hbase.regionserver.HRegionServer.getLockFromId(long lockId) 方法
{
String lockName = String.valueOf(lockId);
    Integer rl = rowlocks.get(lockName);
    if (rl == null) {
      throw new IOException("Invalid row lock");
    }
    this.leases.renewLease(lockName);//这里把定时重新刷新一下
  }
  只是HRegion对get()操作忽略了lock锁,HRegion的put操作(包括批量)都调用了
  Integer getLock(Integer lockid, byte[] row, boolean waitForLock) throws IOException方法
  {
  Integer lid = null;
    if (lockid == null) {
      lid = internalObtainRowLock(row, waitForLock);//如果这个操作之前没有先去锁定row,那么这次操作中要先去获得一个锁.
      //这个锁会在操作完成后在finally中清除掉
    } else {
      if (!isRowLocked(lockid)) {//如果之前获得的锁已经过期了,或者这个锁由客户端自己随意写的,那么这里要抛出异常.
        throw new IOException("Invalid row lock");
      }
      lid = lockid;//这里返回用户的锁id,这个锁在本次操作中不会被清除,清除操作只发生在客户端释放锁,
      //或者由HRegionServer中注册的RowLockListener因为超时来释放锁.
    }
    return lid;
  }

1
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics