* over unreliable network and from behind firewall

* * Transport states: * * * Transport types: * * * @package Campcaster * @subpackage StorageServer * @copyright 2010 Sourcefabric O.P.S. * @license http://www.gnu.org/licenses/gpl.txt */ class Transport { /** * @var GreenBox */ public $gb; /** * File name * @var string */ private $cronJobScript; /** * wget --read-timeout parameter [s] * @var int */ private $downTimeout = 900; /** * wget --waitretry parameter [s] * @var int */ private $downWaitretry = 10; /** * wget --limit-rate parameter */ private $downLimitRate = NULL; # private $downLimitRate = 500; /** * wget -t parameter * @var int */ private $downRetries = 6; /** * curl --max-time parameter * @var int */ private $upTrMaxTime = 1800; /** * curl --speed-time parameter * @var int */ private $upTrSpeedTime = 30; /** * curl --speed-limit parameter * @var int */ private $upTrSpeedLimit = 30; /** * curl --connect-timeout parameter * @var int */ private $upTrConnectTimeout = 20; /** * curl --limit-rate parameter * @var int */ private $upLimitRate = NULL; # private $upLimitRate = 500; /** * Constructor * * @param LocStor $gb * @return Transport */ public function __construct(&$gb) { $this->gb =& $gb; $this->cronJobScript = realpath( dirname(__FILE__). '/../../storageServer/var/cron/transportCronJob.php' ); } /* ==================================================== transport methods */ /* ------------------------------------------------------- common methods */ /** * Common "check" method for transports * * @param string $trtok * transport token * @return array * struct/hasharray with fields: * trtype: string - * audioclip | playlist | playlistPkg | metadata | file * state: string - transport state * init | pending | waiting | finished | closed | failed * direction: string - up | down * expectedsize: int - file size in bytes * realsize: int - currently transported bytes * expectedsum: string - orginal file checksum * realsum: string - transported file checksum * title: string - dc:title or filename etc. * errmsg: string - error message for failed transports * ... ? */ function getTransportInfo($trtok) { $trec = TransportRecord::recall($this, $trtok); if (PEAR::isError($trec)) { return $trec; } $res = array(); foreach (array( 'trtype', 'state', 'direction', 'expectedsize', 'realsize', 'expectedsum', 'realsum', 'title', 'errmsg' ) as $k) { $res[$k] = ( isset($trec->row[$k]) ? $trec->row[$k] : NULL ); } if ( ($trec->row['direction'] == 'down') && file_exists($trec->row['localfile']) ){ $res['realsize'] = filesize($trec->row['localfile']); $res['realsum'] = $this->_chsum($trec->row['localfile']); } if ( ($trec->row['direction'] == 'up') ){ $check = $this->uploadCheck($trec->row['pdtoken']); if (!PEAR::isError($check)) { $res['realsize'] = $check['size']; $res['realsum'] = $check['realsum']; } } return $res; } /** * Turn transports on/off, optionaly return current state. * (true=On / false=off) * * @param string $sessid * session id * @param boolean $onOff * optional (if not used, current state is returned) * @return boolea * previous state */ function turnOnOffTransports($sessid, $onOff=NULL) { require_once('Prefs.php'); $pr = new Prefs($this->gb); $group = $CC_CONFIG['StationPrefsGr']; $key = 'TransportsDenied'; $res = $pr->loadGroupPref($group, $key); if (PEAR::isError($res)) { if ($res->getCode() !== GBERR_PREF) { return $res; } else { $res = FALSE; // default } } $state = !$res; if (is_null($onOff)) { return $state; } $res = $pr->saveGroupPref($sessid, $group, $key, !$onOff); if (PEAR::isError($res)) { return $res; } return $state; } /** * Pause, resume or cancel transport * * @param string $trtok * transport token * @param string $action * pause | resume | cancel * @return string * resulting transport state */ function doTransportAction($trtok, $action) { $trec = TransportRecord::recall($this, $trtok); if (PEAR::isError($trec)) { return $trec; } if ($trec->getState() == 'closed') { return PEAR::raiseError( "Transport::doTransportAction:". " closed transport token ($trtok)", TRERR_TOK ); } switch ($action) { case 'pause'; $newState = 'paused'; break; case 'resume'; $newState = 'waiting'; break; case 'cancel'; $newState = 'closed'; break; default: return PEAR::raiseError( "Transport::doTransportAction:". " unknown action ($action)" ); } $res = $trec->setState($newState); switch ($action) { case 'pause'; case 'cancel'; $trec->killJob(); } return $res; } /* ------------- special methods for audioClip/webstream object transport */ /** * Start upload of audioClip/webstream/playlist from local storageServer * to hub. * * @param string $gunid * global unique id of object being transported * @param boolean $withContent * if true, transport playlist content too (optional) * @param array $pars * default parameters (optional, internal use) * @return string * transport token */ function upload2Hub($gunid, $withContent=TRUE, $pars=array()) { global $CC_CONFIG, $CC_DBC; $this->trLog("upload2Hub start: ".strftime("%H:%M:%S")); switch ($ftype = BasicStor::GetType($gunid)) { case "audioclip": case "webstream": $storedFile = StoredFile::RecallByGunid($gunid); if (is_null($storedFile) || PEAR::isError($storedFile)) { return $storedFile; } // handle metadata: $mdfpath = $storedFile->getRealMetadataFileName(); if (PEAR::isError($mdfpath)) { return $mdfpath; } $mdtrec = $this->_uploadGeneralFileToHub($mdfpath, 'metadata', array_merge(array('gunid'=>$gunid, 'fname'=>'metadata',), $pars) ); if (PEAR::isError($mdtrec)) { return $mdtrec; } // handle raw media file: $fpath = $storedFile->getRealFileName(); if (PEAR::isError($fpath)) { return $fpath; } $fname = $storedFile->getName(); if (PEAR::isError($fname)) { return $fname; } $trec = $this->_uploadGeneralFileToHub($fpath, 'audioclip', array_merge(array( 'gunid'=>$gunid, 'fname'=>$fname, 'mdtrtok'=>$mdtrec->trtok, ), $pars) ); if (PEAR::isError($trec)) { return $trec; } $this->startCronJobProcess($mdtrec->trtok); break; case "playlist": $plid = $gunid; require_once("Playlist.php"); $pl = StoredFile::RecallByGunid($plid); if (is_null($pl) || PEAR::isError($pl)) { return $pl; } $fname = $pl->getName(); if (PEAR::isError($fname)) { return $fname; } if ($withContent) { $this->trLog("upload2Hub exportPlaylistOpen BEGIN: ".strftime("%H:%M:%S")); $res = $this->gb->bsExportPlaylistOpen($plid); $this->trLog("upload2Hub exportPlaylistOpen END: ".strftime("%H:%M:%S")); if (PEAR::isError($res)) { return $res; } $tmpn = tempnam($CC_CONFIG['transDir'], 'plExport_'); $plfpath = "$tmpn.lspl"; $this->trLog("upload2Hub begin copy: ".strftime("%H:%M:%S")); copy($res['fname'], $plfpath); $this->trLog("upload2Hub end copy: ".strftime("%H:%M:%S")); $res = $this->gb->bsExportPlaylistClose($res['token']); if (PEAR::isError($res)) { return $res; } $fname = $fname.".lspl"; $trtype = 'playlistPkg'; } else { $plfpath = $pl->getRealMetadataFileName(); if (PEAR::isError($plfpath)) { return $plfpath; } $trtype = 'playlist'; } $trec = $this->_uploadGeneralFileToHub($plfpath, $trtype, array_merge(array('gunid'=>$plid,'fname'=>$fname,), $pars)); if (PEAR::isError($trec)) { return $trec; } break; default: return PEAR::raiseError("Transport::upload2Hub: ftype not supported ($ftype)"); } $this->startCronJobProcess($trec->trtok); $this->trLog("upload2Hub end: ".strftime("%H:%M:%S")); return $trec->trtok; } /** * Start download of audioClip/webstream/playlist from hub to local * storageServer * * @param int $uid * local user id of transport owner * (for downloading file to homedir in storage) * @param string $gunid * global unique id of object being transported * @param boolean $withContent * if true, transport playlist content too (optional) * @param array $pars * default parameters (optional, internal use) * @return string * transport token */ function downloadFromHub($uid, $gunid, $withContent=TRUE, $pars=array()) { $trtype = ($withContent ? 'playlistPkg' : 'unknown' ); $trec = TransportRecord::create($this, $trtype, 'down', array_merge(array('gunid'=>$gunid, 'uid'=>$uid), $pars)); if (PEAR::isError($trec)) { return $trec; } $this->startCronJobProcess($trec->trtok); return $trec->trtok; } /* ------------------------------------------------ remote-search methods */ /** * Start search job on remote Campcaster instance. * * @param array $criteria * LS criteria format (see localSearch) * @param string $resultMode * 'php' | 'xmlrpc' * @param array $pars * default parameters (optional, internal use) * @return string * transport token */ function remoteSearch($criteria, $resultMode='php') { global $CC_CONFIG, $CC_DBC; $criteria['resultMode'] = $resultMode; // testing of hub availability and hub account configuration. $sessid = $this->loginToArchive(); if (PEAR::isError($sessid)) { switch(intval($sessid->getCode())) { case 802: return PEAR::raiseError("Can't login to Hub ({$sessid->getMessage()})", TRERR_XR_FAIL); case TRERR_XR_FAIL: return PEAR::raiseError("Can't connect to Hub ({$sessid->getMessage()})", TRERR_XR_FAIL); } return $sessid; } $params = array("sessid" => $sessid, "criteria" => $criteria); $result = $this->xmlrpcCall("locstor.searchMetadata", $params); //$result = $this->xmlrpcCall("locstor.ping", array("par" => "foo")); $this->logoutFromArchive($sessid); return $result; } /** * Start search job on network hub * * @param array $criteria * LS criteria format (see localSearch) * @param string $resultMode * 'php' | 'xmlrpc' * @param array $pars * default parameters (optional, internal use) * @return string * transport token */ // function globalSearch($criteria, $resultMode='php', $pars=array()) // { // global $CC_CONFIG, $CC_DBC; // // testing of hub availability and hub account configuration. // // it makes searchjob not async - should be removed for real async // $r = $this->loginToArchive(); // if (PEAR::isError($r)) { // switch(intval($r->getCode())) { // case 802: // return PEAR::raiseError("Can't login to Hub ({$r->getMessage()})", TRERR_XR_FAIL); // case TRERR_XR_FAIL: // return PEAR::raiseError("Can't connect to Hub ({$r->getMessage()})", TRERR_XR_FAIL); // } // return $r; // } // $this->logoutFromArchive($r); // $criteria['resultMode'] = $resultMode; // $localfile = tempnam($CC_CONFIG['transDir'], 'searchjob_'); // @chmod($localfile, 0660); // $len = file_put_contents($localfile, serialize($criteria)); // $trec = $this->_uploadGeneralFileToHub($localfile, 'searchjob', $pars); // if (PEAR::isError($trec)) { // return $trec; // } // $this->startCronJobProcess($trec->trtok); // return $trec->trtok; // } /** * Get results from search job on network hub * * @param string $trtok * transport token * @param boolean $andClose * if TRUE, close transport token * @return array * LS search result format (see localSearch) */ // function getSearchResults($trtok, $andClose=TRUE) // { // $trec = TransportRecord::recall($this, $trtok); // if (PEAR::isError($trec)) { // return $trec; // } // $row = $trec->row; // switch ($st = $trec->getState()) { // case "failed": // return PEAR::raiseError( // "Transport::getSearchResults:". // " global search or results transport failed". // " ({$trec->row['errmsg']})" // ); // case "closed": ///* // $res = file_get_contents($row['localfile']); // $results = unserialize($res); // return $results; //*/ // return PEAR::raiseError( // "Transport::getSearchResults:". // " closed transport token ($trtok)", TRERR_TOK // ); // case "finished": // if ($row['direction'] == 'down') { // // really finished // $res = file_get_contents($row['localfile']); // $results = unserialize($res); // if ($andClose) { // $ret = $this->xmlrpcCall('archive.downloadClose', // array( // 'token' => $row['pdtoken'] , // 'trtype' => $row['trtype'] , // )); // if (PEAR::isError($ret)) { // return $ret; // } // @unlink($row['localfile']); // $r = $trec->close(); // if (PEAR::isError($r)) { // return $r; // } // } // return $results; // } // // otherwise not really finished - only request upload finished // default: // return PEAR::raiseError( // "Transport::getSearchResults: not finished ($st)", // TRERR_NOTFIN // ); // } // } /* ------------------------ methods for ls-archive-format file transports */ /** * Open async file transfer from local storageServer to network hub, * file should be ls-archive-format file. * * @param string $filePath * local path to uploaded file * @param array $pars * default parameters (optional, internal use) * @return string * transport token */ function uploadFile2Hub($filePath, $pars=array()) { if (!file_exists($filePath)) { return PEAR::raiseError( "Transport::uploadFile2Hub: file not found ($filePath)" ); } $trec = $this->_uploadGeneralFileToHub($filePath, 'file', $pars); if (PEAR::isError($trec)) { return $trec; } $this->startCronJobProcess($trec->trtok); return $trec->trtok; } /** * Open async file transfer from network hub to local storageServer, * file should be ls-archive-format file. * * @param string $url * readable url * @param string $chsum * checksum from remote side * @param int $size * filesize from remote side * @param array $pars * default parameters (internal use) * @return array * trtok: string - transport token * localfile: string - filepath of downloaded file */ function downloadFileFromHub($url, $chsum=NULL, $size=NULL, $pars=array()) { global $CC_CONFIG, $CC_DBC; $tmpn = tempnam($CC_CONFIG['transDir'], 'HITrans_'); $trec = TransportRecord::create($this, 'file', 'down', array_merge(array( 'url' => $url, 'localfile' => $tmpn, 'expectedsum' => $chsum, 'expectedsize' => $size, ), $pars) ); if (PEAR::isError($trec)) { return $trec; } $this->startCronJobProcess($trec->trtok); return array('trtok'=>$trec->trtok, 'localfile'=>$tmpn); } /** * Get list of prepared transfers initiated by hub * * @return array * array of structs/hasharrays with fields: * trtok: string transport token */ function getHubInitiatedTransfers() { $ret = $this->xmlrpcCall('archive.listHubInitiatedTransfers', array('target' => HOSTNAME)); if (PEAR::isError($ret)) { return $ret; } $res = array(); foreach ($ret as $it) { $res[] = array('trtok'=>$it['trtok']); } return $res; } /** * Start of download initiated by hub * * @param int $uid * local user id of transport owner * (for downloading file to homedir in storage) * @param string $rtrtok * transport token obtained from the getHubInitiatedTransfers method * @return string * transport token */ function startHubInitiatedTransfer($uid, $rtrtok) { $ret = $this->xmlrpcCall('archive.listHubInitiatedTransfers', array( 'target' => HOSTNAME, 'trtok' => $rtrtok, )); if (PEAR::isError($ret)) { return $ret; } if (count($ret) != 1) { return PEAR::raiseError( "Transport::startHubInitiatedTransfer:". " wrong number of transports (".count($ret).")" ); } $ta = $ret[0]; // direction invertation to locstor point of view: $direction = ( $ta['direction']=='up' ? 'down' : 'up' ); $gunid = $ta['gunid']; switch ($direction) { case "up": switch ($ta['trtype']) { case "audioclip": case "playlist": case "playlistPkg": $trtok = $this->upload2Hub($gunid, TRUE, array('rtrtok'=>$rtrtok)); if (PEAR::isError($trtok)) { return $trtok; } break; //case "searchjob": break; // not supported yet //case "file": break; // probably unusable default: return PEAR::raiseError( "Transport::startHubInitiatedTransfer:". " wrong direction / transport type combination". " ({$ta['direction']}/{$ta['trtype']})" ); } break; case "down": switch ($ta['trtype']) { case "audioclip": case "playlist": case "playlistPkg": $trtok = $this->downloadFromHub($uid, $gunid, TRUE, array('rtrtok'=>$rtrtok)); if (PEAR::isError($trtok)) { return $trtok; } break; //case "searchjob": break; // probably unusable case "file": $r = $this->downloadFileFromHub( $ta['url'], $ta['expectedsum'], $ta['expectedsize'], array('rtrtok'=>$rtrtok)); if (PEAR::isError($r)) { return $r; } extract($r); // trtok, localfile break; default: return PEAR::raiseError( "Transport::startHubInitiatedTransfer:". " wrong direction / transport type combination". " ({$ta['direction']}/{$ta['trtype']})" ); } break; default: return PEAR::raiseError( "Transport::startHubInitiatedTransfer: ???" ); } $ret = $this->xmlrpcCall('archive.setHubInitiatedTransfer', array( 'target' => HOSTNAME, 'trtok' => $rtrtok, 'state' => 'waiting', )); if (PEAR::isError($ret)) { return $ret; } $this->startCronJobProcess($trtok); return $trtok; } /* =============================================== authentication methods */ /** * Login to archive server * (account info is taken from storageServer's config) * * @return string * sessid or error */ function loginToArchive() { global $CC_CONFIG; $res = $this->xmlrpcCall('locstor.login', array( 'login' => $CC_CONFIG['archiveAccountLogin'], 'pass' => $CC_CONFIG['archiveAccountPass'] )); if (PEAR::isError($res)) { return $res; } return $res['sessid']; } /** * Logout from archive server * * @param unknown $sessid * session id * @return string * Bye or error */ function logoutFromArchive($sessid) { $res = $this->xmlrpcCall('locstor.logout', array('sessid'=>$sessid)); return $res; } /* ========================================================= cron methods */ /* -------------------------------------------------- common cron methods */ /** * Main method for periodical transport tasks - called by cron * * @param string $direction * optional * @return boolean * TRUE */ function cronMain($direction=NULL) { global $CC_CONFIG; if (is_null($direction)) { $r = $this->cronMain('up'); if (PEAR::isError($r)) { return $r; } $r = $this->cronMain('down'); if (PEAR::isError($r)) { return $r; } return TRUE; } // fetch all opened transports $transports = $this->getTransports($direction); if (PEAR::isError($transports)) { $this->trLog("cronMain: DB error"); return FALSE; } if (count($transports) == 0) { if (TR_LOG_LEVEL > 1) { $this->trLog("cronMain: $direction - nothing to do."); } return TRUE; } // ping to archive server: $r = $this->ping(); chdir($CC_CONFIG['transDir']); // for all opened transports: foreach ($transports as $i => $row) { $r = $this->startCronJobProcess($row['trtok']); } // foreach transports return TRUE; } /** * Cron job process starter * * @param string $trtok * transport token * @return boolean * status */ function startCronJobProcess($trtok) { global $CC_CONFIG, $CC_DBC; if (TR_LOG_LEVEL > 2) { $redirect = $CC_CONFIG['transDir']."/debug.log"; } else { $redirect = "/dev/null"; } $redirect_escaped = escapeshellcmd($redirect); $command = "{$this->cronJobScript} {$trtok}"; $command_escaped = escapeshellcmd($command); $command_final = "$command_escaped >> $redirect_escaped 2>&1 &"; $res = system($command_final, $status); if ($res === FALSE) { $this->trLog( "cronMain: Error on execute cronJobScript with trtok {$trtok}" ); return FALSE; } return TRUE; } /** * Dynamic method caller - wrapper * * @param string $trtok * transport token * @return mixed * inherited from called method */ function cronCallMethod($trtok) { global $CC_CONFIG; $trec = TransportRecord::recall($this, $trtok); if (PEAR::isError($trec)) { return $trec; } $row = $trec->row; $state = $row['state']; $states = array('init'=>'init', 'pending'=>'pending', 'waiting'=>'waiting', 'finished'=>'finished', 'failed'=>'failed', 'closed'=>'closed'); $directions = array('up'=>'upload', 'down'=>'download'); // method name construction: $mname = "cron"; if (isset($directions[$row['direction']])) { $mname .= ucfirst($directions[$row['direction']]); } else { return PEAR::raiseError( "Transport::cronCallMethod: invalid direction ({$row['direction']})" ); } if (isset($states[$state])) { $mname .= ucfirst($states[$state]); } else { return PEAR::raiseError( "Transport::cronCallMethod: invalid state ({$state})" ); } switch ($state) { // do nothing if closed, penfing or failed: case 'closed': // excluded in SQL query too, but let check it here case 'failed': // -"- case 'pending': case 'paused': return TRUE; case 'waiting': require_once('Prefs.php'); $pr = new Prefs($this->gb); $group = $CC_CONFIG['StationPrefsGr']; $key = 'TransportsDenied'; $res = $pr->loadGroupPref($group, $key); if (PEAR::isError($res)) { if ($res->getCode() !== GBERR_PREF) { return $res; } else { $res = FALSE; // default } } // transfers turned off // if ($res) { return TRUE; break; } if ($res) { return PEAR::raiseError( "Transport::cronCallMethod: transfers turned off" ); } // NO break here! default: if (method_exists($this, $mname)) { // lock the job: $pid = getmypid(); $r = $trec->setLock(TRUE, $pid); if (PEAR::isError($r)) { return $r; } $trec = TransportRecord::recall($this, $trtok); if (PEAR::isError($trec)) { $trec->setLock(FALSE); return $trec; } $row = $trec->row; $state = $row['state']; // login to archive server: $r = $this->loginToArchive(); if (PEAR::isError($r)) { $r2 = $trec->setLock(FALSE); return $r; } $asessid = $r; // method call: if (TR_LOG_LEVEL > 2) { $this->trLog("cronCallMethod($pid): $mname($trtok) >"); } $ret = call_user_func(array($this, $mname), $row, $asessid); if (PEAR::isError($ret)) { $trec->setLock(FALSE); return $this->_failFatal($ret, $trec); } if (TR_LOG_LEVEL > 2) { $this->trLog("cronCallMethod($pid): $mname($trtok) <"); } // unlock the job: $r = $trec->setLock(FALSE); if (PEAR::isError($r)) { return $r; } // logout: $r = $this->logoutFromArchive($asessid); if (PEAR::isError($r)) { return $r; } return $ret; } else { return PEAR::raiseError( "Transport::cronCallMethod: unknown method ($mname)" ); } } } /** * Upload initialization * * @param array $row * row from getTransport results * @param string $asessid * session id (from network hub) * @return mixed * boolean TRUE or error object */ function cronUploadInit($row, $asessid) { $trtok = $row['trtok']; $trec = TransportRecord::recall($this, $trtok); if (PEAR::isError($trec)) { return $trec; } $ret = $this->xmlrpcCall('archive.uploadOpen', array( 'sessid' => $asessid , 'chsum' => $row['expectedsum'], )); if (PEAR::isError($ret)) { return $ret; } $r = $trec->setState('waiting', array('url'=>$ret['url'], 'pdtoken'=>$ret['token'])); if (PEAR::isError($r)) { return $r; } return TRUE; } /** * Download initialization * * @param array $row * row from getTransport results * @param string $asessid * session id (from network hub) * @return mixed * boolean TRUE or error object */ function cronDownloadInit($row, $asessid) { global $CC_CONFIG; $trtok = $row['trtok']; $trec = TransportRecord::recall($this, $trtok); if (PEAR::isError($trec)) { return $trec; } $ret = $this->xmlrpcCall('archive.downloadOpen', array( 'sessid'=> $asessid, 'trtype'=> $row['trtype'], 'pars'=>array( 'gunid' => $row['gunid'], 'token' => $row['pdtoken'], ), )); if (PEAR::isError($ret)) { return $ret; } $trtype = $ret['trtype']; $title = $ret['title']; $pars = array(); switch ($trtype) { // case "searchjob": // $r = $trec->setState('waiting', $pars); // break; case "file": $r = $trec->setState('waiting',array_merge($pars, array( 'trtype'=>$trtype, 'url'=>$ret['url'], 'pdtoken'=>$ret['token'], 'expectedsum'=>$ret['chsum'], 'expectedsize'=>$ret['size'], 'fname'=>$ret['filename'], 'localfile'=>$CC_CONFIG['transDir']."/$trtok", ))); break; case "audioclip": $mdtrec = TransportRecord::create($this, 'metadata', 'down', array('gunid'=>$row['gunid'], 'uid'=>$row['uid'], ) ); if (PEAR::isError($mdtrec)) { return $mdtrec; } $this->startCronJobProcess($mdtrec->trtok); $pars = array('mdtrtok'=>$mdtrec->trtok); // NO break here ! default: $r = $trec->setState('waiting',array_merge($pars, array( 'trtype'=>$trtype, 'url'=>$ret['url'], 'pdtoken'=>$ret['token'], 'expectedsum'=>$ret['chsum'], 'expectedsize'=>$ret['size'], 'fname'=>$ret['filename'], 'title'=>$title, 'localfile'=>$CC_CONFIG['transDir']."/$trtok", ))); } if (PEAR::isError($r)) { return $r; } return TRUE; } /** * Upload next part of transported file * * @param array $row * row from getTransport results * @param string $asessid * session id (from network hub) * @return mixed * boolean TRUE or error object */ function cronUploadWaiting($row, $asessid) { $trtok = $row['trtok']; $check = $this->uploadCheck($row['pdtoken']); if (PEAR::isError($check)) { return $check; } // test filesize if (!file_exists($row['localfile'])) { return PEAR::raiseError("Transport::cronUploadWaiting:". " file being uploaded does not exist! ({$row['localfile']})" ); } $trec = TransportRecord::recall($this, $trtok); if (PEAR::isError($trec)) { return $trec; } $size = escapeshellarg($check['size']); $localfile = escapeshellarg($row['localfile']); $url = escapeshellarg($row['url']); $command = "curl -f -s -C $size --max-time {$this->upTrMaxTime}". " --speed-time {$this->upTrSpeedTime}". " --speed-limit {$this->upTrSpeedLimit}". " --connect-timeout {$this->upTrConnectTimeout}". (!is_null($this->upLimitRate)? " --limit-rate {$this->upLimitRate}" : ""). " -T $localfile $url"; $r = $trec->setState('pending', array(), 'waiting'); if (PEAR::isError($r)) { return $r; } if ($r === FALSE) { return TRUE; } $res = system($command, $status); // leave paused and closed transports $trec2 = TransportRecord::recall($this, $trtok); if (PEAR::isError($trec)) { return $trec; } $state2 = $trec2->row['state']; if ($state2 == 'paused' || $state2 == 'closed' ) { return TRUE; } // status 18 - Partial file. Only a part of the file was transported. // status 28 - Timeout. Too long/slow upload, try to resume next time rather. // status 6 - Couldn't resolve host. // status 7 - Failed to connect to host. // status 56 - Failure in receiving network data. Important - this status is // returned if file is locked on server side if ($status == 0 || $status == 18 || $status == 28 || $status == 6 || $status == 7 || $status == 56) { $check = $this->uploadCheck($row['pdtoken']); if (PEAR::isError($check)) { return $check; } // test checksum if ($check['status'] == TRUE) { // finished $r = $trec->setState('finished', array('realsum'=>$check['realsum'], 'realsize'=>$check['size'])); if (PEAR::isError($r)) { return $r; } } else { if (intval($check['size']) < $row['expectedsize']) { $r = $trec->setState('waiting', array('realsum'=>$check['realsum'], 'realsize'=>$check['size'])); if (PEAR::isError($r)) { return $r; } } else { // wrong md5 at finish - TODO: start again // $this->xmlrpcCall('archive.uploadReset', array()); $trec->fail('file uploaded with bad md5'); return PEAR::raiseError("Transport::cronUploadWaiting:". " file uploaded with bad md5 ". "($trtok: {$check['realsum']}/{$check['expectedsum']})" ); } } } else { return PEAR::raiseError("Transport::cronUploadWaiting:". " wrong return status from curl: $status on $url". "($trtok)" ); } return TRUE; } /** * Download next part of transported file * * @param array $row * row from getTransport results * @param string $asessid * session id (from network hub) * @return mixed * boolean TRUE or error object */ function cronDownloadWaiting($row, $asessid) { $trtok = $row['trtok']; // wget the file $trec = TransportRecord::recall($this, $trtok); if (PEAR::isError($trec)) { return $trec; } $localfile = escapeshellarg($row['localfile']); $url = escapeshellarg($row['url']); $command = "wget -q -c". " --read-timeout={$this->downTimeout}". " --waitretry={$this->downWaitretry}". " -t {$this->downRetries}". (!is_null($this->downLimitRate)? " --limit-rate={$this->downLimitRate}" : ""). " -O $localfile $url" ; $r = $trec->setState('pending', array(), 'waiting'); if (PEAR::isError($r)) { return $r; } if ($r === FALSE) { return TRUE; } $res = system($command, $status); // leave paused and closed transports $trec2 = TransportRecord::recall($this, $trtok); if (PEAR::isError($trec)) { return $trec; } $state2 = $trec2->row['state']; if ($state2 == 'paused' || $state2 == 'closed' ) { return TRUE; } // check consistency $size = filesize($row['localfile']); if ($size < $row['expectedsize']) { // not finished - return to the 'waiting' state $r = $trec->setState('waiting', array('realsize'=>$size)); if (PEAR::isError($r)) { return $r; } } elseif ($size >= $row['expectedsize']) { $chsum = $this->_chsum($row['localfile']); if ($chsum == $row['expectedsum']) { // mark download as finished $r = $trec->setState('finished', array('realsum'=>$chsum, 'realsize'=>$size)); if (PEAR::isError($r)) { return $r; } } else { // bad checksum, retry from the scratch @unlink($row['localfile']); $r = $trec->setState('waiting', array('realsum'=>$chsum, 'realsize'=>$size)); if (PEAR::isError($r)) { return $r; } } } return TRUE; } /** * Finish the upload * * @param array $row * row from getTransport results * @param string $asessid * session id (from network hub) * @return mixed * boolean TRUE or error object */ function cronUploadFinished($row, $asessid) { global $CC_CONFIG; $trtok = $row['trtok']; $trec = TransportRecord::recall($this, $trtok); if (PEAR::isError($trec)) { return $trec; } // don't close metadata transport - audioclip will close it if ($row['trtype'] == 'metadata') { return TRUE; } // handle metadata transport on audioclip trtype: if ($row['trtype'] == 'audioclip') { $mdtrec = TransportRecord::recall($this, $trec->row['mdtrtok']); if (PEAR::isError($mdtrec)) { return $mdtrec; } switch ($mdtrec->row['state']) { case 'failed': case 'closed': return PEAR::raiseError("Transport::cronUploadFinished:". " metadata transport in wrong state: {$mdtrec->row['state']}". " ({$this->trtok})" ); break; // don't close transport with nonfinished metadata transport: case 'init': case 'waiting': case 'pending': case 'paused': return TRUE; default: // finished - ok close parent transport $mdpdtoken = $mdtrec->row['pdtoken']; } } else { $mdpdtoken = NULL; } $ret = $this->xmlrpcCall('archive.uploadClose', array( 'token' => $row['pdtoken'] , 'trtype' => $row['trtype'], 'pars' => array( 'gunid' => $row['gunid'], 'name' => $row['fname'], 'mdpdtoken' => $mdpdtoken, ), )); if (PEAR::isError($ret)) { if ($row['trtype'] == 'audioclip') { $r2 = $mdtrec->close(); } return $ret; } // if ($row['trtype'] == 'searchjob') { // @unlink($row['localfile']); // $r = $trec->setState('init', array( // 'direction' => 'down', // 'pdtoken' => $ret['token'], // 'expectedsum' => $ret['chsum'], // 'expectedsize' => $ret['size'], // 'url' => $ret['url'], // 'realsize' => 0, // )); // $this->startCronJobProcess($trec->trtok); // } else { $r = $trec->close(); // } if (PEAR::isError($r)) { return $r; } switch ($row['trtype']) { case 'audioclip': // close metadata transport: $r = $mdtrec->close(); if (PEAR::isError($r)) { return $r; } break; case 'playlistPkg': // remove exported playlist (playlist with content) $ep = $row['localfile']; @unlink($ep); if (preg_match("|/(plExport_[^\.]+)\.lspl$|", $ep, $va)) { list(,$tmpn) = $va; $tmpn = $CC_CONFIG['transDir']."/$tmpn"; if (file_exists($tmpn)) { @unlink($tmpn); } } break; default: } return TRUE; } /** * Finish the download * * @param array $row * row from getTransport results * @param string $asessid * session id (from network hub) * @return mixed * boolean TRUE or error object */ function cronDownloadFinished($row, $asessid) { $trtok = $row['trtok']; $trec = TransportRecord::recall($this, $trtok); if (PEAR::isError($trec)) { return $trec; } switch ($row['trtype']) { case "audioclip": $mdtrtok = $trec->row['mdtrtok']; $mdtrec = TransportRecord::recall($this, $mdtrtok); if (PEAR::isError($mdtrec)) { return $mdtrec; } $pid = getmypid(); $r = $mdtrec->setLock(TRUE, $pid); if (PEAR::isError($r)) { return $r; } switch ($mdtrec->row['state']) { // don't close transport with nonfinished metadata transport: case 'init': case 'waiting': case 'pending': case 'paused': $r = $mdtrec->setLock(FALSE); if (PEAR::isError($r)) { return $r; } return TRUE; case 'finished': // metadata finished, close main transport $values = array( "filename" => $row['fname'], "filepath" => $trec->row['localfile'], "metadata" => $mdtrec->row['localfile'], "gunid" => $row['gunid'], "filetype" => "audioclip" ); $storedFile = $this->gb->bsPutFile($values); if (PEAR::isError($storedFile)) { $mdtrec->setLock(FALSE); return $storedFile; } $res = $storedFile->getId(); $ret = $this->xmlrpcCall('archive.downloadClose', array( 'token' => $mdtrec->row['pdtoken'] , 'trtype' => 'metadata' , )); if (PEAR::isError($ret)) { $mdtrec->setLock(FALSE); return $ret; } $r = $mdtrec->close(); if (PEAR::isError($r)) { $r2 = $mdtrec->setLock(FALSE); return $r; } @unlink($trec->row['localfile']); @unlink($mdtrec->row['localfile']); break; default: $r = $mdtrec->setLock(FALSE); return PEAR::raiseError("Transport::cronDownloadFinished:". " metadata transport in wrong state: {$mdtrec->row['state']}". " ({$this->trtok})" ); } $r = $mdtrec->setLock(FALSE); if (PEAR::isError($r)) { return $r; } break; case "metadata": // case "searchjob": return TRUE; // don't close - getSearchResults should close it break; } $ret = $this->xmlrpcCall('archive.downloadClose', array( 'token' => $row['pdtoken'] , 'trtype' => $row['trtype'] , )); if (PEAR::isError($ret)) { return $ret; } switch ($row['trtype']) { case "playlist": $values = array( "filename" => $row['fname'], "metadata" => $trec->row['localfile'], "gunid" => $row['gunid'], "filetype" => "playlist" ); $storedFile = $this->gb->bsPutFile($values); if (PEAR::isError($storedFile)) { return $storedFile; } $res = $storedFile->getId(); @unlink($row['localfile']); break; case "playlistPkg": $subjid = $trec->row['uid']; $fname = $trec->row['localfile']; $res = $this->gb->bsImportPlaylist($fname, $subjid); if (PEAR::isError($res)) { return $res; } @unlink($fname); break; case "audioclip": case "metadata": // case "searchjob": case "file": break; default: return PEAR::raiseError("DEBUG: NotImpl ".var_export($row,TRUE)); } if (!is_null($rtrtok = $trec->row['rtrtok'])) { $ret = $this->xmlrpcCall('archive.setHubInitiatedTransfer', array( 'target' => HOSTNAME, 'trtok' => $rtrtok, 'state' => 'closed', )); if (PEAR::isError($ret)) { return $ret; } } $r = $trec->close(); if (PEAR::isError($r)) { return $r; } return TRUE; } /* ==================================================== auxiliary methods */ /** * Prepare upload for general file * * @param string $fpath * local filepath of uploaded file * @param string $trtype * transport type * @param array $pars * default parameters (optional, internal use) * @return object - transportRecord instance */ function _uploadGeneralFileToHub($fpath, $trtype, $pars=array()) { $chsum = $this->_chsum($fpath); $size = filesize($fpath); $trec = TransportRecord::create($this, $trtype, 'up', array_merge(array( 'localfile'=>$fpath, 'fname'=>basename($fpath), 'expectedsum'=>$chsum, 'expectedsize'=>$size ), $pars) ); if (PEAR::isError($trec)) { return $trec; } return $trec; } /** * Create new transport token * * @return string * transport token */ function _createTransportToken() { $ip = (isset($_SERVER['SERVER_ADDR']) ? $_SERVER['SERVER_ADDR'] : ''); $initString = microtime().$ip.rand()."org.mdlf.campcaster"; $hash = md5($initString); $res = substr($hash, 0, 16); return $res; } /** * Get all relevant transport records * * @param string $direction * 'up' | 'down' * @param string $target * target hostname * @param string $trtok * transport token for specific query * @return array * array of transportRecords (as hasharrays) */ function getTransports($direction=NULL, $target=NULL, $trtok=NULL) { global $CC_CONFIG, $CC_DBC; switch ($direction) { case 'up': $dirCond = "direction='up' AND"; break; case 'down': $dirCond = "direction='down' AND"; break; default: $dirCond = ''; break; } if (is_null($target)) { $targetCond = ""; } else { $targetCond = "target='$target' AND"; } if (is_null($trtok)) { $trtokCond = ""; } else { $trtokCond = "trtok='$trtok' AND"; } $rows = $CC_DBC->getAll(" SELECT id, trtok, state, trtype, direction, to_hex(gunid)as gunid, to_hex(pdtoken)as pdtoken, fname, localfile, expectedsum, expectedsize, url, uid, target FROM ".$CC_CONFIG['transTable']." WHERE $dirCond $targetCond $trtokCond state not in ('closed', 'failed', 'paused') ORDER BY start DESC "); if (PEAR::isError($rows)) { return $rows; } foreach ($rows as $i => $row) { $rows[$i]['pdtoken'] = StoredFile::NormalizeGunid($row['pdtoken']); $rows[$i]['gunid'] = StoredFile::NormalizeGunid($row['gunid']); } return $rows; } /** * Check remote state of uploaded file * * @param string $pdtoken * put/download token (from network hub) * @return array * hash: chsum, size, url */ function uploadCheck($pdtoken) { $ret = $this->xmlrpcCall('archive.uploadCheck', array('token'=>$pdtoken)); return $ret; } /** * Ping to remote Campcaster server * * @return string * network hub response or error object */ function ping() { $res = $this->xmlrpcCall('ping', array('par'=>'ping_'.date('H:i:s'))); return $res; } /** * XMLRPC call to network hub. * * @param string $method * method name * @param array $pars * call parameters * @return mixed * response */ function xmlrpcCall($method, $pars=array()) { global $CC_CONFIG; $xrp = XML_RPC_encode($pars); $pr = new Prefs($this->gb); $group = $CC_CONFIG["StationPrefsGr"]; $key = 'archiveServerLocation'; $archiveUrl = $pr->loadGroupPref($group, $key, false); if ($archiveUrl) { $archiveUrlInfo = parse_url($archiveUrl); if ($archiveUrlInfo['port']) { $port = $archiveUrlInfo['port']; } else { $port = 80; } $c = new XML_RPC_Client($archiveUrlInfo['path'], $archiveUrlInfo['host'], $port); } else { $c = new XML_RPC_Client( $CC_CONFIG['archiveUrlPath']."/".$CC_CONFIG['archiveXMLRPC'], $CC_CONFIG['archiveUrlHost'], $CC_CONFIG['archiveUrlPort'] ); } $f = new XML_RPC_Message($method, array($xrp)); $r = $c->send($f); if (!$r) { return PEAR::raiseError("XML-RPC request failed", TRERR_XR_FAIL); } elseif ($r->faultCode() > 0) { return PEAR::raiseError($r->faultString(), $r->faultCode()); // return PEAR::raiseError($r->faultString(). // " (code ".$r->faultCode().")", TRERR_XR_FAIL); } else { $v = $r->value(); return XML_RPC_decode($v); } } /** * Checksum of local file * * @param string $fpath * local filepath * @return string * checksum */ function _chsum($fpath) { return md5_file($fpath); } /** * Check exception and eventually mark transport as failed * * @param mixed $res * result object to be checked * @param unknown $trec * transport record object * @return unknown */ function _failFatal($res, $trec) { if (PEAR::isError($res)) { switch ($res->getCode()) { // non fatal: case TRERR_XR_FAIL: break; // fatal: default: $trec->fail('', $res); } } return $res; } /** * Clean up transport jobs * * @param string $interval * psql time interval - older closed jobs will be deleted * @param boolean $forced * if true, delete non-closed jobs too * @return boolean true or error */ function _cleanUp($interval='1 minute'/*'1 hour'*/, $forced=FALSE) { global $CC_CONFIG, $CC_DBC; $cond = ($forced ? '' : " AND state='closed' AND lock = 'N'"); $r = $CC_DBC->query(" DELETE FROM ".$CC_CONFIG['transTable']." WHERE ts < now() - interval '$interval'".$cond ); if (PEAR::isError($r)) { return $r; } return TRUE; } /** * Logging wrapper for PEAR error object * * @param string $txt * log message * @param PEAR_Error $eo * @param array $row * array returned from getRow * @return mixed * void or error object */ function trLogPear($txt, $eo, $row=NULL) { $msg = $txt.$eo->getMessage()." ".$eo->getUserInfo(). " [".$eo->getCode()."]"; if (!is_null($row)) { $trec = TransportRecord::recall($this, $row['trtok']); if (!PEAR::isError($trec)) { $trec->setState('failed', array('errmsg'=>$msg)); } $msg .= "\n ".serialize($row); } $this->trLog($msg); } /** * Logging for debug transports * * @param string $msg * log message * @return mixed * void or error object */ function trLog($msg) { global $CC_CONFIG; $logfile = $CC_CONFIG['transDir']."/activity.log"; if (FALSE === ($fp = fopen($logfile, "a"))) { return PEAR::raiseError( "Transport::trLog: Can't write to log ($logfile)" ); } flock($fp,LOCK_SH); fputs($fp, "---".date("H:i:s")."---\n $msg\n"); flock($fp,LOCK_UN); fclose($fp); } /* ====================================================== install methods */ /** * Delete all transports * * @return mixed * void or error object */ function resetData() { global $CC_CONFIG, $CC_DBC; return $CC_DBC->query("DELETE FROM ".$CC_CONFIG['transTable']); } } ?>