一、最常用的同步工具
1.synchronized是Java中的关键字,是一种同步锁。可以修饰一段代码,一个方法...这个不展开了,初级使用。

我的理解:一个对象或者类天然有一个锁,用法:synchronized(某个对象)。这个对象也许是自己this,也可以是其它对象:private byte[] lock = new byte[0];据说这个开销比较少。有时候标识在方法前面的方法,可以理解为使用this的锁。

这个锁的锁定与释放都由系统控制,不用自己管理。这具锁对象有wait、notify 和 notifyAll方法,用于线程之间的通讯。常用于代码结构如:for(;;){如果不满足条件就等,如果满足就执行,并通知其它等待的线程},这里注重留意一下interrupt概念,要求中断与可被中断以及中断后执行什么。暂时不展开,回头补充一下。
最好记住一个例子,比如《thinking in java》中的一个厨师放入空盘子与顾客拿走食物的这个例子。

后面开始是复杂的并发包中的内容。

二、并发包中常用的高级工具

2.ReentrantLock是java.util.concurrent包中的,拥有与 synchronized 相同的并发性和内存语义,但是添加了类似锁投票、定时锁等候和可中断锁等候的一些特性。此外,它还提供了在激烈争用情况下更佳的性能。lock后必须在finally块中释放。

3.ReentrantLock与条件的配合使用
要记住这个例子,一个篮子可以放苹果,可以有多个线程生产苹果,多个线程拿走苹果。篮子对象有个容量,而且有一个锁与两个条件。生产线程放的时候调用篮子的put方法,如果篮子满了就等待,如果不满就放个苹果,同时通知等待在【空了条件】上的拿苹果的线程可以拿了,也许通知的时候,拿苹果的线程没有等待,而是正常运行着。

Java代码
  1. Locklock=newReentrantLock();
  2. //条件锁与Lock是相关的
  3. ConditionnoEmptity=lock.newCondition();
  4. ConditionisFull=lock.newCondition();


如果是拿走get方法,就是先判断,篮子空了就等待,如果不空就拿一个,再通知可能的等待在【满了条件】上的线程可以运行了。

应用实例:之前在看阿里的数据库源工具druid中发现了使用这个条件锁的情况。上面的例子用于把握原理的代码框架,而真实的使用例子可以帮助你做自己代码时考虑的更全面。
druid中的使用是连接池对象,它持有两个守护进程(主线程结束就可以退出JVM,不用考虑守护进程存在)。其中一个线程是产生新的连接,一个线程是删除连接。连接池有一定的容量,如果不够了就需要多的线程就产生,如果不使用的连接多了,就删除掉,维持一个最小池子,但又可以动态扩容的。

Condition 的方法与wait、notify和notifyAll方法类似,分别命名为await、signal和 signalAll,因为它们不能覆盖Object上的对应方法。

4.ReentrantReadWriteLock
看名字就知道是读写锁。这个用的应该蛮多的,不过估计都封闭在缓存工具里了。
比如一个cache,持有一个map。那可以用此锁来控制对map的读写,读取数据的方法用读锁,修改数据的方法用写锁。读锁可以多个线程都获取,如果有其它线程有写的锁的时候就不行。写要等待没有读的锁,也没有其它写的锁,才能写。

Java代码
  1. staticMap<String,Object>map=newHashMap<String,Object>();
  2. staticReentrantReadWriteLockrwl=newReentrantReadWriteLock();
  3. staticLockr=rwl.readLock();
  4. staticLockw=rwl.writeLock();


每个读锁或者写锁获取后都要在finally中释放。更深入的可以了解锁降级,主要是有了写锁,自己写好后,释放前再获取读锁。防止中间被其它写锁钻空子,造成自己写的内容,立马读出来又不对了的情况。

