Table of Contents

エージェントはRefと同じくミュータブルな状態の共有を提供する。 Refs協調した複数のもの への 同期的 な変更をサポートしているのに対し、エージェントは 独立した 個別のもの への 非同期 な変更を提供する。エージェントは単一のストレージに束縛され、アクションの結果としてそのストレージにだけ(新しい状態への)変更を許可する。アクションとはエージェントの状態に対して非同期的に適用される (任意で引数を追加できる) 関数であり、戻り値がエージェントの新しい状態となる。アクションは関数であるため、マルチメソッドを使うことができ、アクションをポリモーフィックにすることも出来る。更には関数の数には限りがなく、エージェントがサポートできるアクションにも限りがない点が他の言語によるパターンマッチによるメッセージの処理ループと対照的である。

Clojureのエージェントは リアクティブ であり、自律的ではない - 手続き的なメッセージループやブロッキングレシーブ等は存在しない。エージェントの状態自体はイミュータブル (Clojureの永続的なコレクションのインスタンスが好ましい) にするべきで、エージェントの状態はどのスレッドからでもメッセージ無しで即座に読み込める ( deref 関数か reader macro @ を使用する) 。要するに、観測においては協調や調停が不要である。

エージェントのアクションディスパッチは (send agent fn args*) の形式を取る。 send (及び send-off) は即座にリターンされ、後ほど別のスレッドで以下が起こる:

  1. 与えられた fn がエージェントの 状態 と任意の引数に対して適用される。

  2. エージェントに対してバリデーター関数がセットされている場合、 fn の戻り値がバリデーター関数に渡される。詳細については set-validator! を参照

  3. バリデーターが成功、もしくはバリデーターが指定されていない場合、与えられた fn の戻り値がエージェントの新しい状態となる。

  4. エージェントに対してウォッチャーが追加されている場合、それらが呼ばれる。詳細については add-watch を参照

  5. 関数の実行中に他のディスパッチが行われた場合(直接、もしくは関節的に)、エージェントの状態の 変更後 まで保留となる。

アクション関数によって例外が投げられた場合、ネストされたディスパッチは起こらず、例外はエージェント自身にキャッシュされる。エラーがキャッシュされたエージェントに対しては、エラーがクリアされるまで後に続く全ての操作が即座に例外となる。エージェントのエラー内容は agent-error で取得することが可能で、 restart-agent を使用してエージェントを再起動することが出来る。

全てのエージェントのアクションはスレッドプール内のスレッドにインターリーブされる。どの時点でも各エージェントに対して最大でも一つのアクションが実行される。別のエージェントやスレッドからエージェントへとディスパッチされたアクションは send が行われた順番で起こり、他のソースからディスパッチされたアクションと( 可能性として )インターリーブされる。 send はCPUバウンドな処理に使用するべきで、 send-off はIOでブロックする可能性があるアクションに適切なものとなっている。

エージェントはSTMと統合されている - トランザクションの中で行われる全てのディスパッチはコミットまで保留され、リトライや中断が発生した際にはディスパッチが廃棄される。

全てのClojureの並列サポートと同じく、ユーザーコードのロックは一切関与してこない。

エージェントを使用すると非デーモンバックグラウンドスレッドのプールが立ち上がり、JVMのシャットダウンを妨げる点には注意が必要になる。シャットダウンを行い場合は shutdown-agents を使用してこれらのスレッドを終了させる。

この例ではメッセージの巡回送信のテストを実装している。n個のエージェントのチェインの作成と、連続するm個のアクションのチェインの先頭へのディスパッチが行われ、チェインを辿っていく。

(defn relay [x i]
  (when (:next x)
    (send (:next x) relay i))
  (when (and (zero? i) (:report-queue x))
    (.put (:report-queue x) i))
  x)

(defn run [m n]
  (let [q (new java.util.concurrent.SynchronousQueue)
        hd (reduce (fn [next _] (agent {:next next}))
                   (agent {:report-queue q}) (range (dec m)))]
    (doseq [i (reverse (range n))]
      (send hd relay i))
    (.take q)))

; 1 million message sends:
(time (run 1000 1000))
->"Elapsed time: 2959.254 msecs"

関連する関数

エージェントを作成する: agent

エージェントの観察: deref ( @ reader マクロも参照) agent-error error-handler error-mode

エージェントの状態を変更する: send send-off restart-agent

エージェントに対してブロックを伴う待ちを行う: await await-for

Refバリデーター: set-validator! get-validator

ウォッチャー: add-watch remove-watch

エージェントのスレッドを管理する: shutdown-agents

エージェントのエラーマネジメント: agent-error restart-agent set-error-handler! error-handler set-error-mode! error-mode