#2058 cancel transport kill already running transport processes too

This commit is contained in:
tomash 2007-01-24 03:04:56 +00:00
parent 1e48e8c7da
commit 8818ae3537
9 changed files with 129 additions and 46 deletions

View File

@ -669,7 +669,7 @@ class BasicStor {
if (!PEAR::isError($storedFile)) {
$storedFile->setMd5($md5sum);
} else {
$error = $storedFile;
# $error = $storedFile;
}
}
@ -845,7 +845,7 @@ class BasicStor {
} else {
$storedFile = StoredFile::RecallByGunid($gunid);
}
if (PEAR::isError($storedFile)) {
if (is_null($storedFile) || PEAR::isError($storedFile)) {
return $storedFile;
}
$r = $storedFile->md->getMetadataValue('dc:title', $lang, $deflang);
@ -1180,7 +1180,7 @@ class BasicStor {
if (PEAR::isError($storedFile)) {
return $storedFile;
}
$MDfname = $storedFile->md->getName();
$MDfname = $storedFile->md->getFileName();
if (PEAR::isError($MDfname)) {
return $MDfname;
}
@ -2104,11 +2104,18 @@ class BasicStor {
*/
public static function RemoveObj($id, $forced=FALSE)
{
switch ($ot = BasicStor::GetObjType($id)) {
$ot = BasicStor::GetObjType($id);
if (PEAR::isError($ot)) {
return $ot;
}
switch ($ot) {
case "audioclip":
case "playlist":
case "webstream":
$storedFile = StoredFile::Recall($id);
if (is_null($storedFile)) {
return TRUE;
}
if (PEAR::isError($storedFile)) {
return $storedFile;
}

View File

@ -64,7 +64,7 @@ class LocStor extends BasicStor {
// Check if specified gunid exists.
$storedFile =& StoredFile::RecallByGunid($gunid);
if (!PEAR::isError($storedFile)) {
if (!is_null($storedFile) && !PEAR::isError($storedFile)) {
// gunid exists - do replace
$oid = $storedFile->getId();
if (($res = BasicStor::Authorize('write', $oid, $sessid)) !== TRUE) {
@ -75,7 +75,7 @@ class LocStor extends BasicStor {
'LocStor::storeAudioClipOpen: is accessed'
);
}
$res = $storedFile->replace($oid, $storedFile->name, '', $metadata, 'string');
$res = $storedFile->replace($oid, $storedFile->getName(), '', $metadata, 'string');
if (PEAR::isError($res)) {
return $res;
}
@ -535,6 +535,9 @@ class LocStor extends BasicStor {
protected function deleteAudioClip($sessid, $gunid, $forced=FALSE)
{
$storedFile =& StoredFile::RecallByGunid($gunid);
if (is_null($storedFile)) {
return TRUE;
}
if (PEAR::isError($storedFile)) {
if ($storedFile->getCode()==GBERR_FOBJNEX && $forced) {
return TRUE;

View File

@ -133,6 +133,7 @@ class MetaData {
return $res;
}
}
unset($this->metadata);
return $this->insert($mdata, $loc, $format);
}
@ -951,8 +952,11 @@ class MetaData {
}
if (is_null($row)) {
$node = XML_Util::createTagFromArray(array(
'localpart'=>'none'
'localPart'=>'none'
));
if (PEAR::isError($node)) {
return $node;
}
} else {
$node = $this->genXMLNode($row);
if (PEAR::isError($node)) {

View File

@ -345,7 +345,7 @@ class StoredFile {
global $CC_CONFIG;
global $CC_DBC;
$this->gunid = $p_gunid;
if (is_null($this->gunid)) {
if (empty($this->gunid)) {
$this->gunid = StoredFile::CreateGunid();
}
$this->resDir = $this->_getResDir($this->gunid);
@ -386,14 +386,14 @@ class StoredFile {
if ($storedFile->ftype == 'playlist') {
$storedFile->mime = 'application/smil';
} else {
$storedFile->mime = $p_values["mime"];
$storedFile->mime = (isset($p_values["mime"]) ? $p_values["mime"] : NULL );
}
$storedFile->filepath = $p_values['filepath'];
# $storedFile->filepath = $p_values['filepath'];
if (isset($p_values['md5'])) {
$storedFile->md5 = $p_values['md5'];
} elseif (file_exists($storedFile->filepath)) {
echo "StoredFile::Insert: WARNING: Having to recalculate MD5 value\n";
$storedFile->md5 = md5_file($storedFile->filepath);
} elseif (file_exists($p_values['filepath'])) {
# echo "StoredFile::Insert: WARNING: Having to recalculate MD5 value\n";
$storedFile->md5 = md5_file($p_values['filepath']);
}
$storedFile->exists = FALSE;
@ -417,7 +417,9 @@ class StoredFile {
// Insert metadata
$metadata = $p_values['metadata'];
$mdataLoc = ($metadata[0]=="/")?"file":"string";
// $mdataLoc = ($metadata[0]=="/")? "file":"string";
// for non-absolute paths:
$mdataLoc = ($metadata[0]!="<")? "file":"string";
if (is_null($metadata) || ($metadata == '') ) {
$metadata = dirname(__FILE__).'/emptyMdata.xml';
$mdataLoc = 'file';
@ -435,18 +437,18 @@ class StoredFile {
}
// Save media file
if (!empty($storedFile->filepath)) {
if (!file_exists($storedFile->filepath)) {
if (!empty($p_values['filepath'])) {
if (!file_exists($p_values['filepath'])) {
return PEAR::raiseError("StoredFile::Insert: ".
"media file not found ($storedFile->filepath)");
"media file not found ({$p_values['filepath']})");
}
$res = $storedFile->addFile($storedFile->filepath, $p_copyMedia);
$res = $storedFile->addFile($p_values['filepath'], $p_copyMedia);
if (PEAR::isError($res)) {
$CC_DBC->query("ROLLBACK");
return $res;
}
if (empty($storedFile->mime)) {
echo "StoredFile::Insert: WARNING: Having to recalculate MIME value\n";
# echo "StoredFile::Insert: WARNING: Having to recalculate MIME value\n";
$storedFile->setMime($storedFile->getMime());
}
$emptyState = FALSE;
@ -514,6 +516,8 @@ class StoredFile {
$storedFile = new StoredFile($gunid);
} elseif ($row['ftype'] == 'playlist') {
$storedFile = new Playlist($gunid);
} else { // fallback
$storedFile = new StoredFile($gunid);
}
$storedFile->gunidBigint = $row['gunid_bigint'];
$storedFile->md->gunidBigint = $row['gunid_bigint'];

View File

@ -77,6 +77,7 @@ class Transport
* wget --limit-rate parameter
*/
private $downLimitRate = NULL;
# private $downLimitRate = 500;
/**
* wget -t parameter
@ -113,6 +114,7 @@ class Transport
* @var int
*/
private $upLimitRate = NULL;
# private $upLimitRate = 500;
/**
@ -262,6 +264,11 @@ class Transport
);
}
$res = $trec->setState($newState);
switch ($action) {
case 'pause';
case 'cancel';
$trec->killJob();
}
return $res;
}
@ -800,8 +807,11 @@ class Transport
} else {
$redirect = "/dev/null";
}
$command = "{$this->cronJobScript} {$trtok} >> $redirect &";
$res = system($command, $status);
$redirect_escaped = escapeshellcmd($redirect);
$command = "{$this->cronJobScript} {$trtok}";
$command_escaped = escapeshellcmd($command);
$command_final = "$command_escaped >> $redirect_escaped 2>&1 &";
$res = system($command_final, $status);
if ($res === FALSE) {
$this->trLog(
"cronMain: Error on execute cronJobScript with trtok {$trtok}"
@ -883,7 +893,8 @@ class Transport
default:
if (method_exists($this, $mname)) {
// lock the job:
$r = $trec->setLock(TRUE);
$pid = getmypid();
$r = $trec->setLock(TRUE, $pid);
if (PEAR::isError($r)) {
return $r;
}
@ -904,7 +915,7 @@ class Transport
$asessid = $r;
// method call:
if (TR_LOG_LEVEL > 2) {
$this->trLog("cronCallMethod: $mname($trtok) >");
$this->trLog("cronCallMethod($pid): $mname($trtok) >");
}
$ret = call_user_func(array($this, $mname), $row, $asessid);
if (PEAR::isError($ret)) {
@ -912,7 +923,7 @@ class Transport
return $this->_failFatal($ret, $trec);
}
if (TR_LOG_LEVEL > 2) {
$this->trLog("cronCallMethod: $mname($trtok) <");
$this->trLog("cronCallMethod($pid): $mname($trtok) <");
}
// unlock the job:
$r = $trec->setLock(FALSE);
@ -980,6 +991,7 @@ class Transport
*/
function cronDownloadInit($row, $asessid)
{
global $CC_CONFIG;
$trtok = $row['trtok'];
$trec = TransportRecord::recall($this, $trtok);
if (PEAR::isError($trec)) {
@ -1085,6 +1097,18 @@ class Transport
return TRUE;
}
$res = system($command, $status);
// leave paused and closed transports
$trec2 = TransportRecord::recall($this, $trtok);
if (PEAR::isError($trec)) {
return $trec;
}
$state2 = $trec2->row['state'];
if ($state2 == 'paused' || $state2 == 'closed' ) {
return TRUE;
}
// status 18 - Partial file. Only a part of the file was transported.
// status 28 - Timeout. Too long/slow upload, try to resume next time rather.
// status 6 - Couldn't resolve host.
@ -1168,31 +1192,43 @@ class Transport
return TRUE;
}
$res = system($command, $status);
// leave paused and closed transports
$trec2 = TransportRecord::recall($this, $trtok);
if (PEAR::isError($trec)) {
return $trec;
}
$state2 = $trec2->row['state'];
if ($state2 == 'paused' || $state2 == 'closed' ) {
return TRUE;
}
// check consistency
$size = filesize($row['localfile']);
if ($status == 0 || ($status == 1 && $size >= $row['expectedsize'])) {
$chsum = $this->_chsum($row['localfile']);
if ($size < $row['expectedsize']) {
// not finished - return to the 'waiting' state
$r = $trec->setState('waiting', array('realsize'=>$size));
if (PEAR::isError($r)) {
return $r;
}
} elseif ($size >= $row['expectedsize']) {
$chsum = $this->_chsum($row['localfile']);
if ($chsum == $row['expectedsum']) {
// mark download as finished
$r = $trec->setState('finished',
array('realsum'=>$chsum, 'realsize'=>$size));
if (PEAR::isError($r)) {
return $r;
return $r;
}
} else {
// bad checksum
// bad checksum, retry from the scratch
@unlink($row['localfile']);
$r = $trec->setState('waiting',
array('realsum'=>$chsum, 'realsize'=>$size));
if (PEAR::isError($r)) {
return $r;
return $r;
}
}
} else {
return PEAR::raiseError("Transport::cronDownloadWaiting:".
" wrong return status from wget: $status ".
"($trtok)"
);
}
return TRUE;
}
@ -1210,6 +1246,7 @@ class Transport
*/
function cronUploadFinished($row, $asessid)
{
global $CC_CONFIG;
$trtok = $row['trtok'];
$trec = TransportRecord::recall($this, $trtok);
if (PEAR::isError($trec)) {
@ -1330,7 +1367,8 @@ class Transport
if (PEAR::isError($mdtrec)) {
return $mdtrec;
}
$r = $mdtrec->setLock(TRUE);
$pid = getmypid();
$r = $mdtrec->setLock(TRUE, $pid);
if (PEAR::isError($r)) {
return $r;
}
@ -1730,6 +1768,7 @@ class Transport
*/
function trLog($msg)
{
global $CC_CONFIG;
$logfile = $CC_CONFIG['transDir']."/activity.log";
if (FALSE === ($fp = fopen($logfile, "a"))) {
return PEAR::raiseError(

View File

@ -116,7 +116,7 @@ class TransportRecord
*/
function recall(&$tr, $trtok)
{
global $CC_DBC;
global $CC_DBC, $CC_CONFIG;
$trec = new TransportRecord($tr);
$trec->trtok = $trtok;
$row = $CC_DBC->getRow("
@ -125,7 +125,7 @@ class TransportRecord
to_hex(gunid)as gunid, to_hex(pdtoken)as pdtoken,
fname, localfile, url, rtrtok, mdtrtok, uid,
expectedsize, realsize, expectedsum, realsum,
errmsg, title
errmsg, title, jobpid
FROM ".$CC_CONFIG['transTable']."
WHERE trtok='$trtok'
");
@ -207,16 +207,19 @@ class TransportRecord
/**
* Set lock on transport record
* Set lock on transport record and save/clear process id
*
* @param boolean $lock
* lock if true, release lock if false
* @param int $pid
* process id
* @return mixed
* true or error
*/
function setLock($lock)
function setLock($lock, $pid=NULL)
{
global $CC_CONFIG, $CC_DBC;
$pidsql = (is_null($pid) ? "NULL" : "$pid" );
if ($this->dropped) {
return TRUE;
}
@ -225,7 +228,7 @@ class TransportRecord
$snlock = ($nlock ? 'Y' : 'N');
$r = $CC_DBC->query("
UPDATE ".$CC_CONFIG['transTable']."
SET lock='$slock', ts=now()
SET lock='$slock', jobpid=$pidsql, ts=now()
WHERE trtok='{$this->trtok}' AND lock = '$snlock'"
);
if (PEAR::isError($r)) {
@ -235,7 +238,7 @@ class TransportRecord
if (PEAR::isError($affRows)) {
return $affRows;
}
if ($affRows != 1) {
if ($affRows === 0) {
$ltxt = ($lock ? 'lock' : 'unlock' );
return PEAR::raiseError(
"TransportRecord::setLock: can't $ltxt ({$this->trtok})"
@ -262,6 +265,24 @@ class TransportRecord
}
/**
* Kill transport job (on pause or cancel)
*
* @return string
* Transport type
*/
function killJob()
{
if (!$this->recalled) {
return PEAR::raiseError("TransportRecord::getTransportType:".
" not recalled ({$this->trtok})", TRERR_TOK
);
}
$jobpid = $this->row['jobpid'];
$res = system("pkill -P $jobpid", $status);
}
/**
* Set state to failed and set error message in transport record
*
@ -374,6 +395,9 @@ class TransportRecord
case "playlistPkg":
case "metadata":
$title = $this->gb->bsGetTitle(NULL, $this->row['gunid']);
if (is_null($title)) {
$title = $defStr;
}
if (PEAR::isError($title)) {
if ($title->getCode() == GBERR_FOBJNEX) {
$title = $defStr;

View File

@ -217,7 +217,7 @@ class XmlParser {
$parser = new XmlParser($data);
if ($parser->isError()) {
return PEAR::raiseError(
"SmilPlaylist::parse: ".$parser->getError()
"XmlParser::parse: ".$parser->getError()
);
}
$tree = $parser->getTree();

View File

@ -12,9 +12,10 @@ $CC_DBC->setFetchMode(DB_FETCHMODE_ASSOC);
$gb = new LocStor();
$tr = new Transport($gb);
$pid = getmypid();
list(, $trtok) = $_SERVER['argv'];
if (TR_LOG_LEVEL > 1) {
$tr->trLog("transportCronJob start ($trtok)");
$tr->trLog("transportCronJob($pid) start ($trtok)");
}
// 4-pass on job:
@ -23,11 +24,11 @@ for ($i = 0; $i < $cnt; $i++, sleep(1)) {
// run the action:
$r = $tr->cronCallMethod($trtok);
if (PEAR::isError($r)) {
$tr->trLogPear("transportCronJob: ($trtok): ", $r);
$tr->trLogPear("transportCronJob($pid): ($trtok): ", $r);
} else {
# $tr->trLog("X transportCronJob: ".var_export($r, TRUE));
if ($r !== TRUE) {
$tr->trLog("transportCronJob: ($trtok): nonTRUE returned");
$tr->trLog("transportCronJob($pid): ($trtok): nonTRUE returned");
}
}
#if(!$r) exit(1);
@ -35,7 +36,7 @@ for ($i = 0; $i < $cnt; $i++, sleep(1)) {
}
if (TR_LOG_LEVEL>1) {
$tr->trLog("transportCronJob end ($trtok)");
$tr->trLog("transportCronJob($pid) end ($trtok)");
}
exit(0);
?>

View File

@ -407,6 +407,7 @@ if (!camp_db_table_exists($CC_CONFIG['transTable'])) {
realsize int, -- filesize of transported part
uid int, -- local user id of transport owner
errmsg varchar(255), -- error message string for failed tr.
jobpid int, -- pid of transport job
start timestamp, -- starttime
ts timestamp -- mtime
)";