# # To change this template, choose Tools | Templates # and open the template in the editor. require 'drb' require 'ruby2ruby' class Sender class NoServers < RuntimeError; end @@instance = nil def self.instance(*args) @@instance ||= self.new(*args) end def initialize() $logger.debug "creating first sender" @worker_pool = Array.new @worker_mutex = Mutex.new @lambdas = {} @workers = {} begin DRb.start_service rescue Exception raise end @worker_pool << nil # ugly bad bad bad hack pinger end def add_worker(address) $logger.debug "adding worker: #{address}" begin drb_object = DRb::DRbObject.new(nil, address) drb_object.ping @lambdas.each {|block| drb_object.prepare_lambda(block.to_sexp) } @worker_mutex.synchronize do @workers[address] = drb_object @worker_pool << drb_object end rescue Exception end self end def remove_worker(address) $logger.debug "removing worker: #{address}" @worker_mutex.synchronize do @worker_pool.delete(@workers[address]) @workers.delete(address) end end def pinger Thread.new() { loop { @workers.each_pair{ |address, drb| begin drb.ping rescue Exception remove_worker(address) end } sleep 5 } } end def prepare_lambda(&block) block_sexp = block.to_sexp block_hash = block_sexp.hash return block_hash if @lambdas[block_hash] $logger.debug "started preparing lambda: #{block_hash}" @workers.each_pair{|address, drb| drb.prepare_lambda(block_sexp, block_hash) } @lambdas[block_hash] = block $logger.debug "finished preparing lambda: #{block_hash}" block_hash end def call(lambda, *args) result = nil begin $logger.debug "processing call" sleep 0.1 while @worker_pool.length == 0 # change it to Queue? or mayby semaphore? drb = false #scope @worker_mutex.synchronize do drb = @worker_pool.shift end result = drb ? drb.call(lambda, *args) : @lambdas[lambda].call(*args) # self fixing? @worker_mutex.synchronize do @worker_pool.push(drb) # hehe, self destroying end $logger.debug "finished remote call" rescue Exception => e $logger.error "error while processing call #{e.inspect} \n Backtrace:\n#{e.backtrace.join("\n")}\n" result = @lambdas[lambda].call(*args) end result end def lambda(&block) hash = prepare_lambda(&block) Kernel.lambda{|*args| Sender.instance.call(hash, *args) } end end