diff options
author | keiju <keiju@b2dd03c8-39d4-4d8f-98ff-823fe69b080e> | 2007-03-20 12:38:58 +0000 |
---|---|---|
committer | keiju <keiju@b2dd03c8-39d4-4d8f-98ff-823fe69b080e> | 2007-03-20 12:38:58 +0000 |
commit | 42a81120e510c2c2fb7822ff4d5738db6ba392bf (patch) | |
tree | 3f6024dcb1fe3a3ca6b3fd613f436fd99c80c80c /lib/shell/process-controller.rb | |
parent | d9d838a45beff13576e4fba42e0d5b611d0df706 (diff) |
* lib/shell.rb, lib/shell: support for ruby 1.9(YARV) thread model.
git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@12110 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
Diffstat (limited to 'lib/shell/process-controller.rb')
-rw-r--r-- | lib/shell/process-controller.rb | 158 |
1 files changed, 109 insertions, 49 deletions
diff --git a/lib/shell/process-controller.rb b/lib/shell/process-controller.rb index 8929805506..c38d8bc987 100644 --- a/lib/shell/process-controller.rb +++ b/lib/shell/process-controller.rb @@ -1,33 +1,38 @@ # # shell/process-controller.rb - -# $Release Version: 0.6.0 $ +# $Release Version: 0.7 $ # $Revision$ # $Date$ -# by Keiju ISHITSUKA(Nihon Rational Software Co.,Ltd) +# by Keiju ISHITSUKA([email protected]) # # -- # # # +require "forwardable" -require "mutex_m" -require "monitor" +require "thread" require "sync" class Shell class ProcessController @ProcessControllers = {} - @ProcessControllers.extend Mutex_m + @ProcessControllersMonitor = Mutex.new + @ProcessControllersCV = ConditionVariable.new + + @BlockOutputMonitor = Mutex.new + @BlockOutputCV = ConditionVariable.new class<<self + extend Forwardable - def process_controllers_exclusive - begin - @ProcessControllers.lock unless Thread.critical - yield - ensure - @ProcessControllers.unlock unless Thread.critical + def_delegator("@ProcessControllersMonitor", + "synchronize", "process_controllers_exclusive") + + def active_process_controllers + process_controllers_exclusive do + @ProcessControllers.dup end end @@ -43,6 +48,7 @@ class Shell if @ProcessControllers[pc] if (@ProcessControllers[pc] -= 1) == 0 @ProcessControllers.delete(pc) + @ProcessControllersCV.signal end end end @@ -55,6 +61,34 @@ class Shell end end end + + def block_output_synchronize(&b) + @BlockOutputMonitor.synchronize &b + end + + def wait_to_finish_all_process_controllers + process_controllers_exclusive do + while [email protected]? + Shell::notify("Process finishing, but active shell exists", + "You can use Shell#transact or Shell#check_point for more safe execution.") + if Shell.debug? + for pc in @ProcessControllers.keys + Shell::notify(" Not finished jobs in "+pc.shell.to_s) + for com in pc.jobs + com.notify(" Jobs: %id") + end + end + end + @ProcessControllersCV.wait(@ProcessControllersMonitor) + end + end + end + end + + # for shell-command complete finish at this prosess exit. + USING_AT_EXIT_WHEN_PROCESS_EXIT = true + at_exit do + wait_to_finish_all_process_controllers unless $@ end def initialize(shell) @@ -67,6 +101,8 @@ class Shell @job_condition = ConditionVariable.new end + attr_reader :shell + def jobs jobs = [] @jobs_sync.synchronize(:SH) do @@ -122,15 +158,19 @@ class Shell @waiting_jobs.delete command else command = @waiting_jobs.shift +# command.notify "job(%id) pre-start.", @shell.debug? + return unless command end @active_jobs.push command command.start +# command.notify "job(%id) post-start.", @shell.debug? # start all jobs that input from the job - for job in @waiting_jobs + for job in @waiting_jobs.dup start_job(job) if job.input == command end +# command.notify "job(%id) post2-start.", @shell.debug? end end @@ -152,6 +192,7 @@ class Shell @active_jobs.delete command ProcessController.inactivate(self) if @active_jobs.empty? + command.notify("start_jon in ierminate_jon(%id)", Shell::debug?) start_job end end @@ -159,7 +200,7 @@ class Shell # kill a job def kill_job(sig, command) - @jobs_sync.synchronize(:SH) do + @jobs_sync.synchronize(:EX) do if @waiting_jobs.delete command ProcessController.inactivate(self) return @@ -183,6 +224,9 @@ class Shell begin while !jobs.empty? @job_condition.wait(@job_monitor) + for job in jobs + job.notify("waiting job(%id)", Shell::debug?) + end end ensure redo unless jobs.empty? @@ -194,66 +238,82 @@ class Shell def sfork(command, &block) pipe_me_in, pipe_peer_out = IO.pipe pipe_peer_in, pipe_me_out = IO.pipe - Thread.critical = true - STDOUT.flush - ProcessController.each_active_object do |pc| - for jobs in pc.active_jobs - jobs.flush - end - end - - pid = fork { - Thread.critical = true - - Thread.list.each do |th| - th.kill unless [Thread.main, Thread.current].include?(th) - end - STDIN.reopen(pipe_peer_in) - STDOUT.reopen(pipe_peer_out) + pid = nil + pid_mutex = Mutex.new + pid_cv = ConditionVariable.new - ObjectSpace.each_object(IO) do |io| - if ![STDIN, STDOUT, STDERR].include?(io) - io.close unless io.closed? + Thread.start do + ProcessController.block_output_synchronize do + STDOUT.flush + ProcessController.each_active_object do |pc| + for jobs in pc.active_jobs + jobs.flush + end end + + pid = fork { + Thread.list.each do |th| +# th.kill unless [Thread.main, Thread.current].include?(th) + th.kill unless Thread.current == th + end + + STDIN.reopen(pipe_peer_in) + STDOUT.reopen(pipe_peer_out) + + ObjectSpace.each_object(IO) do |io| + if ![STDIN, STDOUT, STDERR].include?(io) + io.close unless io.closed? + end + end + + yield + } end - yield - } + pid_cv.signal - pipe_peer_in.close - pipe_peer_out.close - command.notify "job(%name:##{pid}) start", @shell.debug? - Thread.critical = false + pipe_peer_in.close + pipe_peer_out.close + command.notify "job(%name:##{pid}) start", @shell.debug? - th = Thread.start { - Thread.critical = true begin _pid = nil command.notify("job(%id) start to waiting finish.", @shell.debug?) - Thread.critical = false _pid = Process.waitpid(pid, nil) rescue Errno::ECHILD command.notify "warn: job(%id) was done already waitipd." _pid = true + # rescue + # STDERR.puts $! ensure + command.notify("Job(%id): Wait to finish when Process finished.", @shell.debug?) # when the process ends, wait until the command termintes - if _pid + if USING_AT_EXIT_WHEN_PROCESS_EXIT or _pid else command.notify("notice: Process finishing...", "wait for Job[%id] to finish.", "You can use Shell#transact or Shell#check_point for more safe execution.") redo end - Thread.exclusive do - @job_monitor.synchronize do - terminate_job(command) - @job_condition.signal - command.notify "job(%id) finish.", @shell.debug? - end + +# command.notify "job(%id) pre-pre-finish.", @shell.debug? + @job_monitor.synchronize do +# command.notify "job(%id) pre-finish.", @shell.debug? + terminate_job(command) +# command.notify "job(%id) pre-finish2.", @shell.debug? + @job_condition.signal + command.notify "job(%id) finish.", @shell.debug? end end - } + end + + pid_mutex.synchronize do + while !pid + pid_cv.wait(pid_mutex) + end + end + return pid, pipe_me_in, pipe_me_out end end |