5.CountDownLatch。Java并发包中有三个类用于同步一批线程的行为,分别是CountDownLatch、 Semaphore和CyclicBarrier,这个就是其中之一。
从名字上看就是一个倒计数的控制。有多个线程在上面等等待着,另外有多个线程会让计数减少。当减为0后,所有等待的线程就开始动起来了。(关于等待的线程怎么动,后面有提到更底层AQS里的队列。)
多对多就不讲了,记住一对多与多对一例子。比如主线程同时启动一组线程时,主线程先持有一个CountDownLatch(1),再可以循环new出一组线程,他们会在CountDownLatch上等待。主线程sleep一定时间后(等一组都进行等待中),突然让CountDownLatch来一个countDown()。这时候一组线程就都可以动起来了。另一个情况是主线程等一组线程做完了再接着做事。主线程new一个CountDownLatch(5)后启动一组5个线程开始运行,自己在CountDownLatch上等着,每个子线程最后来一个countDown()操作,那最后计数为0时,就激活等待的主线程继续运行。

应用实例:还是在阿里的数据库源工具druid中看到过这个,就是init的时候,主线程产生了CountDownLatch(2),而生产连接的守护线程启动后countDown(),删除连接的守护线程countDown(),表明两个需要的线程都启动了,接着做其它的事情了。生活实例:比如汽车启动时要做5个自查,每完成一个减少一个,都检查完了就正式可以开了,否则可能报警。再比如大家去吃饭,每到一个人就报数,人数够了,等待中的上菜主线程就可以启动了。而几个炒菜师傅都等着命令呢,突然一个命令,几个师傅都开始干活了。前者是主线程等子线程的条件满足后开工,后者是主线程下命令后子线程开工。

项目实例:之前那个多规则线程校验一批文档时,最后一步都校验后要置文档已经被校验过了,正好需要些功能。其它人写的原代码比较老,这里有同时运行的线程数限制,于是先是循环检测所有线程的状态,如果有State.NEW的,并且没超过限制就start它,如果没有State.NEW了就跳出循环。再并一个循环所有的线程,就把没有State.TERMINATED的都join到主线程中来。最后又出现了与主线程的串行。如要用上面的工具就非常简单了。另外,还可以用线程池来做,当线程池shutdown后,主线程可以循环(中间sleep一会)检查线程池的isTerminated(),如果OK就可以再做后面的工作了。


6.Semaphore
Semaphore与CountDownLatch相似,不同的地方在于Semaphore的值被获取到后是可以释放的,并不像CountDownLatch那样一直减到底。它也被更多地用来限制流量,类似阀门的 功能。如果限定某些资源最多有N个线程可以访问,那么超过N个主不允许再有线程来访问,同时当现有线程结束后,就会释放,然后允许新的线程进来。有点类似于锁的lock与 unlock过程。相对来说他也有两个主要的方法:

  • 用于获取权限的acquire(),其底层实现与CountDownLatch.countdown()类似;
  • 用于释放权限的release(),其底层实现与acquire()是一个互逆的过程。


这个没有见到过例子,只能与生活中的餐馆举例。比如有5个桌子,多了顾客只能等待,少了就可以进去吃饭。有限流的功能,如果碰到类似的需要再来研究。

7.CyclicBarrier
CyclicBarrier是用来一个关卡来阻挡住所有线程,等所有线程全部执行到关卡处时,再统一执行下一步操作,它里面最重要的方法是await()方法。
即每个线程执行完后调用await(),然后在await()里,线程先将计数器减1,如果计数器为0,则执行定义好的操作,然后再继续执行原线程的内容。
代码中就是先new一个CyclicBarrier(计数,统一操作)。前一个参数是多少个线程等待了就可以启动了,后一个是启动前做些其它的统一操作。生活中的场景:比如警--察抓行人闯红灯,抓住10个人(行人等待凑够数)后现场开班学习交通法规(统一操作),之后这批行人再出发。

上面一些东西用起来也不难,关键是记住使用模型是什么样的。

三、更深入的理解java共享锁模型

8.AQS
在java5提供的并发包下,有一个AbstractQueuedSynchronizer抽象类,也叫AQS,此类根据大部分并发共性作了一些抽象,便于开发者实现如排他锁,共享锁,条件等待等更高级的业务功能。它通过使用CAS(compare and swap,比较和交换,更底层的)和队列模型,出色的完成了抽象任务。
仔细想想上面的那些工具,有些什么共性呢?

