java线程同步机制wait方法(JAVA线程之间如何通信?)java基础 / Java线程同步与通信...

wufei123 发布于 2024-06-22 阅读(5)

#挑战30天在头条写日记#A线程异步调用B线程,整个过程链路是怎样的?比如dubbo调用netty的过程代码是怎样的?比如发送异步MQ消息的代码如何实现?一、一图总结线程交互逻辑:

二、调用方A线程的视角A线程调用B线程之前,A线程把自己存入一个全局的ConcurrentHashMap中,并把Map的key作为参数传给线程B,然后A线程调用Thread的wait()方法进入WAITTING状态,,等到B线程处理完后利用A的Key从Map中取出对应的线程,进行唤醒notify()

三、接收放B线程的视角B线程本身也是一个异步处理,当接收到A的请求后,把A的参数存入处理队列,然后通知A接收成功(A接收到成功的消息后调用Thread的wait()方法进入WAITTING状);B的处理多线程按规则处理队列里面的任务,完成相应的任务后从全局的ConcurrentHashMap中取出A的线程,调用A的notify(),然后A开始运行后告诉B自己成功了,则B完成一次任务

四、线程状态机(供参考用)

五、具体解析5.1 发送同步转异步在网络通信框架,都会面临一个很经典的问题,上游业务调用外部方法是同步的,但是网络请求会设计成异步,当服务端处理完成以后告诉客户端,这个过程如何实现? 举例:业务调用本地的netty客户端发起请求以后阻塞,netty客户端异步发送请求给服务端,当服务端处理完成以后告诉客户端,当客户端收到响应以后,唤醒之前阻塞的业务方调用。

这个过程如何实现?

而rocketMq 在这个地方的实现上代码是比较简洁的(dubbo调用netty的过程也超级简直,有兴趣可以了解)代码如下:代码片段一,同步发送消息的入口:作者:随风 链接:https://www.zhihu.com/question/623539711/answer/3224195091。

来源:知乎 著作权归作者所有商业转载请联系作者获得授权,非商业转载请注明出处 public RemotingCommand invokeSyncImpl(final Channel channel, 。

final RemotingCommand request, finallong timeoutMillis)throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException

{ finalint opaque = request.getOpaque(); try { final ResponseFuture responseFuture =

new ResponseFuture(channel, opaque, timeoutMillis, null, null); this.responseTable.put(opaque, responseFuture);

//------------①final SocketAddress addr = channel.remoteAddress(); channel.writeAndFlush(request).addListener(

new ChannelFutureListener() { @OverridepublicvoidoperationComplete(ChannelFuture f)throws

Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(

true); return; } else { responseFuture.setSendRequestOK(

false); } responseTable.remove(opaque); responseFuture.setCause(f.cause()); responseFuture.putResponse(

null); log.warn("send a request command to channel failed."); } }); RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);

//------②if (null == responseCommand) { if (responseFuture.isSendRequestOK()) {

thrownew RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, responseFuture.getCause()); }

else { thrownew RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause()); } }

return responseCommand; } finally { this.responseTable.remove(opaque); } }

上面代码①处,相当于每次请求的时候,先生成了一个响应对象ResponseFuture(里面的响应内容是空的),然后将其放置responseTable(ConcurrentHashMap)中,并且有个唯一标识opaque,这个东西会传到服务端,后面服务端再带过来。

上面代码②出,相当于要从响应里面拿结果,拿不到就会阻塞到超时下面看下②处里面具体的代码:代码片段二:public RemotingCommand waitResponse(finallong timeoutMillis)。

throws InterruptedException { this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);

returnthis.responseCommand; }可以理解为responseFuture中有一个responseCommand对象,它放置的就是真正的响应结果,但是它只有在countDownLatch.countdown()以后,才不会阻塞在上面的this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS) 这里,所以可以想到,还有另外一个线程在收到服务端响应后会调用countDownLatch.countdown,继续看下文另外一个线程怎么在接收到服务端响应后来处理。

下面看下当客户端收到服务端响应是否会这样呢?代码如下:代码片段三,收到服务端响应入口:作者:随风 链接:https://www.zhihu.com/question/623539711/answer/3224195091

来源:知乎 著作权归作者所有商业转载请联系作者获得授权,非商业转载请注明出处 publicvoidprocessResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd)。

{ finalint opaque = cmd.getOpaque(); final ResponseFuture responseFuture = responseTable.get(opaque);

//---①if (responseFuture != null) { responseFuture.setResponseCommand(cmd); responseTable.remove(opaque);

if (responseFuture.getInvokeCallback() != null) { executeInvokeCallback(responseFuture); }

else { responseFuture.putResponse(cmd); //----② responseFuture.release(); } }

else { log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));

log.warn(cmd.toString()); } }代码片段四:publicvoidputResponse(final RemotingCommand responseCommand)

{ this.responseCommand = responseCommand; this.countDownLatch.countDown(); //----------③

}上面的① 就是客户端拿到响应后,根据全文唯一的标识opaque拿到之前代码片段一放concurrenthashMap中的响应对象上面的② 就是要把结果给放在我们响应对象里面上面的③ 就是在放结果的时候调用countDownLatch.countDown(),那么上

文代码片段二,阻塞的线程得以正常执行5.2. 回调采用支持独立线程池作者:随风 链接:https://www.zhihu.com/question/623539711/answer/3224195091

来源:知乎 著作权归作者所有商业转载请联系作者获得授权,非商业转载请注明出处 privatevoidexecuteInvokeCallback(final ResponseFuture responseFuture)。

{ boolean runInThisThread = false; ExecutorService executor = this.getCallbackExecutor();

//------------1if (executor != null) { try { executor.submit(new Runnable() {

@Overridepublicvoidrun(){ try { responseFuture.executeInvokeCallback();

//------2 } catch (Throwable e) { log.warn("execute callback in executor exception, and callback throw"

, e); } finally { responseFuture.release(); } } }); }

catch (Exception e) { runInThisThread = true; log.warn("execute callback in executor exception, maybe executor busy"

, e); } } else { runInThisThread = true; } if (runInThisThread) {

try { responseFuture.executeInvokeCallback(); } catch (Throwable e) { log.warn(

"executeInvokeCallback Exception", e); } finally { responseFuture.release(); } } }

上面的这段代码是消息发送成功后触发业务方回调的方法,设计上采用:ExecutorService executor = this.getCallbackExecutor(); 这里采用线程池异步处理,可以防止回

调方法的不确定性(超时等)阻碍消息接收的主线程。

发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

河南中青旅行社综合资讯 奇遇综合资讯 盛世蓟州综合资讯 综合资讯 游戏百科综合资讯 新闻11980