我们的项目,是 MySQL+ElasticSearch 做的一个数据库和搜索引擎,项目经理提出需要做一个用于重建 ES 搜索数据的接口,这个任务很光荣的交给了我。
在功能的编写过程当中,我突然思考这样一个问题,因为我们 Web 项目本身是多线程的,那如果在同一时间段,有多个请求同时发起,那同时发起 ES 的重建,对于 ES 来说,可能会产生一些莫名其妙的问题。
所以我感到非常高兴,因为这个问题,似乎不是听起来的那么简单。于是乎我想到了,要加入同步锁了。
最开始我只是很简单的想,直接在对应的 Service 层写一个方法,然后直接加一个
synchronized(this)
在整个方法体上。
@Override public synchronized int rebuiltBountyData() throws Exception { ... }可是问题来了:
但是这个方法很快就联想到了另一个问题:
我们是希望不要多线程同时重建数据,但是如果排队重建呢?好像也不是我们想要的结果。我希望的是当一个线程在执行重建任务的时候,另一个线程要被拒绝开始任务,而不是等待上一个任务做好后再开始。因为我们 tomcat 是采用线程池的概念,如果所有线程都执行这个方法,最后每个线程都会处于等待状态,结果其他请求就会因为没有空闲的线程可用,而无法正常执行。
so,我们修改了一下思路:
在 Service 的这个实现类中,添加一个私有类成员对象 flag = false,当线程进入时,判断 flag 是否为 true,是,则直接抛出异常,结束线程。否,则修改 flag 的值为 true,然后开始执行线程任务,并且,我们对这个 flag 加上一个同步锁,例如:我们在代码中使用时,加入这样一段
synchronized(flag)
由于 SPRing 默认是单例模式,所以这个flag 在多个线程中是共享的,这样就不需要将这个flag 设置为 static 了,因为它在这个局部当中实现了类似 static 的作用。但是这个时候,flag 不能是基础类型,必须是 Boolean 包装类型。那就会产生另一个隐患:包装类的对象仅仅是一个引用,引用是可以被更换了,比如使用了这个 flag 的 set 方法来修改值,但是同步锁取得是引用的锁,而不是引用对应那个实例的锁,锁了引用却没锁实例,但我们实际上却要根据实例的状态来判断,这就会造成一个隐患,可能会使得同步锁失效。
那使用 this 来获得整个 Service 类的同步锁,貌似可以解决问题(如下面这段代码具体实现),但是如果万一以后这个 Service 还有其他需要用到同步锁的需求怎么办呢?这样就会让两个不想干的业务逻辑因为同步锁的问题产生互相的影响。添加同步锁,要尽可能的缩小同步锁的获取范围,和锁内代码的代码量,这样才能减少冲突和线程获取锁时等待的时间,提高软件的安全性和执行效率。
而且,我们的需求在这个时候,又有了变化,项目经理说,有两张表都需要做这种功能。就是说两个业务内容,都需要进行ES 的数据重建。所以如果每次增加一个,我就要单独写一个类似下面这段代码,不仅代码的可复用性降低了,而且以后换别人来维护的时候,说不定会写错这些内容。
@Override public int rebuiltBountyData() throws Exception { //锁住资源防止多线程重复发起任务 synchronized (this) { if (hasThread) { throw new RebuiltBountyEsException("搜索引擎重建任务已经在执行,请勿重复发起!"); } else { hasThread = true; } } //获取总数 int count = bountyMapper.countNum(); int pageTotal, pageSize = 1000; if (count % pageSize != 0) { //若不能整除,则页数加1 pageTotal = count / pageSize + 1; } else { pageTotal = count / pageSize; } try { for (int pageNum = 1; pageNum <= pageTotal; pageNum++) { //分页查询数据库的数据 PageHelper.startPage(pageNum, pageSize); List<Bounty> bountyList = bountyMapper.selectForRebuiltES(); //添加到 ES 引擎 bountyDao.add4List(bountyList); } } catch (Exception e) { throw e; } finally { hasThread = false; } return count; }那我们该怎么办好呢?
最好的办法,就是把这个需要“加锁”的逻辑,单独赋予一个对象,让这个锁的范围能够缩小到只针对这个逻辑,这个功能,而不要跟其他的功能混在一起。 然后我们需要对这个功能,进行进一步的抽象。
我们来好好观察上面这段代码,上面这段代码,算是已经实现了整个功能,从头到尾分解一下这段代码的功能,可以看得出如下:
单线程检查 分页处理 获取数据 写入 ESSo,我们可以看到,其实不同业务场景下,线程检查是一模一样的代码,而分页处理中,获取数据总条数会根据不同业务场景而不同,其他代码也都是相同的,至于写入 ES 的部分,如果数据结构跟从数据库中获取的实体对象没有区别的话,这个也是可以看做是相同的而不需要特别的处理,但是我们公司的项目中,因为种种原因,ES 中的数据结构和实体对象是不同的(尽管数据字段都是相同,我表示我不知道怎么跟你们说这个历史遗留的奇葩问题...)。在这里,我们要应用一个设计模式,是模板模式,将固定的流程代码封装起来。再将可变的部分,留给子类实现。
/** * 类说明:从 JDBC 中获取重建 ES 的数据 */@Servicepublic abstract class JdbcRebuiltEsService<E> extends BaseService{ protected Logger log = LoggerFactory.getLogger(getClass()); private boolean threadLock = false;//线程锁 @Value("1") private int startPage;//开始页码 @Value("1000") private int pageSize;//页面容量 protected abstract int countTotalData() throws Exception; protected abstract Collection<E> loadDataSource(int pageSize, int pageNum) throws Exception; protected abstract void writeToElasticSearch(Collection<E> collection) throws Exception; /** * 检查线程锁 * * @throws Exception */ private void checkLock() throws Exception { //这段代码需要保证线程安全 synchronized (this) { if (threadLock) { //如果已经有线程占用,后续线程进入则抛出异常,因为本接口只允许单线程执行 throw new RebuiltEsTaskExistException("已经有重建任务正在执行,请等待结束后再发起新任务!"); } else { //如果没有线程占用,则新线程进入后将改成线程占用状态 threadLock = true; log.info("用户[{}]发起 ES 重建任务!其他重建任务请求将被拒绝!", getUserJid()); } } } /** * 数据重建 * * @return * @throws Exception */ public int rebuild() throws Exception { checkLock(); log.info("#=== ES 重建任务开始执行"); int totalNum = countTotalData(); log.info("本次重建预计总记录数{}", totalNum); int pageTotal; int pageNum = this.startPage; int pageSize = this.pageSize; //根据条目总数计算总页数 if (totalNum % pageSize != 0) { //若不能整除,则页数加1 pageTotal = totalNum / pageSize + 1; } else { pageTotal = totalNum / pageSize; } long startTime = System.currentTimeMillis();//任务开始计时 try { while (pageNum <= pageTotal) { //分页查询数据库的数据并同时发送到 ES writeToElasticSearch(loadDataSource(pageSize, pageNum)); pageNum++; } } catch (Exception e) { Double progress = (Double) (pageNum * 1.0) / (Double) (pageSize * 1.0); DecimalFormat decimalFormat = new DecimalFormat("##.00%"); log.info("重建异常中断,当前已重建进度为:{}", decimalFormat.format(progress)); throw e; } finally { threadLock = false;//不论是否成功,当线程退出时,都需要将线程状态改为非占用 long endTime = System.currentTimeMillis();//任务结束计时 log.info("#=== ES 重建任务执行结束,耗时:{}毫秒", endTime - startTime); } return totalNum; } public int getStartPage() { return startPage; } public void setStartPage(int startPage) { this.startPage = startPage; } public int getPageSize() { return pageSize; } public void setPageSize(int pageSize) { this.pageSize = pageSize; }}OK,这样就解决了。复写三个容易跟随应用场景不同,而改变的方法,分别是,获取数据源,获取数据总条目,写入 ES。然后暴露 rebuild 方法给外部调用,在 rebuild 方法内部,实现整个运作流程,这样也可以避免以后有人需要做新的实现的时候,修改到这部分有涉及到同步锁的代码,以避免安全隐患。
实际使用的时候可以这样用,创建一个子类继承这个 JdbcRebuiltEsService
/** * 类说明:商品信息 ES 重建所需要实现的具体方法 */@Servicepublic class GoodsRebuiltEsServiceImpl extends JdbcRebuiltEsService<Goods> { @Autowired private GoodsMapper goodsMapper; @Autowired private DrawingDAO drawingDAO; @Override public int countTotalData() throws Exception { return goodsMapper.countNum(); } @Override public Collection<Goods> loadDataSource(int pageSize, int pageNum) throws Exception { PageHelper.startPage(pageNum, pageSize); return goodsMapper.selectForRebuiltES(); } @Override public void writeToElasticSearch(Collection<Goods> collection) throws Exception { drawingDAO.addBatch(collection); }}这样以后每次使用,都只需要实现一个新的子类,然后这样调用:
@Autowired @Qualifier("goodsRebuiltEsServiceImpl") private JdbcRebuiltEsService<Goods> jdbcRebuiltEsService; /** * 数据重建 * * @return * @throws Exception */ @Override public int rebuiltEsGoodsData() throws Exception { return jdbcRebuiltEsService.rebuiltd(); } 这样 rebuilt 方法就很安全的被调用,将程序中不希望被修改的部分,用父类写好,只留下希望被复写的部分,这样就可以很好的保护比较关键的部位,当然了,public 方法也是可以重写的,不过这就超出了我们“以防万一,不小心写错”的初衷了,如果需要重写,那就重写呗。
新闻热点
疑难解答