You are here:   ArielOrtiz.com > Programming Languages > Clojure Concurrency API

[ Printer friendly page ]

The following notes are a subset of the Clojure and API for clojure.core documentation, plus some examples.

Clojure Concurrency API

Quick Links to Contents

Atoms

Atoms allow changing a single value. They are updated synchronously (immediately) and are uncoordinated (independent from each other).

Function Description
(atom x & options)

Creates and returns an atom with an initial value of x. options can be:

:validator validate-fn

validate-fn must be nil or a side-effect-free function of one argument, which will be passed the intended new state on any state change. If the new state is unacceptable, validate-fn should return a false value or throw an exception.

(deref a) Returns the current state of atom a. Equivalent to: @a
(reset! a x) Sets the value of atom a to x, ignoring its previous value. Returns x.
(swap! a f & args) Atomically swaps the value of atom a to: (apply f @a args). Note that f may be called multiple times, and thus should be free of side effects. Returns the value that was swapped in.

Examples:

(def my-atom (atom 5))

(deref my-atom)               ⇒ 5
@my-atom                      ⇒ 5
(reset! my-atom 0)            ⇒ 0
@my-atom                      ⇒ 0
(swap! my-atom inc)           ⇒ 1
@my-atom                      ⇒ 1

;;; This atom will only accept numeric values.
(def my-other-atom (atom 42 :validator number?)) 

@my-other-atom                ⇒ 42
(reset! my-other-atom 0)      ⇒ 0
(swap! my-other-atom inc)     ⇒ 1
(reset! my-other-atom "hi")   ⇒ IllegalStateException
@my-other-atom                ⇒ 1
(swap! my-other-atom number?) ⇒ IllegalStateException
@my-other-atom                ⇒ 1

Refs

A ref is a managed reference that allows for synchronous (immediate) and coordinated (dependent on other) changes to mutable data.

Refs ensure safe shared use of mutable storage locations via a software transactional memory (STM) system. Refs are bound to a single storage location for their lifetime, and only allow mutation of that location to occur within a transaction.

Clojure transactions ensure that all actions on refs are atomic, consistent, and isolated. Atomic means that every change to refs made within a transaction occurs or none do. Consistent means that each new value can be checked with a validator function before allowing the transaction to commit. Isolated means that no transaction sees the effects of any other transaction while it is running. Another feature common to STMs is that, should a transaction have a conflict while running, it is automatically retried.

The Clojure STM uses multiversion concurrency control (MVCC) which means:

Function/Macro Description
(alter r f & args) Sets the in-transaction-value of ref r to: (apply f @r args). Returns the in-transaction-value of r. Must be called in a transaction.
(deref r) Within a transaction, returns the in-transaction-value of ref r, else returns the most recently committed value of r. Equivalent to: @r
(dosync & exprs) Runs the exprs (in an implicit do) in a transaction. Starts a transaction if none is already running on this thread. Any uncaught exception will abort the transaction and flow out of dosync. The exprs may be run more than once, but any effects on refs will be atomic.
(ref x & options)

Creates and returns a ref with an initial value of x. options can be:

:validator validate-fn

validate-fn must be nil or a side-effect-free function of one argument, which will be passed the intended new state on any state change. If the new state is unacceptable, validate-fn should return a false value or throw an exception.

(ref-set r val) Sets the value of ref r to val. Returns val. Must be called in a transaction.

Examples:

(def x (ref 10))

;;; This ref will only accept numeric values.
(def y (ref 16 :validator number?))

(defn atomic-multiply [a b factor]
  (dosync
    (alter a * factor)
    (alter b * factor)))

(defn swap-ref-values [a b]
  (dosync 
    (let [temp @a]
      (ref-set a @b)
      (ref-set b temp))))

@x                         ⇒ 10
@y                         ⇒ 16      
(atomic-multiply x y 2)    ⇒ 32
@x                         ⇒ 20
@y                         ⇒ 32
(swap-ref-values x y)      ⇒ 20
@x                         ⇒ 32
@y                         ⇒ 20
(dosync (alter y number?)) ⇒ IllegalStateException

Agents

