首页 > 编程 > Java > 正文

spring异步service中处理线程数限制详解

2019-11-26 08:37:23
字体:
来源:转载
供稿:网友

情况简介

spring项目,controller异步调用service的方法,产生大量并发。

具体业务:

前台同时传入大量待翻译的单词,后台业务接收单词,并调用百度翻译接口翻译接收单词并将翻译结果保存到数据库,前台不需要实时返回翻译结果。

处理方式:

controller接收文本调用service中的异步方法,将单词先保存到队列中,再启动2个新线程,从缓存队列中取单词,并调用百度翻译接口获取翻译结果并将翻译结果保存到数据库。

本文主要知识点:

多线程同时(异步)调用方法后,开启新线程,并限制线程数量。

代码如下:

@Servicepublic class LgtsAsyncServiceImpl { /** logger日志. */ public static final Logger LOGGER = Logger.getLogger(LgtsAsyncServiceImpl2.class); private final BlockingQueue<Lgts> que = new LinkedBlockingQueue<>();// 待翻译的队列 private final AtomicInteger threadCnt = new AtomicInteger(0);// 当前翻译中的线程数 private final Vector<String> existsKey = new Vector<>();// 保存已入队列的数据 private final int maxThreadCnt = 2;// 允许同时执行的翻译线程数 private static final int NUM_OF_EVERY_TIME = 50;// 每次提交的翻译条数 private static final String translationFrom = "zh"; @Async public void saveAsync(Lgts t) {  if (Objects.isNull(t) || StringUtils.isAnyBlank(t.getGco(), t.getCode())) {   return;  }  offer(t);  save();  return; } private boolean offer(Lgts t) {  String key = t.getGco() + "-" + t.getCode();  if (!existsKey.contains(key)) {   existsKey.add(key);   boolean result = que.offer(t);   // LOGGER.trace("待翻译文字[" + t.getGco() + ":" + t.getCode() + "]加入队列结果[" + result   // + "],队列中数据总个数:" + que.size());   return result;  }  return false; } @Autowired private LgtsService lgtsService; private void save() {  int cnt = threadCnt.incrementAndGet();// 当前线程数+1  if (cnt > maxThreadCnt) {   // 已启动的线程大于设置的最大线程数直接丢弃   threadCnt.decrementAndGet();// +1的线程数再-回去   return;  }  GwallUser user = UserUtils.getUser();  Thread thr = new Thread() {   public void run() {    long sleepTime = 30000l;    UserUtils.setUser(user);    boolean continueFlag = true;    int maxContinueCnt = 5;// 最大连续休眠次数,连续休眠次数超过最大休眠次数后,while循环退出,当前线程销毁    int continueCnt = 0;// 连续休眠次数    while (continueFlag) {// 队列不为空时执行     if (Objects.isNull(que.peek())) {      try {       if (continueCnt > maxContinueCnt) {        // 连续休眠次数达到最大连续休眠次数,当前线程将销毁。        continueFlag = false;        continue;       }       // 队列为空,准备休眠       Thread.sleep(sleepTime);       continueCnt++;       continue;      } catch (InterruptedException e) {       // 休眠失败,无需处理       e.printStackTrace();      }     }     continueCnt = 0;// 重置连续休眠次数为0     List<Lgts> params = new ArrayList<>();     int totalCnt = que.size();     que.drainTo(params, NUM_OF_EVERY_TIME);     StringBuilder utf8q = new StringBuilder();     String code = "";     List<Lgts> needRemove = new ArrayList<>();     for (Lgts lgts : params) {      if (StringUtils.isAnyBlank(code)) {       code = lgts.getCode();      }      // 移除existsKey中保存的key,以免下面翻译失败时再次加入队列时,加入不进去      String key = lgts.getGco() + "-" + lgts.getCode();      existsKey.remove(key);      if (!code.equalsIgnoreCase(lgts.getCode())) {// 要翻译的目标语言与当前列表中的第一个不一致       offer(lgts);// 重新将待翻译的语言放回队列       needRemove.add(lgts);       continue;      }      utf8q.append(lgts.getGco()).append("/n");     }     params.removeAll(needRemove);     LOGGER.debug("队列中共" + totalCnt + " 个,获取" + params.size() + " 个符合条件的待翻译内容,编码:" + code);     String to = "en";     if (StringUtils.isAnyBlank(utf8q, to)) {      LOGGER.warn("调用翻译出错,未找到[" + code + "]对应的百度编码。");      continue;     }     Map<String, String> result = getBaiduTranslation(utf8q.toString(), translationFrom, to);     if (Objects.isNull(result) || result.isEmpty()) {// 把没有获取到翻译结果的重新放回队列      for (Lgts lgts : params) {       offer(lgts);      }      LOGGER.debug("本次翻译结果为空。");      continue;     }     int sucessCnt = 0, ignoreCnt = 0;     for (Lgts lgts : params) {      lgts.setBdcode(to);      String gna = result.get(lgts.getGco());      if (StringUtils.isAnyBlank(gna)) {       offer(lgts);// 重新将待翻译的语言放回队列       continue;      }      lgts.setStat(1);      lgts.setGna(gna);      int saveResult = lgtsService.saveIgnore(lgts);      if (0 == saveResult) {       ignoreCnt++;      } else {       sucessCnt++;      }     }     LOGGER.debug("待翻译个数:" + params.size() + ",翻译成功个数:" + sucessCnt + ",已存在并忽略个数:" + ignoreCnt);    }    threadCnt.decrementAndGet();// 运行中的线程数-1    distory();// 清理数据,必须放在方法最后,否则distory中的判断需要修改   }   /**    * 如果是最后一个线程,清空队列和existsKey中的数据    */   private void distory() {    if (0 == threadCnt.get()) {     // 最后一个线程退出时,执行清理操作     existsKey.clear();     que.clear();    }   }  };  thr.setDaemon(true);// 守护线程,如果主线程执行完毕,则此线程会自动销毁  thr.setName("baidufanyi-" + RandomUtils.nextInt(1000, 9999));  thr.start();// 启动插入线程 } /**  * 百度翻译  *   * @param utf8q  *   待翻译的字符串,需要utf8格式的  * @param from  *   百度翻译语言列表中的代码  *   参见:http://api.fanyi.baidu.com/api/trans/product/apidoc#languageList  * @param to  *   百度翻译语言列表中的代码  *   参见:http://api.fanyi.baidu.com/api/trans/product/apidoc#languageList  * @return 翻译结果  */ private Map<String, String> getBaiduTranslation(String utf8q, String from, String to) {  Map<String, String> result = new HashMap<>();  String baiduurlStr = "http://api.fanyi.baidu.com/api/trans/vip/translate";  if (StringUtils.isAnyBlank(baiduurlStr)) {   LOGGER.warn("百度翻译API接口URL相关参数为空!");   return result;  }  Map<String, String> params = buildParams(utf8q, from, to);  if (params.isEmpty()) {   return result;  }  String sendUrl = getUrlWithQueryString(baiduurlStr, params);  try {   HttpClient httpClient = new HttpClient();   httpClient.setMethod("GET");   String remoteResult = httpClient.pub(sendUrl, "");   result = convertRemote(remoteResult);  } catch (Exception e) {   LOGGER.info("百度翻译API返回结果异常!", e);  }  return result; } private Map<String, String> convertRemote(String remoteResult) {  Map<String, String> result = new HashMap<>();  if (StringUtils.isBlank(remoteResult)) {   return result;  }  JSONObject jsonObject = JSONObject.parseObject(remoteResult);  JSONArray trans_result = jsonObject.getJSONArray("trans_result");  if (Objects.isNull(trans_result) || trans_result.isEmpty()) {   return result;  }  for (Object object : trans_result) {   JSONObject trans = (JSONObject) object;   result.put(trans.getString("src"), trans.getString("dst"));  }  return result; } private Map<String, String> buildParams(String utf8q, String from, String to) {  if (StringUtils.isBlank(from)) {   from = "auto";  }  Map<String, String> params = new HashMap<String, String>();  String skStr = "sk";  String appidStr = "appid";  if (StringUtils.isAnyBlank(skStr, appidStr)) {   LOGGER.warn("百度翻译API接口相关参数为空!");   return params;  }  params.put("q", utf8q);  params.put("from", from);  params.put("to", to);  params.put("appid", appidStr);  // 随机数  String salt = String.valueOf(System.currentTimeMillis());  params.put("salt", salt);  // 签名  String src = appidStr + utf8q + salt + skStr; // 加密前的原文  params.put("sign", MD5Util.md5Encrypt(src).toLowerCase());  return params; } public static String getUrlWithQueryString(String url, Map<String, String> params) {  if (params == null) {   return url;  }  StringBuilder builder = new StringBuilder(url);  if (url.contains("?")) {   builder.append("&");  } else {   builder.append("?");  }  int i = 0;  for (String key : params.keySet()) {   String value = params.get(key);   if (value == null) { // 过滤空的key    continue;   }   if (i != 0) {    builder.append('&');   }   builder.append(key);   builder.append('=');   builder.append(encode(value));   i++;  }  return builder.toString(); } /**  * 对输入的字符串进行URL编码, 即转换为%20这种形式  *   * @param input  *   原文  * @return URL编码. 如果编码失败, 则返回原文  */ public static String encode(String input) {  if (input == null) {   return "";  }  try {   return URLEncoder.encode(input, "utf-8");  } catch (UnsupportedEncodingException e) {   e.printStackTrace();  }  return input; }}

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对武林网的支持。

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表