源码分析Dubbo tps过滤器器实现原理
本文将重点分析一下dubbo限流的另外一个方式,tps过滤器。 @Activate(group = Constants.PROVIDER, value = Constants.TPS_LIMIT_RATE_KEY)
- 过滤器作用 服务调用tps过滤器
- 使用场景 对Dubbo服务提供者实现限流(tps)。
- 阻断条件 当服务调用者超过其TPS时,直接返回rpc exception。 接下来从源码的角度分析Tps过滤器的实现机制。
public class TpsLimitFilter implements Filter {
private final TPSLimiter tpsLimiter = new DefaultTPSLimiter();
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) {
throw new RpcException(
"Failed to invoke service " + invoker.getInterface().getName() + "." + invocation.getMethodName() + " because exceed max service tps.");
}
return invoker.invoke(invocation);
}
}
tps limit 生效的条件是,服务提供者的url中包含了tps=""这个属性,还可以通过<dubbo:parameter="tps.interval" value="60000"/>来设置TPS的统计时长,默认为1分钟,表示如果在1分钟之内的调用次数超过配置的tps,则阻断本次RPC服务调用。 其TPS控制代码主要由DefaultTPSLimiter实现。
public class DefaultTPSLimiter implements TPSLimiter {
private final ConcurrentMap<String, StatItem> stats = new ConcurrentHashMap<String, StatItem>();
@Override
public boolean isAllowable(URL url, Invocation invocation) {
int rate = url.getParameter(Constants.TPS_LIMIT_RATE_KEY, -1); // @1
long interval = url.getParameter(Constants.TPS_LIMIT_INTERVAL_KEY,
Constants.DEFAULT_TPS_LIMIT_INTERVAL);
String serviceKey = url.getServiceKey(); // @2
if (rate > 0) {
StatItem statItem = stats.get(serviceKey);
if (statItem == null) {
stats.putIfAbsent(serviceKey,
new StatItem(serviceKey, rate, interval));
statItem = stats.get(serviceKey);
}
return statItem.isAllowable(); // @3
} else {
StatItem statItem = stats.get(serviceKey);
if (statItem != null) {
stats.remove(serviceKey);
}
}
return true;
}
}
代码@1:获取服务提供者url中的参数tps、tps.interval属性。 代码@2:获取服务key,并创建或获取对应的StatItem。 代码@3:调用StatItem的isAllowable()方法来判断是否可用。 StatItem#isAllowable
public boolean isAllowable() {
long now = System.currentTimeMillis();
if (now > lastResetTime + interval) { // @1
token.set(rate);
lastResetTime = now;
}
int value = token.get();
boolean flag = false;
while (value > 0 && !flag) { // @2
flag = token.compareAndSet(value, value - 1);
value = token.get();
}
return flag;
}
该类的核心思想:是漏桶算法。 代码@1:如果当前时间大于(上一次刷新时间+统计间隔),重新复位token为rate,表示重新生成一批token。 代码@2:每使用一次,消耗一个token,如果能成功消耗一个token则返回true,如果没有可消耗的token,则直接返回false。
Tps过滤器的实现原理其实比较简单,大家可以从这里体会到ConcurrentHashMap、漏桶算法的简易实现。
