libretime/backend/Transport.php

1833 lines
60 KiB
PHP

<?php
define('TRERR_', 70);
define('TRERR_MD', 71);
define('TRERR_TOK', 72);
define('TRERR_NOTFIN', 73);
define('TRERR_XR_FAIL', 74);
#define('TR_LOG_LEVEL', 0);
define('TR_LOG_LEVEL', 10);
define('HOSTNAME', 'storageServer');
include_once("XML/RPC.php");
include_once("TransportRecord.php");
/**
* Class for handling file tranport between StorageServer and ArchiveServer<br>
* over unreliable network and from behind firewall<br><br>
*
* Transport states:
* <ul>
* <li>init: transport is prepared, but not started
* (e.g. no network connection is present)</li>
* <li>pending: transport is in progress, file is not fully transported to
* target system</li>
* <li>waiting: transport is in progress, but not running now</li>
* <li>finished: transport is finished, but file processing on target side
* is not completed</li>
* <li>closed: processing on target side is completed without errors</li>
* <li>failed: error - error message stored in errmsg field</li>
* <li>paused: transport have been paused</li>
* </ul>
*
* Transport types:
* <ul>
* <li>audioclip</li>
* <li>playlist</li>
* <li>metadata</li>
* <li>file</li>
* </ul>
*
* @package Campcaster
* @subpackage StorageServer
* @copyright 2010 Sourcefabric O.P.S.
* @license http://www.gnu.org/licenses/gpl.txt
*/
class Transport
{
/**
* @var GreenBox
*/
public $gb;
/**
* File name
* @var string
*/
private $cronJobScript;
/**
* wget --read-timeout parameter [s]
* @var int
*/
private $downTimeout = 900;
/**
* wget --waitretry parameter [s]
* @var int
*/
private $downWaitretry = 10;
/**
* wget --limit-rate parameter
*/
private $downLimitRate = NULL;
# private $downLimitRate = 500;
/**
* wget -t parameter
* @var int
*/
private $downRetries = 6;
/**
* curl --max-time parameter
* @var int
*/
private $upTrMaxTime = 1800;
/**
* curl --speed-time parameter
* @var int
*/
private $upTrSpeedTime = 30;
/**
* curl --speed-limit parameter
* @var int
*/
private $upTrSpeedLimit = 30;
/**
* curl --connect-timeout parameter
* @var int
*/
private $upTrConnectTimeout = 20;
/**
* curl --limit-rate parameter
* @var int
*/
private $upLimitRate = NULL;
# private $upLimitRate = 500;
/**
* Constructor
*
* @param LocStor $gb
* @return Transport
*/
public function __construct(&$gb)
{
$this->gb =& $gb;
$this->cronJobScript = realpath(
dirname(__FILE__).
'/../../storageServer/var/cron/transportCronJob.php'
);
}
/* ==================================================== transport methods */
/* ------------------------------------------------------- common methods */
/**
* Common "check" method for transports
*
* @param string $trtok
* transport token
* @return array
* struct/hasharray with fields:
* trtype: string -
* audioclip | playlist | playlistPkg | metadata | file
* state: string - transport state
* init | pending | waiting | finished | closed | failed
* direction: string - up | down
* expectedsize: int - file size in bytes
* realsize: int - currently transported bytes
* expectedsum: string - orginal file checksum
* realsum: string - transported file checksum
* title: string - dc:title or filename etc.
* errmsg: string - error message for failed transports
* ... ?
*/
function getTransportInfo($trtok)
{
$trec = TransportRecord::recall($this, $trtok);
if (PEAR::isError($trec)) {
return $trec;
}
$res = array();
foreach (array(
'trtype', 'state', 'direction', 'expectedsize', 'realsize',
'expectedsum', 'realsum', 'title', 'errmsg'
) as $k) {
$res[$k] = ( isset($trec->row[$k]) ? $trec->row[$k] : NULL );
}
if ( ($trec->row['direction'] == 'down') && file_exists($trec->row['localfile']) ){
$res['realsize'] = filesize($trec->row['localfile']);
$res['realsum'] = $this->_chsum($trec->row['localfile']);
}
if ( ($trec->row['direction'] == 'up') ){
$check = $this->uploadCheck($trec->row['pdtoken']);
if (!PEAR::isError($check)) {
$res['realsize'] = $check['size'];
$res['realsum'] = $check['realsum'];
}
}
return $res;
}
/**
* Turn transports on/off, optionaly return current state.
* (true=On / false=off)
*
* @param string $sessid
* session id
* @param boolean $onOff
* optional (if not used, current state is returned)
* @return boolea
* previous state
*/
function turnOnOffTransports($sessid, $onOff=NULL)
{
require_once('Prefs.php');
$pr = new Prefs($this->gb);
$group = $CC_CONFIG['StationPrefsGr'];
$key = 'TransportsDenied';
$res = $pr->loadGroupPref($group, $key);
if (PEAR::isError($res)) {
if ($res->getCode() !== GBERR_PREF) {
return $res;
} else {
$res = FALSE; // default
}
}
$state = !$res;
if (is_null($onOff)) {
return $state;
}
$res = $pr->saveGroupPref($sessid, $group, $key, !$onOff);
if (PEAR::isError($res)) {
return $res;
}
return $state;
}
/**
* Pause, resume or cancel transport
*
* @param string $trtok
* transport token
* @param string $action
* pause | resume | cancel
* @return string
* resulting transport state
*/
function doTransportAction($trtok, $action)
{
$trec = TransportRecord::recall($this, $trtok);
if (PEAR::isError($trec)) {
return $trec;
}
if ($trec->getState() == 'closed') {
return PEAR::raiseError(
"Transport::doTransportAction:".
" closed transport token ($trtok)", TRERR_TOK
);
}
switch ($action) {
case 'pause';
$newState = 'paused';
break;
case 'resume';
$newState = 'waiting';
break;
case 'cancel';
$newState = 'closed';
break;
default:
return PEAR::raiseError(
"Transport::doTransportAction:".
" unknown action ($action)"
);
}
$res = $trec->setState($newState);
switch ($action) {
case 'pause';
case 'cancel';
$trec->killJob();
}
return $res;
}
/* ------------- special methods for audioClip/webstream object transport */
/**
* Start upload of audioClip/webstream/playlist from local storageServer
* to hub.
*
* @param string $gunid
* global unique id of object being transported
* @param boolean $withContent
* if true, transport playlist content too (optional)
* @param array $pars
* default parameters (optional, internal use)
* @return string
* transport token
*/
function upload2Hub($gunid, $withContent=TRUE, $pars=array())
{
global $CC_CONFIG, $CC_DBC;
$this->trLog("upload2Hub start: ".strftime("%H:%M:%S"));
switch ($ftype = BasicStor::GetType($gunid)) {
case "audioclip":
case "webstream":
$storedFile = StoredFile::RecallByGunid($gunid);
if (is_null($storedFile) || PEAR::isError($storedFile)) {
return $storedFile;
}
// handle metadata:
$mdfpath = $storedFile->getRealMetadataFileName();
if (PEAR::isError($mdfpath)) {
return $mdfpath;
}
$mdtrec = $this->_uploadGeneralFileToHub($mdfpath, 'metadata',
array_merge(array('gunid'=>$gunid, 'fname'=>'metadata',), $pars)
);
if (PEAR::isError($mdtrec)) {
return $mdtrec;
}
// handle raw media file:
$fpath = $storedFile->getRealFilePath();
if (PEAR::isError($fpath)) {
return $fpath;
}
$fname = $storedFile->getName();
if (PEAR::isError($fname)) {
return $fname;
}
$trec = $this->_uploadGeneralFileToHub($fpath, 'audioclip',
array_merge(array(
'gunid'=>$gunid, 'fname'=>$fname, 'mdtrtok'=>$mdtrec->trtok,
), $pars)
);
if (PEAR::isError($trec)) {
return $trec;
}
$this->startCronJobProcess($mdtrec->trtok);
break;
case "playlist":
$plid = $gunid;
require_once("Playlist.php");
$pl = StoredFile::RecallByGunid($plid);
if (is_null($pl) || PEAR::isError($pl)) {
return $pl;
}
$fname = $pl->getName();
if (PEAR::isError($fname)) {
return $fname;
}
if ($withContent) {
$this->trLog("upload2Hub exportPlaylistOpen BEGIN: ".strftime("%H:%M:%S"));
$res = $this->gb->bsExportPlaylistOpen($plid);
$this->trLog("upload2Hub exportPlaylistOpen END: ".strftime("%H:%M:%S"));
if (PEAR::isError($res)) {
return $res;
}
$tmpn = tempnam($CC_CONFIG['transDir'], 'plExport_');
$plfpath = "$tmpn.lspl";
$this->trLog("upload2Hub begin copy: ".strftime("%H:%M:%S"));
copy($res['fname'], $plfpath);
$this->trLog("upload2Hub end copy: ".strftime("%H:%M:%S"));
$res = $this->gb->bsExportPlaylistClose($res['token']);
if (PEAR::isError($res)) {
return $res;
}
$fname = $fname.".lspl";
$trtype = 'playlistPkg';
} else {
$plfpath = $pl->getRealMetadataFileName();
if (PEAR::isError($plfpath)) {
return $plfpath;
}
$trtype = 'playlist';
}
$trec = $this->_uploadGeneralFileToHub($plfpath, $trtype,
array_merge(array('gunid'=>$plid,'fname'=>$fname,), $pars));
if (PEAR::isError($trec)) {
return $trec;
}
break;
default:
return PEAR::raiseError("Transport::upload2Hub: ftype not supported ($ftype)");
}
$this->startCronJobProcess($trec->trtok);
$this->trLog("upload2Hub end: ".strftime("%H:%M:%S"));
return $trec->trtok;
}
/**
* Start download of audioClip/webstream/playlist from hub to local
* storageServer
*
* @param int $uid
* local user id of transport owner
* (for downloading file to homedir in storage)
* @param string $gunid
* global unique id of object being transported
* @param boolean $withContent
* if true, transport playlist content too (optional)
* @param array $pars
* default parameters (optional, internal use)
* @return string
* transport token
*/
function downloadFromHub($uid, $gunid, $withContent=TRUE, $pars=array())
{
$trtype = ($withContent ? 'playlistPkg' : 'unknown' );
$trec = TransportRecord::create($this, $trtype, 'down',
array_merge(array('gunid'=>$gunid, 'uid'=>$uid), $pars));
if (PEAR::isError($trec)) {
return $trec;
}
$this->startCronJobProcess($trec->trtok);
return $trec->trtok;
}
/* ------------------------------------------------ remote-search methods */
/**
* Start search job on remote Campcaster instance.
*
* @param array $criteria
* LS criteria format (see localSearch)
* @param string $resultMode
* 'php' | 'xmlrpc'
* @param array $pars
* default parameters (optional, internal use)
* @return string
* transport token
*/
function remoteSearch($criteria, $resultMode='php')
{
global $CC_CONFIG, $CC_DBC;
$criteria['resultMode'] = $resultMode;
// testing of hub availability and hub account configuration.
$sessid = $this->loginToArchive();
if (PEAR::isError($sessid)) {
switch(intval($sessid->getCode())) {
case 802:
return PEAR::raiseError("Can't login to Hub ({$sessid->getMessage()})", TRERR_XR_FAIL);
case TRERR_XR_FAIL:
return PEAR::raiseError("Can't connect to Hub ({$sessid->getMessage()})", TRERR_XR_FAIL);
}
return $sessid;
}
$params = array("sessid" => $sessid, "criteria" => $criteria);
$result = $this->xmlrpcCall("locstor.searchMetadata", $params);
//$result = $this->xmlrpcCall("locstor.ping", array("par" => "foo"));
$this->logoutFromArchive($sessid);
return $result;
}
/**
* Start search job on network hub
*
* @param array $criteria
* LS criteria format (see localSearch)
* @param string $resultMode
* 'php' | 'xmlrpc'
* @param array $pars
* default parameters (optional, internal use)
* @return string
* transport token
*/
// function globalSearch($criteria, $resultMode='php', $pars=array())
// {
// global $CC_CONFIG, $CC_DBC;
// // testing of hub availability and hub account configuration.
// // it makes searchjob not async - should be removed for real async
// $r = $this->loginToArchive();
// if (PEAR::isError($r)) {
// switch(intval($r->getCode())) {
// case 802:
// return PEAR::raiseError("Can't login to Hub ({$r->getMessage()})", TRERR_XR_FAIL);
// case TRERR_XR_FAIL:
// return PEAR::raiseError("Can't connect to Hub ({$r->getMessage()})", TRERR_XR_FAIL);
// }
// return $r;
// }
// $this->logoutFromArchive($r);
// $criteria['resultMode'] = $resultMode;
// $localfile = tempnam($CC_CONFIG['transDir'], 'searchjob_');
// @chmod($localfile, 0660);
// $len = file_put_contents($localfile, serialize($criteria));
// $trec = $this->_uploadGeneralFileToHub($localfile, 'searchjob', $pars);
// if (PEAR::isError($trec)) {
// return $trec;
// }
// $this->startCronJobProcess($trec->trtok);
// return $trec->trtok;
// }
/**
* Get results from search job on network hub
*
* @param string $trtok
* transport token
* @param boolean $andClose
* if TRUE, close transport token
* @return array
* LS search result format (see localSearch)
*/
// function getSearchResults($trtok, $andClose=TRUE)
// {
// $trec = TransportRecord::recall($this, $trtok);
// if (PEAR::isError($trec)) {
// return $trec;
// }
// $row = $trec->row;
// switch ($st = $trec->getState()) {
// case "failed":
// return PEAR::raiseError(
// "Transport::getSearchResults:".
// " global search or results transport failed".
// " ({$trec->row['errmsg']})"
// );
// case "closed":
///*
// $res = file_get_contents($row['localfile']);
// $results = unserialize($res);
// return $results;
//*/
// return PEAR::raiseError(
// "Transport::getSearchResults:".
// " closed transport token ($trtok)", TRERR_TOK
// );
// case "finished":
// if ($row['direction'] == 'down') {
// // really finished
// $res = file_get_contents($row['localfile']);
// $results = unserialize($res);
// if ($andClose) {
// $ret = $this->xmlrpcCall('archive.downloadClose',
// array(
// 'token' => $row['pdtoken'] ,
// 'trtype' => $row['trtype'] ,
// ));
// if (PEAR::isError($ret)) {
// return $ret;
// }
// @unlink($row['localfile']);
// $r = $trec->close();
// if (PEAR::isError($r)) {
// return $r;
// }
// }
// return $results;
// }
// // otherwise not really finished - only request upload finished
// default:
// return PEAR::raiseError(
// "Transport::getSearchResults: not finished ($st)",
// TRERR_NOTFIN
// );
// }
// }
/* ------------------------ methods for ls-archive-format file transports */
/**
* Open async file transfer from local storageServer to network hub,
* file should be ls-archive-format file.
*
* @param string $filePath
* local path to uploaded file
* @param array $pars
* default parameters (optional, internal use)
* @return string
* transport token
*/
function uploadFile2Hub($filePath, $pars=array())
{
if (!file_exists($filePath)) {
return PEAR::raiseError(
"Transport::uploadFile2Hub: file not found ($filePath)"
);
}
$trec = $this->_uploadGeneralFileToHub($filePath, 'file', $pars);
if (PEAR::isError($trec)) {
return $trec;
}
$this->startCronJobProcess($trec->trtok);
return $trec->trtok;
}
/**
* Open async file transfer from network hub to local storageServer,
* file should be ls-archive-format file.
*
* @param string $url
* readable url
* @param string $chsum
* checksum from remote side
* @param int $size
* filesize from remote side
* @param array $pars
* default parameters (internal use)
* @return array
* trtok: string - transport token
* localfile: string - filepath of downloaded file
*/
function downloadFileFromHub($url, $chsum=NULL, $size=NULL, $pars=array())
{
global $CC_CONFIG, $CC_DBC;
$tmpn = tempnam($CC_CONFIG['transDir'], 'HITrans_');
$trec = TransportRecord::create($this, 'file', 'down',
array_merge(array(
'url' => $url,
'localfile' => $tmpn,
'expectedsum' => $chsum,
'expectedsize' => $size,
), $pars)
);
if (PEAR::isError($trec)) {
return $trec;
}
$this->startCronJobProcess($trec->trtok);
return array('trtok'=>$trec->trtok, 'localfile'=>$tmpn);
}
/**
* Get list of prepared transfers initiated by hub
*
* @return array
* array of structs/hasharrays with fields:
* trtok: string transport token
*/
function getHubInitiatedTransfers()
{
$ret = $this->xmlrpcCall('archive.listHubInitiatedTransfers',
array('target' => HOSTNAME));
if (PEAR::isError($ret)) {
return $ret;
}
$res = array();
foreach ($ret as $it) {
$res[] = array('trtok'=>$it['trtok']);
}
return $res;
}
/**
* Start of download initiated by hub
*
* @param int $uid
* local user id of transport owner
* (for downloading file to homedir in storage)
* @param string $rtrtok
* transport token obtained from the getHubInitiatedTransfers method
* @return string
* transport token
*/
function startHubInitiatedTransfer($uid, $rtrtok)
{
$ret = $this->xmlrpcCall('archive.listHubInitiatedTransfers',
array(
'target' => HOSTNAME,
'trtok' => $rtrtok,
));
if (PEAR::isError($ret)) {
return $ret;
}
if (count($ret) != 1) {
return PEAR::raiseError(
"Transport::startHubInitiatedTransfer:".
" wrong number of transports (".count($ret).")"
);
}
$ta = $ret[0];
// direction invertation to locstor point of view:
$direction = ( $ta['direction']=='up' ? 'down' : 'up' );
$gunid = $ta['gunid'];
switch ($direction) {
case "up":
switch ($ta['trtype']) {
case "audioclip":
case "playlist":
case "playlistPkg":
$trtok = $this->upload2Hub($gunid, TRUE,
array('rtrtok'=>$rtrtok));
if (PEAR::isError($trtok)) {
return $trtok;
}
break;
//case "searchjob": break; // not supported yet
//case "file": break; // probably unusable
default:
return PEAR::raiseError(
"Transport::startHubInitiatedTransfer:".
" wrong direction / transport type combination".
" ({$ta['direction']}/{$ta['trtype']})"
);
}
break;
case "down":
switch ($ta['trtype']) {
case "audioclip":
case "playlist":
case "playlistPkg":
$trtok = $this->downloadFromHub($uid, $gunid, TRUE,
array('rtrtok'=>$rtrtok));
if (PEAR::isError($trtok)) {
return $trtok;
}
break;
//case "searchjob": break; // probably unusable
case "file":
$r = $this->downloadFileFromHub(
$ta['url'], $ta['expectedsum'], $ta['expectedsize'],
array('rtrtok'=>$rtrtok));
if (PEAR::isError($r)) {
return $r;
}
extract($r); // trtok, localfile
break;
default:
return PEAR::raiseError(
"Transport::startHubInitiatedTransfer:".
" wrong direction / transport type combination".
" ({$ta['direction']}/{$ta['trtype']})"
);
}
break;
default:
return PEAR::raiseError(
"Transport::startHubInitiatedTransfer: ???"
);
}
$ret = $this->xmlrpcCall('archive.setHubInitiatedTransfer',
array(
'target' => HOSTNAME,
'trtok' => $rtrtok,
'state' => 'waiting',
));
if (PEAR::isError($ret)) {
return $ret;
}
$this->startCronJobProcess($trtok);
return $trtok;
}
/* =============================================== authentication methods */
/**
* Login to archive server
* (account info is taken from storageServer's config)
*
* @return string
* sessid or error
*/
function loginToArchive()
{
global $CC_CONFIG;
$res = $this->xmlrpcCall('locstor.login',
array(
'login' => $CC_CONFIG['archiveAccountLogin'],
'pass' => $CC_CONFIG['archiveAccountPass']
));
if (PEAR::isError($res)) {
return $res;
}
return $res['sessid'];
}
/**
* Logout from archive server
*
* @param unknown $sessid
* session id
* @return string
* Bye or error
*/
function logoutFromArchive($sessid)
{
$res = $this->xmlrpcCall('locstor.logout',
array('sessid'=>$sessid));
return $res;
}
/* ========================================================= cron methods */
/* -------------------------------------------------- common cron methods */
/**
* Main method for periodical transport tasks - called by cron
*
* @param string $direction
* optional
* @return boolean
* TRUE
*/
function cronMain($direction=NULL)
{
global $CC_CONFIG;
if (is_null($direction)) {
$r = $this->cronMain('up');
if (PEAR::isError($r)) {
return $r;
}
$r = $this->cronMain('down');
if (PEAR::isError($r)) {
return $r;
}
return TRUE;
}
// fetch all opened transports
$transports = $this->getTransports($direction);
if (PEAR::isError($transports)) {
$this->trLog("cronMain: DB error");
return FALSE;
}
if (count($transports) == 0) {
if (TR_LOG_LEVEL > 1) {
$this->trLog("cronMain: $direction - nothing to do.");
}
return TRUE;
}
// ping to archive server:
$r = $this->ping();
chdir($CC_CONFIG['transDir']);
// for all opened transports:
foreach ($transports as $i => $row) {
$r = $this->startCronJobProcess($row['trtok']);
} // foreach transports
return TRUE;
}
/**
* Cron job process starter
*
* @param string $trtok
* transport token
* @return boolean
* status
*/
function startCronJobProcess($trtok)
{
global $CC_CONFIG, $CC_DBC;
if (TR_LOG_LEVEL > 2) {
$redirect = $CC_CONFIG['transDir']."/debug.log";
} else {
$redirect = "/dev/null";
}
$redirect_escaped = escapeshellcmd($redirect);
$command = "{$this->cronJobScript} {$trtok}";
$command_escaped = escapeshellcmd($command);
$command_final = "$command_escaped >> $redirect_escaped 2>&1 &";
$res = system($command_final, $status);
if ($res === FALSE) {
$this->trLog(
"cronMain: Error on execute cronJobScript with trtok {$trtok}"
);
return FALSE;
}
return TRUE;
}
/**
* Dynamic method caller - wrapper
*
* @param string $trtok
* transport token
* @return mixed
* inherited from called method
*/
function cronCallMethod($trtok)
{
global $CC_CONFIG;
$trec = TransportRecord::recall($this, $trtok);
if (PEAR::isError($trec)) {
return $trec;
}
$row = $trec->row;
$state = $row['state'];
$states = array('init'=>'init',
'pending'=>'pending',
'waiting'=>'waiting',
'finished'=>'finished',
'failed'=>'failed',
'closed'=>'closed');
$directions = array('up'=>'upload', 'down'=>'download');
// method name construction:
$mname = "cron";
if (isset($directions[$row['direction']])) {
$mname .= ucfirst($directions[$row['direction']]);
} else {
return PEAR::raiseError(
"Transport::cronCallMethod: invalid direction ({$row['direction']})"
);
}
if (isset($states[$state])) {
$mname .= ucfirst($states[$state]);
} else {
return PEAR::raiseError(
"Transport::cronCallMethod: invalid state ({$state})"
);
}
switch ($state) {
// do nothing if closed, penfing or failed:
case 'closed': // excluded in SQL query too, but let check it here
case 'failed': // -"-
case 'pending':
case 'paused':
return TRUE;
case 'waiting':
require_once('Prefs.php');
$pr = new Prefs($this->gb);
$group = $CC_CONFIG['StationPrefsGr'];
$key = 'TransportsDenied';
$res = $pr->loadGroupPref($group, $key);
if (PEAR::isError($res)) {
if ($res->getCode() !== GBERR_PREF) {
return $res;
} else {
$res = FALSE; // default
}
}
// transfers turned off
// if ($res) { return TRUE; break; }
if ($res) {
return PEAR::raiseError(
"Transport::cronCallMethod: transfers turned off"
);
}
// NO break here!
default:
if (method_exists($this, $mname)) {
// lock the job:
$pid = getmypid();
$r = $trec->setLock(TRUE, $pid);
if (PEAR::isError($r)) {
return $r;
}
$trec = TransportRecord::recall($this, $trtok);
if (PEAR::isError($trec)) {
$trec->setLock(FALSE);
return $trec;
}
$row = $trec->row;
$state = $row['state'];
// login to archive server:
$r = $this->loginToArchive();
if (PEAR::isError($r)) {
$r2 = $trec->setLock(FALSE);
return $r;
}
$asessid = $r;
// method call:
if (TR_LOG_LEVEL > 2) {
$this->trLog("cronCallMethod($pid): $mname($trtok) >");
}
$ret = call_user_func(array($this, $mname), $row, $asessid);
if (PEAR::isError($ret)) {
$trec->setLock(FALSE);
return $this->_failFatal($ret, $trec);
}
if (TR_LOG_LEVEL > 2) {
$this->trLog("cronCallMethod($pid): $mname($trtok) <");
}
// unlock the job:
$r = $trec->setLock(FALSE);
if (PEAR::isError($r)) {
return $r;
}
// logout:
$r = $this->logoutFromArchive($asessid);
if (PEAR::isError($r)) {
return $r;
}
return $ret;
} else {
return PEAR::raiseError(
"Transport::cronCallMethod: unknown method ($mname)"
);
}
}
}
/**
* Upload initialization
*
* @param array $row
* row from getTransport results
* @param string $asessid
* session id (from network hub)
* @return mixed
* boolean TRUE or error object
*/
function cronUploadInit($row, $asessid)
{
$trtok = $row['trtok'];
$trec = TransportRecord::recall($this, $trtok);
if (PEAR::isError($trec)) {
return $trec;
}
$ret = $this->xmlrpcCall('archive.uploadOpen',
array(
'sessid' => $asessid ,
'chsum' => $row['expectedsum'],
));
if (PEAR::isError($ret)) {
return $ret;
}
$r = $trec->setState('waiting',
array('url'=>$ret['url'], 'pdtoken'=>$ret['token']));
if (PEAR::isError($r)) {
return $r;
}
return TRUE;
}
/**
* Download initialization
*
* @param array $row
* row from getTransport results
* @param string $asessid
* session id (from network hub)
* @return mixed
* boolean TRUE or error object
*/
function cronDownloadInit($row, $asessid)
{
global $CC_CONFIG;
$trtok = $row['trtok'];
$trec = TransportRecord::recall($this, $trtok);
if (PEAR::isError($trec)) {
return $trec;
}
$ret = $this->xmlrpcCall('archive.downloadOpen',
array(
'sessid'=> $asessid,
'trtype'=> $row['trtype'],
'pars'=>array(
'gunid' => $row['gunid'],
'token' => $row['pdtoken'],
),
));
if (PEAR::isError($ret)) {
return $ret;
}
$trtype = $ret['trtype'];
$title = $ret['title'];
$pars = array();
switch ($trtype) {
// case "searchjob":
// $r = $trec->setState('waiting', $pars);
// break;
case "file":
$r = $trec->setState('waiting',array_merge($pars, array(
'trtype'=>$trtype,
'url'=>$ret['url'], 'pdtoken'=>$ret['token'],
'expectedsum'=>$ret['chsum'], 'expectedsize'=>$ret['size'],
'fname'=>$ret['filename'],
'localfile'=>$CC_CONFIG['transDir']."/$trtok",
)));
break;
case "audioclip":
$mdtrec = TransportRecord::create($this, 'metadata', 'down',
array('gunid'=>$row['gunid'], 'uid'=>$row['uid'], )
);
if (PEAR::isError($mdtrec)) {
return $mdtrec;
}
$this->startCronJobProcess($mdtrec->trtok);
$pars = array('mdtrtok'=>$mdtrec->trtok);
// NO break here !
default:
$r = $trec->setState('waiting',array_merge($pars, array(
'trtype'=>$trtype,
'url'=>$ret['url'], 'pdtoken'=>$ret['token'],
'expectedsum'=>$ret['chsum'], 'expectedsize'=>$ret['size'],
'fname'=>$ret['filename'], 'title'=>$title,
'localfile'=>$CC_CONFIG['transDir']."/$trtok",
)));
}
if (PEAR::isError($r)) {
return $r;
}
return TRUE;
}
/**
* Upload next part of transported file
*
* @param array $row
* row from getTransport results
* @param string $asessid
* session id (from network hub)
* @return mixed
* boolean TRUE or error object
*/
function cronUploadWaiting($row, $asessid)
{
$trtok = $row['trtok'];
$check = $this->uploadCheck($row['pdtoken']);
if (PEAR::isError($check)) {
return $check;
}
// test filesize
if (!file_exists($row['localfile'])) {
return PEAR::raiseError("Transport::cronUploadWaiting:".
" file being uploaded does not exist! ({$row['localfile']})"
);
}
$trec = TransportRecord::recall($this, $trtok);
if (PEAR::isError($trec)) {
return $trec;
}
$size = escapeshellarg($check['size']);
$localfile = escapeshellarg($row['localfile']);
$url = escapeshellarg($row['url']);
$command =
"curl -f -s -C $size --max-time {$this->upTrMaxTime}".
" --speed-time {$this->upTrSpeedTime}".
" --speed-limit {$this->upTrSpeedLimit}".
" --connect-timeout {$this->upTrConnectTimeout}".
(!is_null($this->upLimitRate)?
" --limit-rate {$this->upLimitRate}" : "").
" -T $localfile $url";
$r = $trec->setState('pending', array(), 'waiting');
if (PEAR::isError($r)) {
return $r;
}
if ($r === FALSE) {
return TRUE;
}
$res = system($command, $status);
// leave paused and closed transports
$trec2 = TransportRecord::recall($this, $trtok);
if (PEAR::isError($trec)) {
return $trec;
}
$state2 = $trec2->row['state'];
if ($state2 == 'paused' || $state2 == 'closed' ) {
return TRUE;
}
// status 18 - Partial file. Only a part of the file was transported.
// status 28 - Timeout. Too long/slow upload, try to resume next time rather.
// status 6 - Couldn't resolve host.
// status 7 - Failed to connect to host.
// status 56 - Failure in receiving network data. Important - this status is
// returned if file is locked on server side
if ($status == 0 || $status == 18 || $status == 28 || $status == 6 || $status == 7 || $status == 56) {
$check = $this->uploadCheck($row['pdtoken']);
if (PEAR::isError($check)) {
return $check;
}
// test checksum
if ($check['status'] == TRUE) {
// finished
$r = $trec->setState('finished',
array('realsum'=>$check['realsum'], 'realsize'=>$check['size']));
if (PEAR::isError($r)) {
return $r;
}
} else {
if (intval($check['size']) < $row['expectedsize']) {
$r = $trec->setState('waiting',
array('realsum'=>$check['realsum'], 'realsize'=>$check['size']));
if (PEAR::isError($r)) {
return $r;
}
} else {
// wrong md5 at finish - TODO: start again
// $this->xmlrpcCall('archive.uploadReset', array());
$trec->fail('file uploaded with bad md5');
return PEAR::raiseError("Transport::cronUploadWaiting:".
" file uploaded with bad md5 ".
"($trtok: {$check['realsum']}/{$check['expectedsum']})"
);
}
}
} else {
return PEAR::raiseError("Transport::cronUploadWaiting:".
" wrong return status from curl: $status on $url".
"($trtok)"
);
}
return TRUE;
}
/**
* Download next part of transported file
*
* @param array $row
* row from getTransport results
* @param string $asessid
* session id (from network hub)
* @return mixed
* boolean TRUE or error object
*/
function cronDownloadWaiting($row, $asessid)
{
$trtok = $row['trtok'];
// wget the file
$trec = TransportRecord::recall($this, $trtok);
if (PEAR::isError($trec)) {
return $trec;
}
$localfile = escapeshellarg($row['localfile']);
$url = escapeshellarg($row['url']);
$command =
"wget -q -c".
" --read-timeout={$this->downTimeout}".
" --waitretry={$this->downWaitretry}".
" -t {$this->downRetries}".
(!is_null($this->downLimitRate)?
" --limit-rate={$this->downLimitRate}" : "").
" -O $localfile $url"
;
$r = $trec->setState('pending', array(), 'waiting');
if (PEAR::isError($r)) {
return $r;
}
if ($r === FALSE) {
return TRUE;
}
$res = system($command, $status);
// leave paused and closed transports
$trec2 = TransportRecord::recall($this, $trtok);
if (PEAR::isError($trec)) {
return $trec;
}
$state2 = $trec2->row['state'];
if ($state2 == 'paused' || $state2 == 'closed' ) {
return TRUE;
}
// check consistency
$size = filesize($row['localfile']);
if ($size < $row['expectedsize']) {
// not finished - return to the 'waiting' state
$r = $trec->setState('waiting', array('realsize'=>$size));
if (PEAR::isError($r)) {
return $r;
}
} elseif ($size >= $row['expectedsize']) {
$chsum = $this->_chsum($row['localfile']);
if ($chsum == $row['expectedsum']) {
// mark download as finished
$r = $trec->setState('finished',
array('realsum'=>$chsum, 'realsize'=>$size));
if (PEAR::isError($r)) {
return $r;
}
} else {
// bad checksum, retry from the scratch
@unlink($row['localfile']);
$r = $trec->setState('waiting',
array('realsum'=>$chsum, 'realsize'=>$size));
if (PEAR::isError($r)) {
return $r;
}
}
}
return TRUE;
}
/**
* Finish the upload
*
* @param array $row
* row from getTransport results
* @param string $asessid
* session id (from network hub)
* @return mixed
* boolean TRUE or error object
*/
function cronUploadFinished($row, $asessid)
{
global $CC_CONFIG;
$trtok = $row['trtok'];
$trec = TransportRecord::recall($this, $trtok);
if (PEAR::isError($trec)) {
return $trec;
}
// don't close metadata transport - audioclip will close it
if ($row['trtype'] == 'metadata') {
return TRUE;
}
// handle metadata transport on audioclip trtype:
if ($row['trtype'] == 'audioclip') {
$mdtrec = TransportRecord::recall($this, $trec->row['mdtrtok']);
if (PEAR::isError($mdtrec)) {
return $mdtrec;
}
switch ($mdtrec->row['state']) {
case 'failed':
case 'closed':
return PEAR::raiseError("Transport::cronUploadFinished:".
" metadata transport in wrong state: {$mdtrec->row['state']}".
" ({$this->trtok})"
);
break;
// don't close transport with nonfinished metadata transport:
case 'init':
case 'waiting':
case 'pending':
case 'paused':
return TRUE;
default: // finished - ok close parent transport
$mdpdtoken = $mdtrec->row['pdtoken'];
}
} else {
$mdpdtoken = NULL;
}
$ret = $this->xmlrpcCall('archive.uploadClose',
array(
'token' => $row['pdtoken'] ,
'trtype' => $row['trtype'],
'pars' => array(
'gunid' => $row['gunid'],
'name' => $row['fname'],
'mdpdtoken' => $mdpdtoken,
),
));
if (PEAR::isError($ret)) {
if ($row['trtype'] == 'audioclip') {
$r2 = $mdtrec->close();
}
return $ret;
}
// if ($row['trtype'] == 'searchjob') {
// @unlink($row['localfile']);
// $r = $trec->setState('init', array(
// 'direction' => 'down',
// 'pdtoken' => $ret['token'],
// 'expectedsum' => $ret['chsum'],
// 'expectedsize' => $ret['size'],
// 'url' => $ret['url'],
// 'realsize' => 0,
// ));
// $this->startCronJobProcess($trec->trtok);
// } else {
$r = $trec->close();
// }
if (PEAR::isError($r)) {
return $r;
}
switch ($row['trtype']) {
case 'audioclip':
// close metadata transport:
$r = $mdtrec->close();
if (PEAR::isError($r)) {
return $r;
}
break;
case 'playlistPkg':
// remove exported playlist (playlist with content)
$ep = $row['localfile'];
@unlink($ep);
if (preg_match("|/(plExport_[^\.]+)\.lspl$|", $ep, $va)) {
list(,$tmpn) = $va; $tmpn = $CC_CONFIG['transDir']."/$tmpn";
if (file_exists($tmpn)) {
@unlink($tmpn);
}
}
break;
default:
}
return TRUE;
}
/**
* Finish the download
*
* @param array $row
* row from getTransport results
* @param string $asessid
* session id (from network hub)
* @return mixed
* boolean TRUE or error object
*/
function cronDownloadFinished($row, $asessid)
{
$trtok = $row['trtok'];
$trec = TransportRecord::recall($this, $trtok);
if (PEAR::isError($trec)) {
return $trec;
}
switch ($row['trtype']) {
case "audioclip":
$mdtrtok = $trec->row['mdtrtok'];
$mdtrec = TransportRecord::recall($this, $mdtrtok);
if (PEAR::isError($mdtrec)) {
return $mdtrec;
}
$pid = getmypid();
$r = $mdtrec->setLock(TRUE, $pid);
if (PEAR::isError($r)) {
return $r;
}
switch ($mdtrec->row['state']) {
// don't close transport with nonfinished metadata transport:
case 'init':
case 'waiting':
case 'pending':
case 'paused':
$r = $mdtrec->setLock(FALSE);
if (PEAR::isError($r)) {
return $r;
}
return TRUE;
case 'finished': // metadata finished, close main transport
$values = array(
"filename" => $row['fname'],
"filepath" => $trec->row['localfile'],
"metadata" => $mdtrec->row['localfile'],
"gunid" => $row['gunid'],
"filetype" => "audioclip"
);
$storedFile = StoredFile::Insert($values);
if (PEAR::isError($storedFile)) {
$mdtrec->setLock(FALSE);
return $storedFile;
}
$res = $storedFile->getId();
$ret = $this->xmlrpcCall('archive.downloadClose',
array(
'token' => $mdtrec->row['pdtoken'] ,
'trtype' => 'metadata' ,
));
if (PEAR::isError($ret)) {
$mdtrec->setLock(FALSE);
return $ret;
}
$r = $mdtrec->close();
if (PEAR::isError($r)) {
$r2 = $mdtrec->setLock(FALSE);
return $r;
}
@unlink($trec->row['localfile']);
@unlink($mdtrec->row['localfile']);
break;
default:
$r = $mdtrec->setLock(FALSE);
return PEAR::raiseError("Transport::cronDownloadFinished:".
" metadata transport in wrong state: {$mdtrec->row['state']}".
" ({$this->trtok})"
);
}
$r = $mdtrec->setLock(FALSE);
if (PEAR::isError($r)) {
return $r;
}
break;
case "metadata":
// case "searchjob":
return TRUE; // don't close - getSearchResults should close it
break;
}
$ret = $this->xmlrpcCall('archive.downloadClose',
array(
'token' => $row['pdtoken'] ,
'trtype' => $row['trtype'] ,
));
if (PEAR::isError($ret)) {
return $ret;
}
switch ($row['trtype']) {
case "playlist":
$values = array(
"filename" => $row['fname'],
"metadata" => $trec->row['localfile'],
"gunid" => $row['gunid'],
"filetype" => "playlist"
);
$storedFile = StoredFile::Insert($values);
if (PEAR::isError($storedFile)) {
return $storedFile;
}
$res = $storedFile->getId();
@unlink($row['localfile']);
break;
case "playlistPkg":
$subjid = $trec->row['uid'];
$fname = $trec->row['localfile'];
$res = $this->gb->bsImportPlaylist($fname, $subjid);
if (PEAR::isError($res)) {
return $res;
}
@unlink($fname);
break;
case "audioclip":
case "metadata":
// case "searchjob":
case "file":
break;
default:
return PEAR::raiseError("DEBUG: NotImpl ".var_export($row,TRUE));
}
if (!is_null($rtrtok = $trec->row['rtrtok'])) {
$ret = $this->xmlrpcCall('archive.setHubInitiatedTransfer',
array(
'target' => HOSTNAME,
'trtok' => $rtrtok,
'state' => 'closed',
));
if (PEAR::isError($ret)) {
return $ret;
}
}
$r = $trec->close();
if (PEAR::isError($r)) {
return $r;
}
return TRUE;
}
/* ==================================================== auxiliary methods */
/**
* Prepare upload for general file
*
* @param string $fpath
* local filepath of uploaded file
* @param string $trtype
* transport type
* @param array $pars
* default parameters (optional, internal use)
* @return object - transportRecord instance
*/
function _uploadGeneralFileToHub($fpath, $trtype, $pars=array())
{
$chsum = $this->_chsum($fpath);
$size = filesize($fpath);
$trec = TransportRecord::create($this, $trtype, 'up',
array_merge(array(
'localfile'=>$fpath, 'fname'=>basename($fpath),
'expectedsum'=>$chsum, 'expectedsize'=>$size
), $pars)
);
if (PEAR::isError($trec)) {
return $trec;
}
return $trec;
}
/**
* Create new transport token
*
* @return string
* transport token
*/
function _createTransportToken()
{
$ip = (isset($_SERVER['SERVER_ADDR']) ? $_SERVER['SERVER_ADDR'] : '');
$initString = microtime().$ip.rand()."org.mdlf.campcaster";
$hash = md5($initString);
$res = substr($hash, 0, 16);
return $res;
}
/**
* Get all relevant transport records
*
* @param string $direction
* 'up' | 'down'
* @param string $target
* target hostname
* @param string $trtok
* transport token for specific query
* @return array
* array of transportRecords (as hasharrays)
*/
function getTransports($direction=NULL, $target=NULL, $trtok=NULL)
{
global $CC_CONFIG, $CC_DBC;
switch ($direction) {
case 'up':
$dirCond = "direction='up' AND";
break;
case 'down':
$dirCond = "direction='down' AND";
break;
default:
$dirCond = '';
break;
}
if (is_null($target)) {
$targetCond = "";
} else {
$targetCond = "target='$target' AND";
}
if (is_null($trtok)) {
$trtokCond = "";
} else {
$trtokCond = "trtok='$trtok' AND";
}
$rows = $CC_DBC->getAll("
SELECT
id, trtok, state, trtype, direction,
to_hex(gunid)as gunid, to_hex(pdtoken)as pdtoken,
fname, localfile, expectedsum, expectedsize, url,
uid, target
FROM ".$CC_CONFIG['transTable']."
WHERE $dirCond $targetCond $trtokCond
state not in ('closed', 'failed', 'paused')
ORDER BY start DESC
");
if (PEAR::isError($rows)) {
return $rows;
}
foreach ($rows as $i => $row) {
$rows[$i]['pdtoken'] = StoredFile::NormalizeGunid($row['pdtoken']);
$rows[$i]['gunid'] = StoredFile::NormalizeGunid($row['gunid']);
}
return $rows;
}
/**
* Check remote state of uploaded file
*
* @param string $pdtoken
* put/download token (from network hub)
* @return array
* hash: chsum, size, url
*/
function uploadCheck($pdtoken)
{
$ret = $this->xmlrpcCall('archive.uploadCheck',
array('token'=>$pdtoken));
return $ret;
}
/**
* Ping to remote Campcaster server
*
* @return string
* network hub response or error object
*/
function ping()
{
$res = $this->xmlrpcCall('ping',
array('par'=>'ping_'.date('H:i:s')));
return $res;
}
/**
* XMLRPC call to network hub.
*
* @param string $method
* method name
* @param array $pars
* call parameters
* @return mixed
* response
*/
function xmlrpcCall($method, $pars=array())
{
global $CC_CONFIG;
$xrp = XML_RPC_encode($pars);
$pr = new Prefs($this->gb);
$group = $CC_CONFIG["StationPrefsGr"];
$key = 'archiveServerLocation';
$archiveUrl = $pr->loadGroupPref($group, $key, false);
if ($archiveUrl) {
$archiveUrlInfo = parse_url($archiveUrl);
if ($archiveUrlInfo['port']) {
$port = $archiveUrlInfo['port'];
}
else {
$port = 80;
}
$c = new XML_RPC_Client($archiveUrlInfo['path'], $archiveUrlInfo['host'], $port);
}
else {
$c = new XML_RPC_Client(
$CC_CONFIG['archiveUrlPath']."/".$CC_CONFIG['archiveXMLRPC'],
$CC_CONFIG['archiveUrlHost'], $CC_CONFIG['archiveUrlPort']
);
}
$f = new XML_RPC_Message($method, array($xrp));
$r = $c->send($f);
if (!$r) {
return PEAR::raiseError("XML-RPC request failed", TRERR_XR_FAIL);
} elseif ($r->faultCode() > 0) {
return PEAR::raiseError($r->faultString(), $r->faultCode());
// return PEAR::raiseError($r->faultString().
// " (code ".$r->faultCode().")", TRERR_XR_FAIL);
} else {
$v = $r->value();
return XML_RPC_decode($v);
}
}
/**
* Checksum of local file
*
* @param string $fpath
* local filepath
* @return string
* checksum
*/
function _chsum($fpath)
{
return md5_file($fpath);
}
/**
* Check exception and eventually mark transport as failed
*
* @param mixed $res
* result object to be checked
* @param unknown $trec
* transport record object
* @return unknown
*/
function _failFatal($res, $trec)
{
if (PEAR::isError($res)) {
switch ($res->getCode()) {
// non fatal:
case TRERR_XR_FAIL:
break;
// fatal:
default:
$trec->fail('', $res);
}
}
return $res;
}
/**
* Clean up transport jobs
*
* @param string $interval
* psql time interval - older closed jobs will be deleted
* @param boolean $forced
* if true, delete non-closed jobs too
* @return boolean true or error
*/
function _cleanUp($interval='1 minute'/*'1 hour'*/, $forced=FALSE)
{
global $CC_CONFIG, $CC_DBC;
$cond = ($forced ? '' : " AND state='closed' AND lock = 'N'");
$r = $CC_DBC->query("
DELETE FROM ".$CC_CONFIG['transTable']."
WHERE ts < now() - interval '$interval'".$cond
);
if (PEAR::isError($r)) {
return $r;
}
return TRUE;
}
/**
* Logging wrapper for PEAR error object
*
* @param string $txt
* log message
* @param PEAR_Error $eo
* @param array $row
* array returned from getRow
* @return mixed
* void or error object
*/
function trLogPear($txt, $eo, $row=NULL)
{
$msg = $txt.$eo->getMessage()." ".$eo->getUserInfo().
" [".$eo->getCode()."]";
if (!is_null($row)) {
$trec = TransportRecord::recall($this, $row['trtok']);
if (!PEAR::isError($trec)) {
$trec->setState('failed', array('errmsg'=>$msg));
}
$msg .= "\n ".serialize($row);
}
$this->trLog($msg);
}
/**
* Logging for debug transports
*
* @param string $msg
* log message
* @return mixed
* void or error object
*/
function trLog($msg)
{
global $CC_CONFIG;
$logfile = $CC_CONFIG['transDir']."/activity.log";
if (FALSE === ($fp = fopen($logfile, "a"))) {
return PEAR::raiseError(
"Transport::trLog: Can't write to log ($logfile)"
);
}
flock($fp,LOCK_SH);
fputs($fp, "---".date("H:i:s")."---\n $msg\n");
flock($fp,LOCK_UN);
fclose($fp);
}
/* ====================================================== install methods */
/**
* Delete all transports
*
* @return mixed
* void or error object
*/
function resetData()
{
global $CC_CONFIG, $CC_DBC;
return $CC_DBC->query("DELETE FROM ".$CC_CONFIG['transTable']);
}
}
?>