`
yuren1hao
  • 浏览: 2210 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

一种应用限流方法实例

阅读更多

整体思路说明:

    对于并发限制请求,会统计当前的并发数,并发数统计原理:1次请求进入到限流模块incr 1;等请求结束退出时decr 1,当前正在处理的请求数就是并发数

 

对于QPS限制请求,统计QPS不能按照秒统计(第1s系统就可能就被打挂了),所以QPS得按照毫秒级别去统计,统计的级别越小,性能损耗越大,所以定在10ms-100ms的级别去统计,基本逻辑如下1s中切成10份,每一份100ms,一个请求进来肯定会落在某一份上,这一份的计数值++,计算当前的QPS,只需要将当前时间所在份的计数和前面9份的技数相加;内存里面需要维护当前秒和前面2秒的数据,数据结构以环形数组为基础EndFragment

 

简单实例:

 

初始化上下文(第一次进入的时候才有效,后续都已经有了返回)

ContextUtil.enter("xxxxxx", this.getRequestPlatForm());

初始化

Entry entry = EntryUtil.entry("xxxx);

获取对应的value,然后根据调用链一次执行,修改计数。

    @Override

    public Entry entry(ResourceWrapper resourceWrapper) throws BlockException {

        Context context = ContextUtil.getContext();

        if(ContextUtil.isNullContext(context)) {

            //空 不需要判断

            return new CtEntry(null , context);

        }

        if(context == null) {

            //创建默认

            context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", this);

        }

        if(!Constants.ON) {

            return new CtEntry(null, context); // 开关不生效

        }

 

        IProcessorValve valves = getProcessorValves(resourceWrapper);

        if(valves == null) {

            return new CtEntry(null, context);

        }

        Entry entry = new CtEntry(valves, context);

        try{

            //规则判断,计数修改

            valves.entry(resourceWrapper, context, null);

        }catch(BlockException e){

            //阻塞的总计数修改

            Tracer.trace(BlockException.BLOCK);

            entry.exit();

            throw e;

        }catch(Throwable e) {

            //理论上不会有这个异常

            log.error("unknow exception", e);

        }

 

        return entry;

    }

最后一步执行统计流程:

@Override

public void entry(ResourceWrapper resourceWrapper, Context context, DefaultDom dom) throws Throwable {

// 设置originDom

if (!context.getOrigin().equals("")) {

Dom originDom = dom.getResourceDom().getOriginDom(context.getOrigin());

context.getCurEntry().setOriginDom(originDom);

}

try {

// 先执行其他的Valve

super.fireEntry(resourceWrapper, context, dom);

dom.increasePassRequest();

dom.increaseThreadNum();

dom.add();//增加对应的统计数据

// 一条链路,或者说一个context对应一个originDom,有可能originDom不存在(比如:origin为"")

if (context.getCurEntry().getOriginDom() != null) {

context.getCurEntry().getOriginDom().increasePassRequest();

context.getCurEntry().getOriginDom().increaseThreadNum();

context.getCurEntry().getOriginDom().add();

}

} catch (Throwable ex) {

if (ex instanceof BlockException) {

dom.increaseBlockedRequest();

if (context.getCurEntry().getOriginDom() != null) {

context.getCurEntry().getOriginDom().increaseBlockedRequest();

}

}

context.getCurEntry().setError(ex);

throw ex;

}

}

 

 

数据增加,最终算法:

 public void add(int value) {

 

        if (value > Constants.MAX_TIME_VALUE){

            value = Constants.MAX_TIME_VALUE;

        }

        //如 xxx1111毫秒

        long currentTime = TickUtil.currentTimeMillis();

        //通过进度计算,对应到具体的精度值。如精度为100ms,xxx11为进度值

        long timeGranularity = currentTime / precision;

        //根据具体的时间节点值,映射到具体的时间环分片。30个时间分片(保留3s的数据),对应到的分片为11分片。

        int index = (int) (timeGranularity % time.length);

 

        do{

            int recordPassCnt = passCnt[index].get();

            int recordBlockCnt = blockCnt[index].get();

            int recordRt = rt[index].get();

            long recordTime = time[index].get();

 

             if ( timeGranularity == recordTime ){

             //对应分片的统计数据增加

                if (value < 0){

                    if (blockCnt[index].compareAndSet(recordBlockCnt,recordBlockCnt + 1)){

                        break;

                    }

                }else{

                    boolean result = rt[index].compareAndSet(recordRt, recordRt +  value);

                    result = result && passCnt[index].compareAndSet(recordPassCnt, recordPassCnt + 1);

                    if (result || time[index].get() != timeGranularity){

                        break;

                    }

                }

            } else if(timeGranularity > recordTime){//如果超过时间环一圈,如,41对应的分片也为11,需要先清空分片数据,然后再重新统计。

                synchronized (time[index]) {

                    if (timeGranularity > time[index].get()) {

                        time[index].set(-1);

                        passCnt[index].set(-1);

                        blockCnt[index].set(-1);

                        rt[index].set(-1);

 

                        time[index].set(timeGranularity);

 

                        if (value < 0) {

                            passCnt[index].addAndGet(1);

                            blockCnt[index].addAndGet(2);

                            rt[index].addAndGet(1);

                        } else {

                            passCnt[index].addAndGet(2);

                            blockCnt[index].addAndGet(1);

                            rt[index].addAndGet(1 + value);

                        }

                        break;

                    }

                }

            }else {

                 break;

            }

            Thread.yield();

        }while(true);

    }

 

 

数据统计算法:

 

    @Override

    public int[] getAvgQpsAndRt() {

        long currentTime = TickUtil.currentTimeMillis();

        long endTimeGranularity = currentTime / precision;

        int index = (int) (endTimeGranularity % time.length);

        long startTimeGranularity = endTimeGranularity - sampleCnt;

        long totalPassCnt = 0;

        long totalBlockCnt = 0;

        long totalRt = 0;

        //向前统计N个时间片的数据,比如可以统计1s的数据,精度为100ms,则samplCnt值为10,共取10个分片数据,计算qps和rt

        for (int i = 0; i < sampleCnt; i++){

            long recordTime = time[index].get();

            if (recordTime <= endTimeGranularity && recordTime > startTimeGranularity){

                int recordPass =  passCnt[index].get();

                int recordBlock = blockCnt[index].get();

                int recordRt = rt[index].get();

                if (recordTime == time[index].get()) {

                    totalPassCnt += recordPass;

                    totalBlockCnt += recordBlock;

                    totalRt += recordRt;

                }else{

                    startTimeGranularity = recordTime - 1;

                    break;

                }

            } else if (recordTime > endTimeGranularity){

                startTimeGranularity = recordTime - 1;

                break;

            }

            index = (index -1 + time.length) % time.length;

        }

        long duration = precision * sampleCnt;

        int[] avgResult = new int[]{0,0,0}; //passCnt, blockCnt, rt

        if (duration != 0){

            avgResult[0] = (int) (totalPassCnt * 1000 / duration);

            avgResult[1] = (int) (totalBlockCnt * 1000 / duration);

            avgResult[2] = (int) Math.ceil((double)totalRt / (totalPassCnt == 0 ? 1 : totalPassCnt));

        }

        return avgResult;

 

数据限流算法:

 

执行最终的计数算法之前,首先要执行规则检查:

    @Override

public void entry(ResourceWrapper resourceWrapper, Context context, DefaultDom dom) throws Throwable {

// 设置originDom

if (!context.getOrigin().equals("")) {

Dom originDom = dom.getResourceDom().getOriginDom(context.getOrigin());

context.getCurEntry().setOriginDom(originDom);

}

try {

// 先执行其他的Valve,限制性:qps、并发限流等规则检查(详见下:)

super.fireEntry(resourceWrapper, context, dom);

dom.increasePassRequest();

dom.increaseThreadNum();

dom.add();

// 一条链路,或者说一个context对应一个originDom,有可能originDom不存在(比如:origin为"")

if (context.getCurEntry().getOriginDom() != null) {

context.getCurEntry().getOriginDom().increasePassRequest();

context.getCurEntry().getOriginDom().increaseThreadNum();

context.getCurEntry().getOriginDom().add();

}

} catch (Throwable ex) {

if (ex instanceof BlockException) {

dom.increaseBlockedRequest();

if (context.getCurEntry().getOriginDom() != null) {

context.getCurEntry().getOriginDom().increaseBlockedRequest();

}

}

context.getCurEntry().setError(ex);

throw ex;

}

}

 

//该方法为对应的检查规则的方法

@Override

    public boolean checkRule(Context context, DefaultDom dom) {

        // 获取此规则的受限应用(可能是来源,本身,或者去向)

        String limitApp = this.getLimitApp();

        // 如果此规则无限制应用,立即通过

        if (limitApp == null) {

            return true;

        }

        // 统计值

        long count = -1L;

        // 来源限流

        String origin = context.getOrigin();

        // 按照流向计算

        switch (type) {

            case FlowConstant.TYPE_RESOURCE:

                count = getResourceCount(dom);

                break;// 此资源本身的总限流

            case FlowConstant.TYPE_ORIGIN:

                count = getOriginCount(limitApp, origin, context, dom);

                break;// 此资源来源的限流

            case FlowConstant.TYPE_DESTINATION:

                count = getDestinationCount(limitApp, context, dom);// 此资源去向的限流

        }

        // 如果当前的值已经=或>阀值,则return false

        if (count >= threshold) {

            return false;

        }

        return true;

    }

 

 

//检查是否超过了qps限流值

private long getOriginCount(final String limitApp, final String origin, final Context context, final DefaultDom dom) {

        long count = -1L;

        if (limitApp.equals(origin)) {// limitApp与来源相同的限流

            count = strategy == FlowConstant.STRATEGY_QPS ? context.getOriginPassedReqQps() : context.getOriginCurThreadNum();

        } else if (limitApp.equals(FlowConstant.APP_DEFAULT)) {// 所有的来源,即资源的总限流(也可以设置type字段为resource的类型获得相同结果)

            count = strategy == FlowConstant.STRATEGY_QPS ? dom.getResourceDom().passReqQps() : dom.getResourceDom().curThreadNum();

        } else if (limitApp.equals(FlowConstant.APP_OTHER) && FlowManager.isOtherOrigin(getIdentity(), origin)) {// 其他的来源限流

            count = strategy == FlowConstant.STRATEGY_QPS ? context.getOriginPassedReqQps() : context.getOriginCurThreadNum();

        }

        return count;

    }

分享到:
评论

相关推荐

    分享9种实用开关电源应用案例

    简单的单颗TL431限流恒压控制方法 ●当电流增大时TL431-1的电位被太高,从而起到现在电流的功能,因为R3的存在对输出电压进行了补偿.所以基本上可以做到限流稳压功能为一体, 具有相对的成本优势. 应用实例(3) ...

    C#开发实例大全(基础卷).软件开发技术联盟(带详细书签) PDF 下载

    《C#开发实例大全(基础卷)》筛选、汇集了C#开发从基础知识到高级应用各个层面约600个实例及源代码,每个实例都按实例说明、关键技术、设计过程、详尽注释、秘笈心法的顺序进行了分析解读。全书分6篇共25章,主要...

    H桥为LED照明铺新路

    本设计实例是该技术的一种全新的实现方法,能以全波限流模式,从交流电源直接驱动白光LED阵列,从而实现了一种无闪烁、高能效的固态照明灯。电路会在激励电压的正、负偏移期间,采用交替电开关工作方式,将激励电压...

    超级有影响力霸气的Java面试题大全文档

     继承是一种联结类的层次模型,并且允许和鼓励类的重用,它提供了一种明确表述共性的方法。对象的一个新类可以从现有的类中派生,这个过程称为类继承。新类继承了原始类的特性,新类称为原始类的派生类(子类),而...

    Java开发实战1200例(第1卷).(清华出版.李钟尉.陈丹丹).part3

    实例087 构造方法的应用 108 5.2 修饰符的使用 109 实例088 单例模式的应用 109 实例089 祖先的止痒药方 110 实例090 统计图书的销售量 111 实例091 汉诺塔问题求解 112 实例092 不能重写的方法 113 5.3 包装类的...

    JAVA上百实例源码以及开源项目源代码

     Java数据压缩与传输实例,可以学习一下实例化套按字、得到文件输入流、压缩输入流、文件输出流、实例化缓冲区、写入数据到文件、关闭输入流、关闭套接字关闭输出流、输出错误信息等Java编程小技巧。 Java数组倒置...

    limiter:简单的Ruby速率限制机制

    这个gem实现了一种简单的机制来限制或限制Ruby中的操作。 安装 将此行添加到您的应用程序的Gemfile中: gem 'ruby-limiter' 然后执行: $ bundle 或将其自己安装为: $ gem install ruby-limiter 用法 基本...

    java源码包---java 源码 大量 实例

     Java数据压缩与传输实例,可以学习一下实例化套按字、得到文件输入流、压缩输入流、文件输出流、实例化缓冲区、写入数据到文件、关闭输入流、关闭套接字关闭输出流、输出错误信息等Java编程小技巧。 Java数组倒置...

    java 面试题 总结

    继承是一种联结类的层次模型,并且允许和鼓励类的重用,它提供了一种明确表述共性的方法。对象的一个新类可以从现有的类中派生,这个过程称为类继承。新类继承了原始类的特性,新类称为原始类的派生类(子类),而...

    什么是VLAN

     IP 组播实际上也是一种VLAN的定义,即认为一个组播组就是一个VLAN,这种划分的方法将VLAN扩大到了广域网,因此这种方法具有更大的灵活性,而且也很容易通过路由器进行扩展,当然这种方法不适合局域网,主要是效率...

    CLR.via.C#.(中文第3版)(自制详细书签)Part2

    《CLR via C#(第3版) 》针对.NET Framework 4.0和多核编程进行了全面更新和修订,是帮助读者深入探索和掌握公共语言运行时、C#和.NET开发的重要参考,同时也是帮助开发人员构建任何一种应用程序(如Microsoft ...

    node-js-design-patterns:13 种最流行的面向对象设计模式应用于 Node.js

    单例模式是一种将类的实例化限制为一个对象的设计模式。 原型模式 使用原型实例指定要创建的对象类型,并通过复制此原型来创建新对象。 工厂模式 定义用于创建对象的接口,但让子类决定实例化哪个类。 工厂方法让一...

    JAVA上百实例源码以及开源项目

     Java数据压缩与传输实例,可以学习一下实例化套按字、得到文件输入流、压缩输入流、文件输出流、实例化缓冲区、写入数据到文件、关闭输入流、关闭套接字关闭输出流、输出错误信息等Java编程小技巧。 Java数组倒置...

Global site tag (gtag.js) - Google Analytics