Agents allow asynchronous and independent changes to shared mutable data. Mutation occurs as a result of an action. Actions are functions (with, optionally, additional arguments) that are asynchronously applied to an agent's state and whose return value becomes the agent's new state. The state of an agent is always immediately available for reading by any thread (using the deref function) and it doesn't require any special cooperation or coordination.

If an exception is thrown by an action function, the exception will be cached in the agent itself. When an agent has errors cached, any subsequent interactions will immediately throw an exception, until the agent's errors are cleared.

The actions of all agents get interleaved amongst threads in a thread pool. At any point in time, at most one action for each agent is being executed. Actions dispatched to an agent from another agent or thread will occur in the order they were sent, potentially interleaved with actions dispatched to the same agent from other sources.

Agents are integrated with the STM — any dispatches made in a transaction are held until it commits, and are discarded if it is retried or aborted. As with all of Clojure's concurrency support, no user-code locking is involved.

Note that use of agents starts a pool of non-daemon background threads that will prevent shutdown of the JVM. Use shutdown-agents to terminate these threads and allow shutdown.

Function Description
(agent x & options)

Creates and returns an agent with an initial value of x. options can be:

:validator validate-fn

validate-fn must be nil or a side-effect-free function of one argument, which will be passed the intended new state on any state change. If the new state is unacceptable, validate-fn should return a false value or throw an exception.

(agent-error a) Returns the exception thrown during an asynchronous action of agent a, if it is failed. Returns nil if the agent is not failed.
(await & agents) Blocks until all actions sent from the current thread to all agents have completed. It has no timeout, so be careful: it is willing to wait forever. Will block indefinitely also on failed agents.
(await-for
  timeout & agents)
Like await but with a timeout (in milliseconds). Returns false if returning due to timeout, or true otherwise.
(deref a) Returns the current state of agent a. Equivalent to: @a
(restart-agent
  a x & options)
Resets the value of the agent a to x and takes away the agent's failure state so the agent can accept new actions. If the :clear-actions true option is provided, the agent's action queue is cleared; otherwise, pending actions will be called sequentially. Returns the new state of the agent.
(send a f & args) Dispatch an action to the agent a. Returns the agent immediately. Subsequently, in a thread from a fixed size thread pool, the state of the agent will be set to the value of: (apply f @a args). Use this function for actions that are CPU bound and non-blocking.
(send-off a f & args) Dispatch a potentially blocking action to the agent a. Returns the agent immediately. Subsequently, in a separate thread, the state of the agent will be set to the value of: (apply f @a args). Use this function for actions that may block on IO.
(shutdown-agents) Initiates a shutdown of the thread pools that back the agent system. Running actions will complete, but no new actions will be accepted.

Examples:

(def a (agent 100))
@a                                        ⇒ 100
(send a inc)                              ⇒ #<Agent@1114460: 101>
(await a)                                 ⇒ nil
(await-for 2000 a)                        ⇒ true
@a                                        ⇒ 101

;;; This action will take 30 seconds to execute in a different thread.
(send a (fn [x] (Thread/sleep 30000) x))  ⇒ #<Agent@1114460: 101>
(await-for 2000 a)                        ⇒ false
(send a / 0)                              ⇒ #<Agent@1114460: 101>
(send a inc)                              ⇒ #<Agent@1114460: 101>
(send a inc)                              ⇒ #<Agent@1114460: 101>
;;; Wait for more than 30 seconds...
(send a inc)                              ⇒ RuntimeException
(agent-error a)                           ⇒ ArithmeticException
(restart-agent a 100)                     ⇒ 100
(await a)                                 ⇒ nil
@a                                        ⇒ 102

(def b (agent 86))                       
;;; This action will take 30 seconds to execute in a different thread.
(send b (fn [x] (Thread/sleep 30000) x))  ⇒ #<Agent@1114460: 86>
(send b / 0)                              ⇒ #<Agent@1114460: 86> 
(send b inc)                              ⇒ #<Agent@1114460: 86>
(send b inc)                              ⇒ #<Agent@1114460: 86>
;;; Wait for more than 30 seconds...
(send b inc)                              ⇒ RuntimeException
(agent-error b)                           ⇒ ArithmeticException
(restart-agent b 86 :clear-actions true)  ⇒ 86
(await b)                                 ⇒ nil
@b                                        ⇒ 86

