当前位置: 安卓之星 -> Java开发 -> Java7并发编程实战手册

Java7并发编程实战手册

作者:网络 发表于: 2017-03-22 点击: 350 次

线程管理

Thread/Runnable/Thread.State
线程的信息获取和设置
线程的中断
sleep/yield
join
daemon
UncaughtExceptionHandler
Thread#setDefaultUncaughtExceptionHandler
ThreadLocal/InheritableThreadLocal
ThreadGroup
uncaughtException
ThreadFactory

线程同步基础

synchronized 同步方法 this
synchronized 属性对象 object
同步代码块中使用条件
Object#wait/notify/notifyAll
虚假唤醒(while)
Lock
ReentrantLock
try/finally
ReadWriteLock
公平性 fair
Condition
while
lock/unlock之间

线程同步辅助类

Semaphore
内部计数器
acquire/release
acquireUninterruptibly
忽略线程中断且不会抛出任何异常
CountDownLatch
内部计数器被初始化之后就不能被再次初始化,唯一能改值的是countDown
CylicBarrier
BrokenBarrierException
reset
Phaser
在每一步结束的位置对线程进行同步,当所有的线程都完成了这一步,才允许执行下一步
Phaser(3) 指定参与阶段同步的线程数是3个
被Phaser类置于休眠的线程不会响应中断事件
awaitAdvanceInterruptibly(int phaser),被中断会抛出InterruptedException
onAdavance,阶段改变时被自动执行
Exchanger
只能同步两个线程
exchange调用后将休眠直到其他的线程到达

线程执行器

ThreadPoolExecutor/Executors
shutdown,当执行完所有待运行的任务后,它将结束执行,调用完毕立即返回,结合awaitTermination判断是否线程池是否关闭
awaitTermination(long timeout, TimeUnit unit)
提供很多方法获取自身状态的信息
newCachedThreadPool/newFixedThreadPool
Future/Callbale
submit
call() throws Exception
ThreadPoolExecutor#invokeAny
返回第一个完成任务且没有抛出异常的任务的执行结果
ThreadPoolExecutor#invokeAll
等待所有任务的完成
landon:是否可以用这个方法做场景心跳
   while(!isDone)
   {
    – long startTime = getCurrent()
    – 场景调度器提交场景任务,等待所有场景任务执行完毕(invokeAll),这样亦可以保证潜在的顺序问题,因为每次都是将当前的tick执行完毕
     – 而非之前submit,如果一次tick执行超过50ms则下次循环又会向线程池提交任务,会出现同一个任务多个线程执行的潜在情况
    – long endTime = getCurrent()
    – long sleepTime = startTime + TickInterval – endTime
    – if(sleepTime > 0) sleep(sleepTime)
    – else println(“busy,oneTick executeTime:” + sleepTime)
   }
ScheduledThreadPoolExecutor
schedule
scheduleAtFixedRate
scheduleWithFixDelay
Future
cancel
get
FutureTask
done,允许在执行器中的任务执行结束之后还可以执行一些代码
FutureTask implements RunnableFuture
FutureTask(Callable callable)
CompletionService
submit,提交任务
poll/take,获取任务已经完成的Future对象
即任务完成后将Future对象放到一个完成的阻塞队列中
RejectedExecutionHandler
Fork/Join框架
分治
fork-将一个任务拆分成更小的多个任务
join-等待子任务的完成执行
工作窃取算法-work-stealing algorithm
ForkJoinPool

ForkJoinTask
RecursiveAction 任务无返回结果
RecursiveTask 任务有返回结果
递归
if(problem size > default size)
{
    tasks = divide(task)
    execute(tasks)
 }
 else {resolve problem using another algorithm}
Task extends RecursiveAction // 无返回结果
compute
  if(…) // divide
  {
   Task t1 = new Task(…)
   Task t2 = new Task(…)
   invokeAll(t1,t2) // 同步调用,执行创建的多个子任务
  }
  else {…}
ForkJoinPool#execute(task) –默认创建一个线程数等于计算机cpu数目的线程池
合并任务的结果
if(problem size > default size)
  tasks = divide(task)
  execute(tasks)
  groupResults
  return result
 else
  resolve problem
  return result
ForkJoinTask#get 等待返回任务计算结果

异步运行任务

同步方式如invokeAll,任务被挂起,直到任务被发送到fork/join线程池中执行。该方式允许ForkJoinPool采用工作窃取算法
异步方式如fork时(立即返回),无法使用该算法
ForkJoinTask#V join()
get和join有区别
任务中抛出异常
ForkJoinTask#isCompletedAbnormally 检查主任务或者子任务是否抛出了异常
getException 获取异常信息
任务抛出运行时异常,会影响其父任务…父任务..
取消任务
在任务开始前可以取消
例:在数字数组中寻找一组数字,拆分为更小的问题,但仅关心数字的一次出现。当我们找到他时,就会取消其他子任务
可存储发送到线程池中的所有任务,当发现当前任务找到数字后,取消非当前任务的所有任务
如果任务正在运行或者已经执行结束,则不能取消,cancel返回false。因此可以尝试去取消所有的任务而不用担心可能带来的间接影响

并发集合

