I experienced a strange phenomenon while using clojure.core.async. The timeout channel not working; the blocking/parking functions immediately return instead of blocking/parking.1

I use a RELP plugin that reload the file if file changed.2 and I written a code as following. The following code have a problem when reload the file. The problem is that the ‘to-ch’ channel has not waited for the remaining time.

(defstate session
  :start {:close-ch (a/chan)
          :other-ch other-ch
          :xxx (go-loop []
                 (let [remaining-time (calc-remaining-time)
                       to-ch          (a/timeout remaining-time)]
                     close-ch (do
                                (close! other-ch)
                                (close! to-ch))
                     to-ch (do
  :stop  (a/close! (:close-ch session)))

I figured out the problem:

  • First, timeout channel uses the cache, that is to say timeout channels that having same a timeout time within the time resolution share one channel.3
  • Second, core.async does not check whether the cached timeout channel is open or closed.
  • Third, The blocking/parking functions will not wait closed channel.

I demonstrate the problem as following:

(ns async-debugging.core
  (:require [clojure.core.async :as a]
            [clojure.core.async.impl.timers :as timer]))

;; The timeout channel uses the cache.
(= (a/timeout 5000) (a/timeout 5000))
#_=> true

;; Can't wait closed channel.
(time (a/<!! (doto (a/timeout 5000) (a/close!))))
#_=> "Elapsed time: 0.14014 msecs"

;; So...
  (let [delay 5000
        to-ch (a/timeout delay)]
    (a/close! to-ch)
    (a/<!! (a/timeout delay))))
#_=> "Elapsed time: 0.135586 msecs"

;; Into the inside of the `timeout` function.
(let [delay        5000
      _            (a/timeout delay)
      timeout      (+ (System/currentTimeMillis) delay)
      timeouts-map @#'timer/timeouts-map
      me           (.ceilingEntry timeouts-map timeout)]
  (println timeouts-map)
  (println me)
  (println (+ timeout timer/TIMEOUT_RESOLUTION_MS))
  (if (and me (< (.getKey me) (+ timeout timer/TIMEOUT_RESOLUTION_MS)))
    (println "Cached channel.")
    (println "New channel.")))
#_=> #object[java.util.concurrent.ConcurrentSkipListMap 0x722dca12 {1489161795712=...}]
#_=> #object[java.util.AbstractMap$SimpleImmutableEntry 0x7b78bff9 1489161795712=...]
#_=> 1489161795722
#_=> Cached channel.

So, Do not close the timeout channel.


1 I found a related issue: http://dev.clojure.org/jira/browse/ASYNC-109

2 /src/lein_dev_helper/plugin.clj:70

3 src/main/clojure/clojure/core/async/impl/timers.clj:48