トランスデューサーは合成可能な、演算的変換である。トランスデューサーは入出力ソースから独立しており、個別の要素に対する変換のエッセンスのみを指定する。トランスデューサーは入出力ソースとは分離されているため、様々な処理で使用することが可能である - コレクション、ストリーム、チャネル、オブザーバブル等。トランスデューサーは入力を意識することも中間集計結果を生成することもなく、直接的に合成する。

導入として次のブログポストおよびビデオを参照: blog post video

用語

reducing関数 とは reduce に渡すような関数 - 累積結果と新しい入力を受け取り、新しい累積結果を返す関数だ:

;; reducing関数のシグネチャ
whatever, input -> whatever

トランスデューサー (xformもしくはxfと書かれることもある) はreducing関数を別のreducing関数に変換する:

;; トランスデューサーのシグネチャ
(whatever, input -> whatever) -> (whatever, input -> whatever)

トランスデューサーで変換を定義する

Clojureに含まれるほとんどのシーケンス関数は、トランスデューサーを返すアリティを持つ。そのアリティは入力のコレクションを省いたものになっている; 入力のコレクションはトランスデューサーを適用する処理によって与えられる。 注: この個数の減ったアリティはカリー化や部分適用ではない。

例:

(filter odd?) ;; 奇数をフィルタするトランスデューサーを返す
(map inc)     ;; incによるマッピングを行うトランスデューサーを返す
(take 5)      ;; 最初の5つの値を取得するトランスデューサーを返す

トランスデューサーは通常の関数合成によって合成される。トランスデューサーは、ラップしているトランスデューサーを呼び出すか否か、何回呼び出すかを判断する前に自分自身の処理を行う。トランスデューサーの合成には既存の comp 関数の使用が推奨される:

(def xf
  (comp
    (filter odd?)
    (map inc)
    (take 5)))

トランスデューサーxfは一連の入力要素に対する処理によって適用される変換のまとまりである。まとまりの中の各関数はラップしている操作の 前に 実行される。変換器の合成は右から左に流れるが、変換の処理は左から右に流れる変換のまとまりとして組み立てられる (この例ではマッピングの前にフィルタが行われる)。

記憶の助けとして、 comp のトランスデューサーの順序は ->> によるシーケンスの変換と同じであることを覚えておくこと。上記の変換は下記のシーケンス変換と同等である:

(->> coll
     (filter odd?)
     (map inc)
     (take 5))

次の関数は入力コレクションの引数が省略されている場合トランスデューサーを返す: map cat mapcat filter remove take take-while take-nth drop drop-while replace partition-by partition-all keep keep-indexed map-indexed distinct interpose dedupe random-sample

トランスデューサーを使用する

トランスデューサーは様々なコンテキストで使用することができる (新しいコンテキストの作り方は下記を参照)。.

transduce

最も一般的なトランスデューサーの適用方法は transduce 関数で、標準のreduce関数と似ている:

(transduce xform f coll)
(transduce xform f init coll)

transducecoll を即時に (遅延評価せずに) reducing関数 f にトランスデューサー xform を適用してreduceを行う。initが与えられている場合はそれを初期値とし、それ以外の場合は(f)を初期値とする。f は(ステートフルな場合もある) reduceのコンテキストにおいて値の集約方法を指定する。

(def xf (comp (filter odd?) (map inc)))
(transduce xf + (range 5))
;; => 6
(transduce xf + 100 (range 5))
;; => 106

合成されたxfトランスデューサーは最終的なreducing関数fへの呼び出しとともに左から右へと呼び出される。直前の例では、入力値がフィルタ、インクリメント、合計される。

ネストした変換

eduction

トランスデューサーをcollに適用する際の処理を捕捉するには eduction 関数を使用する。この関数は任意の数の xform と最後に coll を受け取り、reduce可能でイテラブルな、collの各要素へのトランスデューサーの適用を返す。これらの適用はreduce/iteratorが呼び出される度に行われる。

(def iter (eduction xf (range 5)))
(reduce + 0 iter)
;; => 6

into

トランスデューサーを入力コレクションに適用し、出力コレクションを新たに構築する場合は into を使用する (内部で効率的にreduceを行い、可能であればtransientを使用する):

(into [] xf (range 1000))

sequence

入力コレクションに対するトランスデューサーの適用からシーケンスを作成する場合には sequence を使用する:

(sequence xf (range 1000))

結果のシーケンスの要素は一つずつ計算される。このようなシーケンスは入力を必要に応じて一つずつ消費し、かつ中間処理を全て実行する。この挙動は遅延シーケンスに対する同様の処理とは異なる。

