I am learning clojure and try out its concurrency and effectiveness via a producer consumer example.
Did that and it felt pretty akward with having to use ref and deref and also watch and unwatch.
I tried to check other code snippet; but is there a better way of re factoring this other than using Java Condition await() and signal() methods along with Java Lock. I did not want to use anything in Java.
here is the code; i guess I would have made many mistakes here with my usage...
;a simple producer class
(ns my.clojure.producer
    (:use  my.clojure.consumer)
  (:gen-class)
  )
 (def tasklist( ref (list) )) ;this is declared as a global variable; to make this 
    ;mutable we need to use the fn ref
(defn gettasklist[]
    (deref tasklist) ;we need to use deref fn to return the task list
  )
(def testagent (agent 0)); create an agent
(defn emptytasklist[akey aref old-val new-val]
   (doseq [item (gettasklist)]
        (println(str "item is") item)
        (send testagent consume item)
        (send testagent increment item)
     )
      (. java.lang.Thread sleep 1000)
   (dosync ; adding a transaction for this is needed to reset
      (remove-watch tasklist "key123"); removing the watch on the tasklist so that it does not
                                       ; go to a recursive call
      (ref-set tasklist (list ) ) ; we need to make it as a ref to reassign
      (println (str "The number of tasks now remaining is=")  (count (gettasklist)))
     )
  (add-watch tasklist "key123" emptytasklist)
 )
(add-watch tasklist "key123" emptytasklist)
  (defn addtask [task] 
    (dosync ; adding a transaction for this is needed to refset
      ;(println (str "The number of tasks before") (count (gettasklist)))
      (println (str "Adding a task") task)
      (ref-set tasklist (conj (gettasklist) task )) ; we need to make it as a ref to reassign
      ;(println (str "The number of tasks after") (count (gettasklist)))
     )
  )
Here is the consumer code
(ns my.clojure.consumer
  )
(defn consume[c item]
  (println  "In the consume method:Item is " c item  )
  item 
)
(defn increment [c n] 
  (println "parmeters are" c n)
  (+ c n)
  )
And here is the test code ( I have used maven to run clojure code and used NetBeans to edit as this is more familiar to me coming from Java - folder structure and pom at - https://github.com/alexcpn/clojure-evolve
(ns my.clojure.Testproducer
        (:use my.clojure.producer)
        (:use clojure.test)
      (:gen-class)
  )
(deftest test-addandcheck
  (addtask 1)
  (addtask 2)
  (is(= 0 (count (gettasklist))))
   (println (str "The number of tasks are") (count (gettasklist)))
 )
If anybody can refactor this lightly so that I can read and understand the code then it will be great; Else I guess I will have to learn more
Edit -1
I guess using a global task list and making it available to other functions by de-referencing it (deref) and again making it mutable by ref is not the way in clojure;
So changing the addTask method to directly send the incoming tasks to an agent
(defn addtask [task] 
    (dosync ; adding a transaction for this is needed to refset
      (println (str "Adding a task") task)
        ;(ref-set tasklist (conj (gettasklist) task )) ; we need to make it as a ref to reassign
       (def testagent (agent 0)); create an agent
       (send testagent consume task)
       (send testagent increment task)
     )
However when I tested it
(deftest test-addandcheck
  (loop [task 0]
    (when ( < task 100)
        (addtask task)
      (recur (inc task))))
  (is(= 0 (count (gettasklist))))
   (println (str "The number of tasks are") (count (gettasklist)))
 )
after sometime the I am getting Java rejected execution exception -- This is fine if you do Java threads, because you take full control. But from clojure this looks odd, especially since you are not selecting the ThreadPool stratergy yourself
Adding a task 85
Exception in thread "pool-1-thread-4" java.util.concurrent.RejectedExecutionExce
ption
        at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution
(ThreadPoolExecutor.java:1759)
        at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.jav
a:767)
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.ja
va:658)
        at clojure.lang.Agent$Action.execute(Agent.java:56)
        at clojure.lang.Agent$Action.doRun(Agent.java:95)
        at clojure.lang.Agent$Action.run(Agent.java:106)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExec
utor.java:885)Adding a task 86
Adding a task 87