Clojure STM 笔记-中篇
继续上篇继续完成"Software Transactional Memory"的笔记, 本文关注Clojure处理并发的四种引用类型. Clojure中除了引用类型(Reference Type)之外所有的变量都是immutable.本质上"Reference types are mutable references to immutable data". Clojure有四种引用对象:Var Atom Agent Ref.
user=> (def a (atom 101))#'user/auser=> @a101user=> (reset! a 23)23user=> @a23user=> (swap! a + 1024)1047user=>
user=> (def v (agent 123))#'user/vuser=> @v123user=> (send v inc)#<Agent@2e21712e: 124>user=> (send v * 10)#<Agent@2e21712e: 1240>user=> (await v)niluser=> (await-for 1024 v)trueuser=> @v1240user=> user=> (source await)(defn await "Blocks the current thread (indefinitely!) until all actions dispatched thus far, from this thread or agent, to the agent(s) have occurred. Will block on failed agents. Will never return if a failed agent is restarted with :clear-actions true." {:added "1.0" :static true} [& agents] (io! "await in transaction" (when *agent* (throw (new Exception "Can't await in agent action"))) (let [latch (new java.util.concurrent.CountDownLatch (count agents)) count-down (fn [agent] (. latch (countDown)) agent)] (doseq [agent agents] (send agent count-down)) (. latch (await)))))nil
看一下send和send-off实现的差异:
send & send-offuser=> (source send)(defn send "Dispatch an action to an agent. Returns the agent immediately. Subsequently, in a thread from a thread pool, the state of the agent will be set to the value of: (apply action-fn state-of-agent args)" {:added "1.0" :static true} [^clojure.lang.Agent a f & args] (.dispatch a (binding [*agent* a] (binding-conveyor-fn f)) args false))niluser=> (source send-off)(defn send-off "Dispatch a potentially blocking action to an agent. Returns the agent immediately. Subsequently, in a separate thread, the state of the agent will be set to the value of: (apply action-fn state-of-agent args)" {:added "1.0" :static true} [^clojure.lang.Agent a f & args] (.dispatch a (binding [*agent* a] (binding-conveyor-fn f)) args true))niluser=>
看实现代码的差异,alter:
;;;;;;;;; alter ;;;;;;;;;;;;;;;;;;(defn alter "Must be called in a transaction. Sets the in-transaction-value of ref to: (apply fun in-transaction-value-of-ref args) and returns the in-transaction-value of ref." {:added "1.0" :static true} [^clojure.lang.Ref ref fun & args] (. ref (alter fun args))) public Object alter(IFn fn, ISeq args) { LockingTransaction t = LockingTransaction.getEx(); return t.doSet(this, fn.applyTo(RT.cons(t.doGet(this), args)));}Object doSet(Ref ref, Object val){ if(!info.running()) throw retryex; if(commutes.containsKey(ref)) throw new IllegalStateException("Can't set after commute"); if(!sets.contains(ref)) { sets.add(ref); lock(ref); //If another transaction has modified the same Ref then this transaction may retry (see the lock method). } vals.put(ref, val); return val;}
ref-set:
;;;;;;;;; ref-set ;;;;;;;;;;;;;;;;;;(defn ref-set "Must be called in a transaction. Sets the value of ref. Returns val." {:added "1.0" :static true} [^clojure.lang.Ref ref val] (. ref (set val))) public Object set(Object val){ return LockingTransaction.getEx().doSet(this, val);}
不要害怕把函数传递给函数在函数式编程里面这是很平常的事情
Clojure 1.4.0user=> (source ref-set)(defn ref-set "Must be called in a transaction. Sets the value of ref. Returns val." {:added "1.0" :static true} [^clojure.lang.Ref ref val] (. ref (set val)))niluser=> (source alter)(defn alter "Must be called in a transaction. Sets the in-transaction-value of ref to: (apply fun in-transaction-value-of-ref args) and returns the in-transaction-value of ref." {:added "1.0" :static true} [^clojure.lang.Ref ref fun & args] (. ref (alter fun args)))niluser=>
;;;;; commute ;;;;;;;(defn commute "Must be called in a transaction. Sets the in-transaction-value of ref to: (apply fun in-transaction-value-of-ref args) and returns the in-transaction-value of ref. At the commit point of the transaction, sets the value of ref to be: (apply fun most-recently-committed-value-of-ref args) Thus fun should be commutative, or, failing that, you must accept last-one-in-wins behavior. commute allows for more concurrency than ref-set." {:added "1.0" :static true} [^clojure.lang.Ref ref fun & args] (. ref (commute fun args)))public Object commute(IFn fn, ISeq args) { return LockingTransaction.getEx().doCommute(this, fn, args);}Object doCommute(Ref ref, IFn fn, ISeq args) { if(!info.running()) throw retryex; if(!vals.containsKey(ref)) { Object val = null; try { ref.lock.readLock().lock(); val = ref.tvals == null ? null : ref.tvals.val; } finally { ref.lock.readLock().unlock(); } vals.put(ref, val); } ArrayList<CFn> fns = commutes.get(ref); if(fns == null) commutes.put(ref, fns = new ArrayList<CFn>()); fns.add(new CFn(fn, args)); Object ret = fn.applyTo(RT.cons(vals.get(ref), args)); vals.put(ref, ret); return ret;}
总结性的表格:
user=> (def my-ref (ref 100 :validator (fn [x] (> x 50))))#'user/my-refuser=> (dosync (ref-set my-ref 123))123user=> (dosync (ref-set my-ref 23))IllegalStateException Invalid reference state clojure.lang.ARef.validate (ARef.java:33)user=>
有两种机制实现引用类型值的变动通知:Watch function 和Watch Agents,区别在于订阅的消息不同,或者说关注的事件不同.
user=> (def a (ref 123))#'user/auser=> (defn w-1 [key id old new ] (println "w-1" "key" key "id" id "old" old "new" new))#'user/w-1user=> (defn w-2 [key id old new ] (println "w-2" "key" key "id" id "old" old "new" new))#'user/w-2user=> (add-watch a "watch-1" w-1)#<Ref@41f1f35b: 123>user=> (add-watch a "watch-2" w-2)#<Ref@41f1f35b: 123>user=> (dosync (alter a inc))w-1 key watch-1 id #<Ref@41f1f35b: 124> old 123 new 124w-2 key watch-2 id #<Ref@41f1f35b: 124> old 123 new 124124user=>
从上面的例子可以看到watcher可以是多个.下面是add-watch的源码,metadata有详细的说明:
user=> (source add-watch)(defn add-watch "Alpha - subject to change. Adds a watch function to an agent/atom/var/ref reference. The watch fn must be a fn of 4 args: a key, the reference, its old-state, its new-state. Whenever the reference's state might have been changed, any registered watches will have their functions called. The watch fn will be called synchronously, on the agent's thread if an agent, before any pending sends if agent or ref. Note that an atom's or ref's state may have changed again prior to the fn call, so use old/new-state rather than derefing the reference. Note also that watch fns may be called from multiple threads simultaneously. Var watchers are triggered only by root binding changes, not thread-local set!s. Keys must be unique per reference, and can be used to remove the watch with remove-watch, but are otherwise considered opaque by the watch mechanism." {:added "1.0" :static true} [^clojure.lang.IRef reference key fn] (.addWatch reference key fn))niluser=>
Watcher Agent订阅的事件是有action发送到Agent.注意:Action并没有携带old value.
就到这里,下一篇将继续对"Software Transactional Memory"的研读,关注Clojure语言处理并发的基础原语的实现. 继续上篇继续完成"Software Transactional Memory"的笔记, 本文关注Clojure处理并发的四种引用类型. Clojure中除了引用类型(Reference Type)之外所有的变量都是immutable.本质上"Reference types are mutable references to immutable data". Clojure有四种引用对象:Var Atom Agent Ref."MVCC uses timestamps or increasing transaction IDs to achieve serializability. MVCC ensures a transaction never has to wait for a [database] object by maintaining several versions of an object. Each version would have a write timestamp and it would let a transaction read the most recent version of an object which precedes the transaction timestamp."
"If a transaction (Ti) wants to write to an object, and if there is another transaction (Tk) (that also wants to write it), the timestamp of Ti must precede the timestamp of Tk for the object write operation to succeed. Which is to say a write cannot complete if there are outstanding transactions with an earlier timestamp."
"Every object would also have a read timestamp, and if a transaction Ti wanted to write to object P, and the timestamp of that transaction is earlier than the object's read timestamp, the transaction Ti is aborted and restarted. Otherwise, Ti creates a new version of P and sets the read/write timestamps of P to the timestamp of the transaction." (The Clojure STM implementation does not use read timestamps.)
"The obvious drawback to this system is the cost of storing multiple versions of objects [in the database]. On the other hand reads are never blocked, which can be important for workloads mostly involving reading values [from the database]. MVCC is particularly adept at implementing true snapshot isolation, something which other methods of concurrency control frequently do either incompletely or with high performance costs."
"A transaction executing under snapshot isolation appears to operate on a personal snapshot [of the database], taken at the start of the transaction. When the transaction concludes, it will successfully commit only if the values updated by the transaction have not been changed externally since the snapshot was taken."
user=> (def a (atom 101))#'user/auser=> @a101user=> (reset! a 23)23user=> @a23user=> (swap! a + 1024)1047user=>
user=> (def v (agent 123))#'user/vuser=> @v123user=> (send v inc)#<Agent@2e21712e: 124>user=> (send v * 10)#<Agent@2e21712e: 1240>user=> (await v)niluser=> (await-for 1024 v)trueuser=> @v1240user=> user=> (source await)(defn await "Blocks the current thread (indefinitely!) until all actions dispatched thus far, from this thread or agent, to the agent(s) have occurred. Will block on failed agents. Will never return if a failed agent is restarted with :clear-actions true." {:added "1.0" :static true} [& agents] (io! "await in transaction" (when *agent* (throw (new Exception "Can't await in agent action"))) (let [latch (new java.util.concurrent.CountDownLatch (count agents)) count-down (fn [agent] (. latch (countDown)) agent)] (doseq [agent agents] (send agent count-down)) (. latch (await)))))nil
看一下send和send-off实现的差异:
send & send-offuser=> (source send)(defn send "Dispatch an action to an agent. Returns the agent immediately. Subsequently, in a thread from a thread pool, the state of the agent will be set to the value of: (apply action-fn state-of-agent args)" {:added "1.0" :static true} [^clojure.lang.Agent a f & args] (.dispatch a (binding [*agent* a] (binding-conveyor-fn f)) args false))niluser=> (source send-off)(defn send-off "Dispatch a potentially blocking action to an agent. Returns the agent immediately. Subsequently, in a separate thread, the state of the agent will be set to the value of: (apply action-fn state-of-agent args)" {:added "1.0" :static true} [^clojure.lang.Agent a f & args] (.dispatch a (binding [*agent* a] (binding-conveyor-fn f)) args true))niluser=>
看实现代码的差异,alter:
;;;;;;;;; alter ;;;;;;;;;;;;;;;;;;(defn alter "Must be called in a transaction. Sets the in-transaction-value of ref to: (apply fun in-transaction-value-of-ref args) and returns the in-transaction-value of ref." {:added "1.0" :static true} [^clojure.lang.Ref ref fun & args] (. ref (alter fun args))) public Object alter(IFn fn, ISeq args) { LockingTransaction t = LockingTransaction.getEx(); return t.doSet(this, fn.applyTo(RT.cons(t.doGet(this), args)));}Object doSet(Ref ref, Object val){ if(!info.running()) throw retryex; if(commutes.containsKey(ref)) throw new IllegalStateException("Can't set after commute"); if(!sets.contains(ref)) { sets.add(ref); lock(ref); //If another transaction has modified the same Ref then this transaction may retry (see the lock method). } vals.put(ref, val); return val;}
ref-set:
;;;;;;;;; ref-set ;;;;;;;;;;;;;;;;;;(defn ref-set "Must be called in a transaction. Sets the value of ref. Returns val." {:added "1.0" :static true} [^clojure.lang.Ref ref val] (. ref (set val))) public Object set(Object val){ return LockingTransaction.getEx().doSet(this, val);}
不要害怕把函数传递给函数在函数式编程里面这是很平常的事情
Clojure 1.4.0user=> (source ref-set)(defn ref-set "Must be called in a transaction. Sets the value of ref. Returns val." {:added "1.0" :static true} [^clojure.lang.Ref ref val] (. ref (set val)))niluser=> (source alter)(defn alter "Must be called in a transaction. Sets the in-transaction-value of ref to: (apply fun in-transaction-value-of-ref args) and returns the in-transaction-value of ref." {:added "1.0" :static true} [^clojure.lang.Ref ref fun & args] (. ref (alter fun args)))niluser=>
;;;;; commute ;;;;;;;(defn commute "Must be called in a transaction. Sets the in-transaction-value of ref to: (apply fun in-transaction-value-of-ref args) and returns the in-transaction-value of ref. At the commit point of the transaction, sets the value of ref to be: (apply fun most-recently-committed-value-of-ref args) Thus fun should be commutative, or, failing that, you must accept last-one-in-wins behavior. commute allows for more concurrency than ref-set." {:added "1.0" :static true} [^clojure.lang.Ref ref fun & args] (. ref (commute fun args)))public Object commute(IFn fn, ISeq args) { return LockingTransaction.getEx().doCommute(this, fn, args);}Object doCommute(Ref ref, IFn fn, ISeq args) { if(!info.running()) throw retryex; if(!vals.containsKey(ref)) { Object val = null; try { ref.lock.readLock().lock(); val = ref.tvals == null ? null : ref.tvals.val; } finally { ref.lock.readLock().unlock(); } vals.put(ref, val); } ArrayList<CFn> fns = commutes.get(ref); if(fns == null) commutes.put(ref, fns = new ArrayList<CFn>()); fns.add(new CFn(fn, args)); Object ret = fn.applyTo(RT.cons(vals.get(ref), args)); vals.put(ref, ret); return ret;}
总结性的表格:
user=> (def my-ref (ref 100 :validator (fn [x] (> x 50))))#'user/my-refuser=> (dosync (ref-set my-ref 123))123user=> (dosync (ref-set my-ref 23))IllegalStateException Invalid reference state clojure.lang.ARef.validate (ARef.java:33)user=>
有两种机制实现引用类型值的变动通知:Watch function 和Watch Agents,区别在于订阅的消息不同,或者说关注的事件不同.
user=> (def a (ref 123))#'user/auser=> (defn w-1 [key id old new ] (println "w-1" "key" key "id" id "old" old "new" new))#'user/w-1user=> (defn w-2 [key id old new ] (println "w-2" "key" key "id" id "old" old "new" new))#'user/w-2user=> (add-watch a "watch-1" w-1)#<Ref@41f1f35b: 123>user=> (add-watch a "watch-2" w-2)#<Ref@41f1f35b: 123>user=> (dosync (alter a inc))w-1 key watch-1 id #<Ref@41f1f35b: 124> old 123 new 124w-2 key watch-2 id #<Ref@41f1f35b: 124> old 123 new 124124user=>
从上面的例子可以看到watcher可以是多个.下面是add-watch的源码,metadata有详细的说明:
user=> (source add-watch)(defn add-watch "Alpha - subject to change. Adds a watch function to an agent/atom/var/ref reference. The watch fn must be a fn of 4 args: a key, the reference, its old-state, its new-state. Whenever the reference's state might have been changed, any registered watches will have their functions called. The watch fn will be called synchronously, on the agent's thread if an agent, before any pending sends if agent or ref. Note that an atom's or ref's state may have changed again prior to the fn call, so use old/new-state rather than derefing the reference. Note also that watch fns may be called from multiple threads simultaneously. Var watchers are triggered only by root binding changes, not thread-local set!s. Keys must be unique per reference, and can be used to remove the watch with remove-watch, but are otherwise considered opaque by the watch mechanism." {:added "1.0" :static true} [^clojure.lang.IRef reference key fn] (.addWatch reference key fn))niluser=>
Watcher Agent订阅的事件是有action发送到Agent.注意:Action并没有携带old value.
就到这里,下一篇将继续对"Software Transactional Memory"的研读,关注Clojure语言处理并发的基础原语的实现."MVCC uses timestamps or increasing transaction IDs to achieve serializability. MVCC ensures a transaction never has to wait for a [database] object by maintaining several versions of an object. Each version would have a write timestamp and it would let a transaction read the most recent version of an object which precedes the transaction timestamp."
"If a transaction (Ti) wants to write to an object, and if there is another transaction (Tk) (that also wants to write it), the timestamp of Ti must precede the timestamp of Tk for the object write operation to succeed. Which is to say a write cannot complete if there are outstanding transactions with an earlier timestamp."
"Every object would also have a read timestamp, and if a transaction Ti wanted to write to object P, and the timestamp of that transaction is earlier than the object's read timestamp, the transaction Ti is aborted and restarted. Otherwise, Ti creates a new version of P and sets the read/write timestamps of P to the timestamp of the transaction." (The Clojure STM implementation does not use read timestamps.)
"The obvious drawback to this system is the cost of storing multiple versions of objects [in the database]. On the other hand reads are never blocked, which can be important for workloads mostly involving reading values [from the database]. MVCC is particularly adept at implementing true snapshot isolation, something which other methods of concurrency control frequently do either incompletely or with high performance costs."
"A transaction executing under snapshot isolation appears to operate on a personal snapshot [of the database], taken at the start of the transaction. When the transaction concludes, it will successfully commit only if the values updated by the transaction have not been changed externally since the snapshot was taken."