Clojure

core.async 簡介

入門

core.async 函式庫透過使用通道支援非同步程式設計。

若要使用 core.async,請宣告對 Clojure 1.10.0 或更高版本以及最新 core.async 函式庫的相依性

{:deps
 {org.clojure/clojure {:mvn/version "1.11.2"}
  org.clojure/core.async {:mvn/version "1.6.673"}}}

若要開始使用 core.async,請在 REPL 中載入 clojure.core.async 名稱空間

(require '[clojure.core.async :as a :refer [<!! >!! <! >!]])

或將其包含在您的名稱空間中

(ns my.ns
  (:require [clojure.core.async :as a :refer [<!! >!! <! >!]]))

通道

值傳遞在類似的佇列通道上。預設情況下,通道是未緩衝的(長度為 0) - 它們需要生產者和消費者在通道中會合才能傳遞值。

使用 chan 建立未緩衝的通道

(a/chan)

傳遞一個數字來建立一個具有固定緩衝區大小的通道

(a/chan 10)

close! 一個通道以停止接受放入。剩餘的值仍然可用於取出。已排空的通道在取出時會傳回 nil。不能明確地透過通道傳送 Nils!

(let [c (a/chan)]
  (a/close! c))

通道也可以使用自訂緩衝區,它們對「已滿」的情況有不同的政策。API 中提供了兩個有用的範例。

;; Use `dropping-buffer` to drop newest values when the buffer is full:
(a/chan (a/dropping-buffer 10))

;; Use `sliding-buffer` to drop oldest values when the buffer is full:
(a/chan (a/sliding-buffer 10))

執行緒

在一般執行緒中,我們使用 >!!(封鎖放入)和 <!!(封鎖取出)透過通道溝通。

(let [c (a/chan 10)]
  (>!! c "hello")
  (assert (= "hello" (<!! c)))
  (a/close! c))

由於這些是封鎖呼叫,如果我們嘗試放入一個未緩衝的通道,我們會封鎖主執行緒。我們可以使用 thread(例如 future)在池執行緒中執行一個主體並傳回一個包含結果的通道。這裡我們啟動一個背景工作以將「hello」放入一個通道,然後在目前的執行緒中讀取該值。

(let [c (a/chan)]
  (a/thread (>!! c "hello"))
  (assert (= "hello" (<!! c)))
  (a/close! c))

Go 封鎖和 IOC 執行緒

go 巨集非同步地在一個特殊執行緒池中執行其主體。會封鎖的通道操作會暫停執行,而不是封鎖任何執行緒。此機制封裝了事件/回呼系統中外部的控制反轉。在 go 封鎖中,我們使用 >!(放入)和 <!(取出)。

這裡我們將我們先前的通道範例轉換為使用 go 封鎖

(let [c (a/chan)]
  (a/go (>! c "hello"))
  (assert (= "hello" (<!! (a/go (<! c)))))
  (a/close! c))

我們使用一個 go 封鎖作為產生器,而不是明確的執行緒和封鎖呼叫。消費者使用一個 go 封鎖來取出,然後傳回一個結果通道,我們從中進行封鎖取出。

Alts

通道相較於佇列的一個殺手級功能是能夠同時等待多個通道(例如 socket select)。這透過 alts!!(一般執行緒)或 go 封鎖中的 alts! 來完成。

我們可以建立一個具有 alts 的背景執行緒,它會結合兩個通道中的輸入。alts!! 會採用一組要執行的操作 - 要取出的通道或要放入的 [通道值],並傳回成功的值(放入為 nil)和通道

(let [c1 (a/chan)
      c2 (a/chan)]
  (a/thread (while true
              (let [[v ch] (a/alts!! [c1 c2])]
                (println "Read" v "from" ch))))
  (>!! c1 "hi")
  (>!! c2 "there"))

列印(在 stdout 上,在您的 repl 中可能不可見)

Read hi from #object[clojure.core.async.impl.channels.ManyToManyChannel ...]
Read there from #object[clojure.core.async.impl.channels.ManyToManyChannel ...]

我們可以使用 alts! 在 go 封鎖中執行相同的事情

(let [c1 (a/chan)
      c2 (a/chan)]
  (a/go (while true
          (let [[v ch] (a/alts! [c1 c2])]
            (println "Read" v "from" ch))))
  (a/go (>! c1 "hi"))
  (a/go (>! c2 "there")))

由於 go 封鎖是不受執行緒約束的輕量級處理程序,我們可以擁有大量的 go 封鎖!這裡我們建立 1000 個 go 封鎖,它們會在 1000 個通道上說嗨。我們使用 alts!! 在它們準備好時讀取它們。

(let [n 1000
      cs (repeatedly n a/chan)
      begin (System/currentTimeMillis)]
  (doseq [c cs] (a/go (>! c "hi")))
  (dotimes [i n]
    (let [[v c] (a/alts!! cs)]
      (assert (= "hi" v))))
  (println "Read" n "msgs in" (- (System/currentTimeMillis) begin) "ms"))

timeout 建立一個等待指定毫秒數,然後關閉的通道

(let [t (a/timeout 100)
      begin (System/currentTimeMillis)]
  (<!! t)
  (println "Waited" (- (System/currentTimeMillis) begin)))

我們可以將 timeout 與 alts! 結合,以執行計時通道等待。在此,我們等待 100 毫秒,直到通道上出現一個值,然後放棄

(let [c (a/chan)
      begin (System/currentTimeMillis)]
  (a/alts!! [c (a/timeout 100)])
  (println "Gave up after" (- (System/currentTimeMillis) begin)))

更多資訊

請參閱以下內容,以取得更多資訊

原始作者:Alex Miller