ConcurrentLinkedDeque 非阻塞
getFirst…/peekFirst…/removeFirst…/pollFirst… – 双端队列
LinkedBlockingDeque 阻塞
put/take/poll… 双端队列
PriorityBlockingQueue
队列的元素必须实现Comparable接口
按照排序结果决定插入元素的位置
DelayQueue
元素必须实现Delayed接口
public interface Delayed extends Comparable
两个待实现方法
compareTo(Delayed o)
getDelay(Timeunit unit)
从队列取元素时,到期的元素会返回(未来的元素等待到期_延迟时间)
landon:是否可以用于游戏服务器中的计时器如果有多个timer按照到期时间排队_
ConcurrentSkipListMap
根据键值排序所有元素
内部机制-Skip List
# firstEntry/lastEntry/subMap/…
landon:可以和TreeMap做比较,一个线程不安全,一个线程安全
ThreadLocalRandom
# current,该方法是一个静态方法,如果调用线程还未关联随机数对象,就会初始化一个(localInit)
Atomic Variable
compareAndSet,这是是最主要的方法
判断内存变量的值是否是expect,如果是说明未被其他线程改过,可以直接用update新值更新
否则说明被其他线程改过,进而可以选择下一步处理方式
AtomicReference#public final boolean compareAndSet(V expect, V update)
CAS
AtomicIntegerArray -原子数组
LinkedTransferQueue
AtomicReference–实现单例
 public class Singleton {   
    private static final AtomicReference<Singleton> INSTANCE = new AtomicReference<Singleton>();  
    private Singleton (){}   
    public static  Singleton getInstance() {            
    for (;;) {            
        Singleton current = INSTANCE.get();                 
        if (current != null) {                
        return current;            
        }            
        current = new Singleton();            
        if (INSTANCE.compareAndSet(null, current)) {                
        return current;           
        }       
    }    
    }
}

定制并发类

定制ThreadPoolExecutor
继承该类
覆写_记得调用super
shutdown
shutdownNow
输出如等待执行的任务数目 getQueue().size…
getCompletedTaskCount,获得已执行过的任务数
getActiveCount,获得正在执行的任务数
beforeExecute
afterExecute
实现基于优先级的Executor类
ThreadPoolExector参数传入PriorityBlockingQueue
如果是fixed两个线程,那么前2个任务是被2个线程执行的;后面的排队任务按照优先级顺序执行
实现ThreadFactory接口生成定制线程

覆写newThread方法

返回的Thread可以是定制的Thread对象(MyThread extends Thread)
可外部直接调用Threadfactory#newThread返回线程
在Executor对象中使用ThreadFactory
线程池参数中指定线程工厂
Executors内部有一个DefaultThreadFactory
Executors$DefaultThreadFactory
定制运行在定时线程池中的任务
继承ScheduledThreadPoolExecutor
覆写protected RunnableScheduledFuture decorateTask(
Runnable runnable, RunnableScheduledFuture task)
可自定义一个调度类extends FutureTask implements RunnableScheduledFuture
参考ScheduledThreadPoolExecutor$ScheduledFutureTask
通过实现ThreadFactory接口为Fork/Join框架生成定制线程
ForkJoinPool内部实现
一个任务队列,存放等待被执行的任务
一个执行这些任务的线程池
ForkJoinWorkerThread持有一个ForkJoinPool.WorkQueue workQueue
work-stealing mechanics
MyWorkerThread extends ForkJoinWorkerThread
onStart
onTermination
调用super
MyWorkerThreadFactory implements ForkJoinWorkerThreadFactory
newThread

定制运行在Fork/Join框架中的任务

MyWorkTask extends ForkJoinTask(Void)
getRawResult
exec
调用compute抽象方法
实现定制Lock类
ReentrantLock内部有一个很重要的类Sync(AQS)
abstract static class Sync extends AbstractQueuedSynchronizer
static final class FairSync extends Sync
static final class NonfairSync extends Sync
自定义实现一个MyAbstractQueuedSynchronizer extends AbstractQueuedSynchronizer
tryActuire
tryRelease
自定义实现MyLock implements Lock
lock
unlock
tryLock
newCondition
实现基于优先级的传输队列
MyPriorityTransferQueue extends PriorityBlockingQueue implements TransferQueue
tryTransfer
transfer
hasWaitingConsumer
getWaitingConsumerCount
take
实现自己的原子对象
MyCounter extends AtomicInteger
   for(;;)
     {
      int value = get();
      if(value == 10) return false;
      else
      {
       int newValue = value + 1;
       boolean changed = compareAndSet(value,newValue);
       if(changed) return true;
      }
     }

测试并发应用程序

监控Lock接口
ReentrantLock内部方法都是protected的,所以可以继承
MyLock extends ReentrantLock
调用Thread getOwner()
调用Collection getQueuedThreads()

监控Phaser类
getPhase
getRegisteredParties
getArrivedParties

监控执行器框架
ThreadPoolExecutor
getPoolSize
getCorePoolSize
getActiveCount
getTaskCount

监控Fork/Join池
getPoolSize
getParallelism
getActiveThreadCount
getStealCount

输出高效的日志信息
输出必要的信息
为消息设定恰当的级别
使用FindBugs分析并发代码
配置Eclipse调试并发代码
可选择Default suspend policy for new breakpoints的值为Suspend VM
默认为Suspend Thread
配置NetBeans调试并发代码
使用MultithreadedTC测试并发代码
MultithreadedTestCase
waitForTick

相关文章

相关文章

赶快留言冒泡

  • 评论 (0)
  • 引用通告 (0)
目前还没有任何评论.
目前还没有任何Trackbacks和Pingbacks.
吐个泡浮上去.