9.cas。compare and swap的缩写,中文翻译成比较并交换。
CAS指令在Intel CPU上称为CMPXCHG指令,它的作用是将指定内存地址的内容与所给的某个值相比,如果相等,则将其内容替换为指令中提供的新值,如果不相等,则更新失败。这一比较并交换的操作是原子的,不可以被中断。
这不是java特有的,而是操作系统需要保证的。利用CPU的CAS指令,同时借助JNI来完成Java的非阻塞算法。允许算法执行读-修改-写操作,而无需害怕其他线程同时 修改变量,因为如果其他线程修改变量,那么 CAS 会检测它(并失败),算法可以对该操作重新计算。
作为乐观锁就是,每次不加锁而是假设没有冲突而去完成某项操作,如果因为冲突失败就重试,无限循环执行(称为自旋),直到成功为止,这个自旋的过程算是线程在等待吗?。AtomicInteger也是用乐观锁cas原子操作实现的。
所以上面很多工具首先是有一个计数,而这个计数是共享的变量。比如countDown中的计数,比如Semaphore中的闸门数,比如CyclicBarrier中的阻隔数。线程调用共享变量正好用到cas。

10.等待线程队列
另外发现上面的工具都有很多线程处于等待状态,这些线程信息必然要存下来,应该是按顺序存,而且可能如餐厅限流一样不断的产生等待和启动,所以一定用的队列这种结构。
在countDown中通过CAS成功置为0的那个线程将会同时承担起唤醒等待线程队列中第一个节点线程的任务,而第一个节点任务又会发现自身为通知状态,又会把队列中的head指向后一个等待线程的节点,然后删除自身节点,并唤醒它。一个线程在阻塞之前,就会把它前面的节点设置为通知状态,这样便可以实现链式唤醒机制了。

11.引伸知识简单了解
java的CAS同时具有 volatile 读和volatile写的内存语义,Java的CAS会使用现代处理器上提供的高效机器级别原子指令,这些原子指令以原子方式对内存执行读-改-写操作,这是在多处理器中实现同步的关键。同时,volatile变量的读/写和CAS可以实现线程之间的通信(CAS中的不相等时的自旋看做等待?)。把这些特性整合在一起,就形成了整个concurrent包得以实现的基石。如果我们仔细分析concurrent包的源代码实现,会发现一个通用化的实现模式:

  • 首先,声明共享变量为volatile;
  • 然后,使用CAS的原子条件更新来实现线程之间的同步;
  • 同时,配合以volatile的读/写和CAS所具有的volatile读和写的内存语义来实现线程之间的通信。


AQS,非阻塞数据结构和原子变量类(java.util.concurrent.atomic包中的类),这些concurrent包中的基础类都是使用这种模式来实现的,而concurrent包中的高层类又是依赖于这些基础类来实现的。

12.最后有一个疑问
CountDownLatch、 Semaphore和CyclicBarrier的设计中,是不是可以让调用工具方法的线程由于条件不足时,都锁定在同一个对象中。而工具持有一个atomInteger的数字,而另外的线程调用工具的调整数字的方法时,如果满足条件再通知那些锁定的线程启动。这样当然不是用更底层的AQS方式来做,等于绕了一个弯而已。不过,这样可以/不可以吗?

前面有提出等待是否底层就是自旋的问题?有文章说:
“自旋和阻塞:像同步模式会分为两个阵营,他们的实现也会分为两个阵营:他们都采用自旋或者阻塞。自旋是一个简单的例子。比如条件同步,他采用一个一般的循环。自旋的明显缺点就是它浪费了cpu的执行周期。在一个多应用程序系统,经常会使用阻塞——让处理器去执行其他的可执行的线程。之前的线程也许会不久之后又会执行。阻塞不用不停的去查看条件和锁的状态,但是他会在来回切换程序的时候有性能花费。如果线程等待的平均时间小上下文切换时间的两倍,则可以优先考虑轮询。当每个cpu核上之后一个线程在执行的时候,轮训也是不错的选择,这通常是发生生在嵌入式或者高性能的系统中。最终我们会发现,阻塞(基于调度的同步)一定是基于轮询(自旋)实现的,因为调度器使用的数据结构本身也需要同步。”


