Class: Cod::WorkQueue
- Inherits:
-
Object
- Object
- Cod::WorkQueue
- Defined in:
- lib/cod/work_queue.rb
Overview
Describes a queue that stores work items given through #schedule and works through those items in order if #predicate is true.
Synopsis:
queue = WorkQueue.new
queue.predicate { true }
queue.schedule {
# some work
}
# Will try to work through items right now.
queue.try_work
# Will cleanly shutdown background threads, but not finish work.
queue.shutdown
Instance Attribute Summary collapse
-
#thread ⇒ Object
readonly
The internal thread that is used to work on scheduled items in the background.
Instance Method Summary collapse
- #clear_thread_semaphore ⇒ Object
-
#exclusive ⇒ Object
Allows to perform other work mutually exclusive with the work performed in the queue.
-
#initialize ⇒ WorkQueue
constructor
:nodoc:.
-
#predicate(&predicate) ⇒ Object
Before any kind of work is attempted, this predicate must evaluate to true.
-
#predicate? ⇒ Boolean
Evaluates the predicate.
-
#schedule(&work) ⇒ Object
Schedules a piece of work.
-
#shutdown ⇒ Object
Shuts down the queue properly, without waiting for work to be completed.
-
#size ⇒ Object
Returns the size of the queue.
- #thread_semaphore_set? ⇒ Boolean
- #try_work ⇒ Object
Constructor Details
#initialize ⇒ WorkQueue
:nodoc:
21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/cod/work_queue.rb', line 21 def initialize # NOTE: This is an array that is protected by careful coding, rather # than a mutex. Queue would be right, but Rubys GIL will interfere with # that producing more deadlocks than I would like. @queue = Array.new @try_work_exclusive_section = ExclusiveSection.new @thread = Thread.start(&method(:thread_main)) @thread.priority = Thread.current.priority - 10 end |
Instance Attribute Details
#thread ⇒ Object (readonly)
The internal thread that is used to work on scheduled items in the background.
35 36 37 |
# File 'lib/cod/work_queue.rb', line 35 def thread @thread end |
Instance Method Details
#clear_thread_semaphore ⇒ Object
99 100 101 |
# File 'lib/cod/work_queue.rb', line 99 def clear_thread_semaphore @one_turn = false end |
#exclusive ⇒ Object
Allows to perform other work mutually exclusive with the work performed in the queue. (see ExclusiveSection)
51 52 53 |
# File 'lib/cod/work_queue.rb', line 51 def exclusive @try_work_exclusive_section.enter { yield } end |
#predicate(&predicate) ⇒ Object
Before any kind of work is attempted, this predicate must evaluate to true. It is tested repeatedly.
Example:
work_queue.predicate { connection.established? }
67 68 69 |
# File 'lib/cod/work_queue.rb', line 67 def predicate(&predicate) @predicate = predicate end |
#predicate? ⇒ Boolean
Evaluates the predicate. If the predicate is not set, this returns nil.
57 58 59 |
# File 'lib/cod/work_queue.rb', line 57 def predicate? @predicate && @predicate.call end |
#schedule(&work) ⇒ Object
Schedules a piece of work. Example:
work_queue.schedule { a_piece_of_work }
75 76 77 78 79 |
# File 'lib/cod/work_queue.rb', line 75 def schedule(&work) exclusive { @queue << work } end |
#shutdown ⇒ Object
Shuts down the queue properly, without waiting for work to be completed.
83 84 85 86 87 88 89 |
# File 'lib/cod/work_queue.rb', line 83 def shutdown return unless @thread @shutdown_requested = true @thread.join @thread = nil end |
#size ⇒ Object
Returns the size of the queue.
93 94 95 96 97 |
# File 'lib/cod/work_queue.rb', line 93 def size exclusive { @queue.size } end |
#thread_semaphore_set? ⇒ Boolean
102 103 104 |
# File 'lib/cod/work_queue.rb', line 102 def thread_semaphore_set? @one_turn end |
#try_work ⇒ Object
37 38 39 40 41 42 43 44 45 46 |
# File 'lib/cod/work_queue.rb', line 37 def try_work exclusive { # NOTE if predicate is nil or not set, no work will be accomplished. # This is the way I need it. while !@queue.empty? && predicate? wi = @queue.shift wi.call end } end |