Class: Cod::Beanstalk::Channel
- Defined in:
- lib/cod/beanstalk/channel.rb
Overview
Beanstalk channels cannot currently be used in Cod.select. This is due to limitations inherent in the beanstalkd protocol. We’ll probably try to get a patch into beanstalkd to change this.
If you embed a beanstalk channel into one of your messages, you will get a channel that connects to the same server and the same tube on the other end. This behaviour is useful for Service.
A channel based on a beanstalkd tube. A #put will insert messages into the tube, and a #get will fetch the next message that is pending on the tube.
Defined Under Namespace
Classes: Control
Constant Summary collapse
- JOB_PRIORITY =
All messages get inserted into the beanstalk queue as this priority.
0
Instance Attribute Summary collapse
-
#server_url ⇒ String
readonly
Beanstalkd server this channel is connected to.
-
#tube_name ⇒ String
readonly
Which tube this channel is connected to.
Class Method Summary collapse
Instance Method Summary collapse
-
#_dump(level) ⇒ Object
———————————————————- serialization.
- #bs_bury(msg_id) ⇒ Object
-
#bs_delete(msg_id) ⇒ Object
—————————————————– beanstalk commands.
- #bs_release(msg_id) ⇒ Object
- #bs_release_with_delay(msg_id, seconds) ⇒ Object
- #client(answers_to) ⇒ Object
- #close ⇒ Object
-
#get ⇒ Object
Reads a job from the tube and decodes it as a message.
-
#initialize(tube_name, server_url) ⇒ Channel
constructor
A new instance of Channel.
-
#initialize_copy(other) ⇒ Object
Allow #dup on beanstalkd channels, resulting in a new connection to the beanstalkd server.
-
#put(msg) ⇒ void
Puts a job on the tube after serializing.
- #service ⇒ Object
- #to_read_fds ⇒ Object
-
#try_get {|Object, Cod::Beanstalk::Channel::Control| ... } ⇒ Object
Like #get, read next message from the channel but reserve the right to put it back.
Methods inherited from Channel
Constructor Details
#initialize(tube_name, server_url) ⇒ Channel
Returns a new instance of Channel.
26 27 28 29 30 31 32 |
# File 'lib/cod/beanstalk/channel.rb', line 26 def initialize(tube_name, server_url) super() @tube_name, @server_url = tube_name, server_url @body_serializer = Cod::SimpleSerializer.new @transport = connection(server_url, tube_name) end |
Instance Attribute Details
#server_url ⇒ String (readonly)
Beanstalkd server this channel is connected to
24 25 26 |
# File 'lib/cod/beanstalk/channel.rb', line 24 def server_url @server_url end |
#tube_name ⇒ String (readonly)
Which tube this channel is connected to
21 22 23 |
# File 'lib/cod/beanstalk/channel.rb', line 21 def tube_name @tube_name end |
Class Method Details
._load(str) ⇒ Object
186 187 188 189 |
# File 'lib/cod/beanstalk/channel.rb', line 186 def self._load(str) # :nodoc: tube_name, server_url = Marshal.load(str) Cod.beanstalk(tube_name, server_url) end |
Instance Method Details
#_dump(level) ⇒ Object
———————————————————- serialization
181 182 183 184 |
# File 'lib/cod/beanstalk/channel.rb', line 181 def _dump(level) # :nodoc: Marshal.dump( [@tube_name, @server_url]) end |
#bs_bury(msg_id) ⇒ Object
205 206 207 208 209 |
# File 'lib/cod/beanstalk/channel.rb', line 205 def bs_bury(msg_id) # NOTE: Why I need to assign a priority when burying I fail to # understand. Like a priority for rapture? bs_command([:bury, msg_id, JOB_PRIORITY], :buried) end |
#bs_delete(msg_id) ⇒ Object
—————————————————– beanstalk commands
193 194 195 |
# File 'lib/cod/beanstalk/channel.rb', line 193 def bs_delete(msg_id) bs_command([:delete, msg_id], :deleted) end |
#bs_release(msg_id) ⇒ Object
197 198 199 |
# File 'lib/cod/beanstalk/channel.rb', line 197 def bs_release(msg_id) bs_command([:release, msg_id, JOB_PRIORITY, 0], :released) end |
#bs_release_with_delay(msg_id, seconds) ⇒ Object
201 202 203 |
# File 'lib/cod/beanstalk/channel.rb', line 201 def bs_release_with_delay(msg_id, seconds) bs_command([:release, msg_id, JOB_PRIORITY, seconds], :released) end |
#client(answers_to) ⇒ Object
86 87 88 |
# File 'lib/cod/beanstalk/channel.rb', line 86 def client(answers_to) Service::Client.new(self, answers_to) end |
#close ⇒ Object
70 71 72 |
# File 'lib/cod/beanstalk/channel.rb', line 70 def close @transport.close end |
#get ⇒ Object
Reads a job from the tube and decodes it as a message.
61 62 63 64 65 66 67 68 |
# File 'lib/cod/beanstalk/channel.rb', line 61 def get id, msg = bs_reserve # We delete the job immediately, since #get should be definitive. bs_delete(id) deserialize(msg) end |
#initialize_copy(other) ⇒ Object
Allow #dup on beanstalkd channels, resulting in a new connection to the beanstalkd server.
37 38 39 40 |
# File 'lib/cod/beanstalk/channel.rb', line 37 def initialize_copy(other) super(other) initialize(other.tube_name, other.server_url) end |
#put(msg) ⇒ void
This method returns an undefined value.
Puts a job on the tube after serializing.
47 48 49 50 51 52 53 54 55 |
# File 'lib/cod/beanstalk/channel.rb', line 47 def put(msg) pri = JOB_PRIORITY delay = 0 ttr = 120 body = @body_serializer.en(msg) answer, *rest = @transport.interact([:put, pri, delay, ttr, body]) fail "#put fails, #{answer.inspect}" unless answer == :inserted end |
#service ⇒ Object
83 84 85 |
# File 'lib/cod/beanstalk/channel.rb', line 83 def service Service.new(self) end |
#to_read_fds ⇒ Object
75 76 77 78 |
# File 'lib/cod/beanstalk/channel.rb', line 75 def to_read_fds fail "Cod.select not supported with beanstalkd channels.\n"+ "To support this, we will have to extend the beanstalkd protocol." end |
#try_get {|Object, Cod::Beanstalk::Channel::Control| ... } ⇒ Object
Like #get, read next message from the channel but reserve the right to put it back. This uses beanstalkds flow control features to be able to control message flow in the case of exceptions and the like.
If the block given to this message raises an exception, the message is released unless a control command has been given. This means that other workers on the same tube will get the chance to see the message.
If the block is exited without specifying a fate for the message, it is deleted from the tube.
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/cod/beanstalk/channel.rb', line 125 def try_get fail "No block given to #try_get" unless block_given? id, msg = bs_reserve control = Control.new(id, self) begin retval = yield(deserialize(msg), control) rescue Exception control.release unless control.command_given? raise ensure control.delete unless control.command_given? end return retval end |