Module: KafkaSyrup::Broker::Communications
- Defined in:
- lib/kafka_syrup/broker.rb
Instance Method Summary collapse
Instance Method Details
#send_request(req, opts = {}, &block) ⇒ Object
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/kafka_syrup/broker.rb', line 25 def send_request(req, opts = {}, &block) begin socket.write(req.encode) response = self.class.const_get(req.class.to_s.sub(/Request$/, 'Response')).new(socket, &block) rescue KafkaSyrup::KafkaResponseError => e raise e rescue StandardError => e raise KafkaSyrup::SocketReadError.new(e) end socket.close if opts[:close] response end |
#socket ⇒ Object
12 13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/kafka_syrup/broker.rb', line 12 def socket unless @socket.respond_to?(:closed?) && !@socket.closed? @socket = Socket.new(:INET, :SOCK_STREAM) @socket.setsockopt(:SOCKET, :SO_SNDBUF, KafkaSyrup.config.so_sndbuf) @socket.connect(Socket.pack_sockaddr_in(port, host)) end @socket rescue => e @socket.close rescue @socket = nil raise e end |