In Files

  • rinda/ring.rb

Parent

Included Modules

Rinda::RingServer

A RingServer allows a Rinda::TupleSpace to be located via UDP broadcasts. Default service location uses the following steps:

  1. A RingServer begins listening on the network broadcast UDP address.

  2. A RingFinger sends a UDP packet containing the DRb URI where it will listen for a reply.

  3. The RingServer receives the UDP packet and connects back to the provided DRb URI with the DRb service.

A RingServer requires a TupleSpace:

ts = Rinda::TupleSpace.new
rs = Rinda::RingServer.new

RingServer can also listen on multicast addresses for announcements. This allows multiple RingServers to run on the same host. To use network broadcast and multicast:

ts = Rinda::TupleSpace.new
rs = Rinda::RingServer.new ts, %w[Socket::INADDR_ANY, 239.0.0.1 ff02::1]

Public Class Methods

new(ts, addresses=[Socket::INADDR_ANY], port=Ring_PORT) click to toggle source

Advertises ts on the given addresses at port.

If addresses is omitted only the UDP broadcast address is used.

addresses can contain multiple addresses. If a multicast address is given in addresses then the RingServer will listen for multicast queries.

If you use IPv4 multicast you may need to set an address of the inbound interface which joins a multicast group.

ts = Rinda::TupleSpace.new
rs = Rinda::RingServer.new(ts, [['239.0.0.1', '9.5.1.1']])

You can set addresses as an Array Object. The first element of the Array is a multicast address and the second is an inbound interface address. If the second is omitted then '0.0.0.0' is used.

If you use IPv6 multicast you may need to set both the local interface address and the inbound interface index:

rs = Rinda::RingServer.new(ts, [['ff02::1', '::1', 1]])

The first element is a multicast address and the second is an inbound interface address. The third is an inbound interface index.

At this time there is no easy way to get an interface index by name.

If the second is omitted then '::1' is used. If the third is omitted then 0 (default interface) is used.

 
               # File rinda/ring.rb, line 93
def initialize(ts, addresses=[Socket::INADDR_ANY], port=Ring_PORT)
  @port = port

  if Integer === addresses then
    addresses, @port = [Socket::INADDR_ANY], addresses
  end

  @renewer = Renewer.new

  @ts = ts
  @sockets = []
  addresses.each do |address|
    if Array === address
      make_socket(*address)
    else
      make_socket(address)
    end
  end

  @w_services = write_services
  @r_service  = reply_service
end
            

Public Instance Methods

do_reply() click to toggle source

Pulls lookup tuples out of the TupleSpace and sends their DRb object the address of the local TupleSpace.

 
               # File rinda/ring.rb, line 217
def do_reply
  tuple = @ts.take([:lookup_ring, nil], @renewer)
  Thread.new { tuple[1].call(@ts) rescue nil}
rescue
end
            
do_write(msg) click to toggle source

Extracts the response URI from msg and adds it to TupleSpace where it will be picked up by reply_service for notification.

 
               # File rinda/ring.rb, line 192
def do_write(msg)
  Thread.new do
    begin
      tuple, sec = Marshal.load(msg)
      @ts.write(tuple, sec)
    rescue
    end
  end
end
            
make_socket(address, interface_address=nil, multicast_interface=0) click to toggle source

Creates a socket at address

If address is multicast address then interface_address and multicast_interface can be set as optional.

A created socket is bound to interface_address. If you use IPv4 multicast then the interface of interface_address is used as the inbound interface. If interface_address is omitted or nil then '0.0.0.0' or '::1' is used.

If you use IPv6 multicast then multicast_interface is used as the inbound interface. multicast_interface is a network interface index. If multicast_interface is omitted then 0 (default interface) is used.

 
               # File rinda/ring.rb, line 131
def make_socket(address, interface_address=nil, multicast_interface=0)
  addrinfo = Addrinfo.udp(address, @port)

  socket = Socket.new(addrinfo.pfamily, addrinfo.socktype,
                      addrinfo.protocol)

  if addrinfo.ipv4_multicast? or addrinfo.ipv6_multicast? then
    if Socket.const_defined?(:SO_REUSEPORT) then
      socket.setsockopt(:SOCKET, :SO_REUSEPORT, true)
    else
      socket.setsockopt(:SOCKET, :SO_REUSEADDR, true)
    end

    if addrinfo.ipv4_multicast? then
      interface_address = '0.0.0.0' if interface_address.nil?
      socket.bind(Addrinfo.udp(interface_address, @port))

      mreq = IPAddr.new(addrinfo.ip_address).hton +
        IPAddr.new(interface_address).hton

      socket.setsockopt(:IPPROTO_IP, :IP_ADD_MEMBERSHIP, mreq)
    else
      interface_address = '::1' if interface_address.nil?
      socket.bind(Addrinfo.udp(interface_address, @port))

      mreq = IPAddr.new(addrinfo.ip_address).hton +
        [multicast_interface].pack('I')

      socket.setsockopt(:IPPROTO_IPV6, :IPV6_JOIN_GROUP, mreq)
    end
  else
    socket.bind(addrinfo)
  end

  socket
rescue
  socket = socket.close if socket
  raise
ensure
  @sockets << socket if socket
end
            
reply_service() click to toggle source

Creates a thread that notifies waiting clients from the TupleSpace.

 
               # File rinda/ring.rb, line 205
def reply_service
  Thread.new do
    loop do
      do_reply
    end
  end
end
            
shutdown() click to toggle source

Shuts down the RingServer

 
               # File rinda/ring.rb, line 226
def shutdown
  @renewer.renew = false

  @w_services.each do |thread|
    thread.kill
    thread.join
  end

  @sockets.each do |socket|
    socket.close
  end

  @r_service.kill
  @r_service.join
end
            
write_services() click to toggle source

Creates threads that pick up UDP packets and passes them to #do_write for decoding.

 
               # File rinda/ring.rb, line 177
def write_services
  @sockets.map do |s|
    Thread.new(s) do |socket|
      loop do
        msg = socket.recv(1024)
        do_write(msg)
      end
    end
  end
end
            
There is an updated format of the API docs for this version here.