Got rid of all the stuff related to GUNID hex-to-int conversion. Commented out lots of functions that are either not in use or will no longer work. Pypo: made things more generic and pluggable, added documentation. Added the PHP scripts to serve the right info back to pypo.
1833 lines
No EOL
58 KiB
PHP
1833 lines
No EOL
58 KiB
PHP
<?php
|
|
define('TRERR_', 70);
|
|
define('TRERR_MD', 71);
|
|
define('TRERR_TOK', 72);
|
|
define('TRERR_NOTFIN', 73);
|
|
define('TRERR_XR_FAIL', 74);
|
|
#define('TR_LOG_LEVEL', 0);
|
|
define('TR_LOG_LEVEL', 10);
|
|
|
|
define('HOSTNAME', 'storageServer');
|
|
|
|
include_once("XML/RPC.php");
|
|
include_once("TransportRecord.php");
|
|
|
|
/**
|
|
* Class for handling file tranport between StorageServer and ArchiveServer<br>
|
|
* over unreliable network and from behind firewall<br><br>
|
|
*
|
|
* Transport states:
|
|
* <ul>
|
|
* <li>init: transport is prepared, but not started
|
|
* (e.g. no network connection is present)</li>
|
|
* <li>pending: transport is in progress, file is not fully transported to
|
|
* target system</li>
|
|
* <li>waiting: transport is in progress, but not running now</li>
|
|
* <li>finished: transport is finished, but file processing on target side
|
|
* is not completed</li>
|
|
* <li>closed: processing on target side is completed without errors</li>
|
|
* <li>failed: error - error message stored in errmsg field</li>
|
|
* <li>paused: transport have been paused</li>
|
|
* </ul>
|
|
*
|
|
* Transport types:
|
|
* <ul>
|
|
* <li>audioclip</li>
|
|
* <li>playlist</li>
|
|
* <li>metadata</li>
|
|
* <li>file</li>
|
|
* </ul>
|
|
*
|
|
* @package Campcaster
|
|
* @subpackage StorageServer
|
|
* @copyright 2010 Sourcefabric O.P.S.
|
|
* @license http://www.gnu.org/licenses/gpl.txt
|
|
*/
|
|
class Transport
|
|
{
|
|
/**
|
|
* @var GreenBox
|
|
*/
|
|
public $gb;
|
|
|
|
/**
|
|
* File name
|
|
* @var string
|
|
*/
|
|
private $cronJobScript;
|
|
|
|
/**
|
|
* wget --read-timeout parameter [s]
|
|
* @var int
|
|
*/
|
|
private $downTimeout = 900;
|
|
|
|
/**
|
|
* wget --waitretry parameter [s]
|
|
* @var int
|
|
*/
|
|
private $downWaitretry = 10;
|
|
|
|
/**
|
|
* wget --limit-rate parameter
|
|
*/
|
|
private $downLimitRate = NULL;
|
|
# private $downLimitRate = 500;
|
|
|
|
/**
|
|
* wget -t parameter
|
|
* @var int
|
|
*/
|
|
private $downRetries = 6;
|
|
|
|
/**
|
|
* curl --max-time parameter
|
|
* @var int
|
|
*/
|
|
private $upTrMaxTime = 1800;
|
|
|
|
/**
|
|
* curl --speed-time parameter
|
|
* @var int
|
|
*/
|
|
private $upTrSpeedTime = 30;
|
|
|
|
/**
|
|
* curl --speed-limit parameter
|
|
* @var int
|
|
*/
|
|
private $upTrSpeedLimit = 30;
|
|
|
|
/**
|
|
* curl --connect-timeout parameter
|
|
* @var int
|
|
*/
|
|
private $upTrConnectTimeout = 20;
|
|
|
|
/**
|
|
* curl --limit-rate parameter
|
|
* @var int
|
|
*/
|
|
private $upLimitRate = NULL;
|
|
# private $upLimitRate = 500;
|
|
|
|
|
|
/**
|
|
* Constructor
|
|
*
|
|
* @param LocStor $gb
|
|
* @return Transport
|
|
*/
|
|
public function __construct(&$gb)
|
|
{
|
|
$this->gb =& $gb;
|
|
$this->cronJobScript = realpath(
|
|
dirname(__FILE__).
|
|
'/../../storageServer/var/cron/transportCronJob.php'
|
|
);
|
|
}
|
|
|
|
|
|
/* ==================================================== transport methods */
|
|
/* ------------------------------------------------------- common methods */
|
|
/**
|
|
* Common "check" method for transports
|
|
*
|
|
* @param string $trtok
|
|
* transport token
|
|
* @return array
|
|
* struct/hasharray with fields:
|
|
* trtype: string -
|
|
* audioclip | playlist | playlistPkg | metadata | file
|
|
* state: string - transport state
|
|
* init | pending | waiting | finished | closed | failed
|
|
* direction: string - up | down
|
|
* expectedsize: int - file size in bytes
|
|
* realsize: int - currently transported bytes
|
|
* expectedsum: string - orginal file checksum
|
|
* realsum: string - transported file checksum
|
|
* title: string - dc:title or filename etc.
|
|
* errmsg: string - error message for failed transports
|
|
* ... ?
|
|
*/
|
|
function getTransportInfo($trtok)
|
|
{
|
|
$trec = TransportRecord::recall($this, $trtok);
|
|
if (PEAR::isError($trec)) {
|
|
return $trec;
|
|
}
|
|
$res = array();
|
|
foreach (array(
|
|
'trtype', 'state', 'direction', 'expectedsize', 'realsize',
|
|
'expectedsum', 'realsum', 'title', 'errmsg'
|
|
) as $k) {
|
|
$res[$k] = ( isset($trec->row[$k]) ? $trec->row[$k] : NULL );
|
|
}
|
|
if ( ($trec->row['direction'] == 'down') && file_exists($trec->row['localfile']) ){
|
|
$res['realsize'] = filesize($trec->row['localfile']);
|
|
$res['realsum'] = $this->_chsum($trec->row['localfile']);
|
|
}
|
|
if ( ($trec->row['direction'] == 'up') ){
|
|
$check = $this->uploadCheck($trec->row['pdtoken']);
|
|
if (!PEAR::isError($check)) {
|
|
$res['realsize'] = $check['size'];
|
|
$res['realsum'] = $check['realsum'];
|
|
}
|
|
}
|
|
return $res;
|
|
}
|
|
|
|
|
|
/**
|
|
* Turn transports on/off, optionaly return current state.
|
|
* (true=On / false=off)
|
|
*
|
|
* @param string $sessid
|
|
* session id
|
|
* @param boolean $onOff
|
|
* optional (if not used, current state is returned)
|
|
* @return boolea
|
|
* previous state
|
|
*/
|
|
function turnOnOffTransports($sessid, $onOff=NULL)
|
|
{
|
|
require_once('Prefs.php');
|
|
$pr = new Prefs($this->gb);
|
|
$group = $CC_CONFIG['StationPrefsGr'];
|
|
$key = 'TransportsDenied';
|
|
$res = $pr->loadGroupPref($group, $key);
|
|
if (PEAR::isError($res)) {
|
|
if ($res->getCode() !== GBERR_PREF) {
|
|
return $res;
|
|
} else {
|
|
$res = FALSE; // default
|
|
}
|
|
}
|
|
$state = !$res;
|
|
if (is_null($onOff)) {
|
|
return $state;
|
|
}
|
|
$res = $pr->saveGroupPref($sessid, $group, $key, !$onOff);
|
|
if (PEAR::isError($res)) {
|
|
return $res;
|
|
}
|
|
return $state;
|
|
}
|
|
|
|
|
|
/**
|
|
* Pause, resume or cancel transport
|
|
*
|
|
* @param string $trtok
|
|
* transport token
|
|
* @param string $action
|
|
* pause | resume | cancel
|
|
* @return string
|
|
* resulting transport state
|
|
*/
|
|
function doTransportAction($trtok, $action)
|
|
{
|
|
$trec = TransportRecord::recall($this, $trtok);
|
|
if (PEAR::isError($trec)) {
|
|
return $trec;
|
|
}
|
|
if ($trec->getState() == 'closed') {
|
|
return PEAR::raiseError(
|
|
"Transport::doTransportAction:".
|
|
" closed transport token ($trtok)", TRERR_TOK
|
|
);
|
|
}
|
|
switch ($action) {
|
|
case 'pause';
|
|
$newState = 'paused';
|
|
break;
|
|
case 'resume';
|
|
$newState = 'waiting';
|
|
break;
|
|
case 'cancel';
|
|
$newState = 'closed';
|
|
break;
|
|
default:
|
|
return PEAR::raiseError(
|
|
"Transport::doTransportAction:".
|
|
" unknown action ($action)"
|
|
);
|
|
}
|
|
$res = $trec->setState($newState);
|
|
switch ($action) {
|
|
case 'pause';
|
|
case 'cancel';
|
|
$trec->killJob();
|
|
}
|
|
return $res;
|
|
}
|
|
|
|
/* ------------- special methods for audioClip/webstream object transport */
|
|
|
|
/**
|
|
* Start upload of audioClip/webstream/playlist from local storageServer
|
|
* to hub.
|
|
*
|
|
* @param string $gunid
|
|
* global unique id of object being transported
|
|
* @param boolean $withContent
|
|
* if true, transport playlist content too (optional)
|
|
* @param array $pars
|
|
* default parameters (optional, internal use)
|
|
* @return string
|
|
* transport token
|
|
*/
|
|
function upload2Hub($gunid, $withContent=TRUE, $pars=array())
|
|
{
|
|
global $CC_CONFIG, $CC_DBC;
|
|
$this->trLog("upload2Hub start: ".strftime("%H:%M:%S"));
|
|
switch ($ftype = BasicStor::GetType($gunid)) {
|
|
case "audioclip":
|
|
case "webstream":
|
|
$storedFile = StoredFile::RecallByGunid($gunid);
|
|
if (is_null($storedFile) || PEAR::isError($storedFile)) {
|
|
return $storedFile;
|
|
}
|
|
// handle metadata:
|
|
$mdfpath = $storedFile->getRealMetadataFileName();
|
|
if (PEAR::isError($mdfpath)) {
|
|
return $mdfpath;
|
|
}
|
|
$mdtrec = $this->_uploadGeneralFileToHub($mdfpath, 'metadata',
|
|
array_merge(array('gunid'=>$gunid, 'fname'=>'metadata',), $pars)
|
|
);
|
|
if (PEAR::isError($mdtrec)) {
|
|
return $mdtrec;
|
|
}
|
|
// handle raw media file:
|
|
$fpath = $storedFile->getRealFileName();
|
|
if (PEAR::isError($fpath)) {
|
|
return $fpath;
|
|
}
|
|
$fname = $storedFile->getName();
|
|
if (PEAR::isError($fname)) {
|
|
return $fname;
|
|
}
|
|
$trec = $this->_uploadGeneralFileToHub($fpath, 'audioclip',
|
|
array_merge(array(
|
|
'gunid'=>$gunid, 'fname'=>$fname, 'mdtrtok'=>$mdtrec->trtok,
|
|
), $pars)
|
|
);
|
|
if (PEAR::isError($trec)) {
|
|
return $trec;
|
|
}
|
|
$this->startCronJobProcess($mdtrec->trtok);
|
|
break;
|
|
|
|
case "playlist":
|
|
$plid = $gunid;
|
|
require_once("Playlist.php");
|
|
$pl = StoredFile::RecallByGunid($plid);
|
|
if (is_null($pl) || PEAR::isError($pl)) {
|
|
return $pl;
|
|
}
|
|
$fname = $pl->getName();
|
|
if (PEAR::isError($fname)) {
|
|
return $fname;
|
|
}
|
|
if ($withContent) {
|
|
$this->trLog("upload2Hub exportPlaylistOpen BEGIN: ".strftime("%H:%M:%S"));
|
|
$res = $this->gb->bsExportPlaylistOpen($plid);
|
|
$this->trLog("upload2Hub exportPlaylistOpen END: ".strftime("%H:%M:%S"));
|
|
if (PEAR::isError($res)) {
|
|
return $res;
|
|
}
|
|
$tmpn = tempnam($CC_CONFIG['transDir'], 'plExport_');
|
|
$plfpath = "$tmpn.lspl";
|
|
$this->trLog("upload2Hub begin copy: ".strftime("%H:%M:%S"));
|
|
copy($res['fname'], $plfpath);
|
|
$this->trLog("upload2Hub end copy: ".strftime("%H:%M:%S"));
|
|
$res = $this->gb->bsExportPlaylistClose($res['token']);
|
|
if (PEAR::isError($res)) {
|
|
return $res;
|
|
}
|
|
$fname = $fname.".lspl";
|
|
$trtype = 'playlistPkg';
|
|
} else {
|
|
$plfpath = $pl->getRealMetadataFileName();
|
|
if (PEAR::isError($plfpath)) {
|
|
return $plfpath;
|
|
}
|
|
$trtype = 'playlist';
|
|
}
|
|
$trec = $this->_uploadGeneralFileToHub($plfpath, $trtype,
|
|
array_merge(array('gunid'=>$plid,'fname'=>$fname,), $pars));
|
|
if (PEAR::isError($trec)) {
|
|
return $trec;
|
|
}
|
|
break;
|
|
default:
|
|
return PEAR::raiseError("Transport::upload2Hub: ftype not supported ($ftype)");
|
|
}
|
|
$this->startCronJobProcess($trec->trtok);
|
|
$this->trLog("upload2Hub end: ".strftime("%H:%M:%S"));
|
|
return $trec->trtok;
|
|
}
|
|
|
|
|
|
/**
|
|
* Start download of audioClip/webstream/playlist from hub to local
|
|
* storageServer
|
|
*
|
|
* @param int $uid
|
|
* local user id of transport owner
|
|
* (for downloading file to homedir in storage)
|
|
* @param string $gunid
|
|
* global unique id of object being transported
|
|
* @param boolean $withContent
|
|
* if true, transport playlist content too (optional)
|
|
* @param array $pars
|
|
* default parameters (optional, internal use)
|
|
* @return string
|
|
* transport token
|
|
*/
|
|
function downloadFromHub($uid, $gunid, $withContent=TRUE, $pars=array())
|
|
{
|
|
$trtype = ($withContent ? 'playlistPkg' : 'unknown' );
|
|
$trec = TransportRecord::create($this, $trtype, 'down',
|
|
array_merge(array('gunid'=>$gunid, 'uid'=>$uid), $pars));
|
|
if (PEAR::isError($trec)) {
|
|
return $trec;
|
|
}
|
|
$this->startCronJobProcess($trec->trtok);
|
|
return $trec->trtok;
|
|
}
|
|
|
|
|
|
/* ------------------------------------------------ remote-search methods */
|
|
/**
|
|
* Start search job on remote Campcaster instance.
|
|
*
|
|
* @param array $criteria
|
|
* LS criteria format (see localSearch)
|
|
* @param string $resultMode
|
|
* 'php' | 'xmlrpc'
|
|
* @param array $pars
|
|
* default parameters (optional, internal use)
|
|
* @return string
|
|
* transport token
|
|
*/
|
|
function remoteSearch($criteria, $resultMode='php')
|
|
{
|
|
global $CC_CONFIG, $CC_DBC;
|
|
$criteria['resultMode'] = $resultMode;
|
|
|
|
// testing of hub availability and hub account configuration.
|
|
$sessid = $this->loginToArchive();
|
|
if (PEAR::isError($sessid)) {
|
|
switch(intval($sessid->getCode())) {
|
|
case 802:
|
|
return PEAR::raiseError("Can't login to Hub ({$sessid->getMessage()})", TRERR_XR_FAIL);
|
|
case TRERR_XR_FAIL:
|
|
return PEAR::raiseError("Can't connect to Hub ({$sessid->getMessage()})", TRERR_XR_FAIL);
|
|
}
|
|
return $sessid;
|
|
}
|
|
$params = array("sessid" => $sessid, "criteria" => $criteria);
|
|
$result = $this->xmlrpcCall("locstor.searchMetadata", $params);
|
|
//$result = $this->xmlrpcCall("locstor.ping", array("par" => "foo"));
|
|
$this->logoutFromArchive($sessid);
|
|
return $result;
|
|
}
|
|
|
|
/**
|
|
* Start search job on network hub
|
|
*
|
|
* @param array $criteria
|
|
* LS criteria format (see localSearch)
|
|
* @param string $resultMode
|
|
* 'php' | 'xmlrpc'
|
|
* @param array $pars
|
|
* default parameters (optional, internal use)
|
|
* @return string
|
|
* transport token
|
|
*/
|
|
// function globalSearch($criteria, $resultMode='php', $pars=array())
|
|
// {
|
|
// global $CC_CONFIG, $CC_DBC;
|
|
// // testing of hub availability and hub account configuration.
|
|
// // it makes searchjob not async - should be removed for real async
|
|
// $r = $this->loginToArchive();
|
|
// if (PEAR::isError($r)) {
|
|
// switch(intval($r->getCode())) {
|
|
// case 802:
|
|
// return PEAR::raiseError("Can't login to Hub ({$r->getMessage()})", TRERR_XR_FAIL);
|
|
// case TRERR_XR_FAIL:
|
|
// return PEAR::raiseError("Can't connect to Hub ({$r->getMessage()})", TRERR_XR_FAIL);
|
|
// }
|
|
// return $r;
|
|
// }
|
|
// $this->logoutFromArchive($r);
|
|
// $criteria['resultMode'] = $resultMode;
|
|
// $localfile = tempnam($CC_CONFIG['transDir'], 'searchjob_');
|
|
// @chmod($localfile, 0660);
|
|
// $len = file_put_contents($localfile, serialize($criteria));
|
|
// $trec = $this->_uploadGeneralFileToHub($localfile, 'searchjob', $pars);
|
|
// if (PEAR::isError($trec)) {
|
|
// return $trec;
|
|
// }
|
|
// $this->startCronJobProcess($trec->trtok);
|
|
// return $trec->trtok;
|
|
// }
|
|
|
|
|
|
/**
|
|
* Get results from search job on network hub
|
|
*
|
|
* @param string $trtok
|
|
* transport token
|
|
* @param boolean $andClose
|
|
* if TRUE, close transport token
|
|
* @return array
|
|
* LS search result format (see localSearch)
|
|
*/
|
|
// function getSearchResults($trtok, $andClose=TRUE)
|
|
// {
|
|
// $trec = TransportRecord::recall($this, $trtok);
|
|
// if (PEAR::isError($trec)) {
|
|
// return $trec;
|
|
// }
|
|
// $row = $trec->row;
|
|
// switch ($st = $trec->getState()) {
|
|
// case "failed":
|
|
// return PEAR::raiseError(
|
|
// "Transport::getSearchResults:".
|
|
// " global search or results transport failed".
|
|
// " ({$trec->row['errmsg']})"
|
|
// );
|
|
// case "closed":
|
|
///*
|
|
// $res = file_get_contents($row['localfile']);
|
|
// $results = unserialize($res);
|
|
// return $results;
|
|
//*/
|
|
// return PEAR::raiseError(
|
|
// "Transport::getSearchResults:".
|
|
// " closed transport token ($trtok)", TRERR_TOK
|
|
// );
|
|
// case "finished":
|
|
// if ($row['direction'] == 'down') {
|
|
// // really finished
|
|
// $res = file_get_contents($row['localfile']);
|
|
// $results = unserialize($res);
|
|
// if ($andClose) {
|
|
// $ret = $this->xmlrpcCall('archive.downloadClose',
|
|
// array(
|
|
// 'token' => $row['pdtoken'] ,
|
|
// 'trtype' => $row['trtype'] ,
|
|
// ));
|
|
// if (PEAR::isError($ret)) {
|
|
// return $ret;
|
|
// }
|
|
// @unlink($row['localfile']);
|
|
// $r = $trec->close();
|
|
// if (PEAR::isError($r)) {
|
|
// return $r;
|
|
// }
|
|
// }
|
|
// return $results;
|
|
// }
|
|
// // otherwise not really finished - only request upload finished
|
|
// default:
|
|
// return PEAR::raiseError(
|
|
// "Transport::getSearchResults: not finished ($st)",
|
|
// TRERR_NOTFIN
|
|
// );
|
|
// }
|
|
// }
|
|
|
|
|
|
/* ------------------------ methods for ls-archive-format file transports */
|
|
/**
|
|
* Open async file transfer from local storageServer to network hub,
|
|
* file should be ls-archive-format file.
|
|
*
|
|
* @param string $filePath
|
|
* local path to uploaded file
|
|
* @param array $pars
|
|
* default parameters (optional, internal use)
|
|
* @return string
|
|
* transport token
|
|
*/
|
|
function uploadFile2Hub($filePath, $pars=array())
|
|
{
|
|
if (!file_exists($filePath)) {
|
|
return PEAR::raiseError(
|
|
"Transport::uploadFile2Hub: file not found ($filePath)"
|
|
);
|
|
}
|
|
$trec = $this->_uploadGeneralFileToHub($filePath, 'file', $pars);
|
|
if (PEAR::isError($trec)) {
|
|
return $trec;
|
|
}
|
|
$this->startCronJobProcess($trec->trtok);
|
|
return $trec->trtok;
|
|
}
|
|
|
|
|
|
/**
|
|
* Open async file transfer from network hub to local storageServer,
|
|
* file should be ls-archive-format file.
|
|
*
|
|
* @param string $url
|
|
* readable url
|
|
* @param string $chsum
|
|
* checksum from remote side
|
|
* @param int $size
|
|
* filesize from remote side
|
|
* @param array $pars
|
|
* default parameters (internal use)
|
|
* @return array
|
|
* trtok: string - transport token
|
|
* localfile: string - filepath of downloaded file
|
|
*/
|
|
function downloadFileFromHub($url, $chsum=NULL, $size=NULL, $pars=array())
|
|
{
|
|
global $CC_CONFIG, $CC_DBC;
|
|
$tmpn = tempnam($CC_CONFIG['transDir'], 'HITrans_');
|
|
$trec = TransportRecord::create($this, 'file', 'down',
|
|
array_merge(array(
|
|
'url' => $url,
|
|
'localfile' => $tmpn,
|
|
'expectedsum' => $chsum,
|
|
'expectedsize' => $size,
|
|
), $pars)
|
|
);
|
|
if (PEAR::isError($trec)) {
|
|
return $trec;
|
|
}
|
|
$this->startCronJobProcess($trec->trtok);
|
|
return array('trtok'=>$trec->trtok, 'localfile'=>$tmpn);
|
|
}
|
|
|
|
|
|
/**
|
|
* Get list of prepared transfers initiated by hub
|
|
*
|
|
* @return array
|
|
* array of structs/hasharrays with fields:
|
|
* trtok: string transport token
|
|
*/
|
|
function getHubInitiatedTransfers()
|
|
{
|
|
$ret = $this->xmlrpcCall('archive.listHubInitiatedTransfers',
|
|
array('target' => HOSTNAME));
|
|
if (PEAR::isError($ret)) {
|
|
return $ret;
|
|
}
|
|
$res = array();
|
|
foreach ($ret as $it) {
|
|
$res[] = array('trtok'=>$it['trtok']);
|
|
}
|
|
return $res;
|
|
}
|
|
|
|
|
|
/**
|
|
* Start of download initiated by hub
|
|
*
|
|
* @param int $uid
|
|
* local user id of transport owner
|
|
* (for downloading file to homedir in storage)
|
|
* @param string $rtrtok
|
|
* transport token obtained from the getHubInitiatedTransfers method
|
|
* @return string
|
|
* transport token
|
|
*/
|
|
function startHubInitiatedTransfer($uid, $rtrtok)
|
|
{
|
|
$ret = $this->xmlrpcCall('archive.listHubInitiatedTransfers',
|
|
array(
|
|
'target' => HOSTNAME,
|
|
'trtok' => $rtrtok,
|
|
));
|
|
if (PEAR::isError($ret)) {
|
|
return $ret;
|
|
}
|
|
if (count($ret) != 1) {
|
|
return PEAR::raiseError(
|
|
"Transport::startHubInitiatedTransfer:".
|
|
" wrong number of transports (".count($ret).")"
|
|
);
|
|
}
|
|
$ta = $ret[0];
|
|
// direction invertation to locstor point of view:
|
|
$direction = ( $ta['direction']=='up' ? 'down' : 'up' );
|
|
$gunid = $ta['gunid'];
|
|
switch ($direction) {
|
|
case "up":
|
|
switch ($ta['trtype']) {
|
|
case "audioclip":
|
|
case "playlist":
|
|
case "playlistPkg":
|
|
$trtok = $this->upload2Hub($gunid, TRUE,
|
|
array('rtrtok'=>$rtrtok));
|
|
if (PEAR::isError($trtok)) {
|
|
return $trtok;
|
|
}
|
|
break;
|
|
//case "searchjob": break; // not supported yet
|
|
//case "file": break; // probably unusable
|
|
default:
|
|
return PEAR::raiseError(
|
|
"Transport::startHubInitiatedTransfer:".
|
|
" wrong direction / transport type combination".
|
|
" ({$ta['direction']}/{$ta['trtype']})"
|
|
);
|
|
}
|
|
break;
|
|
case "down":
|
|
switch ($ta['trtype']) {
|
|
case "audioclip":
|
|
case "playlist":
|
|
case "playlistPkg":
|
|
$trtok = $this->downloadFromHub($uid, $gunid, TRUE,
|
|
array('rtrtok'=>$rtrtok));
|
|
if (PEAR::isError($trtok)) {
|
|
return $trtok;
|
|
}
|
|
break;
|
|
//case "searchjob": break; // probably unusable
|
|
case "file":
|
|
$r = $this->downloadFileFromHub(
|
|
$ta['url'], $ta['expectedsum'], $ta['expectedsize'],
|
|
array('rtrtok'=>$rtrtok));
|
|
if (PEAR::isError($r)) {
|
|
return $r;
|
|
}
|
|
extract($r); // trtok, localfile
|
|
break;
|
|
default:
|
|
return PEAR::raiseError(
|
|
"Transport::startHubInitiatedTransfer:".
|
|
" wrong direction / transport type combination".
|
|
" ({$ta['direction']}/{$ta['trtype']})"
|
|
);
|
|
}
|
|
break;
|
|
default:
|
|
return PEAR::raiseError(
|
|
"Transport::startHubInitiatedTransfer: ???"
|
|
);
|
|
}
|
|
$ret = $this->xmlrpcCall('archive.setHubInitiatedTransfer',
|
|
array(
|
|
'target' => HOSTNAME,
|
|
'trtok' => $rtrtok,
|
|
'state' => 'waiting',
|
|
));
|
|
if (PEAR::isError($ret)) {
|
|
return $ret;
|
|
}
|
|
$this->startCronJobProcess($trtok);
|
|
return $trtok;
|
|
}
|
|
|
|
|
|
/* =============================================== authentication methods */
|
|
|
|
/**
|
|
* Login to archive server
|
|
* (account info is taken from storageServer's config)
|
|
*
|
|
* @return string
|
|
* sessid or error
|
|
*/
|
|
function loginToArchive()
|
|
{
|
|
global $CC_CONFIG;
|
|
$res = $this->xmlrpcCall('locstor.login',
|
|
array(
|
|
'login' => $CC_CONFIG['archiveAccountLogin'],
|
|
'pass' => $CC_CONFIG['archiveAccountPass']
|
|
));
|
|
if (PEAR::isError($res)) {
|
|
return $res;
|
|
}
|
|
return $res['sessid'];
|
|
}
|
|
|
|
|
|
/**
|
|
* Logout from archive server
|
|
*
|
|
* @param unknown $sessid
|
|
* session id
|
|
* @return string
|
|
* Bye or error
|
|
*/
|
|
function logoutFromArchive($sessid)
|
|
{
|
|
$res = $this->xmlrpcCall('locstor.logout',
|
|
array('sessid'=>$sessid));
|
|
return $res;
|
|
}
|
|
|
|
|
|
/* ========================================================= cron methods */
|
|
/* -------------------------------------------------- common cron methods */
|
|
/**
|
|
* Main method for periodical transport tasks - called by cron
|
|
*
|
|
* @param string $direction
|
|
* optional
|
|
* @return boolean
|
|
* TRUE
|
|
*/
|
|
function cronMain($direction=NULL)
|
|
{
|
|
global $CC_CONFIG;
|
|
if (is_null($direction)) {
|
|
$r = $this->cronMain('up');
|
|
if (PEAR::isError($r)) {
|
|
return $r;
|
|
}
|
|
$r = $this->cronMain('down');
|
|
if (PEAR::isError($r)) {
|
|
return $r;
|
|
}
|
|
return TRUE;
|
|
}
|
|
// fetch all opened transports
|
|
$transports = $this->getTransports($direction);
|
|
if (PEAR::isError($transports)) {
|
|
$this->trLog("cronMain: DB error");
|
|
return FALSE;
|
|
}
|
|
if (count($transports) == 0) {
|
|
if (TR_LOG_LEVEL > 1) {
|
|
$this->trLog("cronMain: $direction - nothing to do.");
|
|
}
|
|
return TRUE;
|
|
}
|
|
// ping to archive server:
|
|
$r = $this->ping();
|
|
chdir($CC_CONFIG['transDir']);
|
|
// for all opened transports:
|
|
foreach ($transports as $i => $row) {
|
|
$r = $this->startCronJobProcess($row['trtok']);
|
|
} // foreach transports
|
|
return TRUE;
|
|
}
|
|
|
|
|
|
/**
|
|
* Cron job process starter
|
|
*
|
|
* @param string $trtok
|
|
* transport token
|
|
* @return boolean
|
|
* status
|
|
*/
|
|
function startCronJobProcess($trtok)
|
|
{
|
|
global $CC_CONFIG, $CC_DBC;
|
|
if (TR_LOG_LEVEL > 2) {
|
|
$redirect = $CC_CONFIG['transDir']."/debug.log";
|
|
} else {
|
|
$redirect = "/dev/null";
|
|
}
|
|
$redirect_escaped = escapeshellcmd($redirect);
|
|
$command = "{$this->cronJobScript} {$trtok}";
|
|
$command_escaped = escapeshellcmd($command);
|
|
$command_final = "$command_escaped >> $redirect_escaped 2>&1 &";
|
|
$res = system($command_final, $status);
|
|
if ($res === FALSE) {
|
|
$this->trLog(
|
|
"cronMain: Error on execute cronJobScript with trtok {$trtok}"
|
|
);
|
|
return FALSE;
|
|
}
|
|
return TRUE;
|
|
}
|
|
|
|
|
|
/**
|
|
* Dynamic method caller - wrapper
|
|
*
|
|
* @param string $trtok
|
|
* transport token
|
|
* @return mixed
|
|
* inherited from called method
|
|
*/
|
|
function cronCallMethod($trtok)
|
|
{
|
|
global $CC_CONFIG;
|
|
$trec = TransportRecord::recall($this, $trtok);
|
|
if (PEAR::isError($trec)) {
|
|
return $trec;
|
|
}
|
|
$row = $trec->row;
|
|
$state = $row['state'];
|
|
|
|
$states = array('init'=>'init',
|
|
'pending'=>'pending',
|
|
'waiting'=>'waiting',
|
|
'finished'=>'finished',
|
|
'failed'=>'failed',
|
|
'closed'=>'closed');
|
|
$directions = array('up'=>'upload', 'down'=>'download');
|
|
// method name construction:
|
|
$mname = "cron";
|
|
if (isset($directions[$row['direction']])) {
|
|
$mname .= ucfirst($directions[$row['direction']]);
|
|
} else {
|
|
return PEAR::raiseError(
|
|
"Transport::cronCallMethod: invalid direction ({$row['direction']})"
|
|
);
|
|
}
|
|
if (isset($states[$state])) {
|
|
$mname .= ucfirst($states[$state]);
|
|
} else {
|
|
return PEAR::raiseError(
|
|
"Transport::cronCallMethod: invalid state ({$state})"
|
|
);
|
|
}
|
|
switch ($state) {
|
|
// do nothing if closed, penfing or failed:
|
|
case 'closed': // excluded in SQL query too, but let check it here
|
|
case 'failed': // -"-
|
|
case 'pending':
|
|
case 'paused':
|
|
return TRUE;
|
|
case 'waiting':
|
|
require_once('Prefs.php');
|
|
$pr = new Prefs($this->gb);
|
|
$group = $CC_CONFIG['StationPrefsGr'];
|
|
$key = 'TransportsDenied';
|
|
$res = $pr->loadGroupPref($group, $key);
|
|
if (PEAR::isError($res)) {
|
|
if ($res->getCode() !== GBERR_PREF) {
|
|
return $res;
|
|
} else {
|
|
$res = FALSE; // default
|
|
}
|
|
}
|
|
// transfers turned off
|
|
// if ($res) { return TRUE; break; }
|
|
if ($res) {
|
|
return PEAR::raiseError(
|
|
"Transport::cronCallMethod: transfers turned off"
|
|
);
|
|
}
|
|
// NO break here!
|
|
default:
|
|
if (method_exists($this, $mname)) {
|
|
// lock the job:
|
|
$pid = getmypid();
|
|
$r = $trec->setLock(TRUE, $pid);
|
|
if (PEAR::isError($r)) {
|
|
return $r;
|
|
}
|
|
$trec = TransportRecord::recall($this, $trtok);
|
|
if (PEAR::isError($trec)) {
|
|
$trec->setLock(FALSE);
|
|
return $trec;
|
|
}
|
|
$row = $trec->row;
|
|
$state = $row['state'];
|
|
|
|
// login to archive server:
|
|
$r = $this->loginToArchive();
|
|
if (PEAR::isError($r)) {
|
|
$r2 = $trec->setLock(FALSE);
|
|
return $r;
|
|
}
|
|
$asessid = $r;
|
|
// method call:
|
|
if (TR_LOG_LEVEL > 2) {
|
|
$this->trLog("cronCallMethod($pid): $mname($trtok) >");
|
|
}
|
|
$ret = call_user_func(array($this, $mname), $row, $asessid);
|
|
if (PEAR::isError($ret)) {
|
|
$trec->setLock(FALSE);
|
|
return $this->_failFatal($ret, $trec);
|
|
}
|
|
if (TR_LOG_LEVEL > 2) {
|
|
$this->trLog("cronCallMethod($pid): $mname($trtok) <");
|
|
}
|
|
// unlock the job:
|
|
$r = $trec->setLock(FALSE);
|
|
if (PEAR::isError($r)) {
|
|
return $r;
|
|
}
|
|
// logout:
|
|
$r = $this->logoutFromArchive($asessid);
|
|
if (PEAR::isError($r)) {
|
|
return $r;
|
|
}
|
|
return $ret;
|
|
} else {
|
|
return PEAR::raiseError(
|
|
"Transport::cronCallMethod: unknown method ($mname)"
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
* Upload initialization
|
|
*
|
|
* @param array $row
|
|
* row from getTransport results
|
|
* @param string $asessid
|
|
* session id (from network hub)
|
|
* @return mixed
|
|
* boolean TRUE or error object
|
|
*/
|
|
function cronUploadInit($row, $asessid)
|
|
{
|
|
$trtok = $row['trtok'];
|
|
$trec = TransportRecord::recall($this, $trtok);
|
|
if (PEAR::isError($trec)) {
|
|
return $trec;
|
|
}
|
|
$ret = $this->xmlrpcCall('archive.uploadOpen',
|
|
array(
|
|
'sessid' => $asessid ,
|
|
'chsum' => $row['expectedsum'],
|
|
));
|
|
if (PEAR::isError($ret)) {
|
|
return $ret;
|
|
}
|
|
$r = $trec->setState('waiting',
|
|
array('url'=>$ret['url'], 'pdtoken'=>$ret['token']));
|
|
if (PEAR::isError($r)) {
|
|
return $r;
|
|
}
|
|
return TRUE;
|
|
}
|
|
|
|
|
|
/**
|
|
* Download initialization
|
|
*
|
|
* @param array $row
|
|
* row from getTransport results
|
|
* @param string $asessid
|
|
* session id (from network hub)
|
|
* @return mixed
|
|
* boolean TRUE or error object
|
|
*/
|
|
function cronDownloadInit($row, $asessid)
|
|
{
|
|
global $CC_CONFIG;
|
|
$trtok = $row['trtok'];
|
|
$trec = TransportRecord::recall($this, $trtok);
|
|
if (PEAR::isError($trec)) {
|
|
return $trec;
|
|
}
|
|
$ret = $this->xmlrpcCall('archive.downloadOpen',
|
|
array(
|
|
'sessid'=> $asessid,
|
|
'trtype'=> $row['trtype'],
|
|
'pars'=>array(
|
|
'gunid' => $row['gunid'],
|
|
'token' => $row['pdtoken'],
|
|
),
|
|
));
|
|
if (PEAR::isError($ret)) {
|
|
return $ret;
|
|
}
|
|
$trtype = $ret['trtype'];
|
|
$title = $ret['title'];
|
|
$pars = array();
|
|
switch ($trtype) {
|
|
// case "searchjob":
|
|
// $r = $trec->setState('waiting', $pars);
|
|
// break;
|
|
case "file":
|
|
$r = $trec->setState('waiting',array_merge($pars, array(
|
|
'trtype'=>$trtype,
|
|
'url'=>$ret['url'], 'pdtoken'=>$ret['token'],
|
|
'expectedsum'=>$ret['chsum'], 'expectedsize'=>$ret['size'],
|
|
'fname'=>$ret['filename'],
|
|
'localfile'=>$CC_CONFIG['transDir']."/$trtok",
|
|
)));
|
|
break;
|
|
case "audioclip":
|
|
$mdtrec = TransportRecord::create($this, 'metadata', 'down',
|
|
array('gunid'=>$row['gunid'], 'uid'=>$row['uid'], )
|
|
);
|
|
if (PEAR::isError($mdtrec)) {
|
|
return $mdtrec;
|
|
}
|
|
$this->startCronJobProcess($mdtrec->trtok);
|
|
$pars = array('mdtrtok'=>$mdtrec->trtok);
|
|
// NO break here !
|
|
default:
|
|
$r = $trec->setState('waiting',array_merge($pars, array(
|
|
'trtype'=>$trtype,
|
|
'url'=>$ret['url'], 'pdtoken'=>$ret['token'],
|
|
'expectedsum'=>$ret['chsum'], 'expectedsize'=>$ret['size'],
|
|
'fname'=>$ret['filename'], 'title'=>$title,
|
|
'localfile'=>$CC_CONFIG['transDir']."/$trtok",
|
|
)));
|
|
}
|
|
if (PEAR::isError($r)) {
|
|
return $r;
|
|
}
|
|
return TRUE;
|
|
}
|
|
|
|
|
|
/**
|
|
* Upload next part of transported file
|
|
*
|
|
* @param array $row
|
|
* row from getTransport results
|
|
* @param string $asessid
|
|
* session id (from network hub)
|
|
* @return mixed
|
|
* boolean TRUE or error object
|
|
*/
|
|
function cronUploadWaiting($row, $asessid)
|
|
{
|
|
$trtok = $row['trtok'];
|
|
$check = $this->uploadCheck($row['pdtoken']);
|
|
if (PEAR::isError($check)) {
|
|
return $check;
|
|
}
|
|
// test filesize
|
|
if (!file_exists($row['localfile'])) {
|
|
return PEAR::raiseError("Transport::cronUploadWaiting:".
|
|
" file being uploaded does not exist! ({$row['localfile']})"
|
|
);
|
|
}
|
|
$trec = TransportRecord::recall($this, $trtok);
|
|
if (PEAR::isError($trec)) {
|
|
return $trec;
|
|
}
|
|
$size = escapeshellarg($check['size']);
|
|
$localfile = escapeshellarg($row['localfile']);
|
|
$url = escapeshellarg($row['url']);
|
|
$command =
|
|
"curl -f -s -C $size --max-time {$this->upTrMaxTime}".
|
|
" --speed-time {$this->upTrSpeedTime}".
|
|
" --speed-limit {$this->upTrSpeedLimit}".
|
|
" --connect-timeout {$this->upTrConnectTimeout}".
|
|
(!is_null($this->upLimitRate)?
|
|
" --limit-rate {$this->upLimitRate}" : "").
|
|
" -T $localfile $url";
|
|
$r = $trec->setState('pending', array(), 'waiting');
|
|
if (PEAR::isError($r)) {
|
|
return $r;
|
|
}
|
|
if ($r === FALSE) {
|
|
return TRUE;
|
|
}
|
|
$res = system($command, $status);
|
|
|
|
// leave paused and closed transports
|
|
$trec2 = TransportRecord::recall($this, $trtok);
|
|
if (PEAR::isError($trec)) {
|
|
return $trec;
|
|
}
|
|
$state2 = $trec2->row['state'];
|
|
if ($state2 == 'paused' || $state2 == 'closed' ) {
|
|
return TRUE;
|
|
}
|
|
|
|
|
|
// status 18 - Partial file. Only a part of the file was transported.
|
|
// status 28 - Timeout. Too long/slow upload, try to resume next time rather.
|
|
// status 6 - Couldn't resolve host.
|
|
// status 7 - Failed to connect to host.
|
|
// status 56 - Failure in receiving network data. Important - this status is
|
|
// returned if file is locked on server side
|
|
if ($status == 0 || $status == 18 || $status == 28 || $status == 6 || $status == 7 || $status == 56) {
|
|
$check = $this->uploadCheck($row['pdtoken']);
|
|
if (PEAR::isError($check)) {
|
|
return $check;
|
|
}
|
|
// test checksum
|
|
if ($check['status'] == TRUE) {
|
|
// finished
|
|
$r = $trec->setState('finished',
|
|
array('realsum'=>$check['realsum'], 'realsize'=>$check['size']));
|
|
if (PEAR::isError($r)) {
|
|
return $r;
|
|
}
|
|
} else {
|
|
if (intval($check['size']) < $row['expectedsize']) {
|
|
$r = $trec->setState('waiting',
|
|
array('realsum'=>$check['realsum'], 'realsize'=>$check['size']));
|
|
if (PEAR::isError($r)) {
|
|
return $r;
|
|
}
|
|
} else {
|
|
// wrong md5 at finish - TODO: start again
|
|
// $this->xmlrpcCall('archive.uploadReset', array());
|
|
$trec->fail('file uploaded with bad md5');
|
|
return PEAR::raiseError("Transport::cronUploadWaiting:".
|
|
" file uploaded with bad md5 ".
|
|
"($trtok: {$check['realsum']}/{$check['expectedsum']})"
|
|
);
|
|
}
|
|
}
|
|
} else {
|
|
return PEAR::raiseError("Transport::cronUploadWaiting:".
|
|
" wrong return status from curl: $status on $url".
|
|
"($trtok)"
|
|
);
|
|
}
|
|
return TRUE;
|
|
}
|
|
|
|
|
|
/**
|
|
* Download next part of transported file
|
|
*
|
|
* @param array $row
|
|
* row from getTransport results
|
|
* @param string $asessid
|
|
* session id (from network hub)
|
|
* @return mixed
|
|
* boolean TRUE or error object
|
|
*/
|
|
function cronDownloadWaiting($row, $asessid)
|
|
{
|
|
$trtok = $row['trtok'];
|
|
// wget the file
|
|
$trec = TransportRecord::recall($this, $trtok);
|
|
if (PEAR::isError($trec)) {
|
|
return $trec;
|
|
}
|
|
$localfile = escapeshellarg($row['localfile']);
|
|
$url = escapeshellarg($row['url']);
|
|
$command =
|
|
"wget -q -c".
|
|
" --read-timeout={$this->downTimeout}".
|
|
" --waitretry={$this->downWaitretry}".
|
|
" -t {$this->downRetries}".
|
|
(!is_null($this->downLimitRate)?
|
|
" --limit-rate={$this->downLimitRate}" : "").
|
|
" -O $localfile $url"
|
|
;
|
|
$r = $trec->setState('pending', array(), 'waiting');
|
|
if (PEAR::isError($r)) {
|
|
return $r;
|
|
}
|
|
if ($r === FALSE) {
|
|
return TRUE;
|
|
}
|
|
$res = system($command, $status);
|
|
|
|
// leave paused and closed transports
|
|
$trec2 = TransportRecord::recall($this, $trtok);
|
|
if (PEAR::isError($trec)) {
|
|
return $trec;
|
|
}
|
|
$state2 = $trec2->row['state'];
|
|
if ($state2 == 'paused' || $state2 == 'closed' ) {
|
|
return TRUE;
|
|
}
|
|
|
|
// check consistency
|
|
$size = filesize($row['localfile']);
|
|
if ($size < $row['expectedsize']) {
|
|
// not finished - return to the 'waiting' state
|
|
$r = $trec->setState('waiting', array('realsize'=>$size));
|
|
if (PEAR::isError($r)) {
|
|
return $r;
|
|
}
|
|
} elseif ($size >= $row['expectedsize']) {
|
|
$chsum = $this->_chsum($row['localfile']);
|
|
if ($chsum == $row['expectedsum']) {
|
|
// mark download as finished
|
|
$r = $trec->setState('finished',
|
|
array('realsum'=>$chsum, 'realsize'=>$size));
|
|
if (PEAR::isError($r)) {
|
|
return $r;
|
|
}
|
|
} else {
|
|
// bad checksum, retry from the scratch
|
|
@unlink($row['localfile']);
|
|
$r = $trec->setState('waiting',
|
|
array('realsum'=>$chsum, 'realsize'=>$size));
|
|
if (PEAR::isError($r)) {
|
|
return $r;
|
|
}
|
|
}
|
|
}
|
|
return TRUE;
|
|
}
|
|
|
|
|
|
/**
|
|
* Finish the upload
|
|
*
|
|
* @param array $row
|
|
* row from getTransport results
|
|
* @param string $asessid
|
|
* session id (from network hub)
|
|
* @return mixed
|
|
* boolean TRUE or error object
|
|
*/
|
|
function cronUploadFinished($row, $asessid)
|
|
{
|
|
global $CC_CONFIG;
|
|
$trtok = $row['trtok'];
|
|
$trec = TransportRecord::recall($this, $trtok);
|
|
if (PEAR::isError($trec)) {
|
|
return $trec;
|
|
}
|
|
// don't close metadata transport - audioclip will close it
|
|
if ($row['trtype'] == 'metadata') {
|
|
return TRUE;
|
|
}
|
|
// handle metadata transport on audioclip trtype:
|
|
if ($row['trtype'] == 'audioclip') {
|
|
$mdtrec = TransportRecord::recall($this, $trec->row['mdtrtok']);
|
|
if (PEAR::isError($mdtrec)) {
|
|
return $mdtrec;
|
|
}
|
|
switch ($mdtrec->row['state']) {
|
|
case 'failed':
|
|
case 'closed':
|
|
return PEAR::raiseError("Transport::cronUploadFinished:".
|
|
" metadata transport in wrong state: {$mdtrec->row['state']}".
|
|
" ({$this->trtok})"
|
|
);
|
|
break;
|
|
// don't close transport with nonfinished metadata transport:
|
|
case 'init':
|
|
case 'waiting':
|
|
case 'pending':
|
|
case 'paused':
|
|
return TRUE;
|
|
default: // finished - ok close parent transport
|
|
$mdpdtoken = $mdtrec->row['pdtoken'];
|
|
}
|
|
} else {
|
|
$mdpdtoken = NULL;
|
|
}
|
|
$ret = $this->xmlrpcCall('archive.uploadClose',
|
|
array(
|
|
'token' => $row['pdtoken'] ,
|
|
'trtype' => $row['trtype'],
|
|
'pars' => array(
|
|
'gunid' => $row['gunid'],
|
|
'name' => $row['fname'],
|
|
'mdpdtoken' => $mdpdtoken,
|
|
),
|
|
));
|
|
if (PEAR::isError($ret)) {
|
|
if ($row['trtype'] == 'audioclip') {
|
|
$r2 = $mdtrec->close();
|
|
}
|
|
return $ret;
|
|
}
|
|
|
|
// if ($row['trtype'] == 'searchjob') {
|
|
// @unlink($row['localfile']);
|
|
// $r = $trec->setState('init', array(
|
|
// 'direction' => 'down',
|
|
// 'pdtoken' => $ret['token'],
|
|
// 'expectedsum' => $ret['chsum'],
|
|
// 'expectedsize' => $ret['size'],
|
|
// 'url' => $ret['url'],
|
|
// 'realsize' => 0,
|
|
// ));
|
|
// $this->startCronJobProcess($trec->trtok);
|
|
// } else {
|
|
$r = $trec->close();
|
|
// }
|
|
if (PEAR::isError($r)) {
|
|
return $r;
|
|
}
|
|
switch ($row['trtype']) {
|
|
case 'audioclip':
|
|
// close metadata transport:
|
|
$r = $mdtrec->close();
|
|
if (PEAR::isError($r)) {
|
|
return $r;
|
|
}
|
|
break;
|
|
case 'playlistPkg':
|
|
// remove exported playlist (playlist with content)
|
|
$ep = $row['localfile'];
|
|
@unlink($ep);
|
|
if (preg_match("|/(plExport_[^\.]+)\.lspl$|", $ep, $va)) {
|
|
list(,$tmpn) = $va; $tmpn = $CC_CONFIG['transDir']."/$tmpn";
|
|
if (file_exists($tmpn)) {
|
|
@unlink($tmpn);
|
|
}
|
|
}
|
|
|
|
break;
|
|
default:
|
|
}
|
|
|
|
return TRUE;
|
|
}
|
|
|
|
|
|
/**
|
|
* Finish the download
|
|
*
|
|
* @param array $row
|
|
* row from getTransport results
|
|
* @param string $asessid
|
|
* session id (from network hub)
|
|
* @return mixed
|
|
* boolean TRUE or error object
|
|
*/
|
|
function cronDownloadFinished($row, $asessid)
|
|
{
|
|
$trtok = $row['trtok'];
|
|
$trec = TransportRecord::recall($this, $trtok);
|
|
if (PEAR::isError($trec)) {
|
|
return $trec;
|
|
}
|
|
switch ($row['trtype']) {
|
|
case "audioclip":
|
|
$mdtrtok = $trec->row['mdtrtok'];
|
|
$mdtrec = TransportRecord::recall($this, $mdtrtok);
|
|
if (PEAR::isError($mdtrec)) {
|
|
return $mdtrec;
|
|
}
|
|
$pid = getmypid();
|
|
$r = $mdtrec->setLock(TRUE, $pid);
|
|
if (PEAR::isError($r)) {
|
|
return $r;
|
|
}
|
|
switch ($mdtrec->row['state']) {
|
|
// don't close transport with nonfinished metadata transport:
|
|
case 'init':
|
|
case 'waiting':
|
|
case 'pending':
|
|
case 'paused':
|
|
$r = $mdtrec->setLock(FALSE);
|
|
if (PEAR::isError($r)) {
|
|
return $r;
|
|
}
|
|
return TRUE;
|
|
case 'finished': // metadata finished, close main transport
|
|
$values = array(
|
|
"filename" => $row['fname'],
|
|
"filepath" => $trec->row['localfile'],
|
|
"metadata" => $mdtrec->row['localfile'],
|
|
"gunid" => $row['gunid'],
|
|
"filetype" => "audioclip"
|
|
);
|
|
$storedFile = StoredFile::Insert($values);
|
|
if (PEAR::isError($storedFile)) {
|
|
$mdtrec->setLock(FALSE);
|
|
return $storedFile;
|
|
}
|
|
$res = $storedFile->getId();
|
|
$ret = $this->xmlrpcCall('archive.downloadClose',
|
|
array(
|
|
'token' => $mdtrec->row['pdtoken'] ,
|
|
'trtype' => 'metadata' ,
|
|
));
|
|
if (PEAR::isError($ret)) {
|
|
$mdtrec->setLock(FALSE);
|
|
return $ret;
|
|
}
|
|
$r = $mdtrec->close();
|
|
if (PEAR::isError($r)) {
|
|
$r2 = $mdtrec->setLock(FALSE);
|
|
return $r;
|
|
}
|
|
@unlink($trec->row['localfile']);
|
|
@unlink($mdtrec->row['localfile']);
|
|
break;
|
|
default:
|
|
$r = $mdtrec->setLock(FALSE);
|
|
return PEAR::raiseError("Transport::cronDownloadFinished:".
|
|
" metadata transport in wrong state: {$mdtrec->row['state']}".
|
|
" ({$this->trtok})"
|
|
);
|
|
}
|
|
$r = $mdtrec->setLock(FALSE);
|
|
if (PEAR::isError($r)) {
|
|
return $r;
|
|
}
|
|
break;
|
|
case "metadata":
|
|
// case "searchjob":
|
|
return TRUE; // don't close - getSearchResults should close it
|
|
break;
|
|
}
|
|
$ret = $this->xmlrpcCall('archive.downloadClose',
|
|
array(
|
|
'token' => $row['pdtoken'] ,
|
|
'trtype' => $row['trtype'] ,
|
|
));
|
|
if (PEAR::isError($ret)) {
|
|
return $ret;
|
|
}
|
|
switch ($row['trtype']) {
|
|
case "playlist":
|
|
$values = array(
|
|
"filename" => $row['fname'],
|
|
"metadata" => $trec->row['localfile'],
|
|
"gunid" => $row['gunid'],
|
|
"filetype" => "playlist"
|
|
);
|
|
$storedFile = StoredFile::Insert($values);
|
|
if (PEAR::isError($storedFile)) {
|
|
return $storedFile;
|
|
}
|
|
$res = $storedFile->getId();
|
|
@unlink($row['localfile']);
|
|
break;
|
|
case "playlistPkg":
|
|
$subjid = $trec->row['uid'];
|
|
$fname = $trec->row['localfile'];
|
|
$res = $this->gb->bsImportPlaylist($fname, $subjid);
|
|
if (PEAR::isError($res)) {
|
|
return $res;
|
|
}
|
|
@unlink($fname);
|
|
break;
|
|
case "audioclip":
|
|
case "metadata":
|
|
// case "searchjob":
|
|
case "file":
|
|
break;
|
|
default:
|
|
return PEAR::raiseError("DEBUG: NotImpl ".var_export($row,TRUE));
|
|
}
|
|
if (!is_null($rtrtok = $trec->row['rtrtok'])) {
|
|
$ret = $this->xmlrpcCall('archive.setHubInitiatedTransfer',
|
|
array(
|
|
'target' => HOSTNAME,
|
|
'trtok' => $rtrtok,
|
|
'state' => 'closed',
|
|
));
|
|
if (PEAR::isError($ret)) {
|
|
return $ret;
|
|
}
|
|
}
|
|
$r = $trec->close();
|
|
if (PEAR::isError($r)) {
|
|
return $r;
|
|
}
|
|
return TRUE;
|
|
}
|
|
|
|
|
|
/* ==================================================== auxiliary methods */
|
|
/**
|
|
* Prepare upload for general file
|
|
*
|
|
* @param string $fpath
|
|
* local filepath of uploaded file
|
|
* @param string $trtype
|
|
* transport type
|
|
* @param array $pars
|
|
* default parameters (optional, internal use)
|
|
* @return object - transportRecord instance
|
|
*/
|
|
function _uploadGeneralFileToHub($fpath, $trtype, $pars=array())
|
|
{
|
|
$chsum = $this->_chsum($fpath);
|
|
$size = filesize($fpath);
|
|
$trec = TransportRecord::create($this, $trtype, 'up',
|
|
array_merge(array(
|
|
'localfile'=>$fpath, 'fname'=>basename($fpath),
|
|
'expectedsum'=>$chsum, 'expectedsize'=>$size
|
|
), $pars)
|
|
);
|
|
if (PEAR::isError($trec)) {
|
|
return $trec;
|
|
}
|
|
return $trec;
|
|
}
|
|
|
|
|
|
/**
|
|
* Create new transport token
|
|
*
|
|
* @return string
|
|
* transport token
|
|
*/
|
|
function _createTransportToken()
|
|
{
|
|
$ip = (isset($_SERVER['SERVER_ADDR']) ? $_SERVER['SERVER_ADDR'] : '');
|
|
$initString = microtime().$ip.rand()."org.mdlf.campcaster";
|
|
$hash = md5($initString);
|
|
$res = substr($hash, 0, 16);
|
|
return $res;
|
|
}
|
|
|
|
|
|
/**
|
|
* Get all relevant transport records
|
|
*
|
|
* @param string $direction
|
|
* 'up' | 'down'
|
|
* @param string $target
|
|
* target hostname
|
|
* @param string $trtok
|
|
* transport token for specific query
|
|
* @return array
|
|
* array of transportRecords (as hasharrays)
|
|
*/
|
|
function getTransports($direction=NULL, $target=NULL, $trtok=NULL)
|
|
{
|
|
global $CC_CONFIG, $CC_DBC;
|
|
switch ($direction) {
|
|
case 'up':
|
|
$dirCond = "direction='up' AND";
|
|
break;
|
|
case 'down':
|
|
$dirCond = "direction='down' AND";
|
|
break;
|
|
default:
|
|
$dirCond = '';
|
|
break;
|
|
}
|
|
if (is_null($target)) {
|
|
$targetCond = "";
|
|
} else {
|
|
$targetCond = "target='$target' AND";
|
|
}
|
|
if (is_null($trtok)) {
|
|
$trtokCond = "";
|
|
} else {
|
|
$trtokCond = "trtok='$trtok' AND";
|
|
}
|
|
$rows = $CC_DBC->getAll("
|
|
SELECT
|
|
id, trtok, state, trtype, direction,
|
|
to_hex(gunid)as gunid, to_hex(pdtoken)as pdtoken,
|
|
fname, localfile, expectedsum, expectedsize, url,
|
|
uid, target
|
|
FROM ".$CC_CONFIG['transTable']."
|
|
WHERE $dirCond $targetCond $trtokCond
|
|
state not in ('closed', 'failed', 'paused')
|
|
ORDER BY start DESC
|
|
");
|
|
if (PEAR::isError($rows)) {
|
|
return $rows;
|
|
}
|
|
foreach ($rows as $i => $row) {
|
|
$rows[$i]['pdtoken'] = StoredFile::NormalizeGunid($row['pdtoken']);
|
|
$rows[$i]['gunid'] = StoredFile::NormalizeGunid($row['gunid']);
|
|
}
|
|
return $rows;
|
|
}
|
|
|
|
|
|
/**
|
|
* Check remote state of uploaded file
|
|
*
|
|
* @param string $pdtoken
|
|
* put/download token (from network hub)
|
|
* @return array
|
|
* hash: chsum, size, url
|
|
*/
|
|
function uploadCheck($pdtoken)
|
|
{
|
|
$ret = $this->xmlrpcCall('archive.uploadCheck',
|
|
array('token'=>$pdtoken));
|
|
return $ret;
|
|
}
|
|
|
|
|
|
/**
|
|
* Ping to remote Campcaster server
|
|
*
|
|
* @return string
|
|
* network hub response or error object
|
|
*/
|
|
function ping()
|
|
{
|
|
$res = $this->xmlrpcCall('ping',
|
|
array('par'=>'ping_'.date('H:i:s')));
|
|
return $res;
|
|
}
|
|
|
|
|
|
/**
|
|
* XMLRPC call to network hub.
|
|
*
|
|
* @param string $method
|
|
* method name
|
|
* @param array $pars
|
|
* call parameters
|
|
* @return mixed
|
|
* response
|
|
*/
|
|
function xmlrpcCall($method, $pars=array())
|
|
{
|
|
global $CC_CONFIG;
|
|
$xrp = XML_RPC_encode($pars);
|
|
|
|
$pr = new Prefs($this->gb);
|
|
$group = $CC_CONFIG["StationPrefsGr"];
|
|
$key = 'archiveServerLocation';
|
|
$archiveUrl = $pr->loadGroupPref($group, $key, false);
|
|
|
|
if ($archiveUrl) {
|
|
$archiveUrlInfo = parse_url($archiveUrl);
|
|
if ($archiveUrlInfo['port']) {
|
|
$port = $archiveUrlInfo['port'];
|
|
}
|
|
else {
|
|
$port = 80;
|
|
}
|
|
|
|
$c = new XML_RPC_Client($archiveUrlInfo['path'], $archiveUrlInfo['host'], $port);
|
|
}
|
|
else {
|
|
$c = new XML_RPC_Client(
|
|
$CC_CONFIG['archiveUrlPath']."/".$CC_CONFIG['archiveXMLRPC'],
|
|
$CC_CONFIG['archiveUrlHost'], $CC_CONFIG['archiveUrlPort']
|
|
);
|
|
}
|
|
|
|
$f = new XML_RPC_Message($method, array($xrp));
|
|
$r = $c->send($f);
|
|
if (!$r) {
|
|
return PEAR::raiseError("XML-RPC request failed", TRERR_XR_FAIL);
|
|
} elseif ($r->faultCode() > 0) {
|
|
return PEAR::raiseError($r->faultString(), $r->faultCode());
|
|
// return PEAR::raiseError($r->faultString().
|
|
// " (code ".$r->faultCode().")", TRERR_XR_FAIL);
|
|
} else {
|
|
$v = $r->value();
|
|
return XML_RPC_decode($v);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
* Checksum of local file
|
|
*
|
|
* @param string $fpath
|
|
* local filepath
|
|
* @return string
|
|
* checksum
|
|
*/
|
|
function _chsum($fpath)
|
|
{
|
|
return md5_file($fpath);
|
|
}
|
|
|
|
|
|
/**
|
|
* Check exception and eventually mark transport as failed
|
|
*
|
|
* @param mixed $res
|
|
* result object to be checked
|
|
* @param unknown $trec
|
|
* transport record object
|
|
* @return unknown
|
|
*/
|
|
function _failFatal($res, $trec)
|
|
{
|
|
if (PEAR::isError($res)) {
|
|
switch ($res->getCode()) {
|
|
// non fatal:
|
|
case TRERR_XR_FAIL:
|
|
break;
|
|
// fatal:
|
|
default:
|
|
$trec->fail('', $res);
|
|
}
|
|
}
|
|
return $res;
|
|
}
|
|
|
|
|
|
/**
|
|
* Clean up transport jobs
|
|
*
|
|
* @param string $interval
|
|
* psql time interval - older closed jobs will be deleted
|
|
* @param boolean $forced
|
|
* if true, delete non-closed jobs too
|
|
* @return boolean true or error
|
|
*/
|
|
function _cleanUp($interval='1 minute'/*'1 hour'*/, $forced=FALSE)
|
|
{
|
|
global $CC_CONFIG, $CC_DBC;
|
|
$cond = ($forced ? '' : " AND state='closed' AND lock = 'N'");
|
|
$r = $CC_DBC->query("
|
|
DELETE FROM ".$CC_CONFIG['transTable']."
|
|
WHERE ts < now() - interval '$interval'".$cond
|
|
);
|
|
if (PEAR::isError($r)) {
|
|
return $r;
|
|
}
|
|
return TRUE;
|
|
}
|
|
|
|
|
|
/**
|
|
* Logging wrapper for PEAR error object
|
|
*
|
|
* @param string $txt
|
|
* log message
|
|
* @param PEAR_Error $eo
|
|
* @param array $row
|
|
* array returned from getRow
|
|
* @return mixed
|
|
* void or error object
|
|
*/
|
|
function trLogPear($txt, $eo, $row=NULL)
|
|
{
|
|
$msg = $txt.$eo->getMessage()." ".$eo->getUserInfo().
|
|
" [".$eo->getCode()."]";
|
|
if (!is_null($row)) {
|
|
$trec = TransportRecord::recall($this, $row['trtok']);
|
|
if (!PEAR::isError($trec)) {
|
|
$trec->setState('failed', array('errmsg'=>$msg));
|
|
}
|
|
$msg .= "\n ".serialize($row);
|
|
}
|
|
$this->trLog($msg);
|
|
}
|
|
|
|
|
|
/**
|
|
* Logging for debug transports
|
|
*
|
|
* @param string $msg
|
|
* log message
|
|
* @return mixed
|
|
* void or error object
|
|
*/
|
|
function trLog($msg)
|
|
{
|
|
global $CC_CONFIG;
|
|
$logfile = $CC_CONFIG['transDir']."/activity.log";
|
|
if (FALSE === ($fp = fopen($logfile, "a"))) {
|
|
return PEAR::raiseError(
|
|
"Transport::trLog: Can't write to log ($logfile)"
|
|
);
|
|
}
|
|
flock($fp,LOCK_SH);
|
|
fputs($fp, "---".date("H:i:s")."---\n $msg\n");
|
|
flock($fp,LOCK_UN);
|
|
fclose($fp);
|
|
}
|
|
|
|
|
|
/* ====================================================== install methods */
|
|
/**
|
|
* Delete all transports
|
|
*
|
|
* @return mixed
|
|
* void or error object
|
|
*/
|
|
function resetData()
|
|
{
|
|
global $CC_CONFIG, $CC_DBC;
|
|
return $CC_DBC->query("DELETE FROM ".$CC_CONFIG['transTable']);
|
|
}
|
|
|
|
}
|
|
|
|
?>
|