{:deps
{org.clojure/clojure {:mvn/version "1.11.2"}
org.clojure/core.async {:mvn/version "1.6.673"}}}
core.async 函式庫透過使用通道支援非同步程式設計。
若要使用 core.async,請宣告對 Clojure 1.10.0 或更高版本以及最新 core.async 函式庫的相依性
{:deps
{org.clojure/clojure {:mvn/version "1.11.2"}
org.clojure/core.async {:mvn/version "1.6.673"}}}
若要開始使用 core.async,請在 REPL 中載入 clojure.core.async
名稱空間
(require '[clojure.core.async :as a :refer [<!! >!! <! >!]])
或將其包含在您的名稱空間中
(ns my.ns
(:require [clojure.core.async :as a :refer [<!! >!! <! >!]]))
值傳遞在類似的佇列通道上。預設情況下,通道是未緩衝的(長度為 0) - 它們需要生產者和消費者在通道中會合才能傳遞值。
使用 chan
建立未緩衝的通道
(a/chan)
傳遞一個數字來建立一個具有固定緩衝區大小的通道
(a/chan 10)
close!
一個通道以停止接受放入。剩餘的值仍然可用於取出。已排空的通道在取出時會傳回 nil。不能明確地透過通道傳送 Nils!
(let [c (a/chan)]
(a/close! c))
通道也可以使用自訂緩衝區,它們對「已滿」的情況有不同的政策。API 中提供了兩個有用的範例。
;; Use `dropping-buffer` to drop newest values when the buffer is full:
(a/chan (a/dropping-buffer 10))
;; Use `sliding-buffer` to drop oldest values when the buffer is full:
(a/chan (a/sliding-buffer 10))
在一般執行緒中,我們使用 >!!
(封鎖放入)和 <!!
(封鎖取出)透過通道溝通。
(let [c (a/chan 10)]
(>!! c "hello")
(assert (= "hello" (<!! c)))
(a/close! c))
由於這些是封鎖呼叫,如果我們嘗試放入一個未緩衝的通道,我們會封鎖主執行緒。我們可以使用 thread
(例如 future
)在池執行緒中執行一個主體並傳回一個包含結果的通道。這裡我們啟動一個背景工作以將「hello」放入一個通道,然後在目前的執行緒中讀取該值。
(let [c (a/chan)]
(a/thread (>!! c "hello"))
(assert (= "hello" (<!! c)))
(a/close! c))
go
巨集非同步地在一個特殊執行緒池中執行其主體。會封鎖的通道操作會暫停執行,而不是封鎖任何執行緒。此機制封裝了事件/回呼系統中外部的控制反轉。在 go
封鎖中,我們使用 >!
(放入)和 <!
(取出)。
這裡我們將我們先前的通道範例轉換為使用 go 封鎖
(let [c (a/chan)]
(a/go (>! c "hello"))
(assert (= "hello" (<!! (a/go (<! c)))))
(a/close! c))
我們使用一個 go 封鎖作為產生器,而不是明確的執行緒和封鎖呼叫。消費者使用一個 go 封鎖來取出,然後傳回一個結果通道,我們從中進行封鎖取出。
通道相較於佇列的一個殺手級功能是能夠同時等待多個通道(例如 socket select)。這透過 alts!!
(一般執行緒)或 go 封鎖中的 alts!
來完成。
我們可以建立一個具有 alts 的背景執行緒,它會結合兩個通道中的輸入。alts!!
會採用一組要執行的操作 - 要取出的通道或要放入的 [通道值],並傳回成功的值(放入為 nil)和通道
(let [c1 (a/chan)
c2 (a/chan)]
(a/thread (while true
(let [[v ch] (a/alts!! [c1 c2])]
(println "Read" v "from" ch))))
(>!! c1 "hi")
(>!! c2 "there"))
列印(在 stdout 上,在您的 repl 中可能不可見)
Read hi from #object[clojure.core.async.impl.channels.ManyToManyChannel ...]
Read there from #object[clojure.core.async.impl.channels.ManyToManyChannel ...]
我們可以使用 alts! 在 go 封鎖中執行相同的事情
(let [c1 (a/chan)
c2 (a/chan)]
(a/go (while true
(let [[v ch] (a/alts! [c1 c2])]
(println "Read" v "from" ch))))
(a/go (>! c1 "hi"))
(a/go (>! c2 "there")))
由於 go 封鎖是不受執行緒約束的輕量級處理程序,我們可以擁有大量的 go 封鎖!這裡我們建立 1000 個 go 封鎖,它們會在 1000 個通道上說嗨。我們使用 alts!! 在它們準備好時讀取它們。
(let [n 1000
cs (repeatedly n a/chan)
begin (System/currentTimeMillis)]
(doseq [c cs] (a/go (>! c "hi")))
(dotimes [i n]
(let [[v c] (a/alts!! cs)]
(assert (= "hi" v))))
(println "Read" n "msgs in" (- (System/currentTimeMillis) begin) "ms"))
timeout
建立一個等待指定毫秒數,然後關閉的通道
(let [t (a/timeout 100)
begin (System/currentTimeMillis)]
(<!! t)
(println "Waited" (- (System/currentTimeMillis) begin)))
我們可以將 timeout 與 alts!
結合,以執行計時通道等待。在此,我們等待 100 毫秒,直到通道上出現一個值,然後放棄
(let [c (a/chan)
begin (System/currentTimeMillis)]
(a/alts!! [c (a/timeout 100)])
(println "Gave up after" (- (System/currentTimeMillis) begin)))
原始作者:Alex Miller