Table of Contents

AgentはRefと同じくミュータブルな状態の共有を提供する。 Ref協調した複数のもの への 同期的 な変更をサポートしているのに対し、Agentは 独立した 個別のもの への 非同期 な変更を提供する。Agentは単一のストレージに束縛され、アクションの結果としてそのストレージにだけ(新しい状態への)変更を許可する。アクションとはAgentの状態に対して非同期的に適用される (任意で引数を追加できる) 関数であり、戻り値がAgentの新しい状態となる。アクションは関数であるため、マルチメソッドを使うことができ、アクションをポリモーフィックにすることもできる。更には関数の数には限りがなく、Agentがサポートできるアクションにも限りがない点が他の言語によるパターンマッチによるメッセージの処理ループと対照的である。

ClojureのAgentは リアクティブ であり、自律的ではない - 手続き的なメッセージループやブロッキングレシーブ等は存在しない。Agentの状態自体はイミュータブル (Clojureの永続的なコレクションのインスタンスが好ましい) にするべきで、Agentの状態はどのスレッドからでもメッセージなしで即座に読み込める ( deref 関数か リーダー マクロ @ を使用する) 。要するに、観測においては協調や調停が不要である。

Agentのアクションディスパッチは (send agent fn args*) の形式を取る。 send (および send-off) は即座にリターンされ、後ほど別のスレッドで以下が起こる:

  1. 与えられた fn がAgentの 状態 と任意の引数に対して適用される。

  2. Agentに対してバリデーター関数がセットされている場合、 fn の戻り値がバリデーター関数に渡される。詳細については set-validator! を参照

  3. バリデーターが成功、もしくはバリデーターが指定されていない場合、与えられた fn の戻り値がAgentの新しい状態となる。

  4. Agentに対してウォッチャーが追加されている場合、それらが呼ばれる。詳細については add-watch を参照

  5. 関数の実行中に他のディスパッチが行われた場合(直接、もしくは関節的に)、Agentの状態の 変更後 まで保留となる。

アクション関数によって例外が投げられた場合、ネストされたディスパッチは起こらず、例外はAgent自身にキャッシュされる。エラーがキャッシュされたAgentに対しては、エラーがクリアされるまで後に続く全ての操作が即座に例外となる。Agentのエラー内容は agent-error で取得することが可能で、 restart-agent を使用してAgentを再起動することができる。

全てのAgentのアクションはスレッドプール内のスレッドにインターリーブされる。どの時点でも各Agentに対して最大でも一つのアクションが実行される。別のAgentやスレッドからAgentへとディスパッチされたアクションは send が行われた順番で起こり、他のソースからディスパッチされたアクションと( 可能性として )インターリーブされる。 send はCPUバウンドな処理に使用するべきで、 send-off はIOでブロックする可能性があるアクションに適切なものとなっている。

AgentはSTMと統合されている - トランザクションの中で行われる全てのディスパッチはコミットまで保留され、リトライや中断が発生した際にはディスパッチが廃棄される。

全てのClojureの並列サポートと同じく、ユーザーコードのロックは一切関与してこない。

Agentを使用すると非デーモンバックグラウンドスレッドのプールが立ち上がり、JVMのシャットダウンを妨げる点には注意が必要になる。シャットダウンを行い場合は shutdown-agents を使用してこれらのスレッドを終了させる。

この例ではメッセージの巡回送信のテストを実装している。n個のAgentのチェーンの作成と、連続するm個のアクションのチェインの先頭へのディスパッチが行われ、チェーンを辿っていく。

(defn relay [x i]
  (when (:next x)
    (send (:next x) relay i))
  (when (and (zero? i) (:report-queue x))
    (.put (:report-queue x) i))
  x)

(defn run [m n]
  (let [q (new java.util.concurrent.SynchronousQueue)
        hd (reduce (fn [next _] (agent {:next next}))
                   (agent {:report-queue q}) (range (dec m)))]
    (doseq [i (reverse (range n))]
      (send hd relay i))
    (.take q)))

; 1 million message sends:
(time (run 1000 1000))
->"Elapsed time: 2959.254 msecs"

関連する関数

Agentを作成する: agent

Agentを観察する: deref ( @ リーダー マクロも参照) agent-error error-handler error-mode

Agentの状態を変更する: send send-off restart-agent

Agentに対してブロックを伴う待ちを行う: await await-for

Refバリデーター: set-validator! get-validator

ウォッチャー: add-watch remove-watch

Agentのスレッドを管理する: shutdown-agents