有的时候博客内容会有变动,首发博客是最新的,其他博客地址可能会未同步, 认准 https://blog.zysicyj.top
全网最细面试题手册,支持艾宾浩斯记忆法。这是一份最全面、最详细、最高质量的 java 面试题,不建议你死记硬背,只要每天复习一遍,有个大概印象就行了。https://store.amazingmemo.com/chapterDetail/1685324709017001`
Dubbo provider 执行超时释放执行线程
背景
支持 provider 根据超时时间进行业务打断
适用场景:对于一个 provider,如果某个操作执行超时,则打断 (释放) 该执行线程,而不是仅仅打印超时日志。
核心处理逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| public class AllChannelHandler2 extends AllChannelHandler {
public static final Timer TIME_OUT_TIMER = new HashedWheelTimer( new NamedThreadFactory("dubbo-server-future-timeout", true), 30, TimeUnit.MILLISECONDS);
public AllChannelHandler2(ChannelHandler handler, URL url) { super(handler, url); }
@Override public void received(Channel channel, Object message) throws RemotingException { ExecutorService executor = getPreferredExecutorService(message); try { Future<?> future = executor.submit(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); long timeout = this.url.getParameter("timeout", 1000) + 90; TIME_OUT_TIMER.newTimeout(t -> { if (!future.isDone() && (!future.isCancelled())) { try { future.cancel(true); } catch (Throwable ex) { } } }, timeout, TimeUnit.MILLISECONDS);
} catch (Throwable t) { if (message instanceof Request && t instanceof RejectedExecutionException) { sendFeedback(channel, (Request) message, t); return; } throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } }
}
|
示例
- 设置 Dubbo ProtocolConfig 线程分发策略为 “all2”。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
|
@Bean public ProtocolConfig protocolConfig() { ProtocolConfig protocolConfig = new ProtocolConfig(); protocolConfig.setName("dubbo"); protocolConfig.setPort(-1); protocolConfig.setTransporter("netty4"); protocolConfig.setThreadpool("fixed"); protocolConfig.setDispatcher("all2"); protocolConfig.setThreads(200); return protocolConfig; }
|
执行超时,直接对业务线程进行打断。即如果 provider 不能及时返回给 counsumer 执行结果,则对执行线程进行打断。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| @Service(interfaceClass = TestService.class,timeout = 1000) public class TestServiceImpl implements TestService { @Override public Integer sum(int a, int b) { CountDownLatch latch = new CountDownLatch(2); AtomicInteger i = new AtomicInteger();
new Thread(()->{ i.incrementAndGet(); latch.countDown(); }).start();
new Thread(()->{ try { TimeUnit.MILLISECONDS.sleep(2000); }catch (InterruptedException e){ e.printStackTrace(); } i.incrementAndGet(); latch.countDown(); }).start();
try { latch.await(); return i.get(); }catch (InterruptedException e){ throw new RuntimeException("call sum timeout"); } } }
|
即对于上述 provider demo,执行最后一个 try catch 时,如果业务线程被超时释放,则捕获 InterruptedException 异常进入 catch 块,返回 “call sum timeout”。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Reference(check = false,interfaceClass = TestService.class,timeout = 3000) private TestService testService;
@GetMapping("/sum") public String consumeSum(){ Integer ret = 0; try{ ret = testService.sum(1,1); }catch (Exception e){ return e.getMessage(); } return String.valueOf(ret); }
|
1 2 3
| curl http://localhost:8081/sum >call sum timeout
|