第一版是針對 Lua 5.0 編寫的。雖然在很大程度上仍然適用於後續版本,但有一些差異。
第四版針對 Lua 5.3,可在 Amazon 和其他書店購買。
購買這本書,您也可以幫助支援 Lua 專案


9.4 – 非搶先式多執行緒

正如我們先前所見,協同程式是一種協作式多執行緒。每個協同程式等同於一個執行緒。一對 yield-resume 切換控制權從一個執行緒到另一個執行緒。然而,與「真正的」多執行緒不同,協同程式是非搶先式的。當一個協同程式正在執行時,它無法從外部停止。它只會在明確要求時暫停執行(透過呼叫 yield)。對於許多應用程式而言,這不是問題,而是相反。在沒有搶先式的情況下,程式設計容易得多。您不需要擔心同步錯誤,因為執行緒之間的所有同步在程式中都是明確的。您只需要確保協同程式只在它位於臨界區外時才讓出。

然而,使用非搶先式多執行緒,每當任何執行緒呼叫封鎖操作時,整個程式會封鎖直到操作完成。對於大多數應用程式而言,這是不可接受的行為,這導致許多程式設計師不將協同程式視為傳統多執行緒的真正替代方案。正如我們將在此處看到的,這個問題有一個有趣的(而且事後看來很明顯的)解決方案。

讓我們假設一個典型的多執行緒情況:我們想要透過 HTTP 下載多個遠端檔案。當然,要下載多個遠端檔案,我們必須知道如何下載一個遠端檔案。在此範例中,我們將使用由 Diego Nehab 開發的 LuaSocket 函式庫。若要下載檔案,我們必須開啟與其網站的連線,傳送請求至檔案,接收檔案(以區塊為單位),並關閉連線。在 Lua 中,我們可以撰寫此任務如下。首先,我們載入 LuaSocket 函式庫

    require "luasocket"

接著,我們定義主機和我們要下載的檔案。在此範例中,我們將從萬維網聯盟網站下載 HTML 3.2 參考規格

    host = "www.w3.org"
    file = "/TR/REC-html32.html"

接著,我們開啟至該網站的 80 埠(HTTP 連線的標準埠)的 TCP 連線

    c = assert(socket.connect(host, 80))
此操作傳回一個連線物件,我們使用它來傳送檔案請求
    c:send("GET " .. file .. " HTTP/1.0\r\n\r\n")

receive 方法總是傳回一個字串,其中包含它讀取的內容以及另一個包含操作狀態的字串。當主機關閉連線時,我們會中斷接收迴圈。

最後,我們關閉連線

    c:close()

現在我們知道如何下載一個檔案,讓我們回到下載多個檔案的問題。最簡單的方法是一次下載一個。然而,這種順序方法(我們只在完成前一個檔案後才開始讀取檔案)太慢了。當讀取遠端檔案時,程式會花費大部分時間等待資料到達。更具體地說,它會花費大部分時間封鎖在對 receive 的呼叫中。因此,如果程式同時下載所有檔案,它可以執行得更快。然後,當一個連線沒有可用資料時,程式可以從另一個連線讀取。顯然,協程提供了一個方便的方法來建構這些同時下載。我們為每個下載任務建立一個新的執行緒。當一個執行緒沒有可用資料時,它會將控制權讓給一個簡單的調度器,而調度器會呼叫另一個執行緒。

若要使用協程重新撰寫程式,讓我們首先將先前的下載程式碼重新撰寫為一個函式

    function download (host, file)
      local c = assert(socket.connect(host, 80))
      local count = 0    -- counts number of bytes read
      c:send("GET " .. file .. " HTTP/1.0\r\n\r\n")
      while true do
        local s, status = receive(c)
        count = count + string.len(s)
        if status == "closed" then break end
      end
      c:close()
      print(file, count)
    end
