本文共 7895 字,大约阅读时间需要 26 分钟。
-- Start
我们在 一节中讲了一个并发存钱和取钱的例子, 事实上这个例子有一个问题, 那就是当余额小于等于0时我们仍然可以取钱(也许是信用卡), 下面我们把这个例子修改一下, 增加卡类型, 如果是信用卡, 则当余额小于等于0时我们仍然可以取钱; 如果是储蓄卡, 当余额小于等于我们要取的钱时, 取钱的线程必须等待, 直到存钱的线程唤醒它或超时为止.
public class Test { public static void main(String[] args) throws Exception { Bank icbc = new Bank(); // 工商银行 Account account = new Account(icbc, "储蓄卡", 1); // 在工商银行开户, 并存入 1 块钱 // 路人甲在 ATM 1 给我转帐, 每次转帐 1 块钱, 连续转帐 10 次 new Thread(new ATM(account, 1, 10), "ATM 1").start(); // 路人乙在 ATM 2 给我转帐, 每次转帐 2 块钱, 连续转帐 5 次 new Thread(new ATM(account, 2, 5), "ATM 2").start(); // 我从 ATM 3 开始取钱, 每次取 3块, 连续取5次 new Thread(new ATM(account, -3, 5), "ATM 3").start(); // 路人丙在 ATM 4 给我转帐, 每次转帐 4 块钱, 连续转帐 5 次 new Thread(new ATM(account, 4, 5), "ATM 4").start(); // 我老婆从 ATM 5 开始取钱, 每次取5块, 连续取7次 new Thread(new ATM(account, -5, 5), "ATM 5").start(); }}class ATM implements Runnable { private Account account; // 账户 private int tradeAmount; // 交易额 private int tradeTimes; // 交易次数 // 构造方法 public ATM(Account account, int tradeAmount, int tradeTimes) { this.account = account; this.tradeAmount = tradeAmount; this.tradeTimes = tradeTimes; } public void run() { for (int i = 0; i < tradeTimes; i++) { try { account.getBank().tradeAmount(account, tradeAmount); Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (Exception e) { e.printStackTrace(); return; } } }}class Account { private Bank bank; private String cardType; private int amount; public Account(Bank bank, String cardType, int amount) { this.bank = bank; this.amount = amount; this.cardType = cardType; } // Getter and Setter public Bank getBank() { return bank; } public String getCardType() { return cardType; } public int getAmount() { return amount; } public void setAmount(int amount) { this.amount = amount; }}class Bank { // 同步方法 public synchronized void tradeAmount(Account account, int amount) throws Exception { while ("储蓄卡".equals(account.getCardType()) // 储蓄卡 && amount < 0 // 取钱 && account.getAmount() < Math.abs(amount)) { // 余额小于要取的钱 try { // 当取钱线程获得锁进入同步方法后, 却发现它必须等到余额大于或等于要取的钱后才能执行 // 如果不能满足条件, 它必须等待并放弃持有的锁, 直到某个存钱线程存入一笔钱后通知它 // 它得到通知后将再次尝试获得锁并检查是否满足条件, 如果此时满足条件, 它将执行取钱操作 // 如果仍然不能满足条件, 它将继续等待直到满足条件或超时 // 由于取钱线程可能需要多次检查条件, 所以此处用的是 while, 而不是 if wait(60000); // 等待一分钟 } catch (InterruptedException e) { throw new Exception("您的余额不足", e); } } account.setAmount(amount + account.getAmount()); System.out.println("您在 " + Thread.currentThread().getName() + " 交易了 " + amount + ", 您账户的账户余额是 " + account.getAmount()); // 通知所有等待的线程, 得到通知的线程将再次检查条件是否满足 notifyAll(); }}
从 JDK 1.5 引入锁之后, 我们又多个一种实现线程交互的方式, 下面是一个简单的例子.
class Bank { final Lock lock; final Condition enoughCash; public Bank() { lock = new ReentrantLock(); enoughCash = lock.newCondition(); } public void tradeAmount(Account account, int amount) throws Exception { lock.lock(); try { while ("储蓄卡".equals(account.getCardType()) // 储蓄卡 && amount < 0 // 取钱 && account.getAmount() < Math.abs(amount)) { // 余额小于要取的钱 try { // 当取钱线程获得锁进入同步方法后, 却发现它必须等到余额大于或等于要取的钱后才能执行 // 如果不能满足条件, 它必须等待并放弃持有的锁, 直到某个存钱线程存入一笔钱后通知它 // 它得到通知后将再次尝试获得锁并检查是否满足条件, 如果此时满足条件, 它将执行取钱操作 // 如果仍然不能满足条件, 它将继续等待直到满足条件或超时 // 由于取钱线程可能需要多次检查条件, 所以此处用的是 while, 而不是 if enoughCash.await(60, TimeUnit.SECONDS); // 等待一分钟 } catch (InterruptedException e) { throw new Exception("您的余额不足", e); } } account.setAmount(amount + account.getAmount()); System.out.println("您在 " + Thread.currentThread().getName() + " 交易了 " + amount + ", 您账户的账户余额是 " + account.getAmount()); // 通知所有等待的线程, 得到通知的线程将再次检查条件是否满足 enoughCash.signalAll(); } finally { lock.unlock(); } }}
JDK 1.5 新加入了一个称为 CyclicBarrier 的类, 它能帮助我们实现线程之间的交互. 如: 下面的例子演示了4个工人盖一栋二层楼的房子, 每人负责东西南北四面墙中的一面, 由于每个工人的进度不同, 当一个工人盖好第一层的一面墙后(此时他到达障碍点), 他必须等待其他墙盖好后才能盖屋顶.
import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;public class Test { public static void main(String[] args) throws Exception { // 四个工人合作盖一栋房子 int workerNum = 4; // 定义屏障点, 由于每个工人盖房的速度不同, 只有当四个工人分别盖好四面墙时, 才能盖屋顶 CyclicBarrier cyclic = new CyclicBarrier(workerNum, new Runnable() { public void run() { System.out.println("盖屋顶"); } }); // 四个工人开始同时干活 for (int i = 0; i < workerNum; i++) { new Thread(new Worker(cyclic)).start(); } }}class Worker implements Runnable { private final CyclicBarrier cyclic; public Worker(CyclicBarrier cyclic) { this.cyclic = cyclic; } public void run() { try { building(); // 盖第一层四面墙中的一面 cyclic.await(); // 盖好一面墙后暂停, 等待其他三面墙盖好后盖屋顶 building(); // 继续盖第二层 cyclic.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } private void building() { System.out.println("盖墙"); }}
JDK 1.5 新加入了一个称为 CountDownLatch 的类, 它能帮助我们实现线程之间的交互. 如: 下面的例子演示了一个程序运行之前需要先初始化数据库, 缓存, 文件等.
import java.util.concurrent.CountDownLatch;public class Test { public static void main(String[] args) throws Exception { // 定义一个有三个门栓的门 final CountDownLatch doneSignal = new CountDownLatch(3); // 初始化数据库 new Thread(new Runnable() { public void run() { System.out.println("初始化数据库"); doneSignal.countDown(); // 打开一个门栓 } }).start(); // 初始化缓存 new Thread(new Runnable() { public void run() { System.out.println("初始化缓存"); doneSignal.countDown(); // 打开一个门栓 } }).start(); // 初始化文件 new Thread(new Runnable() { public void run() { System.out.println("初始化文件"); doneSignal.countDown(); // 打开一个门栓 } }).start(); doneSignal.await(); // 等待三个门栓全部打开 doWork(); // 三个门栓全部打开后就可以运行我们的任务了 } private static void doWork() { System.out.println("生成报告."); }}
JDK 1.5 新加入了一个称为 Exchanger 的类, 它能帮助我们实现两个线程之间的数据交换. 如: 下面的例子演示了一个线程从一个文件中读取数据到一个List中, 另一个线程从另个List中读取数据, 两个线程使用了不同的List, 运行一段时间后我们交换这两个List.
import java.io.File;import java.io.FileNotFoundException;import java.util.ArrayList;import java.util.List;import java.util.Scanner;import java.util.concurrent.Exchanger;public class Test { public static void main(String[] args) throws Exception { Exchanger---更多参见: -- 声 明:转载请注明出处 -- Last Updated on 2012-07-06 -- Written by ShangBo on 2012-06-22 -- End
> exchanger = new Exchanger
>(); new Thread(new WriteTask(exchanger)).start(); new Thread(new ReadTask(exchanger)).start(); }}class WriteTask implements Runnable { private int bufferSize = 3; private List writeDataBuffer = new ArrayList (bufferSize); private Exchanger
> exchanger; public WriteTask(Exchanger
> exchanger) { this.exchanger = exchanger; } public void run() { try { Scanner s = new Scanner(new File("C:\\workspace\\test.txt")); while (s.hasNextLine()) { if (writeDataBuffer.size() >= bufferSize) { writeDataBuffer = exchanger.exchange(writeDataBuffer); // 交换读缓冲区和写缓冲区 } writeDataBuffer.add(s.nextLine()); // 从文件中读取一行数据到写缓冲区 Thread.sleep(1000); } writeDataBuffer.add("EOF"); exchanger.exchange(writeDataBuffer); } catch (InterruptedException e) { e.printStackTrace(); } catch (FileNotFoundException e) { e.printStackTrace(); } }}class ReadTask implements Runnable { private Exchanger
> exchanger; private List readDataBuffer = new ArrayList (3); public ReadTask(Exchanger
> exchanger) { this.exchanger = exchanger; } public void run() { try { String line = ""; while (!"EOF".equals(line)) { if (readDataBuffer.size() == 0) { readDataBuffer = exchanger.exchange(readDataBuffer); // 交换读缓冲区和入缓冲区 } line = readDataBuffer.remove(0); System.out.println(line); // 从读缓冲区得到数据 Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } }}