import com.google.common.util.concurrent.AbstractFuture;
public class ResponseFuture<JsonPRotocol>extends AbstractFuture<JsonProtocol>
{
private final Executorexecutor;
public ResponseFuture()
{
if (ThreadLocalUtil.get("isServer") ==null)
{
//TODO 这里是错误的,把这行代码移到一个单例的全局共享中取,避免每次new。如果是服务端,那么所有的服务端都共享一个线程池
executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
new RpcThreadFacotry("CallBack"));
}
else
{
//单线程执行器
executor = MoreExecutors.directExecutor();
}
}
/**
* 当响应回来的时候,结果被设置到future中,因此从future中可以获得一个异步的响应结果
* @param responseProtocol
*/
public void onResponse(JsonProtocolresponseProtocol)
{
//向future中设置值
super.set(responseProtocol);
}
/**
* 当响应有结果时候可以直接runnable的方法
* @param runnable
*/
public void addCallBack(Runnablerunnable)
{
super.addListener(runnable,executor);
}
}
2、等待线程获得响应结果(使用future.get阻塞等待异步线程的响应)
ResponseFuture<JsonProtocol>reponseFuture = client.futureInvoke(JsonProtocolReqeust);
JsonProtocol JsonProtocolResponse =null;
try
{
JsonProtocolResponse =reponseFuture.get(JsonProtocolReqeust.getRpcMetadata().getTimeOut(),
TimeUnit.MILLISECONDS);
}
catch (ExecutionException e){}
catch (TimeoutException e)
{
throw new RuntimeException("调用远程服务响应超时",e);
}
新闻热点
疑难解答