(def c (agent 1))
(send-off c (fn [x] (println x) x))       ⇒ #<Agent@1f99eea: 1>
                                          ;;; Prints:
                                          1
(await c)                                 ⇒ nil
@c                                        ⇒ 1

Futures

Futures represent asynchronous computations. They are a way to get code to run in another thread and obtain the result at a later moment.

Function Description
(deref f) Return the result of the future f. Will block if computation not complete. Equivalent to: @f
(future & body) Takes a body of expressions and yields immediately a future object that will invoke the body in another thread, and will cache the result and return it on all subsequent calls to deref. If the computation has not yet finished, calls to deref will block.
(future? x) Returns true if x is a future, otherwise returns false.
(future-done? f) Returns true if future f has finished its computations, otherwise returns false.
(future-cancel f) If possible, cancels the future f. Returns true if the future got cancelled, or false otherwise.
(future-cancelled? f) Returns true if future f has been cancelled, otherwise returns false.

Examples:

(def f (future (+ 1 1)))
(future? f)           ⇒ true
(future-done? f)      ⇒ true
(future-cancelled? f) ⇒ false
(future-cancel f)     ⇒ false
@f                    ⇒ 2

;;; This future will take one minute to execute.
(def x (future (Thread/sleep 60000) 42))
(future? x)           ⇒ true
(future-done? x)      ⇒ false
(future-cancelled? x) ⇒ false
(future-cancel x)     ⇒ true
(future-cancelled? x) ⇒ true
@x                    ⇒ CancellationException

Parallel Functions

Function/Macro Description Examples
(pcalls & fns) Executes the no-argument fns in parallel, returning a lazy sequence of their values. (pcalls #(+ 1 2) #(* 3 4) #(/ 5 6))
⇒ (3 12 5/6)
(pmap f coll & colls) Like map, except f is applied in parallel. Semi-lazy in that the parallel computation stays ahead of the consumption, but doesn't realize the entire result unless required. Only useful for computationally intensive functions where the time of f dominates the coordination overhead. (pmap
  #(Math/pow %1 %2)
  [2 3 4 5] [3 2 1 0])
⇒ (8.0 9.0 4.0 1.0)
(pvalues & exprs) Returns a lazy sequence of the values of the exprs, which are evaluated in parallel. (pvalues (+ 1 2) (* 3 4) (/ 5 6))
⇒ (3 12 5/6)

Sequential Operations

These functions are not related directly to concurrency, yet they might come in handy when writing programs that need to exploit concurrency/parallelism.

Function/Macro/
Special Form
Description Examples
(do & exprs) Evaluates exprs in order, most likely for their side effects, and returns the value of the last expression. If no exprs are supplied, returns nil. (do (println 1) (println 2) (* 6 7))
;;; Prints:
1
2
⇒ 42
(doall s) When lazy sequences are produced via functions that have side effects, any effects other than those needed to produce the first element in the seq s do not occur until the seq is consumed. doall can be used to force any effects. Walks through the successive nexts of the seq, retains the head and returns it, thus causing the entire seq to reside in memory at one time. (def m (map #(println %) (range 5)))
(doall m)
;;; Prints:
0
1
2
3
4
⇒ (nil nil nil nil nil)
(dorun s) When lazy sequences are produced via functions that have side effects, any effects other than those needed to produce the first element in the seq s do not occur until the seq is consumed. dorun can be used to force any effects. Walks through the successive nexts of the seq, does not retain the head and returns nil. (def m (map #(println %) (range 5)))
(dorun m)
;;; Prints:
0
1
2
3
4
⇒ nil
(doseq seq-exprs body-expr) Repeatedly executes body-expr (presumably for side-effects) with bindings and filtering as provided by the for macro. Does not retain the head of the seq. Returns nil. (doseq [i (range 5)] (println i))
;;; Prints:
0
1
2
3
4
⇒ nil
© 1996-2011 by Ariel Ortiz (ariel.ortiz@itesm.mx)
Made with Django | Licensed under Creative Commons | Valid XHTML | Valid CSS