一、 Cassandra数据失效机制
现在使用Cassandra的地方很多,由于Cassandra的写的性能很好,所以有一部分使用Cassandra做为类似于日志功能来使用,所
以一个需求就提出来了,那就是希望Cassandra能提供一个自动失效功能,希望Cassandra能保留一定天数后,能自动删除数据。
这种需求的确很常见,但是遗憾的是Cassandra目前仍然不能满足这个需求,虽然Cassandra已经提供了实现这个功能的基础,下面详细看一下Cassandra是怎么删除数据的:在我写的另一篇文章中也介绍了Cassandra删除数据的规则《Cassandra分布式数据库详解》系列文档。
Cassandra判断数据是否有效有三个地方,分别是下面代码片段:
第一段代码
long maxChange = column.mostRecentLiveChangeAt();
return (!column.isMarkedForDelete() || column.getLocalDeletionTime() >
gcBefore || maxChange > column.getMarkedForDeleteAt()) // (1)
&& (!container.isMarkedForDelete() || maxChange >
container.getMarkedForDeleteAt()); //(2)
这段代码判断这个列是否应该被关联,关联有两个条件
(1) 列没有被删除或者删除的时间在有效期以内或者删除的时间在最后修改数据的数据之前
(2) 列所在的容器没有被删除或者列的修改时间在容器删除时间之后
第二段代码
for (byte[] cname : cf.getColumnNames())
{
IColumn c = cf.getColumnsMap().get(cname);
long minTimestamp = Math.max(c.getMarkedForDeleteAt(),
cf.getMarkedForDeleteAt());
for (IColumn subColumn : c.getSubColumns())
{
if (subColumn.timestamp() <= minTimestamp
|| (subColumn.isMarkedForDelete() && subColumn.getLocalDeletionTime()<= gcBefore)){
((SuperColumn)c).remove(subColumn.name());
}}
if (c.getSubColumns().isEmpty() && c.getLocalDeletionTime() <=gcBefore){
cf.remove(c.name());
}}
或
for (byte[] cname : cf.getColumnNames())
{IColumn c = cf.getColumnsMap().get(cname);
if ((c.isMarkedForDelete() && c.getLocalDeletionTime() <= gcBefore)|| c.timestamp() <= cf.getMarkedForDeleteAt()){
cf.remove(cname);
}}
if (cf.getColumnCount() == 0 && cf.getLocalDeletionTime() <=gcBefore){
return null;
}
第二段代码是判断数据是否应该被删除,也有两个或条件
(1) 列已经被删除了并且数据已经过了时效期
(2) 数据的修改时间在容器删除时间之前
当所有列都删除并且容器已失效,这个key就会被删除,key的删除是在SSTable合并的时候完成的
第三段代码是在客户端中
for (IColumn column : columns)
{
if (column.isMarkedForDelete())
{
continue;
}
Column thrift_column = new Column(column.name(), column.value(),
column.timestamp());
thriftColumns.add(createColumnOrSuperColumn_Column(thrift_column));
}
这段代码清楚的说明只要列被删除,客户端将取不到数据
关于gcBefore是在配置文件中设置的864000,很多人以为这个时间就是数据的失效时间,以为在这个时间段内数据可以被使用,从上面的三段代码来看,Cassandra在设计数据失效机制,在实际应用中几乎没有利用价值,也就是数据失效对使用者来说没有任何好处
二、 改造Cassandra实现真正的自动失效功能
从上分析Cassandra判断数据是否失效主要根据三个标识
(1) Column是否被删除标识:isMarkedForDelete
(2)Column的getLocalDeletionTime(),这个时间就是个失效时间GCGraceSeconds比较,判断该Column是否已经失效,这个和前面的条件是并集
(3)Column和所在ColumnFamily的删除时间markedForDeleteAt比较如果小于这个时间,改Column就会被删除
所以我们要实现自动失效数据,不用通过调用remove接口,就能实现。也就是当数据在写到数据库之前,就标识这个数据在GCGraceSeconds时间内有效,一旦超过这个时间,数据被被自动删除,而且是物理删除。
需要改造的地方如下:
A.改造org.apache.cassandra.thrift. ColumnPath类,增加一个
public boolean is_delete;
属性,当我们在调用insert接口是标识这个Column数据是自动失效的。如下所示:
ColumnPath col = new
ColumnPath(columnFamily,superColumn,column.getBytes(“UTF-8″),true);
client.insert(keyspace,key,col,value.getBytes(),System.currentTimeMillis(),
ConsistencyLevel.ONE);
B.改造org.apache.cassandra.thrift. Column类
也增加同样增加is_delete属性,当我们调用batch_insert或batch_mutate接口是同样可以设置Column时支持自动失效的。
A和B是客户端接口需要修改的地方,下面是服务器端要修改的地方:
C.org.apache.cassandra.db.filter. QueryPath类
也增加同样增加is_delete属性,用来保存客户端传过来的is_delete值。并增加一个结构体:
public QueryPath(String columnFamilyName, byte[] superColumnName, byte[] columnName,boolean is_delete) {
this.columnFamilyName = columnFamilyName;
this.superColumnName = superColumnName;
this.columnName = columnName;
this.is_delete = is_delete;
}
在insert、batch_insert和batch_mutate三个接口创建QueryPath对象的地方改成上面这个结构体创建对象
D. org.apache.cassandra.db. ColumnFamily类
修改addColumn方法,将false改为path.is_delete
public void addColumn(QueryPath path, byte[] value, long timestamp)
{
addColumn(path, value, timestamp, path.is_delete);
//addColumn(path, value, timestamp, false);
}
E. org.apache.cassandra.db. Column类
修改getLocalDeletionTime方法,直接去timestamp时间
public int getLocalDeletionTime()
{
assert isMarkedForDelete;
//return ByteBuffer.wrap(value).getInt();
return (int)(timestamp/1000);
}
同时修改comparePriority方法,改变Column替换规则
public long comparePriority(Column o)
{
if(o.timestamp == -1){
return -1;
}
if(this.timestamp == -1){
return 1;
}
if(isMarkedForDelete)
{
// tombstone always wins ties.
return timestamp < o.timestamp ? -1 : 1;
}
return timestamp – o.timestamp;
}
F.
org.apache.cassandra.db. RowMutation类
修改delete方法
public void delete(QueryPath path, long timestamp)
{
assert path.columnFamilyName != null;
String cfName = path.columnFamilyName;
int localDeleteTime = (int) (System.currentTimeMillis() / 1000);
ColumnFamily columnFamily = modifications_.get(cfName);
if (columnFamily == null)
columnFamily = ColumnFamily.create(table_, cfName);
if (path.superColumnName ==
null && path.columnName == null)
{
columnFamily.delete(localDeleteTime, timestamp);
}
else
if (path.columnName == null)
{
SuperColumn sc = new SuperColumn(path.superColumnName, DatabaseDescriptor.getSubComparator(table_, cfName));
sc.markForDeleteAt(localDeleteTime, timestamp);
columnFamily.addColumn(sc);
}
else
{
ByteBuffer bytes = ByteBuffer.allocate(4);
bytes.putInt(localDeleteTime);
long deleteTime = -1;
//columnFamily.addColumn(path, bytes.array(), timestamp, true);
columnFamily.addColumn(path,
bytes.array(), deleteTime, true);
}
modifications_.put(cfName, columnFamily);
}
调用删除接口时将删除时间设为-1,这个和前面的修改的comparePriority方法相适应。
G. 修改CassandraServer类的thriftifyColumns和thriftifySubColumns
却掉isMarkedForDelete检查
public List<ColumnOrSuperColumn>
thriftifyColumns(Collection<IColumn> columns, boolean reverseOrder)
{
ArrayList<ColumnOrSuperColumn> thriftColumns = new
ArrayList<ColumnOrSuperColumn>(columns.size());
for
(IColumn column : columns)
{
/*if (column.isMarkedForDelete())
{
continue;
}*/
Column thrift_column = new Column(column.name(), column.value(), column.timestamp());
thriftColumns.add(createColumnOrSuperColumn_Column(thrift_column));
}
//
we have to do the reversing here, since internally we pass results around in
ColumnFamily
//
objects, which always sort their columns in the “natural” order
//
TODO this is inconvenient for direct users of StorageProxy
if (reverseOrder)
Collections.reverse(thriftColumns);
return thriftColumns;
}
数据有效时间可以通过GCGraceSeconds配置项来设置,这样超过gcBefore时间,客户端就会取不到数据,并且Cassandra 在执行SSTable合并的时候会执行物理删除,如果想立即删除数据可以调用remove接口,数据将会立即被删除,但是在有效时间内被删除的数据客户端仍然能够取到数据。这样就真正实现了数据自动失效和删除的功能。
分享到:
相关推荐
java导出cassandra数据
关于Cassandra数据模型的简单介绍
解Cassandra数据模型
Cassandra对象数据模型的安装包中文安装说明 以及使用例子一步到位哦
NULL 博文链接:https://cjcrobin.iteye.com/blog/1955984
存储数据(cassandra)
Cassandra是一个混合型的非关系的数据库,类似于Google的BigTable。其主要功能比Dynomite(分布式的Key-Value存储系统)更丰富,但支持度却不如文档存储MongoDB(介于关系数据库和非关系数据库之间的开源产品,是非...
主要讲解facebook开发的cassandra的论文。对互联网企业实现key value存储很重要。
它最初由Facebook开发,用于储存收件箱等简单格式数据,集GoogleBigTable的数据模型与Amazon Dynamo的完全分布式的架构于一身Facebook于2008将 Cassandra 开源,此后,由于Cassandra良好的可扩展性,被Digg、Twitter...
2.Cassandra ⼀一致性实现 2.1 CAS 2.2 Quorum读写 2.3 不不⼀一致产⽣生原因 2.4 Hinted handoff 2.5 Read repair 2.6 Manual repair 3.Cassandra应⽤用场景 4.总结 视频是mp4格式,配套文档下载地址...
pillar, Pillar管理你的Cassandra数据存储的迁移 柱用于你的数据存储的支柱管理迁移。Pillar希望自动管理Cassandra模式的愿望。 作为代码管理模式能够实现自动化构建和部署,这是组织努力实现持续交付过程的基本实践...
cassandra 实战cassandra 实战cassandra 实战cassandra 实战cassandra 实战cassandra 实战cassandra 实战cassandra 实战cassandra 实战cassandra 实战cassandra 实战cassandra 实战cassandra 实战cassandra 实战...
它最初由Facebook开发,用于储存收件箱等简单格式数据,集GoogleBigTable的数据模型与Amazon Dynamo的完全分布式的架构于一身Facebook于2008将 Cassandra 开源,此后,由于Cassandra良好的可扩展性,被Digg、Twitter...
Cassandra 的数据rollbock机制
此外,Cassandra 还具有范围查询、列表数据结构、分布式写操作等功能,应用客户包括 Twitter、Facebook 等。 安装 Cassandra 需要进行以下步骤: 1. 下载 Cassandra:从 Apache 官方网站下载 Cassandra 安装包。 2...
nosql 操作,cassandra添加删除操作代码
如果要通过JDBC查询Cassandra数据,但要使用Spark SQL的功能进行数据处理,则需要此应用程序。 此应用程序(CSJB)是Spark应用程序,它将在Spark SQL中自动将所有Cassandra表注册为架构RDD,并启动嵌入式Apache ...
它最初由Facebook开发,用于储存收件箱等简单格式数据,集GoogleBigTable的数据模型与Amazon Dynamo的完全分布式的架构于一身Facebook于2008将 Cassandra 开源,此后,由于Cassandra良好的可扩展性,被Digg、Twitter...
本文是Cassandra数据模型设计第一篇(全两篇),该系列文章包含了eBay使用Cassandra数据模型设计的一些实践。其中一些最佳实践我们是通过社区学到的,有些对我们来说也是新知识,还有一些仍然具有争议性,可能在要...