整体思路说明:
对于并发限制请求,会统计当前的并发数,并发数统计原理:1次请求进入到限流模块incr 1;等请求结束退出时decr 1,当前正在处理的请求数就是并发数
对于QPS限制请求,统计QPS不能按照秒统计(第1s系统就可能就被打挂了),所以QPS得按照毫秒级别去统计,统计的级别越小,性能损耗越大,所以定在10ms-100ms的级别去统计,基本逻辑如下将1s中切成10份,每一份100ms,一个请求进来肯定会落在某一份上,这一份的计数值++,计算当前的QPS,只需要将当前时间所在份的计数和前面9份的技数相加;内存里面需要维护当前秒和前面2秒的数据,数据结构以环形数组为基础EndFragment
简单实例:
初始化上下文(第一次进入的时候才有效,后续都已经有了返回)
ContextUtil.enter("xxxxxx", this.getRequestPlatForm());
初始化
Entry entry = EntryUtil.entry("xxxx);
获取对应的value,然后根据调用链一次执行,修改计数。
@Override
public Entry entry(ResourceWrapper resourceWrapper) throws BlockException {
Context context = ContextUtil.getContext();
if(ContextUtil.isNullContext(context)) {
//空 不需要判断
return new CtEntry(null , context);
}
if(context == null) {
//创建默认
context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", this);
}
if(!Constants.ON) {
return new CtEntry(null, context); // 开关不生效
}
IProcessorValve valves = getProcessorValves(resourceWrapper);
if(valves == null) {
return new CtEntry(null, context);
}
Entry entry = new CtEntry(valves, context);
try{
//规则判断,计数修改
valves.entry(resourceWrapper, context, null);
}catch(BlockException e){
//阻塞的总计数修改
Tracer.trace(BlockException.BLOCK);
entry.exit();
throw e;
}catch(Throwable e) {
//理论上不会有这个异常
log.error("unknow exception", e);
}
return entry;
}
最后一步执行统计流程:
@Override
public void entry(ResourceWrapper resourceWrapper, Context context, DefaultDom dom) throws Throwable {
// 设置originDom
if (!context.getOrigin().equals("")) {
Dom originDom = dom.getResourceDom().getOriginDom(context.getOrigin());
context.getCurEntry().setOriginDom(originDom);
}
try {
// 先执行其他的Valve
super.fireEntry(resourceWrapper, context, dom);
dom.increasePassRequest();
dom.increaseThreadNum();
dom.add();//增加对应的统计数据
// 一条链路,或者说一个context对应一个originDom,有可能originDom不存在(比如:origin为"")
if (context.getCurEntry().getOriginDom() != null) {
context.getCurEntry().getOriginDom().increasePassRequest();
context.getCurEntry().getOriginDom().increaseThreadNum();
context.getCurEntry().getOriginDom().add();
}
} catch (Throwable ex) {
if (ex instanceof BlockException) {
dom.increaseBlockedRequest();
if (context.getCurEntry().getOriginDom() != null) {
context.getCurEntry().getOriginDom().increaseBlockedRequest();
}
}
context.getCurEntry().setError(ex);
throw ex;
}
}
数据增加,最终算法:
public void add(int value) {
if (value > Constants.MAX_TIME_VALUE){
value = Constants.MAX_TIME_VALUE;
}
//如 xxx1111毫秒
long currentTime = TickUtil.currentTimeMillis();
//通过进度计算,对应到具体的精度值。如精度为100ms,xxx11为进度值
long timeGranularity = currentTime / precision;
//根据具体的时间节点值,映射到具体的时间环分片。30个时间分片(保留3s的数据),对应到的分片为11分片。
int index = (int) (timeGranularity % time.length);
do{
int recordPassCnt = passCnt[index].get();
int recordBlockCnt = blockCnt[index].get();
int recordRt = rt[index].get();
long recordTime = time[index].get();
if ( timeGranularity == recordTime ){
//对应分片的统计数据增加
if (value < 0){
if (blockCnt[index].compareAndSet(recordBlockCnt,recordBlockCnt + 1)){
break;
}
}else{
boolean result = rt[index].compareAndSet(recordRt, recordRt + value);
result = result && passCnt[index].compareAndSet(recordPassCnt, recordPassCnt + 1);
if (result || time[index].get() != timeGranularity){
break;
}
}
} else if(timeGranularity > recordTime){//如果超过时间环一圈,如,41对应的分片也为11,需要先清空分片数据,然后再重新统计。
synchronized (time[index]) {
if (timeGranularity > time[index].get()) {
time[index].set(-1);
passCnt[index].set(-1);
blockCnt[index].set(-1);
rt[index].set(-1);
time[index].set(timeGranularity);
if (value < 0) {
passCnt[index].addAndGet(1);
blockCnt[index].addAndGet(2);
rt[index].addAndGet(1);
} else {
passCnt[index].addAndGet(2);
blockCnt[index].addAndGet(1);
rt[index].addAndGet(1 + value);
}
break;
}
}
}else {
break;
}
Thread.yield();
}while(true);
}
数据统计算法:
@Override
public int[] getAvgQpsAndRt() {
long currentTime = TickUtil.currentTimeMillis();
long endTimeGranularity = currentTime / precision;
int index = (int) (endTimeGranularity % time.length);
long startTimeGranularity = endTimeGranularity - sampleCnt;
long totalPassCnt = 0;
long totalBlockCnt = 0;
long totalRt = 0;
//向前统计N个时间片的数据,比如可以统计1s的数据,精度为100ms,则samplCnt值为10,共取10个分片数据,计算qps和rt
for (int i = 0; i < sampleCnt; i++){
long recordTime = time[index].get();
if (recordTime <= endTimeGranularity && recordTime > startTimeGranularity){
int recordPass = passCnt[index].get();
int recordBlock = blockCnt[index].get();
int recordRt = rt[index].get();
if (recordTime == time[index].get()) {
totalPassCnt += recordPass;
totalBlockCnt += recordBlock;
totalRt += recordRt;
}else{
startTimeGranularity = recordTime - 1;
break;
}
} else if (recordTime > endTimeGranularity){
startTimeGranularity = recordTime - 1;
break;
}
index = (index -1 + time.length) % time.length;
}
long duration = precision * sampleCnt;
int[] avgResult = new int[]{0,0,0}; //passCnt, blockCnt, rt
if (duration != 0){
avgResult[0] = (int) (totalPassCnt * 1000 / duration);
avgResult[1] = (int) (totalBlockCnt * 1000 / duration);
avgResult[2] = (int) Math.ceil((double)totalRt / (totalPassCnt == 0 ? 1 : totalPassCnt));
}
return avgResult;
数据限流算法:
执行最终的计数算法之前,首先要执行规则检查:
@Override
public void entry(ResourceWrapper resourceWrapper, Context context, DefaultDom dom) throws Throwable {
// 设置originDom
if (!context.getOrigin().equals("")) {
Dom originDom = dom.getResourceDom().getOriginDom(context.getOrigin());
context.getCurEntry().setOriginDom(originDom);
}
try {
// 先执行其他的Valve,限制性:qps、并发限流等规则检查(详见下:)
super.fireEntry(resourceWrapper, context, dom);
dom.increasePassRequest();
dom.increaseThreadNum();
dom.add();
// 一条链路,或者说一个context对应一个originDom,有可能originDom不存在(比如:origin为"")
if (context.getCurEntry().getOriginDom() != null) {
context.getCurEntry().getOriginDom().increasePassRequest();
context.getCurEntry().getOriginDom().increaseThreadNum();
context.getCurEntry().getOriginDom().add();
}
} catch (Throwable ex) {
if (ex instanceof BlockException) {
dom.increaseBlockedRequest();
if (context.getCurEntry().getOriginDom() != null) {
context.getCurEntry().getOriginDom().increaseBlockedRequest();
}
}
context.getCurEntry().setError(ex);
throw ex;
}
}
//该方法为对应的检查规则的方法
@Override
public boolean checkRule(Context context, DefaultDom dom) {
// 获取此规则的受限应用(可能是来源,本身,或者去向)
String limitApp = this.getLimitApp();
// 如果此规则无限制应用,立即通过
if (limitApp == null) {
return true;
}
// 统计值
long count = -1L;
// 来源限流
String origin = context.getOrigin();
// 按照流向计算
switch (type) {
case FlowConstant.TYPE_RESOURCE:
count = getResourceCount(dom);
break;// 此资源本身的总限流
case FlowConstant.TYPE_ORIGIN:
count = getOriginCount(limitApp, origin, context, dom);
break;// 此资源来源的限流
case FlowConstant.TYPE_DESTINATION:
count = getDestinationCount(limitApp, context, dom);// 此资源去向的限流
}
// 如果当前的值已经=或>阀值,则return false
if (count >= threshold) {
return false;
}
return true;
}
//检查是否超过了qps限流值
private long getOriginCount(final String limitApp, final String origin, final Context context, final DefaultDom dom) {
long count = -1L;
if (limitApp.equals(origin)) {// limitApp与来源相同的限流
count = strategy == FlowConstant.STRATEGY_QPS ? context.getOriginPassedReqQps() : context.getOriginCurThreadNum();
} else if (limitApp.equals(FlowConstant.APP_DEFAULT)) {// 所有的来源,即资源的总限流(也可以设置type字段为resource的类型获得相同结果)
count = strategy == FlowConstant.STRATEGY_QPS ? dom.getResourceDom().passReqQps() : dom.getResourceDom().curThreadNum();
} else if (limitApp.equals(FlowConstant.APP_OTHER) && FlowManager.isOtherOrigin(getIdentity(), origin)) {// 其他的来源限流
count = strategy == FlowConstant.STRATEGY_QPS ? context.getOriginPassedReqQps() : context.getOriginCurThreadNum();
}
return count;
}
相关推荐
简单的单颗TL431限流恒压控制方法 ●当电流增大时TL431-1的电位被太高,从而起到现在电流的功能,因为R3的存在对输出电压进行了补偿.所以基本上可以做到限流稳压功能为一体, 具有相对的成本优势. 应用实例(3) ...
《C#开发实例大全(基础卷)》筛选、汇集了C#开发从基础知识到高级应用各个层面约600个实例及源代码,每个实例都按实例说明、关键技术、设计过程、详尽注释、秘笈心法的顺序进行了分析解读。全书分6篇共25章,主要...
本设计实例是该技术的一种全新的实现方法,能以全波限流模式,从交流电源直接驱动白光LED阵列,从而实现了一种无闪烁、高能效的固态照明灯。电路会在激励电压的正、负偏移期间,采用交替电开关工作方式,将激励电压...
继承是一种联结类的层次模型,并且允许和鼓励类的重用,它提供了一种明确表述共性的方法。对象的一个新类可以从现有的类中派生,这个过程称为类继承。新类继承了原始类的特性,新类称为原始类的派生类(子类),而...
实例087 构造方法的应用 108 5.2 修饰符的使用 109 实例088 单例模式的应用 109 实例089 祖先的止痒药方 110 实例090 统计图书的销售量 111 实例091 汉诺塔问题求解 112 实例092 不能重写的方法 113 5.3 包装类的...
Java数据压缩与传输实例,可以学习一下实例化套按字、得到文件输入流、压缩输入流、文件输出流、实例化缓冲区、写入数据到文件、关闭输入流、关闭套接字关闭输出流、输出错误信息等Java编程小技巧。 Java数组倒置...
这个gem实现了一种简单的机制来限制或限制Ruby中的操作。 安装 将此行添加到您的应用程序的Gemfile中: gem 'ruby-limiter' 然后执行: $ bundle 或将其自己安装为: $ gem install ruby-limiter 用法 基本...
Java数据压缩与传输实例,可以学习一下实例化套按字、得到文件输入流、压缩输入流、文件输出流、实例化缓冲区、写入数据到文件、关闭输入流、关闭套接字关闭输出流、输出错误信息等Java编程小技巧。 Java数组倒置...
继承是一种联结类的层次模型,并且允许和鼓励类的重用,它提供了一种明确表述共性的方法。对象的一个新类可以从现有的类中派生,这个过程称为类继承。新类继承了原始类的特性,新类称为原始类的派生类(子类),而...
IP 组播实际上也是一种VLAN的定义,即认为一个组播组就是一个VLAN,这种划分的方法将VLAN扩大到了广域网,因此这种方法具有更大的灵活性,而且也很容易通过路由器进行扩展,当然这种方法不适合局域网,主要是效率...
《CLR via C#(第3版) 》针对.NET Framework 4.0和多核编程进行了全面更新和修订,是帮助读者深入探索和掌握公共语言运行时、C#和.NET开发的重要参考,同时也是帮助开发人员构建任何一种应用程序(如Microsoft ...
单例模式是一种将类的实例化限制为一个对象的设计模式。 原型模式 使用原型实例指定要创建的对象类型,并通过复制此原型来创建新对象。 工厂模式 定义用于创建对象的接口,但让子类决定实例化哪个类。 工厂方法让一...
Java数据压缩与传输实例,可以学习一下实例化套按字、得到文件输入流、压缩输入流、文件输出流、实例化缓冲区、写入数据到文件、关闭输入流、关闭套接字关闭输出流、输出错误信息等Java编程小技巧。 Java数组倒置...