四、线程池
13.差点漏了这个重要内容了,开始写的内容比较多,考虑到本文的目的,进行精简。
如同连接池一样,如果是比较大的开销进行生成与销毁,就要考虑一个池子。如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。

JDK中强烈建议程序员使用较为方便的Executors工厂方法Executors.newCachedThreadPool()(无界线程池,可以进行自动线程回收)、Executors.newFixedThreadPool(int)(固定大小线程池)Executors.newSingleThreadExecutor()(单个后台线程)它们均为大多数使用场景预定义了设置。
newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。这个在业务中用的也比较多,比如定时交换数据,同步数据。有时候用spring的定时任务,也可以启动容器时用线程池,估计底层都差不多。

14.线程池配置的主要的参数
重点是要理解核心线程数量corePoolSize 与最大线程数量maximumPoolSize的关系。线程超过核心了,先进队列。再多了就扩大线程数。如果无界队列,就永远不会用到最大数量。keepAliveTime是大于核心时,超过此时间的空闲线程要杀死。线程池实在来不及处理的时候用到handler来。
提交给池子的线程可能放队列中缓存,有界的ArrayBlockingQueue放满了就扩大核心数。无界就一直放,不会扩大核心数。说想SynchronousQueue,你一定是碰到了假队列,它是管理直接在线程间移交信息的机制,它会直接提交给空线程做事,没有空的就建新线程,所以最大数量要设置为integer.maxvalue了吧。

15.线程池任务监控
线程池里有一些属性可以直接看到,比较任务总数,完成的任务数之类的。但到ThreadPoolExecutor时,我们是无法知道这些任务是在什么时候才真正的执行的,为了实现这个需求,我们需要扩展ThreadPoolExecutor,重写beforeExecute和afterExecute,在这两个方法里分别做一些任务执行前和任务执行后的相关监控逻辑,还有个terminated方法,是在线程池关闭后回调(这个是否可以把主线程传进去回调整呢?),另外,我们可以通过getLargestPoolSize()和getCompletedTaskCount()来分别获取线程池数的峰值和线程池已完成的任务数。重写就是继承老的,具体查资料不展开,知道可以这样就行了。

16.线程池来不及处理的策略
一共有四种策略为,AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy, DiscardPolicy. 第一个AbortPolicy是默认策略,拒绝策略就是抛异常。这个在构造ThreadPoolExecutor时,传入一个策略对象就行了。如果传CallerRunsPolicy对象,它本身构造时还要有一个线程对象,来执行来不及处理时,你想做的事情。

优质内容筛选与推荐>>
1、意见整理
2、UVA-1347 Tour
3、安卓(android)之实现断点下载功能
4、Java常见面试问题: equals()与hashCode()的使用
5、PCB 敷铜间距规则(转)


长按二维码向我转账

受苹果公司新规定影响,微信 iOS 版的赞赏功能被关闭,可通过二维码转账支持公众号。

    阅读
    好看
    已推荐到看一看
    你的朋友可以在“发现”-“看一看”看到你认为好看的文章。
    已取消,“好看”想法已同步删除
    已推荐到看一看 和朋友分享想法
    最多200字,当前共 发送

    已发送

    朋友将在看一看看到

    确定
    分享你的想法...
    取消

    分享想法到看一看

    确定
    最多200字,当前共

    发送中

    网络异常,请稍后重试

    微信扫一扫
    关注该公众号





    联系我们

    欢迎来到TinyMind。

    关于TinyMind的内容或商务合作、网站建议,举报不良信息等均可联系我们。

    TinyMind客服邮箱:support@tinymind.net.cn