首页 > 编程 > Java > 正文

Spring @Async 的使用与实现的示例代码

2019-11-26 11:34:44
字体:
来源:转载
供稿:网友

首先Spring AOP有两个重要的基础接口,Advisor和PointcutAdvisor,接口声明如下:

Advisor接口声明:

public interface Advisor {  Advice getAdvice();  boolean isPerInstance();}

PointcutAdvisor的接口声明:

public interface PointcutAdvisor extends Advisor {  /**   * Get the Pointcut that drives this advisor.   */  Pointcut getPointcut();}

PointcutAdvisor用来获取一个切点以及这个切点的处理器(Advise)。

@Async注解使用后置处理器BeanPostProcessor的子类AsyncAnnotationBeanPostProcessor来实现bean处理 :

AsyncAnnotationAdvisor继承了PointcutAdvisor接口。并且在AsyncAnnotationBeanPostProcessor实现了其父类接口的BeanFactoryAware中的setBeanFactory初始化。Spring一旦创建beanFactory回调成功,就会回调这个方法。保证Advisor对象最先被初始化。

  @Override  public void setBeanFactory(BeanFactory beanFactory) {    super.setBeanFactory(beanFactory);    AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);    if (this.asyncAnnotationType != null) {      advisor.setAsyncAnnotationType(this.asyncAnnotationType);    }    advisor.setBeanFactory(beanFactory);    this.advisor = advisor;  }}

具体的后置处理是通过AsyncAnnotationBeanPostProcessor的后置bean处理是通过其父类AbstractAdvisingBeanPostProcessor来实现的。AbstractAdvisingBeanPostProcessor提供的后置bean处理方法对所有的自定义注解的bean处理方法时通用的。其具体的代码如下:

@Override  public Object postProcessAfterInitialization(Object bean, String beanName) {    if (bean instanceof AopInfrastructureBean) {      // Ignore AOP infrastructure such as scoped proxies.      return bean;    }      /*      * bean对象如果是一个ProxyFactory对象。ProxyFactory继承了AdvisedSupport,而    AdvisedSupport又继承了Advised接口。这个时候就把不同的Advisor添加起来。   *    if (bean instanceof Advised) {      Advised advised = (Advised) bean;      if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {        // Add our local Advisor to the existing proxy's Advisor chain...        if (this.beforeExistingAdvisors) {          advised.addAdvisor(0, this.advisor);        }        else {          advised.addAdvisor(this.advisor);        }        return bean;      }    }    if (isEligible(bean, beanName)) {      ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);      if (!proxyFactory.isProxyTargetClass()) {        evaluateProxyInterfaces(bean.getClass(), proxyFactory);      }      proxyFactory.addAdvisor(this.advisor);      customizeProxyFactory(proxyFactory);      return proxyFactory.getProxy(getProxyClassLoader());    }

可以看得出来,isEligible用于判断这个类或者这个类中的某个方法是否含有注解。这个方法最终进入到AopUtils的canApply方法中间:

public static boolean canApply(Advisor advisor, Class<?> targetClass, boolean hasIntroductions) {    if (advisor instanceof IntroductionAdvisor) {      return ((IntroductionAdvisor) advisor).getClassFilter().matches(targetClass);    }    else if (advisor instanceof PointcutAdvisor) {      PointcutAdvisor pca = (PointcutAdvisor) advisor;      return canApply(pca.getPointcut(), targetClass, hasIntroductions);    }    else {      // It doesn't have a pointcut so we assume it applies.      return true;    }  }

这里的advisor就是AsyncAnnotationAdvisor对象。然后调用AsyncAnnotationAdvisor对象的getPointcut()方法,得到了Pointcut对象。在AOP规范中间,表示一个具体的切点。那么在方法上注释@Async注解,就意味着声明了一个切点。

然后再根据Pointcut判断是否含有指定的注解。

切点的执行

由于生成了JDK动态代理对象,那么每一个方法的执行必然进入到JdkDynamicAopProxy中的invoke方法中间去执行:

@Override  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {    MethodInvocation invocation;    Object oldProxy = null;    boolean setProxyContext = false;    TargetSource targetSource = this.advised.targetSource;    Class<?> targetClass = null;    Object target = null;    try {      if (!this.equalsDefined && AopUtils.isEqualsMethod(method)) {        // The target does not implement the equals(Object) method itself.        return equals(args[0]);      }      else if (!this.hashCodeDefined && AopUtils.isHashCodeMethod(method)) {        // The target does not implement the hashCode() method itself.        return hashCode();      }      else if (method.getDeclaringClass() == DecoratingProxy.class) {        // There is only getDecoratedClass() declared -> dispatch to proxy config.        return AopProxyUtils.ultimateTargetClass(this.advised);      }      else if (!this.advised.opaque && method.getDeclaringClass().isInterface() &&          method.getDeclaringClass().isAssignableFrom(Advised.class)) {        // Service invocations on ProxyConfig with the proxy config...        return AopUtils.invokeJoinpointUsingReflection(this.advised, method, args);      }      Object retVal;      if (this.advised.exposeProxy) {        // Make invocation available if necessary.        oldProxy = AopContext.setCurrentProxy(proxy);        setProxyContext = true;      }      // May be null. Get as late as possible to minimize the time we "own" the target,      // in case it comes from a pool.      target = targetSource.getTarget();      if (target != null) {        targetClass = target.getClass();      }      // Get the interception chain for this method.      List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);      // Check whether we have any advice. If we don't, we can fallback on direct      // reflective invocation of the target, and avoid creating a MethodInvocation.      if (chain.isEmpty()) {        // We can skip creating a MethodInvocation: just invoke the target directly        // Note that the final invoker must be an InvokerInterceptor so we know it does        // nothing but a reflective operation on the target, and no hot swapping or fancy proxying.        Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);        retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse);      }      else {        // We need to create a method invocation...        invocation = new ReflectiveMethodInvocation(proxy, target, method, args, targetClass, chain);        // Proceed to the joinpoint through the interceptor chain.        retVal = invocation.proceed();      }      // Massage return value if necessary.      Class<?> returnType = method.getReturnType();      if (retVal != null && retVal == target && returnType.isInstance(proxy) &&          !RawTargetAccess.class.isAssignableFrom(method.getDeclaringClass())) {        // Special case: it returned "this" and the return type of the method        // is type-compatible. Note that we can't help if the target sets        // a reference to itself in another returned object.        retVal = proxy;      }      else if (retVal == null && returnType != Void.TYPE && returnType.isPrimitive()) {        throw new AopInvocationException(            "Null return value from advice does not match primitive return type for: " + method);      }      return retVal;    }    finally {      if (target != null && !targetSource.isStatic()) {        // Must have come from TargetSource.        targetSource.releaseTarget(target);      }      if (setProxyContext) {        // Restore old proxy.        AopContext.setCurrentProxy(oldProxy);      }    }  }

重点的执行语句:

// 获取拦截器      List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);      // Check whether we have any advice. If we don't, we can fallback on direct      // reflective invocation of the target, and avoid creating a MethodInvocation.      if (chain.isEmpty()) {        // We can skip creating a MethodInvocation: just invoke the target directly        // Note that the final invoker must be an InvokerInterceptor so we know it does        // nothing but a reflective operation on the target, and no hot swapping or fancy proxying.        Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);        retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse);      }      else {              // 根据拦截器来执行        invocation = new ReflectiveMethodInvocation(proxy, target, method, args, targetClass, chain);        // Proceed to the joinpoint through the interceptor chain.        retVal = invocation.proceed();      }

@Async注解的拦截器是AsyncExecutionInterceptor,它继承了MethodInterceptor接口。而MethodInterceptor就是AOP规范中的Advice(切点的处理器)。

自定义注解

由于其bean处理器是通用的,所以只要实现PointcutAdvisor和具体的处理器就好了。首先自定义一个注解,只要方法加入了这个注解,就可以输出这个方法的开始时间和截止时间,注解的名字叫做@Log:

@Target({ElementType.METHOD, ElementType.TYPE})@Retention(RetentionPolicy.RUNTIME)@Documentedpublic @interface Log {}

定义一个简单的方法用于测试:

public interface IDemoService {  void add(int a, int b);  String getName();}@Servicepublic class DemoServiceImpl implements IDemoService {    @Log    public void add(int a, int b) {    System.out.println(Thread.currentThread().getName());    System.out.println(a + b);  }  @Override  public String getName() {    System.out.println("DemoServiceImpl.getName");    return "DemoServiceImpl";  }}

定义Advisor:

public class LogAnnotationAdvisor extends AbstractPointcutAdvisor {  private Advice advice;  private Pointcut pointcut;  public LogAnnotationAdvisor() {    this.advice = new LogAnnotationInterceptor();  }  @Override  public Advice getAdvice() {    return this.advice;  }  @Override  public boolean isPerInstance() {    return false;  }  @Override  public Pointcut getPointcut() {    return this.pointcut;  }  public void setAsyncAnnotationType(Class<? extends Annotation> asyncAnnotationType) {    Assert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null");    Set<Class<? extends Annotation>> asyncAnnotationTypes = new HashSet<Class<? extends Annotation>>();    asyncAnnotationTypes.add(asyncAnnotationType);    this.pointcut = buildPointcut(asyncAnnotationTypes);  }  protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {    ComposablePointcut result = null;    for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {      Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);      Pointcut mpc = AnnotationMatchingPointcut.forMethodAnnotation(asyncAnnotationType);      if (result == null) {        result = new ComposablePointcut(cpc).union(mpc);      } else {        result.union(cpc).union(mpc);      }    }    return result;  }}

定义具体的处理器:

public class LogAnnotationInterceptor implements MethodInterceptor, Ordered {  @Override  public int getOrder() {    return Ordered.HIGHEST_PRECEDENCE;  }  @Override  public Object invoke(MethodInvocation invocation) throws Throwable {    System.out.println("开始执行");    Object result = invocation.proceed();    System.out.println("结束执行");    return result;  }}

定义@Log专属的BeanPostProcesser对象:

@SuppressWarnings("serial")@Servicepublic class LogAnnotationBeanPostProcesser extends AbstractBeanFactoryAwareAdvisingPostProcessor {  @Override  public void setBeanFactory(BeanFactory beanFactory) {    super.setBeanFactory(beanFactory);    LogAnnotationAdvisor advisor = new LogAnnotationAdvisor();    advisor.setAsyncAnnotationType(Log.class);    this.advisor = advisor;  }}

对bean的后置处理方法直接沿用其父类的方法。当然也可以自定义其后置处理方法,那么就需要自己判断这个对象的方法是否含有注解,并且生成代理对象:

@Override  public Object postProcessAfterInitialization(Object bean, String beanName) {    Method[] methods = ReflectionUtils.getAllDeclaredMethods(bean.getClass());    for (Method method : methods) {      if (method.isAnnotationPresent(Log.class)) {        ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);        System.out.println(proxyFactory);        if (!proxyFactory.isProxyTargetClass()) {          evaluateProxyInterfaces(bean.getClass(), proxyFactory);        }        proxyFactory.addAdvisor(this.advisor);          customizeProxyFactory(proxyFactory);        return proxyFactory.getProxy(getProxyClassLoader());      }    }    return bean;  }

测试注解是否是正常运行的:

public class Main {  public static void main(String[] args) {    @SuppressWarnings("resource")    ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("application-context.xml");      IDemoService demoService = context.getBean(IDemoService.class);        demoService.add(1, 2);     demoService.getName();////     AsyncAnnotationAdvisor//    AsyncAnnotationBeanPostProcessor           }}

输出:

开始执行main3结束执行DemoServiceImpl.getName

功能一切正常。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持武林网。

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表