首页 > 编程 > Java > 正文

JAVA 线程同步

2019-11-10 17:48:48
字体:
来源:转载
供稿:网友

典型场景,银行存取钱: 模拟银行账户:

public class Account { PRivate double balance; public Account() { } public Account(double balance) { this.balance = balance; } public double getBalance() { return balance; } public void reduceBalance(double count) { this.balance -= count; } public boolean draw(double money) { if (getBalance() > money) { System.out.println( "The amount of "+Thread.currentThread().getName() +" draw is : "+ money + ". Withdrawal is successful !" ); try { Thread.sleep(1); } catch (InterruptedException e) { throw new RuntimeException(e); } balance -= money; System.out.println("/t" + "The Balance after "+Thread.currentThread().getName()+" successful withdrawal is : " + getBalance()); return true; } else { System.out.println("Insufficient balance !"); System.out.println("/t" + "The balance after "+ Thread.currentThread().getName()+" withdrawal failed is :" + getBalance()); return false; } }}

模拟对该账户进行存、取操作(不设置线程同步)

public class DrawMoney { public static void main(String[] args) { Runnable r = new DrawRunnable(new Account(800), 300); new Thread(r, "A").start(); new Thread(r, "B").start(); } private static class DrawRunnable implements Runnable { private final Account account; private double money; public DrawRunnable(Account account, double money) { this.account = account; this.money = money; } public void run() { while (account.draw(money)) { } } }}

执行结果:

The amount of A draw is : 300.0. Withdrawal is successful !The amount of B draw is : 300.0. Withdrawal is successful ! The Balance after B successful withdrawal is : 500.0The amount of B draw is : 300.0. Withdrawal is successful ! The Balance after B successful withdrawal is : 200.0Insufficient balance ! The balance after B withdrawal failed is :200.0 The Balance after A successful withdrawal is : -100.0Insufficient balance ! The balance after A withdrawal failed is :-100.0 最终,出现了存款为-100的情况,显然这是银行不愿意看到的,也是现实生活中不可能存在的,说明我们的程序出错了。

synchronized

任何时刻只能有一个线程获得对同步监视器的锁定,当同步代码执行完后,该线程会自动释放对同步监视器的锁定. 修改Account类中的draw函数: public synchronized boolean draw(double money) { if (getBalance() > money) { System.out.println( "The amount of "+Thread.currentThread().getName() +" draw is : "+ money + ". Withdrawal is successful !" ); try { Thread.sleep(1); } catch (InterruptedException e) { throw new RuntimeException(e); } balance -= money; System.out.println("/t" + "The Balance after "+Thread.currentThread().getName()+" successful withdrawal is : " + getBalance()); return true; } else { System.out.println("Insufficient balance !"); System.out.println("/t" + "The balance after "+ Thread.currentThread().getName()+" withdrawal failed is :" + getBalance()); return false; } }

执行结果:

The amount of B draw is : 300.0. Withdrawal is successful ! The Balance after B successful withdrawal is : 500.0The amount of B draw is : 300.0. Withdrawal is successful ! The Balance after B successful withdrawal is : 200.0Insufficient balance ! The balance after B withdrawal failed is :200.0Insufficient balance ! The balance after A withdrawal failed is :200.0当然,synchronized这可以修饰代码块,对线程的run()函数作如下修改,结果一样: public void run() { while (true) { synchronized (account) { if (account.getBalance() > money) { System.out.println(Thread.currentThread().getName() + "取钱" + money + "成功"); try { Thread.sleep(1); } catch (InterruptedException e) { throw new RuntimeException(e); } account.reduceBalance(money); System.out.println("/t" + Thread.currentThread().getName() + "成功后的余额: " + account.getBalance()); } else { System.out.println(Thread.currentThread().getName() + "取钱失败"); System.out.println("/t" + Thread.currentThread().getName() + "失败后的余额: " + account.getBalance()); break; } } } }synchronized可以修饰方法,也可以修改代码块,但不能修饰构造器,成员变量等.

同步监视器释放

释放同步监视器锁定:当前线程的同步方法/同步代码块执行结束, 释放同步监视器;当前线程在同步代码块/同步方法中遇到break/return终止该代码块/方法的执行, 释放同步监视器.当前线程在同步代码块/同步方法中出现了未处理的Error/Exception, 导致异常结束, 释放同步监视器.当前线程调用了同步对象的wait()方法,当前线程暂停,并释放同步监视器. 不释放同步监视器:程序调用Thread.sleep()/Thread.yield()方法暂停当前线程执行.其他线程调用当前线程的suspend()方法将线程挂起.

wait/notify

现在系统中有两个线程,分别执行存钱/取钱,考虑这样一种特殊的需求:”要求存钱者和取钱着不断地重复存钱/取钱动作,同时规定不允许连续两次存钱, 也不允许两次连续取钱”.可以借助Object类提供的wait()/notify()/notifyAll()三个方法来完成这一需求,但这三个方法必须由同步监视器对象来调用,因此可以分为以下两种情况: - 使用synchronized修饰的同步方法, 由于this就是同步监视器,所以可以在同步方法里面直接调用这三个方法.- 使用synchronized修饰的同步代码块, 由于同步监视器是synchronized括号中的对象, 所以必须使用该对象调用这三个方法.
方法 释义
void wait() Causes the current thread to wait until another thread invokes the notify() method or the notifyAll() method for this object. (注意: 调用wait()方法的当前线程会释放对同步监视器的锁定)
void notify() Wakes up a single thread that is waiting on this object’s monitor.
void notifyAll() Wakes up all threads that are waiting on this object’s monitor.
public class Account { private double balance = 0.0; /*Whether current account have balance */ private boolean haveBalance = false; public double getBalance() { return balance; } public synchronized void draw(double amount) throws InterruptedException { // If there is no deposit, release the lock and continue to wait while (!haveBalance) { wait(); } System.out.printf("%sExecute the withdraw Operation ", Thread.currentThread().getName()); balance -= amount; System.out.printf(", current balance %f%n", balance); haveBalance = false; notifyAll(); } public synchronized void deposit(double amount) throws InterruptedException { // If there is a deposit, release the lock and continue to wait while (haveBalance) { wait(); } System.out.printf("%sPerform a save operation", Thread.currentThread().getName()); balance += amount; System.out.printf(", current balance%f%n", balance); haveBalance = true; notifyAll(); }}public class Depositor { public static void main(String[] args) { Account account = new Account(); new Thread(new DrawMethod(account, 100), "- draw ").start(); new Thread(new DepositMethod(account, 100), "+ deposit ").start(); } private static class DrawMethod implements Runnable { private Account account; private double amount; public DrawMethod(Account account, double amount) { this.account = account; this.amount = amount; } public void run() { while (true) { try { account.draw(amount); sleep(500); } catch (InterruptedException e) { throw new RuntimeException(e); } } } } private static class DepositMethod implements Runnable { private Account account; private double amount; public DepositMethod(Account account, double amount) { this.account = account; this.amount = amount; } public void run() { while (true) { try { sleep(500); account.deposit(amount); } catch (InterruptedException e) { throw new RuntimeException(e); } } } }}

Lock

从1.5开始,java提供了另外一种线程同步机制Lock,Lock提供比synchronized更广泛的锁定操作,并且支持多个相关的Condition.这些内容主要集中在 java.util.concurrent.locks 包下面,里面有三个重要的接口Condition、Lock、ReadWriteLock。 java.util.concurrent.locks.Lock和java.util.concurrent.locks.ReadWriteLock是Java提供的两类锁的根接口,并且为Lock提供了ReentrantLock/ReentrantReadWriteLock.ReadLock/ReentrantReadWriteLock.WriteLock实现, 为ReadWriteLock提供ReentrantReadWriteLock实现.
Condition Condition 将 Object 监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set (wait-set)。
Lock Lock 实现提供了比使用 synchronized 方法和语句可获得的更广泛的锁定操作。
ReadWriteLock ReadWriteLock 维护了一对相关的锁定,一个用于只读操作,另一个用于写入操作。
Lock很容易实现对共享资源的互斥访问:每次只能有一个线程对Lock加锁,线程在访问共享资源之前应先获得Lock对象并lock(), 在访问结束之后要unlock(). ReentrantLock表示可重入锁,也就是说一个线程可以对已被加锁的ReentrantLock锁再次加锁,ReentrantLock对象会维持一个计数器来追踪lock()方法的嵌套调用,所以一段被锁保护的代码可以调用另一个被相同锁保护的方法.

典型场景:生产者/消费者模式的Lock实现

仓库类,模拟入库和出库操作:

public class Repository { private final Lock mutex = new ReentrantLock(); private int count; private int limit; public Repository(int count, int limit) { this.count = count; this.limit = limit; } private boolean canWarehouse(int count) { return this.count + count <= limit; } private boolean canOut(int count) { return this.count - count >= 0; } public boolean warehouse(int count) { try { mutex.lock(); if (canWarehouse(count)) { try { sleep(80); } catch (InterruptedException e) { e.printStackTrace(); } this.count += count; return true; } } finally { mutex.unlock(); } return false; } public boolean out_warehouse(int count) { try { mutex.lock(); if (canOut(count)) { sleep(80); this.count -= count; return true; } } catch (InterruptedException e) { e.printStackTrace(); } finally { mutex.unlock(); } return false; } public int getCount() { return count; }}

生产者消费者:

import java.util.concurrent.ExecutorService;import java.util.concurrent.LinkedBlockingDeque;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;/** * Created by aires on 2017/2/8. */public class ProducerConsumerWithLock { public static void main(String[] args) { ExecutorService pool = getExecutor(); Repository repository = new Repository(0, 1000); pool.submit(new Producer(repository, 400)); pool.submit(new Producer(repository, 500)); pool.submit(new Consumer(repository, 110)); pool.submit(new Consumer(repository, 120)); } private static ExecutorService getExecutor() { return new ThreadPoolExecutor(10, 20, 20, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>()); } private static class Producer implements Runnable { private Repository repository; private int produceCount; public Producer(Repository repository, int produceCount) { this.repository = repository; this.produceCount = produceCount; } public void run() { while (true) { try { Thread.sleep(700); } catch (InterruptedException e) { throw new RuntimeException(e); } if (repository.warehouse(produceCount)) { System.out.println("+ Producer" + Thread.currentThread().getName() + " produce<" + produceCount + ">products -- Successful "); } else { System.out.println("+ Producer" + Thread.currentThread().getName() + " produce<" + produceCount + ">products -- failed"); } System.out.println("The count of current repository: " + repository.getCount()); } } } private static class Consumer implements Runnable { private Repository repository; private int consumeCount; public Consumer(Repository repository, int consumeCount) { this.repository = repository; this.consumeCount = consumeCount; } public void run() { while (true) { try { Thread.sleep(300); } catch (InterruptedException e) { throw new RuntimeException(e); } if (repository.out_warehouse(consumeCount)) { System.out.println("- Consumer" + Thread.currentThread().getName() + " consume<" + consumeCount + ">products -- Successful "); } else { System.out.println("- Consumer" + Thread.currentThread().getName() + " consume<" + consumeCount + ">products -- failed "); } System.out.println("The count of current repository: " + repository.getCount()); } } }} 使用Lock对象进行线程同步, 当加锁/释放锁出现在不同的作用范围时, 通常建议使用finally块来确保一定会释放锁. 相对于Lock, synchronized则更加方便,可以避免很多锁的常见编程错误,但Lock却提供了synchronized所没有功能,比如用于tryLock()尝试加锁, 获取可中断锁的lockInterruptibly(), 还有在等待时间内获取锁的tryLock(long time, TimeUnit unit)方法.

Condition

使用Lock来保证互斥时,Java提供了一个java.util.concurrent.locks.Condition来保持协调,使用Condition可以让那些已经得到Lock的线程无法继续执行的而释放Lock,也可以唤醒其他等待在该Condition上的线程.Condition实例需要绑定在一个Lock对象上,使用Lock来保护Condition,因此调用Lock的newCondition()方法才能获得Condition实例. 他提供了如下方法:
方法 释义
void await() Causes the current thread to wait until it is signalled or interrupted(当前线程会释放对Lock的锁定).
void signal() Wakes up one waiting thread.
void signalAll() Wakes up all waiting threads.
下面使用Condition来模拟生产者/消费者模型(当仓库中有产品时才能消费, 当仓库未满时才能生产):import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;/** * Created by aires on 2017/2/8. */public class RepositoryWithCondition { private final Lock mutex = new ReentrantLock(); private final Condition addCondition = mutex.newCondition(); private final Condition reduceCondition = mutex.newCondition(); private int count; private int limit; public RepositoryWithCondition(int count, int limit) { this.count = count; this.limit = limit; } private boolean canAdd(int count) { return this.count + count <= limit; } private boolean canReduce(int count) { return this.count - count >= 0; } public void add(int count) throws InterruptedException { try { mutex.lock(); while (!canAdd(count)) { System.out.printf("+... Producer %s is waiting...%n", Thread.currentThread().getName()); addCondition.await(); } this.count += count; System.out.printf("+ %Product successful, current products is %d%n ", Thread.currentThread().getName(), getCount()); reduceCondition.signalAll(); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { mutex.unlock(); } Thread.sleep(80); } public void reduce(int count) throws InterruptedException { try { mutex.lock(); while (!canReduce(count)) { System.out.printf("-... Consumer %s is waiting...%n", Thread.currentThread().getName()); reduceCondition.await(); } this.count -= count; System.out.printf("- %sConsume successful, current products is %d%n ", Thread.currentThread().getName(), getCount()); addCondition.signalAll(); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { mutex.unlock(); } Thread.sleep(80); } private int getCount() { return count; }}import java.util.concurrent.ExecutorService;import java.util.concurrent.LinkedBlockingDeque;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;/** * Created by aires on 2017/2/8. */public class ProducerConsumerWithCondition { public static void main(String[] args) { ExecutorService pool = getExecutor(); RepositoryWithCondition repository = new RepositoryWithCondition(0, 1000); pool.submit(new Producer(repository, 400)); pool.submit(new Consumer(repository, 200)); } private static ExecutorService getExecutor() { return new ThreadPoolExecutor(10, 20, 20, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>()); } private static class Producer implements Runnable { private RepositoryWithCondition repository; private int produceCount; public Producer(RepositoryWithCondition repository, int produceCount) { this.repository = repository; this.produceCount = produceCount; } public void run() { while (true) { try { repository.add(produceCount); } catch (InterruptedException e) { e.printStackTrace(); } } } } private static class Consumer implements Runnable { private RepositoryWithCondition repository; private int consumeCount; public Consumer(RepositoryWithCondition repository, int consumeCount) { this.repository = repository; this.consumeCount = consumeCount; } public void run() { while (true) { try { repository.reduce(consumeCount); } catch (InterruptedException e) { e.printStackTrace(); } } } }}
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表