class Bundler::ParallelWorkers::UnixWorker

UnixWorker is used only on platforms where fork is available. The way this code works is, it forks a preconfigured number of workers and then It starts preconfigured number of threads that write to the connected pipe.

Public Class Methods

new(size, job) click to toggle source
Calls superclass method
# File lib/bundler/parallel_workers/unix_worker.rb, line 17
def initialize(size, job)
  # Close the persistent connections for the main thread before forking
  Net::HTTP::Persistent.new('bundler', :ENV).shutdown
  super
end

Private Instance Methods

prepare_threads(size) click to toggle source

Start the threads whose job is basically to wait for incoming messages on request queue and write that message to the connected pipe. Also retrieve messages from child worker via connected pipe and write the message to response queue

@param size [Integer] Number of threads to be started

# File lib/bundler/parallel_workers/unix_worker.rb, line 67
def prepare_threads(size)
  @threads = size.times.map do |i|
    Thread.start do
      worker = @workers[i]
      loop do
        obj = @request_queue.deq
        break if obj.equal? POISON
        @response_queue.enq worker.work(obj)
      end
    end
  end
end
prepare_workers(size, func) click to toggle source

Start forked workers for downloading gems. This version of worker is only used on platforms where fork is available.

@param size [Integer] Size of worker pool @param func [Proc] Job that should be executed in the worker

# File lib/bundler/parallel_workers/unix_worker.rb, line 30
def prepare_workers(size, func)
  @workers = size.times.map do |num|
    child_read, parent_write = IO.pipe
    parent_read, child_write = IO.pipe

    pid = Process.fork do
      begin
        parent_read.close
        parent_write.close

        while !child_read.eof?
          obj = Marshal.load child_read
          Marshal.dump func.call(obj, num), child_write
        end
      rescue Exception => e
        begin
          Marshal.dump WrappedException.new(e), child_write
        rescue Errno::EPIPE
          nil
        end
      ensure
        child_read.close
        child_write.close
      end
    end

    child_read.close
    child_write.close
    JobHandler.new pid, parent_read, parent_write
  end
end
stop_workers() click to toggle source

Kill the forked workers by sending SIGINT to them

# File lib/bundler/parallel_workers/unix_worker.rb, line 81
def stop_workers
  @workers.each do |worker|
    worker.io_r.close unless worker.io_r.closed?
    worker.io_w.close unless worker.io_w.closed?
    begin
      Process.kill :INT, worker.pid
    rescue Errno::ESRCH
      nil
    end
  end
  @workers.each do |worker|
    begin
      Process.waitpid worker.pid
    rescue Errno::ECHILD
      nil
    end
  end
end