トランスデューサーの作成

トランスデューサーを返すほとんどの関数は次のような形態をとる (custom code in "…​"):

(fn [xf]
  (fn ([] ...)
      ([result] ...)
      ([result input] ...)))

コアのシーケンス関数の多く (map、 filter、etc) は処理特有の引数を受け取り、(述語、関数、個数、 etc) それらの引数を包含するトランスデューサーを返す。中には catそれ自体が トランスデューサーであり、 xf を受け取らない関数もある。

内部の関数は異なる用途のために3種類のアリティで定義されている:

  • Init (アリティ 0) - ネストしている変換 xf をinit アリティ(引数0個)で呼び出し、最終的にはトランスデューサー処理を呼び出す。

  • Step (アリティ 2) - これは通常のreducing関数だが、トランスデューサーによって xf の stepアリティ(引数2個)を0回以上呼び出すことが期待される。例えば、filterは述語によって xf を呼び出すかどうかを決定する。mapは常に一度のみ呼び出す。catは入力によっては複数回呼び出す可能性がある。

  • Completion (アリティ 1) - 完了しない処理は存在するが、完了するものは (例えば transduce )、 完了引数が使用され、最終的な値を作成/状態をクリアする。この引数は xf の完了引数を一度のみ呼び出す必要がある。

completion の利用例は partition-all で、この関数は入力の最後に残っている要素をフラッシュする必要がある。 completing 関数を使用すると デフォルトのcompletionアリティを追加することによってreducing関数をトランスデューサー関数に変換することができる。

早期終了

Clojureにはreduceの早期終了を指定するメカニズムがある:

  • reduced

    • 値を受け取り reduceの終了すべきことを示す、 reduced(reduce済みの) 値を返す。

  • reduced?

    • reduced を使用して作成された値に対して true を返す。

  • deref もしくは @ を使用して reduced の中の値を取得することができる。

トランスデューサーを使用するプロセスは、step関数がreducedな値を返すかをチェックし、reducedを返す場合は停止する必要がある。 (transduce可能な処理の作成で後述する)。加えて、ネストしたreduceを使用するトランスデューサーのstep関数もreducedな値かどうかをチェックして、見つけたらそれを引き渡す必要がある。(catの実装を例として参照)

reduce処理の状態を持つトランスデューサー

いくつかのトランスデューサー( takepartition 等)はreduce処理に状態が必要となる。この状態はtransduce可能な処理がトランスデューサーを適用する度に作成される。例として連続する重複した値を一つにまとめるdedupeトランスデューサーを考える。このトランスデューサーは現在の値を引き渡すかどうかを決めるにあたって、一つ前の値を記憶する必要がある:

(defn dedupe []
  (fn [xf]
    (let [prev (volatile! ::none)]
      (fn
        ([] (xf))
        ([result] (xf result))
        ([result input]
          (let [prior @prev]
            (vreset! prev input)
              (if (= prior input)
                result
                (xf result input))))))))

dedupeでは prev はreduce処理の間に、一つ前の値を保持するステートフルなコンテナである。prevの値はパフォーマンスのためvolatileだが、atomとしても定義できる。prevの値はトランスデューサー処理が開始するまで初期化されない(例: transduce への呼び出し)。よって、ステートフルなやり取りはトランスデューサー処理のコンテキストに内包される。

完了ステップでは、reduce処理の状態を持つトランスデューサーは、ネストしているステップからredecedな値を観測していない限り(観測した場合は保留にしている状態を破棄するべき)、ネストしている変換器の完了関数を呼び出す前に状態をフラッシュするべきだ。

transduce可能な処理の作成

トランスデューサーは様々な処理で使用できるように設計されている。トランスデューサー処理は各ステップが一つの入力を消化する、一連のステップとして定義される。入力ソースは各処理に特有のものであり (コレクション、イテレータ, ストリーム等)、 同様に、各ステップの出力の処理方法も処理によって選択する必要がある。

トランスデューサーを新たなコンテキストに適用する場合、いくつかの一般的な注意点がある。:

  • step関数が reduced (reduce済み) の値を返す場合、トランスデューサー処理はそれ以上の入力をステップ関数に与えてはならない。reduce済みの値はderefを使用して完了前に展開する必要がある。

  • completion操作は、最終的な集計値に対して完了処置を一度のみ呼び出す必要がある。

  • transducing処理はトランスデューサーを呼び出した際に返される関数への参照を内包する必要がある - これらはステートフルで、スレッド間での使用が安全でない場合もある。