由於我們對遠端檔案內容不感興趣,因此此函式只計算檔案大小,而不是將檔案寫入標準輸出。(如果有多個執行緒讀取多個檔案,輸出會將所有檔案混在一起。)在此新程式碼中,我們使用一個輔助函式(receive)從連線接收資料。在順序方法中,其程式碼如下所示
    function receive (connection)
      return connection:receive(2^10)
    end
對於並發實作,此函式必須在不封鎖的情況下接收資料。相反地,如果沒有足夠的可用資料,它會讓步。新程式碼如下所示
    function receive (connection)
      connection:timeout(0)   -- do not block
      local s, status = connection:receive(2^10)
      if status == "timeout" then
        coroutine.yield(connection)
      end
      return s, status
    end
timeout(0) 的呼叫使透過連線的任何操作都成為非封鎖操作。當操作狀態為 "timeout" 時,表示操作已傳回但未完成。在這種情況下,執行緒會讓步。傳遞給 yield 的非假參數會向調度器發出訊號,表示執行緒仍在執行其任務。(稍後我們將看到調度器需要計時連線的另一個版本。)請注意,即使在逾時的情況下,連線也會傳回它在逾時之前讀取的內容,因此 receive 始終會將 s 傳回給呼叫者。

下一個函式確保每個下載在個別執行緒中執行

    threads = {}    -- list of all live threads
    function get (host, file)
      -- create coroutine
      local co = coroutine.create(function ()
        download(host, file)
      end)
      -- insert it in the list
      table.insert(threads, co)
    end
表格 threads 為調度器保留所有執行中執行緒的清單。

調度器很簡單。它主要是個迴圈,逐一呼叫所有執行緒。它還必須從清單中移除完成其任務的執行緒。當沒有更多執行緒要執行時,它會停止迴圈

    function dispatcher ()
      while true do
        local n = table.getn(threads)
        if n == 0 then break end   -- no more threads to run
        for i=1,n do
          local status, res = coroutine.resume(threads[i])
          if not res then    -- thread finished its task?
            table.remove(threads, i)
            break
          end
        end
      end
    end

最後,主程式建立它需要的執行緒並呼叫調度器。例如,若要從 W3C 網站下載四個文件,主程式可以像這樣

    host = "www.w3.org"
    
    get(host, "/TR/html401/html40.txt")
    get(host,"/TR/2002/REC-xhtml1-20020801/xhtml1.pdf")
    get(host,"/TR/REC-html32.html")
    get(host,
        "/TR/2000/REC-DOM-Level-2-Core-20001113/DOM2-Core.txt")
    
    dispatcher()   -- main loop
我的機器使用協程下載這四個檔案需要六秒。使用順序實作,需要超過兩倍的時間 (15 秒)。

儘管速度加快,這個最後的實作遠非最佳。只要至少有一個執行緒有東西要讀,一切都很好。然而,當沒有執行緒有資料要讀時,調度器會進行忙碌等待,逐一檢查執行緒以確認它們仍然沒有資料。因此,這個協程實作使用的 CPU 比順序解法多將近 30 倍。

若要避免這種行為,我們可以使用 LuaSocket 中的 select 函式。它允許程式在等待一組 socket 中的狀態變更時進行封鎖。我們實作中的變更很小。我們只需要變更調度器。新版本如下

    function dispatcher ()
      while true do
        local n = table.getn(threads)
        if n == 0 then break end   -- no more threads to run
        local connections = {}
        for i=1,n do
          local status, res = coroutine.resume(threads[i])
          if not res then    -- thread finished its task?
            table.remove(threads, i)
            break
          else    -- timeout
            table.insert(connections, res)
          end
        end
        if table.getn(connections) == n then
          socket.select(connections)
        end
      end
    end
在此內部迴圈中,這個新的調度器在表格 connections 中收集逾時連線。請記住,receive 會將此類連線傳遞給 yield;因此 resume 會傳回它們。當所有連線逾時時,調度器會呼叫 select 以等待這些連線中的任何一個變更狀態。這個最終實作執行得和第一個使用協程的實作一樣快。此外,由於它沒有進行忙碌等待,因此使用的 CPU 僅比順序實作多一點點。