併發編程-深入淺出AQS

AQS是併發編程中非常重要的概念,它是juc包下的許多併發工具類,如CountdownLatch,CyclicBarrier,Semaphore 和鎖, 如ReentrantLock, ReaderWriterLock的實現基礎,提供了一個基於int狀態碼和隊列來實現的併發框架。本文將對AQS框架的幾個重要組成進行簡要介紹,讀完本文你將get到以下幾個點:

  1. AQS進行併發控制的機制是什麼

  2. AQS獨佔和共享模式是如何實現的

  3. 同步隊列和條件等待隊列的區別,和數據出入隊原則

一,AQS基本概念

AQS(AbstractQueuedSynchronizer)是用來構建鎖或者其他同步組件的基礎框架,它使用了一個int成員變量來表示狀態,通過內置的FIFO(first in,first out)隊列來完成資源獲取線程的排隊工作。

隊列可分為兩種,一種是同步隊列,是程序執行入口出處的等待隊列;而另一種則是條件等待隊列,隊列中的元素是在程序執行時在某個條件上發生等待。

1.1 獨佔or共享模式

AQS支持兩種獲取同步狀態的模式既獨佔式和共享式。顧名思義,獨佔式模式同一時刻只允許一個線程獲取同步狀態,而共享模式則允許多個線程同時獲取。

1.2 同步隊列

當一個線程嘗試獲取同步狀態失敗時,同步器會將這個線程以及等待狀態等信息構造成一個節點加入到等待隊列中,同時會阻塞當前線程,當同步狀態釋放時,會把首節點中的線程喚醒,使其再次嘗試重複獲取同步隊列。

1.3 條件隊列

AQS內部類ConditionObject來實現的條件隊列,當一個線程獲取到同步狀態,但是卻通過Condition調用了await相關的方法時,會將該線程封裝成Node節點並加入到條件隊列中,它的結構和同步隊列相同。

二,獨佔or共享模式

AQS框架中,通過維護一個int類型的狀態,來進行併發控制,線程通常通過修改此狀態信息來表明當前線程持有此同步狀態。AQS則是通過保存修改狀態線程的引用來實現獨佔和共享模式的。

/**
 * 獲取同步狀態
 */
public final void acquire(int arg) {
    //嘗試獲取同步狀態, 如果嘗試獲取到同步狀態失敗,則加入到同步隊列中
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
/**
 * 嘗試獲取同步狀態【子類中實現】,因為aqs基於模板模式,僅提供基於狀態和同步隊列的實 
 * 現思路,具體的實現由子類決定
 */
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // 如果當前狀態值為0,並且等待隊列中沒有元素,執行修改狀態值操作
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            // 修改狀態值成功,記錄當前持有同步狀態的線程信息
            setExclusiveOwnerThread(current);
            return true;
        }
        // 如果當前線程已經持有同步狀態,繼續修改同步狀態【重入鎖實現原理】
    } else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

/**
 * 根據傳入的模式以及當前線程信息創建一個隊列的節點並加入到同步隊列尾部
 */
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
/**
 * 同步隊列中節點,嘗試獲取同步狀態
 */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 自旋(死循環)
        for (;;) {
            // 只有當前節點的前驅節點是頭節點時才會嘗試執行獲取同步狀態操作
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

獨佔式是如何控製得?

當修改狀態信息成功后,如果執行的是獨佔式操作,AQS的具體實現類中會保存當前線程的信息來聲明同步狀態已被當前線程佔用,此時其他線程再嘗試獲取同步狀態會返回false。

三,同步隊列

3.1 隊列中保存那些信息?

同步隊列節點中主要保存着線程的信息以及模式(共享or獨佔)。

3.2 何時執行入隊操作?

/**
 * 獲取同步狀態
 */
public final void acquire(int arg) {
    //嘗試獲取同步狀態, 如果嘗試獲取到同步狀態失敗,則加入到同步隊列中
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

復用上文中的代碼,不難看出再獲取同步狀態失敗后,會執行入隊操作。

3.3 何時執行出隊操作?

當線程獲取同步狀態失敗時,會被封裝成Node節點加入到等待隊列中,此時所有節點都回進入自旋過程,首先判斷自己prev是否時頭節點,如果是則嘗試獲取同步狀態。
被阻塞線程的喚醒主要以靠前驅節點的出隊或阻塞線程被中斷來實現。

/**
 * 同步隊列中節點,嘗試獲取同步狀態
 * 
 * 1. 當一個線程獲取到同步狀態時,會將當前線程構造程Node並設置為頭節點
 * 2. 並將原始的head節點設置為null,以便於垃圾回收
 */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

四,條件等待隊列

條件變量(ConidtionObject)是AQS中的一個內部類,用來實現同步隊列機制。同步隊列復用了等待隊列中Node節點,所以同步隊列到等待隊列中不需要進行額外的轉換。

4.1 什麼時候執行入隊操作?

當線程獲取到同步狀態,但是在臨界區中調用了await()方法,此時該線程會被加入到對應的條件隊列匯總。
ps: 臨界區,加鎖和釋放鎖之間的代碼區域

/**
 * ConditionObject中的await方法,調用后使得當前執行線程加入條件等待隊列
 */
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    // -----省略代碼------
}
/**
 * 添加等待線程
 */
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // -----省略代碼------
    // 將當前線程構造程條件隊列節點,並加入到隊列中
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

4.2 什麼時候執行出隊操作?

當對應的Conditioni調用signial/signalAll()方法時回選擇從條件隊列中出隊列,同步隊列是通過自旋的方式獲取同步狀態,而條件隊列中的節點則通過通知的方式出隊。條件隊列中的節點被喚醒後會加入到入口等待隊列中。

/**
 * 喚醒當前條件等到隊列中的所有等待線程
 */
public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}
/**
 * 遍歷隊列,將元素從條件隊列 加入到 同步隊列
 */
private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}
final boolean transferForSignal(Node node) {
    // -----省略代碼------
    // 執行入隊操作,將node添加到同步隊列中
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

五,總結

  1. 使用Node實現的FIFO隊列,可以用於構建鎖或者其他同步裝置的基礎框架
  2. 利用一個int類型的屬性表示狀態
  3. 使用模板方法模式,子類可以通過繼承它來管理狀態實現各種併發工具
  4. 可以同時實現獨佔和共享模式

本文對AQS的基本原理進行的簡要的描述,對於子類的公平性和非公平行實現,中斷,隊列中節點的等待狀態,cas等操作沒有進行探討,感興趣的小夥伴可以進行源碼閱讀或者查閱相關資料。

六,Q&A

Question1: 在java中通常使用synchronized來實現方法同步,AQS中通過CAS保證了修改同步狀態的一致性問題,那麼對比synchronized,cas有什麼優勢不同與優勢呢?你還知道其他無鎖併發的策略嗎?

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

※想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師”嚨底家”!!

大陸寄台灣空運注意事項

大陸海運台灣交貨時間多久?

※避免吃悶虧無故遭抬價!台中搬家公司免費估價,有契約讓您安心有保障!

您可能也會喜歡…