php获取后台Job管理的实现代码 |
本文标签:Job管理 复制代码 代码如下: <?php defined(SYSPATH) OR die(No direct access allowed.); class Controller_Jobs extends Controller_Base{ public function before(){ parent::before(); if(Request::$protocol != "cli"){ die("Only cli allowed!\n"); } } public function after(){ parent::after(); //do some cleaning tasks } private function _execJobCommand($joburi,$paras){ $php_exec = Kohana::config("picsou.php_exec"); $php_index = APPINDEX; $command_args = array(); $command_args[] = $php_index; $command_args[] = "--uri=".$joburi; foreach ($paras as $para => $value){ $command_args[] = "--".$para."=".$value; } //var_dump($command_args);exit; echo "exec commmand:".$php_exec."\n"; pcntl_exec($php_exec,$command_args); } /* * Running jobs in queues */ public function action_run(){ $requestCount = 0; while(true){ $sql = "select * from job_queue where status=1 and approved=1 order by id"; $jobs = DB::query(Database::SELECT,$sql)->execute()->as_array(); if($jobs){ foreach ($jobs as $job){ $requestCount ++; //update the jobs status as running DB::update(job_queue)->set(array(status=>2)) ->where(id,=,$job[id])->execute(); $job_pid = pcntl_fork(); if($job_pid == -1){ die("Could not fork Child"); } else if($job_pid == 0 ){ $this->_execJobCommand($job[job_uri],json_decode($job[paras],true)); echo "finish Child\n"; exit(0); //run jobs here } else{ echo "Waiting for job\n"; ob_flush(); $child_pid = pcntl_waitpid($job_pid,$status, WUNTRACED); echo "waitpid end:".$status."\n"; if($status == 0){ //job completed DB::update(job_queue)->set(array(status=>999)) ->where(id,=,$job[id])->execute(); echo "Child Finished\n"; ob_flush(); }else{ DB::update(job_queue)->set(array(status=>-1)) ->where(id,=,$job[id])->execute(); echo "Child Failed\n"; ob_flush(); } } } } else{ if($requestCount >=10){ echo "Have a rest, I have processed 10 jobs\n"; exit; } //no job to run //echo "No job\n"; ob_flush(); sleep(5); } } } } |