Class: Cod::TcpClient
Overview
Acts as a channel that connects to a tcp listening socket on the other end.
Connection negotiation has three phases, as follows: 1) Connection is establishing. Sent messages are buffered and really sent
down the wire once the connection stands. Reading from the channel
will block the client forever.
2) Connection is established: Sending and receiving are immediate and
no buffering is done.
3) Connection is down because of an interruption or exception. Sending and
receiving no longer works, instead a ConnectionLost error is
raised.
Defined Under Namespace
Classes: Connection, OtherEnd, RobustConnection
Class Method Summary collapse
-
._load(params) ⇒ Object
:nodoc:.
Instance Method Summary collapse
-
#_dump(level) ⇒ Object
:nodoc:.
- #client ⇒ Object
-
#close ⇒ Object
Closes all underlying connections.
-
#get(opts = {}) ⇒ Object
Receives a message.
-
#initialize(destination, serializer) ⇒ TcpClient
constructor
Constructs a tcp client channel.
-
#put(obj) ⇒ Object
Sends an object to the other end of the channel, if it is connected.
-
#service ⇒ Object
——————————————————— service/client.
- #to_read_fds ⇒ Object
Methods inherited from Channel
Constructor Details
#initialize(destination, serializer) ⇒ TcpClient
Constructs a tcp client channel. destination may either be a socket, in which case phase 1) of connection negotiation is skipped, or a string that contains an ‘address:port’ part.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/cod/tcp_client.rb', line 24 def initialize(destination, serializer) @serializer = serializer @destination = destination # TODO remove this soon fail "Deprecated API" if destination.kind_of?(TCPSocket) # TcpClient handles two cases: Construction via an url (destination is a # string) and construction via a connection that has been # preestablished (destination is a socket): if destination.respond_to?(:established?) # Should be a connection already. @connection = destination else @connection = RobustConnection.new(destination) end @work_queue = WorkQueue.new # The predicate for allowing sends: Is the connection up? @work_queue.predicate { cached_connection_established? } end |
Class Method Details
._load(params) ⇒ Object
:nodoc:
128 129 130 131 132 133 |
# File 'lib/cod/tcp_client.rb', line 128 def self._load(params) # :nodoc: # Instead of a tcp client (no way to construct one at this point), we'll # insert a kind of marker in the object stream that will be replaced # with a valid client later on. (hopefully) OtherEnd.new(params) end |
Instance Method Details
#_dump(level) ⇒ Object
:nodoc:
125 126 127 |
# File 'lib/cod/tcp_client.rb', line 125 def _dump(level) # :nodoc: @destination end |
#client ⇒ Object
104 105 106 107 108 109 110 111 112 113 |
# File 'lib/cod/tcp_client.rb', line 104 def client # NOTE: Normally, it doesn't make sense to ask the client channel for # something for a service connection, since the service needs to know # where to send requests in addition to knowing where to receive # answers. In the case of sockets, this is different: The service will # send its answers back the same way it got the requests from, so this # is really ok: # Service::Client.new(self, self) end |
#close ⇒ Object
Closes all underlying connections. You should only call this if you don’t want to use the channel again, since it will also stop reconnection attempts.
53 54 55 56 |
# File 'lib/cod/tcp_client.rb', line 53 def close @work_queue.shutdown if @work_queue @connection.close end |
#get(opts = {}) ⇒ Object
Receives a message. opts may contain various options, see below. Options include:
86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/cod/tcp_client.rb', line 86 def get(opts={}) while @work_queue.size > 0 @work_queue.try_work end check_connection_state @connection.read(@serializer) rescue Errno::ECONNRESET, EOFError # Connection reset by peer raise ConnectionLost end |
#put(obj) ⇒ Object
Sends an object to the other end of the channel, if it is connected. If it is not connected, objects sent will queue up and once the internal storage reaches the high watermark, they will be dropped silently.
Example:
channel.put :object
# Really, any Ruby object that the current serializer can turn into
# a string!
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/cod/tcp_client.rb', line 67 def put(obj) # TODO high watermark check # NOTE: predicate will call #try_connect @work_queue.schedule { send(obj) } @work_queue.exclusive { # If we're connected, shut down the worker thread, do all the work # here. check_connection_state } @work_queue.try_work end |
#service ⇒ Object
——————————————————— service/client
101 102 103 |
# File 'lib/cod/tcp_client.rb', line 101 def service fail "A tcp client cannot be a service." end |
#to_read_fds ⇒ Object
137 138 139 140 141 142 143 |
# File 'lib/cod/tcp_client.rb', line 137 def to_read_fds if @connection [@connection.socket] else [] end end |