java并发包使用Executor框架来进行线程的管理,Executor将任务的提交与执行过程分开,直接使用Runnable表示任务。future获取返回值。ExecutorService 继承了Executor接口,提供生命周器的管理,包括运行,关闭,终止三种状态。
ThreadPoolExecutor ThreadPoolExecutor 是ExecutorService的一个实现类。使用几个线程池来执行task,通常使用Executors工厂方法配置。 ThreadPoolExecutor 允许提供一个BlockingQueue来保存正在等待执行的task,队列一般有三种:无界,有界,和同步移交(synchronous handoff)。
newFixedThreadPool和newSingleThreadExecutor默认情况使用LinkedBlockingQueue。当任务增加的速度超过线程处理任务的速度时,队列大小会无限增加。会造成资源耗尽,内存溢出等问题。
所以使用有界队列比较稳妥,但是引入了新的问题,队列满了后,新的任务如何处理。这种情况引入了饱和策略,JDK提供了几种不同的饱和策略。
Abort(中止) 会扔出一个RejectedExecution Exception,开发者根据此处理自己的业务代码
CallerRunsPolicy 不会抛弃任务,也不会抛出异常。而是将某些task回退给调用者,降低新任务的流量。
Executors 同时Executor也提供了线程池管理方法。可以调用Executors的静态工厂方法来创建一个线程池
newFixedThreadPool 固定大小的线程池,没达到最大线程数目时,提交一个任务创建一个线程,达到最大数目后,不再变化。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);}newCachedThreadPool 可缓存的线程池,没有线程最大数目限制。如果线程池当前规模超过了请求,就回收空闲线程,请求任务增加时,就添加新的线程。
newSingleThreadExecutor 单线程的Executor,如果线程异常结束,会创建另外一个线程来替代。确保任务在按队列中的顺序来串行执行。
newScheduledThreadPool 固定长度的线程池,而且以延迟或定时的方式来执行任务
一个sample code
ExecutorService executor = Executors.newSingleThreadExecutor(); Callable<List<String>> callable; callable = new Callable<List<String>>(){ @Override public List<String> call() throws Exception { return readFile("src/concurrent/test.txt"); } }; Future<List<String>> future = executor.submit(callable); try { List<String> lines = future.get(5, TimeUnit.SECONDS); for(String line: lines) { System.out.PRintln(line); } } catch (InterruptedException | ExecutionException | TimeoutException e) { // TODO Auto-generated catch block e.printStackTrace(); }完整源码stoneFang的github
https://github.com/google/guava/wiki/ListenableFutureExplained
JDK中Future通过异步的方式计算返回结果,当并发操作时,在任务结束或者没结束的时候都会返回一个结果。Future是异步操作的一个引用句柄,确保在服务执行返回一个结果。
ListenableFuture允许注册回调方法。可以一个小小的改进会支持更多的操作。 对应JDK中的 ExecutorService.submit(Callable) 提交多线程异步运算的方式,Guava 提供了ListeningExecutorService 接口, 该接口返回 ListenableFuture 而相应的 ExecutorService 返回普通的 Future。将 ExecutorService 转为 ListeningExecutorService,可以使用MoreExecutors.listeningDecorator(ExecutorService)进行装饰。
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() { public Explosion call() { return pushBigRedButton(); }});Futures.addCallback(explosion, new FutureCallback<Explosion>() { // we want this handler to run immediately after we push the big red button! public void onSuccess(Explosion explosion) { walkAwayFrom(explosion); } public void onFailure(Throwable thrown) { battleArchNemesis(); // escaped the explosion! }});与JDK的并发处理写了个对比的guava并发处理
```ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); Callable<List<String>> callable; callable = new Callable<List<String>>(){ @Override public List<String> call() throws Exception { return readFile("src/concurrent/test.txt"); } }; ListenableFuture<List<String>> future = executor.submit(callable); Futures.addCallback(future, new FutureCallback<List<String>>() { public void onFailure(Throwable thrown) { System.out.println("error"); } @Override public void onSuccess(List<String> result) { for(String line: result) { System.out.println(line); } } });```源码在My Github
cassandra在jdk的concurrent包上封装了自己的并发处理,同时也在各处调用原生的jdk并发包以及google的guava并发处理包
Figure1——cassandra并发实现
cassandra各个Stage是通过StageManger来进行管理的,StageManager 有个内部类ExecuteOnlyExecutor。
ExecuteOnlyExecutor继承了ThreadPoolExecutor,实现了cassandra的LocalAwareExecutorSerivce接口
LocalAwareExecutorService继承了Java的ExecutorService,构建了基本的任务模型。添加了两个自己的方法. execute方法用于trace跟踪。
public void execute(Runnable command, ExecutorLocals locals);public void maybeExecuteImmediately(Runnable command);对于Executor中的默认execute方法,和LocalAwareExecutorSerive中的execute方法都是new 一个task,然后将task添加到queue中。而maybeExecuteImmedicatly方法则是判断下是否有正在执行的task或者work,如果没有则直接执行,而不添加到队列中。
public void maybeExecuteImmediately(Runnable command){ //comment1 FutureTask<?> ft = newTaskFor(command, null); if (!takeWorkPermit(false)) { addTask(ft); } else { try { ft.run(); } finally { returnWorkPermit(); maybeSchedule(); } }}AbstractLocalAwareExecutorService实现LocalAwareExecutorSerive接口,提供了executor的实现以及ExecutorServie接口中的关于生命周期管理的方法实现,如submit,shoudown等方法。添加了addTask,和任务完成的方法onCompletion。
SEPExecutor实现了LocalAwareExecutorService类,提供了addTask,onCompletion,maybeExecuteImmediately等方法的实现。同时负责队列的管理
SharedExecutorPool,线程池管理,用来管理Executor
data 就是Memtables,以及在磁盘上的SSTables。需要使用synchronize来确保隔离性。在CF类初始化的时候会进行加载
public ColumnFamilyStore(Keyspace keyspace, String columnFamilyName, int generation, CFMetaData metadata, Directories directories, boolean loadSSTables, boolean registerBookkeeping) {
data = new Tracker(this, loadSSTables); if (data.loadsstables) { Directories.SSTableLister sstableFiles = directories.sstableLister(Directories.OnTxnErr.IGNORE).skipTemporary(true); Collection<SSTableReader> sstables = SSTableReader.openAll(sstableFiles.list().entrySet(), metadata); data.addInitialSSTables(sstables); } }postFlushExecutor.execute(task);调用的就是ThreadPoolExecutor
private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(1, StageManager.KEEPALIVE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("MemtableFlushWriter"), "internal");stoneFang的github
https://wizardforcel.gitbooks.io/guava-tutorial/content/16.html
新闻热点
疑难解答