libretime/airtime_mvc/library/php-amqplib/amqp.inc

1573 lines
45 KiB
PHP

<?php
/**
* Simple AMQP client library for AMQP for protocol version 0.8
*
* http://code.google.com/p/php-amqplib/
* Vadim Zaliva <lord@crocodile.org>
*
*/
require_once('amqp_wire.inc');
require_once('hexdump.inc');
function debug_msg($s)
{
echo $s, "\n";
}
function methodSig($a)
{
if(is_string($a))
return $a;
else
return sprintf("%d,%d",$a[0] ,$a[1]);
}
class AMQPException extends Exception
{
public function __construct($reply_code, $reply_text, $method_sig)
{
parent::__construct($reply_text,$reply_code);
$this->amqp_reply_code = $reply_code; // redundant, but kept for BC
$this->amqp_reply_text = $reply_text; // redundant, but kept for BC
$this->amqp_method_sig = $method_sig;
$ms=methodSig($method_sig);
if(array_key_exists($ms, AbstractChannel::$GLOBAL_METHOD_NAMES))
$mn = AbstractChannel::$GLOBAL_METHOD_NAMES[$ms];
else
$mn = "";
$this->args = array(
$reply_code,
$reply_text,
$method_sig,
$mn
);
}
}
class AMQPConnectionException extends AMQPException
{
public function __construct($reply_code, $reply_text, $method_sig)
{
parent::__construct($reply_code, $reply_text, $method_sig);
}
}
class AMQPChannelException extends AMQPException
{
public function __construct($reply_code, $reply_text, $method_sig)
{
parent::__construct($reply_code, $reply_text, $method_sig);
}
}
class AbstractChannel
{
private static $CONTENT_METHODS = array(
"60,60", // Basic.deliver
"60,71", // Basic.get_ok
);
private static $CLOSE_METHODS = array(
"10,60", // Connection.close
"20,40", // Channel.close
);
// All the method names
public static $GLOBAL_METHOD_NAMES = array(
"10,10" => "Connection.start",
"10,11" => "Connection.start_ok",
"10,20" => "Connection.secure",
"10,21" => "Connection.secure_ok",
"10,30" => "Connection.tune",
"10,31" => "Connection.tune_ok",
"10,40" => "Connection.open",
"10,41" => "Connection.open_ok",
"10,50" => "Connection.redirect",
"10,60" => "Connection.close",
"10,61" => "Connection.close_ok",
"20,10" => "Channel.open",
"20,11" => "Channel.open_ok",
"20,20" => "Channel.flow",
"20,21" => "Channel.flow_ok",
"20,30" => "Channel.alert",
"20,40" => "Channel.close",
"20,41" => "Channel.close_ok",
"30,10" => "Channel.access_request",
"30,11" => "Channel.access_request_ok",
"40,10" => "Channel.exchange_declare",
"40,11" => "Channel.exchange_declare_ok",
"40,20" => "Channel.exchange_delete",
"40,21" => "Channel.exchange_delete_ok",
"50,10" => "Channel.queue_declare",
"50,11" => "Channel.queue_declare_ok",
"50,20" => "Channel.queue_bind",
"50,21" => "Channel.queue_bind_ok",
"50,30" => "Channel.queue_purge",
"50,31" => "Channel.queue_purge_ok",
"50,40" => "Channel.queue_delete",
"50,41" => "Channel.queue_delete_ok",
"50,50" => "Channel.queue_unbind",
"50,51" => "Channel.queue_unbind_ok",
"60,10" => "Channel.basic_qos",
"60,11" => "Channel.basic_qos_ok",
"60,20" => "Channel.basic_consume",
"60,21" => "Channel.basic_consume_ok",
"60,30" => "Channel.basic_cancel",
"60,31" => "Channel.basic_cancel_ok",
"60,40" => "Channel.basic_publish",
"60,50" => "Channel.basic_return",
"60,60" => "Channel.basic_deliver",
"60,70" => "Channel.basic_get",
"60,71" => "Channel.basic_get_ok",
"60,72" => "Channel.basic_get_empty",
"60,80" => "Channel.basic_ack",
"60,90" => "Channel.basic_reject",
"60,100" => "Channel.basic_recover",
"90,10" => "Channel.tx_select",
"90,11" => "Channel.tx_select_ok",
"90,20" => "Channel.tx_commit",
"90,21" => "Channel.tx_commit_ok",
"90,30" => "Channel.tx_rollback",
"90,31" => "Channel.tx_rollback_ok"
);
protected $debug;
public function __construct($connection, $channel_id)
{
$this->connection = $connection;
$this->channel_id = $channel_id;
$connection->channels[$channel_id] = $this;
$this->frame_queue = array(); // Lower level queue for frames
$this->method_queue = array(); // Higher level queue for methods
$this->auto_decode = false;
$this->debug = defined('AMQP_DEBUG') ? AMQP_DEBUG : false;
}
public function getChannelId()
{
return $this->channel_id;
}
function dispatch($method_sig, $args, $content)
{
if(!array_key_exists($method_sig, $this->method_map))
throw new Exception("Unknown AMQP method $method_sig");
$amqp_method = $this->method_map[$method_sig];
if($content == NULL)
return call_user_func(array($this,$amqp_method), $args);
else
return call_user_func(array($this,$amqp_method), $args, $content);
}
function next_frame()
{
if($this->debug)
{
debug_msg("waiting for a new frame");
}
if($this->frame_queue != NULL)
return array_pop($this->frame_queue);
return $this->connection->wait_channel($this->channel_id);
}
protected function send_method_frame($method_sig, $args="")
{
$this->connection->send_channel_method_frame($this->channel_id, $method_sig, $args);
}
function wait_content()
{
$frm = $this->next_frame();
$frame_type = $frm[0];
$payload = $frm[1];
if($frame_type != 2)
throw new Exception("Expecting Content header");
$payload_reader = new AMQPReader(substr($payload,0,12));
$class_id = $payload_reader->read_short();
$weight = $payload_reader->read_short();
$body_size = $payload_reader->read_longlong();
$msg = new AMQPMessage();
$msg->load_properties(substr($payload,12));
$body_parts = array();
$body_received = 0;
while(bccomp($body_size,$body_received)==1)
{
$frm = $this->next_frame();
$frame_type = $frm[0];
$payload = $frm[1];
if($frame_type != 3)
throw new Exception("Expecting Content body, received frame type $frame_type");
$body_parts[] = $payload;
$body_received = bcadd($body_received, strlen($payload));
}
$msg->body = implode("",$body_parts);
if($this->auto_decode and isset($msg->content_encoding))
{
try
{
$msg->body = $msg->body->decode($msg->content_encoding);
} catch (Exception $e) {
if($this->debug)
{
debug_msg("Ignoring body decoding exception: " . $e->getMessage());
}
}
}
return $msg;
}
/**
* Wait for some expected AMQP methods and dispatch to them.
* Unexpected methods are queued up for later calls to this Python
* method.
*/
public function wait($allowed_methods=NULL)
{
if($allowed_methods)
{
if($this->debug)
{
debug_msg("waiting for " . implode(", ", $allowed_methods));
}
}
else
{
if($this->debug)
{
debug_msg("waiting for any method");
}
}
//Process deferred methods
foreach($this->method_queue as $qk=>$queued_method)
{
if($this->debug)
{
debug_msg("checking queue method " . $qk);
}
$method_sig = $queued_method[0];
if($allowed_methods==NULL || in_array($method_sig, $allowed_methods))
{
unset($this->method_queue[$qk]);
if($this->debug)
{
debug_msg("Executing queued method: $method_sig: " . AbstractChannel::$GLOBAL_METHOD_NAMES[methodSig($method_sig)]);
}
return $this->dispatch($queued_method[0],
$queued_method[1],
$queued_method[2]);
}
}
// No deferred methods? wait for new ones
while(true)
{
$frm = $this->next_frame();
$frame_type = $frm[0];
$payload = $frm[1];
if($frame_type != 1)
throw new Exception("Expecting AMQP method, received frame type: $frame_type");
if(strlen($payload) < 4)
throw new Exception("Method frame too short");
$method_sig_array = unpack("n2", substr($payload,0,4));
$method_sig = "" . $method_sig_array[1] . "," . $method_sig_array[2];
$args = new AMQPReader(substr($payload,4));
if($this->debug)
{
debug_msg("> $method_sig: " . AbstractChannel::$GLOBAL_METHOD_NAMES[methodSig($method_sig)]);
}
if(in_array($method_sig, AbstractChannel::$CONTENT_METHODS))
$content = $this->wait_content();
else
$content = NULL;
if($allowed_methods==NULL ||
in_array($method_sig,$allowed_methods) ||
in_array($method_sig,AbstractChannel::$CLOSE_METHODS))
{
return $this->dispatch($method_sig, $args, $content);
}
// Wasn't what we were looking for? save it for later
if($this->debug)
{
debug_msg("Queueing for later: $method_sig: " . AbstractChannel::$GLOBAL_METHOD_NAMES[methodSig($method_sig)]);
}
array_push($this->method_queue,array($method_sig, $args, $content));
}
}
}
class AMQPConnection extends AbstractChannel
{
public static $AMQP_PROTOCOL_HEADER = "AMQP\x01\x01\x09\x01";
public static $LIBRARY_PROPERTIES = array(
"library" => array('S', "PHP Simple AMQP lib"),
"library_version" => array('S', "0.1")
);
protected $method_map = array(
"10,10" => "start",
"10,20" => "secure",
"10,30" => "tune",
"10,41" => "open_ok",
"10,50" => "redirect",
"10,60" => "_close",
"10,61" => "close_ok"
);
public function __construct($host, $port,
$user, $password,
$vhost="/",$insist=false,
$login_method="AMQPLAIN",
$login_response=NULL,
$locale="en_US",
$connection_timeout = 3,
$read_write_timeout = 3)
{
if($user && $password)
{
$login_response = new AMQPWriter();
$login_response->write_table(array("LOGIN" => array('S',$user),
"PASSWORD" => array('S',$password)));
$login_response = substr($login_response->getvalue(),4); //Skip the length
} else
$login_response = NULL;
$d = AMQPConnection::$LIBRARY_PROPERTIES;
while(true)
{
$this->channels = array();
// The connection object itself is treated as channel 0
parent::__construct($this, 0);
$this->channel_max = 65535;
$this->frame_max = 131072;
$errstr = $errno = NULL;
$this->sock = NULL;
if (!($this->sock = fsockopen($host,$port,$errno,$errstr,$connection_timeout)))
{
throw new Exception ("Error Connecting to server($errno): $errstr ");
}
stream_set_timeout($this->sock, $read_write_timeout);
stream_set_blocking($this->sock, 1);
$this->input = new AMQPReader(null, $this->sock);
$this->write(AMQPConnection::$AMQP_PROTOCOL_HEADER);
$this->wait(array("10,10"));
$this->x_start_ok($d, $login_method, $login_response, $locale);
$this->wait_tune_ok = true;
while($this->wait_tune_ok)
{
$this->wait(array(
"10,20", // secure
"10,30", // tune
));
}
$host = $this->x_open($vhost,"", $insist);
if(!$host)
return; // we weren't redirected
// we were redirected, close the socket, loop and try again
if($this->debug)
{
debug_msg("closing socket");
}
@fclose($this->sock); $this->sock=NULL;
}
}
public function __destruct()
{
if(isset($this->input))
if($this->input)
$this->close();
if($this->sock)
{
if($this->debug)
{
debug_msg("closing socket");
}
@fclose($this->sock);
}
}
protected function write($data)
{
if($this->debug)
{
debug_msg("< [hex]:\n" . hexdump($data, $htmloutput = false, $uppercase = true, $return = true));
}
$len = strlen($data);
while(true)
{
$written = fwrite($this->sock, $data);
if($written == false || $written <= 0)
{
throw new Exception ("Error sending data");
}
$len = $len - $written;
if($len>0)
$data=substr($data,0-$len);
else
break;
}
}
protected function do_close()
{
if(isset($this->input))
if($this->input)
{
$this->input->close();
$this->input = NULL;
}
if($this->sock)
{
if($this->debug)
{
debug_msg("closing socket");
}
@fclose($this->sock);
$this->sock = NULL;
}
}
public function get_free_channel_id()
{
for($i=1;$i<=$this->channel_max;$i++)
if(!array_key_exists($i,$this->channels))
return $i;
throw new Exception("No free channel ids");
}
public function send_content($channel, $class_id, $weight, $body_size,
$packed_properties, $body)
{
$pkt = new AMQPWriter();
$pkt->write_octet(2);
$pkt->write_short($channel);
$pkt->write_long(strlen($packed_properties)+12);
$pkt->write_short($class_id);
$pkt->write_short($weight);
$pkt->write_longlong($body_size);
$pkt->write($packed_properties);
$pkt->write_octet(0xCE);
$pkt = $pkt->getvalue();
$this->write($pkt);
while($body)
{
$payload = substr($body,0, $this->frame_max-8);
$body = substr($body,$this->frame_max-8);
$pkt = new AMQPWriter();
$pkt->write_octet(3);
$pkt->write_short($channel);
$pkt->write_long(strlen($payload));
$pkt->write($payload);
$pkt->write_octet(0xCE);
$pkt = $pkt->getvalue();
$this->write($pkt);
}
}
protected function send_channel_method_frame($channel, $method_sig, $args="")
{
if($args instanceof AMQPWriter)
$args = $args->getvalue();
$pkt = new AMQPWriter();
$pkt->write_octet(1);
$pkt->write_short($channel);
$pkt->write_long(strlen($args)+4); // 4 = length of class_id and method_id
// in payload
$pkt->write_short($method_sig[0]); // class_id
$pkt->write_short($method_sig[1]); // method_id
$pkt->write($args);
$pkt->write_octet(0xCE);
$pkt = $pkt->getvalue();
$this->write($pkt);
if($this->debug)
{
debug_msg("< " . methodSig($method_sig) . ": " . AbstractChannel::$GLOBAL_METHOD_NAMES[methodSig($method_sig)]);
}
}
/**
* Wait for a frame from the server
*/
protected function wait_frame()
{
$frame_type = $this->input->read_octet();
$channel = $this->input->read_short();
$size = $this->input->read_long();
$payload = $this->input->read($size);
$ch = $this->input->read_octet();
if($ch != 0xCE)
throw new Exception(sprintf("Framing error, unexpected byte: %x", $ch));
return array($frame_type, $channel, $payload);
}
/**
* Wait for a frame from the server destined for
* a particular channel.
*/
protected function wait_channel($channel_id)
{
while(true)
{
list($frame_type, $frame_channel, $payload) = $this->wait_frame();
if($frame_channel == $channel_id)
return array($frame_type, $payload);
// Not the channel we were looking for. Queue this frame
//for later, when the other channel is looking for frames.
array_push($this->channels[$frame_channel]->frame_queue,
array($frame_type, $payload));
// If we just queued up a method for channel 0 (the Connection
// itself) it's probably a close method in reaction to some
// error, so deal with it right away.
if(($frame_type == 1) && ($frame_channel == 0))
$this->wait();
}
}
/**
* Fetch a Channel object identified by the numeric channel_id, or
* create that object if it doesn't already exist.
*/
public function channel($channel_id=NULL)
{
if(array_key_exists($channel_id,$this->channels))
return $this->channels[$channel_id];
return new AMQPChannel($this->connection, $channel_id);
}
/**
* request a connection close
*/
public function close($reply_code=0, $reply_text="", $method_sig=array(0, 0))
{
try {
$args = new AMQPWriter();
$args->write_short($reply_code);
$args->write_shortstr($reply_text);
$args->write_short($method_sig[0]); // class_id
$args->write_short($method_sig[1]); // method_id
$this->send_method_frame(array(10, 60), $args);
} catch(Exception $e) {
return;
}
return $this->wait(array(
"10,61", // Connection.close_ok
));
}
public static function dump_table($table)
{
$tokens = array();
foreach ($table as $name => $value)
{
switch ($value[0])
{
case 'D':
$val = $value[1]->n . 'E' . $value[1]->e;
break;
case 'F':
$val = '(' . self::dump_table($value[1]) . ')';
case 'T':
$val = date('Y-m-d H:i:s', $value[1]);
break;
default:
$val = $value[1];
}
$tokens[] = $name . '=' . $val;
}
return implode(', ', $tokens);
}
protected function _close($args)
{
$reply_code = $args->read_short();
$reply_text = $args->read_shortstr();
$class_id = $args->read_short();
$method_id = $args->read_short();
$this->x_close_ok();
throw new AMQPConnectionException($reply_code, $reply_text, array($class_id, $method_id));
}
/**
* confirm a connection close
*/
protected function x_close_ok()
{
$this->send_method_frame(array(10, 61));
$this->do_close();
}
/**
* confirm a connection close
*/
protected function close_ok($args)
{
$this->do_close();
}
protected function x_open($virtual_host, $capabilities="", $insist=false)
{
$args = new AMQPWriter();
$args->write_shortstr($virtual_host);
$args->write_shortstr($capabilities);
$args->write_bit($insist);
$this->send_method_frame(array(10, 40), $args);
return $this->wait(array(
"10,41", // Connection.open_ok
"10,50" // Connection.redirect
));
}
/**
* signal that the connection is ready
*/
protected function open_ok($args)
{
$this->known_hosts = $args->read_shortstr();
if($this->debug)
{
debug_msg("Open OK! known_hosts: " . $this->known_hosts);
}
return NULL;
}
/**
* asks the client to use a different server
*/
protected function redirect($args)
{
$host = $args->read_shortstr();
$this->known_hosts = $args->read_shortstr();
if($this->debug)
{
debug_msg("Redirected to [". $host . "], known_hosts [" . $this->known_hosts . "]" );
}
return $host;
}
/**
* security mechanism challenge
*/
protected function secure($args)
{
$challenge = $args->read_longstr();
}
/**
* security mechanism response
*/
protected function x_secure_ok($response)
{
$args = new AMQPWriter();
$args->write_longstr($response);
$this->send_method_frame(array(10, 21), $args);
}
/**
* start connection negotiation
*/
protected function start($args)
{
$this->version_major = $args->read_octet();
$this->version_minor = $args->read_octet();
$this->server_properties = $args->read_table();
$this->mechanisms = explode(" ", $args->read_longstr());
$this->locales = explode(" ", $args->read_longstr());
if($this->debug)
{
debug_msg(sprintf("Start from server, version: %d.%d, properties: %s, mechanisms: %s, locales: %s",
$this->version_major,
$this->version_minor,
self::dump_table($this->server_properties),
implode(', ', $this->mechanisms),
implode(', ', $this->locales)));
}
}
protected function x_start_ok($client_properties, $mechanism, $response, $locale)
{
$args = new AMQPWriter();
$args->write_table($client_properties);
$args->write_shortstr($mechanism);
$args->write_longstr($response);
$args->write_shortstr($locale);
$this->send_method_frame(array(10, 11), $args);
}
/**
* propose connection tuning parameters
*/
protected function tune($args)
{
$v=$args->read_short();
if($v)
$this->channel_max = $v;
$v=$args->read_long();
if($v)
$this->frame_max = $v;
$this->heartbeat = $args->read_short();
$this->x_tune_ok($this->channel_max, $this->frame_max, 0);
}
/**
* negotiate connection tuning parameters
*/
protected function x_tune_ok($channel_max, $frame_max, $heartbeat)
{
$args = new AMQPWriter();
$args->write_short($channel_max);
$args->write_long($frame_max);
$args->write_short($heartbeat);
$this->send_method_frame(array(10, 31), $args);
$this->wait_tune_ok = False;
}
}
class AMQPChannel extends AbstractChannel
{
protected $method_map = array(
"20,11" => "open_ok",
"20,20" => "flow",
"20,21" => "flow_ok",
"20,30" => "alert",
"20,40" => "_close",
"20,41" => "close_ok",
"30,11" => "access_request_ok",
"40,11" => "exchange_declare_ok",
"40,21" => "exchange_delete_ok",
"50,11" => "queue_declare_ok",
"50,21" => "queue_bind_ok",
"50,31" => "queue_purge_ok",
"50,41" => "queue_delete_ok",
"50,51" => "queue_unbind_ok",
"60,11" => "basic_qos_ok",
"60,21" => "basic_consume_ok",
"60,31" => "basic_cancel_ok",
"60,50" => "basic_return",
"60,60" => "basic_deliver",
"60,71" => "basic_get_ok",
"60,72" => "basic_get_empty",
"90,11" => "tx_select_ok",
"90,21" => "tx_commit_ok",
"90,31" => "tx_rollback_ok"
);
public function __construct($connection,
$channel_id=NULL,
$auto_decode=true)
{
if($channel_id == NULL)
$channel_id = $connection->get_free_channel_id();
parent::__construct($connection, $channel_id);
if($this->debug)
{
debug_msg("using channel_id: " . $channel_id);
}
$this->default_ticket = 0;
$this->is_open = false;
$this->active = true; // Flow control
$this->alerts = array();
$this->callbacks = array();
$this->auto_decode = $auto_decode;
$this->x_open();
}
public function __destruct()
{
//TODO:???if($this->connection)
// $this->close("destroying channel");
}
/**
* Tear down this object, after we've agreed to close with the server.
*/
protected function do_close()
{
$this->is_open = false;
unset($this->connection->channels[$this->channel_id]);
$this->channel_id = $this->connection = NULL;
}
/**
* This method allows the server to send a non-fatal warning to
* the client. This is used for methods that are normally
* asynchronous and thus do not have confirmations, and for which
* the server may detect errors that need to be reported. Fatal
* errors are handled as channel or connection exceptions; non-
* fatal errors are sent through this method.
*/
protected function alert($args)
{
$reply_code = $args->read_short();
$reply_text = $args->read_shortstr();
$details = $args->read_table();
array_push($this->alerts,array($reply_code, $reply_text, $details));
}
/**
* request a channel close
*/
public function close($reply_code=0,
$reply_text="",
$method_sig=array(0, 0))
{
$args = new AMQPWriter();
$args->write_short($reply_code);
$args->write_shortstr($reply_text);
$args->write_short($method_sig[0]); // class_id
$args->write_short($method_sig[1]); // method_id
$this->send_method_frame(array(20, 40), $args);
return $this->wait(array(
"20,41" // Channel.close_ok
));
}
protected function _close($args)
{
$reply_code = $args->read_short();
$reply_text = $args->read_shortstr();
$class_id = $args->read_short();
$method_id = $args->read_short();
$this->send_method_frame(array(20, 41));
$this->do_close();
throw new AMQPChannelException($reply_code, $reply_text,
array($class_id, $method_id));
}
/**
* confirm a channel close
*/
protected function close_ok($args)
{
$this->do_close();
}
/**
* enable/disable flow from peer
*/
public function flow($active)
{
$args = new AMQPWriter();
$args->write_bit($active);
$this->send_method_frame(array(20, 20), $args);
return $this->wait(array(
"20,21" //Channel.flow_ok
));
}
protected function _flow($args)
{
$this->active = $args->read_bit();
$this->x_flow_ok($this->active);
}
protected function x_flow_ok($active)
{
$args = new AMQPWriter();
$args->write_bit($active);
$this->send_method_frame(array(20, 21), $args);
}
protected function flow_ok($args)
{
return $args->read_bit();
}
protected function x_open($out_of_band="")
{
if($this->is_open)
return;
$args = new AMQPWriter();
$args->write_shortstr($out_of_band);
$this->send_method_frame(array(20, 10), $args);
return $this->wait(array(
"20,11" //Channel.open_ok
));
}
protected function open_ok($args)
{
$this->is_open = true;
if($this->debug)
{
debug_msg("Channel open");
}
}
/**
* request an access ticket
*/
public function access_request($realm, $exclusive=false,
$passive=false, $active=false, $write=false, $read=false)
{
$args = new AMQPWriter();
$args->write_shortstr($realm);
$args->write_bit($exclusive);
$args->write_bit($passive);
$args->write_bit($active);
$args->write_bit($write);
$args->write_bit($read);
$this->send_method_frame(array(30, 10), $args);
return $this->wait(array(
"30,11" //Channel.access_request_ok
));
}
/**
* grant access to server resources
*/
protected function access_request_ok($args)
{
$this->default_ticket = $args->read_short();
return $this->default_ticket;
}
/**
* declare exchange, create if needed
*/
public function exchange_declare($exchange,
$type,
$passive=false,
$durable=false,
$auto_delete=true,
$internal=false,
$nowait=false,
$arguments=NULL,
$ticket=NULL)
{
if($arguments==NULL)
$arguments = array();
$args = new AMQPWriter();
if($ticket != NULL)
$args->write_short($ticket);
else
$args->write_short($this->default_ticket);
$args->write_shortstr($exchange);
$args->write_shortstr($type);
$args->write_bit($passive);
$args->write_bit($durable);
$args->write_bit($auto_delete);
$args->write_bit($internal);
$args->write_bit($nowait);
$args->write_table($arguments);
$this->send_method_frame(array(40, 10), $args);
if(!$nowait)
return $this->wait(array(
"40,11" //Channel.exchange_declare_ok
));
}
/**
* confirms an exchange declaration
*/
protected function exchange_declare_ok($args)
{
}
/**
* delete an exchange
*/
public function exchange_delete($exchange, $if_unused=false,
$nowait=false, $ticket=NULL)
{
$args = new AMQPWriter();
if($ticket != NULL)
$args->write_short($ticket);
else
$args->write_short($this->default_ticket);
$args->write_shortstr($exchange);
$args->write_bit($if_unused);
$args->write_bit($nowait);
$this->send_method_frame(array(40, 20), $args);
if(!$nowait)
return $this->wait(array(
"40,21" //Channel.exchange_delete_ok
));
}
/**
* confirm deletion of an exchange
*/
protected function exchange_delete_ok($args)
{
}
/**
* bind queue to an exchange
*/
public function queue_bind($queue, $exchange, $routing_key="",
$nowait=false, $arguments=NULL, $ticket=NULL)
{
if($arguments == NULL)
$arguments = array();
$args = new AMQPWriter();
if($ticket != NULL)
$args->write_short($ticket);
else
$args->write_short($this->default_ticket);
$args->write_shortstr($queue);
$args->write_shortstr($exchange);
$args->write_shortstr($routing_key);
$args->write_bit($nowait);
$args->write_table($arguments);
$this->send_method_frame(array(50, 20), $args);
if(!$nowait)
return $this->wait(array(
"50,21" // Channel.queue_bind_ok
));
}
/**
* confirm bind successful
*/
protected function queue_bind_ok($args)
{
}
/**
* unbind queue from an exchange
*/
public function queue_unbind($queue, $exchange, $routing_key="",
$arguments=NULL, $ticket=NULL)
{
if($arguments == NULL)
$arguments = array();
$args = new AMQPWriter();
if($ticket != NULL)
$args->write_short($ticket);
else
$args->write_short($this->default_ticket);
$args->write_shortstr($queue);
$args->write_shortstr($exchange);
$args->write_shortstr($routing_key);
$args->write_table($arguments);
$this->send_method_frame(array(50, 50), $args);
return $this->wait(array(
"50,51" // Channel.queue_unbind_ok
));
}
/**
* confirm unbind successful
*/
protected function queue_unbind_ok($args)
{
}
/**
* declare queue, create if needed
*/
public function queue_declare($queue="",
$passive=false,
$durable=false,
$exclusive=false,
$auto_delete=true,
$nowait=false,
$arguments=NULL,
$ticket=NULL)
{
if($arguments == NULL)
$arguments = array();
$args = new AMQPWriter();
if($ticket != NULL)
$args->write_short($ticket);
else
$args->write_short($this->default_ticket);
$args->write_shortstr($queue);
$args->write_bit($passive);
$args->write_bit($durable);
$args->write_bit($exclusive);
$args->write_bit($auto_delete);
$args->write_bit($nowait);
$args->write_table($arguments);
$this->send_method_frame(array(50, 10), $args);
if(!$nowait)
return $this->wait(array(
"50,11" // Channel.queue_declare_ok
));
}
/**
* confirms a queue definition
*/
protected function queue_declare_ok($args)
{
$queue = $args->read_shortstr();
$message_count = $args->read_long();
$consumer_count = $args->read_long();
return array($queue, $message_count, $consumer_count);
}
/**
* delete a queue
*/
public function queue_delete($queue="", $if_unused=false, $if_empty=false,
$nowait=false, $ticket=NULL)
{
$args = new AMQPWriter();
if($ticket != NULL)
$args->write_short($ticket);
else
$args->write_short($this->default_ticket);
$args->write_shortstr($queue);
$args->write_bit($if_unused);
$args->write_bit($if_empty);
$args->write_bit($nowait);
$this->send_method_frame(array(50, 40), $args);
if(!$nowait)
return $this->wait(array(
"50,41" //Channel.queue_delete_ok
));
}
/**
* confirm deletion of a queue
*/
protected function queue_delete_ok($args)
{
return $args->read_long();
}
/**
* purge a queue
*/
public function queue_purge($queue="", $nowait=false, $ticket=NULL)
{
$args = new AMQPWriter();
if($ticket != NULL)
$args->write_short($ticket);
else
$args->write_short($this->default_ticket);
$args->write_shortstr($queue);
$args->write_bit($nowait);
$this->send_method_frame(array(50, 30), $args);
if(!$nowait)
return $this->wait(array(
"50,31" //Channel.queue_purge_ok
));
}
/**
* confirms a queue purge
*/
protected function queue_purge_ok($args)
{
return $args->read_long();
}
/**
* acknowledge one or more messages
*/
public function basic_ack($delivery_tag, $multiple=false)
{
$args = new AMQPWriter();
$args->write_longlong($delivery_tag);
$args->write_bit($multiple);
$this->send_method_frame(array(60, 80), $args);
}
/**
* end a queue consumer
*/
public function basic_cancel($consumer_tag, $nowait=false)
{
$args = new AMQPWriter();
$args->write_shortstr($consumer_tag);
$args->write_bit($nowait);
$this->send_method_frame(array(60, 30), $args);
return $this->wait(array(
"60,31" // Channel.basic_cancel_ok
));
}
/**
* confirm a cancelled consumer
*/
protected function basic_cancel_ok($args)
{
$consumer_tag = $args->read_shortstr();
unset($this->callbacks[$consumer_tag]);
}
/**
* start a queue consumer
*/
public function basic_consume($queue="", $consumer_tag="", $no_local=false,
$no_ack=false, $exclusive=false, $nowait=false,
$callback=NULL, $ticket=NULL)
{
$args = new AMQPWriter();
if($ticket != NULL)
$args->write_short($ticket);
else
$args->write_short($this->default_ticket);
$args->write_shortstr($queue);
$args->write_shortstr($consumer_tag);
$args->write_bit($no_local);
$args->write_bit($no_ack);
$args->write_bit($exclusive);
$args->write_bit($nowait);
$this->send_method_frame(array(60, 20), $args);
if(!$nowait)
$consumer_tag = $this->wait(array(
"60,21" //Channel.basic_consume_ok
));
$this->callbacks[$consumer_tag] = $callback;
return $consumer_tag;
}
/**
* confirm a new consumer
*/
protected function basic_consume_ok($args)
{
return $args->read_shortstr();
}
/**
* notify the client of a consumer message
*/
protected function basic_deliver($args, $msg)
{
$consumer_tag = $args->read_shortstr();
$delivery_tag = $args->read_longlong();
$redelivered = $args->read_bit();
$exchange = $args->read_shortstr();
$routing_key = $args->read_shortstr();
$msg->delivery_info = array(
"channel" => $this,
"consumer_tag" => $consumer_tag,
"delivery_tag" => $delivery_tag,
"redelivered" => $redelivered,
"exchange" => $exchange,
"routing_key" => $routing_key
);
if(array_key_exists($consumer_tag, $this->callbacks))
$func = $this->callbacks[$consumer_tag];
else
$func = NULL;
if($func!=NULL)
call_user_func($func, $msg);
}
/**
* direct access to a queue
*/
public function basic_get($queue="", $no_ack=false, $ticket=NULL)
{
$args = new AMQPWriter();
if($ticket != NULL)
$args->write_short($ticket);
else
$args->write_short($this->default_ticket);
$args->write_shortstr($queue);
$args->write_bit($no_ack);
$this->send_method_frame(array(60, 70), $args);
return $this->wait(array(
"60,71", //Channel.basic_get_ok
"60,72" // Channel.basic_get_empty
));
}
/**
* indicate no messages available
*/
protected function basic_get_empty($args)
{
$cluster_id = $args->read_shortstr();
}
/**
* provide client with a message
*/
protected function basic_get_ok($args, $msg)
{
$delivery_tag = $args->read_longlong();
$redelivered = $args->read_bit();
$exchange = $args->read_shortstr();
$routing_key = $args->read_shortstr();
$message_count = $args->read_long();
$msg->delivery_info = array(
"delivery_tag" => $delivery_tag,
"redelivered" => $redelivered,
"exchange" => $exchange,
"routing_key" => $routing_key,
"message_count" => $message_count
);
return $msg;
}
/**
* publish a message
*/
public function basic_publish($msg, $exchange="", $routing_key="",
$mandatory=false, $immediate=false,
$ticket=NULL)
{
$args = new AMQPWriter();
if($ticket != NULL)
$args->write_short($ticket);
else
$args->write_short($this->default_ticket);
$args->write_shortstr($exchange);
$args->write_shortstr($routing_key);
$args->write_bit($mandatory);
$args->write_bit($immediate);
$this->send_method_frame(array(60, 40), $args);
$this->connection->send_content($this->channel_id, 60, 0,
strlen($msg->body),
$msg->serialize_properties(),
$msg->body);
}
/**
* specify quality of service
*/
public function basic_qos($prefetch_size, $prefetch_count, $a_global)
{
$args = new AMQPWriter();
$args->write_long($prefetch_size);
$args->write_short($prefetch_count);
$args->write_bit($a_global);
$this->send_method_frame(array(60, 10), $args);
return $this->wait(array(
"60,11" //Channel.basic_qos_ok
));
}
/**
* confirm the requested qos
*/
protected function basic_qos_ok($args)
{
}
/**
* redeliver unacknowledged messages
*/
public function basic_recover($requeue=false)
{
$args = new AMQPWriter();
$args->write_bit($requeue);
$this->send_method_frame(array(60, 100), $args);
}
/**
* reject an incoming message
*/
public function basic_reject($delivery_tag, $requeue)
{
$args = new AMQPWriter();
$args->write_longlong($delivery_tag);
$args->write_bit($requeue);
$this->send_method_frame(array(60, 90), $args);
}
/**
* return a failed message
*/
protected function basic_return($args)
{
$reply_code = $args->read_short();
$reply_text = $args->read_shortstr();
$exchange = $args->read_shortstr();
$routing_key = $args->read_shortstr();
$msg = $this->wait();
}
public function tx_commit()
{
$this->send_method_frame(array(90, 20));
return $this->wait(array(
"90,21" //Channel.tx_commit_ok
));
}
/**
* confirm a successful commit
*/
protected function tx_commit_ok($args)
{
}
/**
* abandon the current transaction
*/
public function tx_rollback()
{
$this->send_method_frame(array(90, 30));
return $this->wait(array(
"90,31" //Channel.tx_rollback_ok
));
}
/**
* confirm a successful rollback
*/
protected function tx_rollback_ok($args)
{
}
/**
* select standard transaction mode
*/
public function tx_select()
{
$this->send_method_frame(array(90, 10));
return $this->wait(array(
"90,11" //Channel.tx_select_ok
));
}
/**
* confirm transaction mode
*/
protected function tx_select_ok($args)
{
}
}
/**
* A Message for use with the Channnel.basic_* methods.
*/
class AMQPMessage extends GenericContent
{
protected static $PROPERTIES = array(
"content_type" => "shortstr",
"content_encoding" => "shortstr",
"application_headers" => "table",
"delivery_mode" => "octet",
"priority" => "octet",
"correlation_id" => "shortstr",
"reply_to" => "shortstr",
"expiration" => "shortstr",
"message_id" => "shortstr",
"timestamp" => "timestamp",
"type" => "shortstr",
"user_id" => "shortstr",
"app_id" => "shortstr",
"cluster_id" => "shortst"
);
public function __construct($body = '', $properties = null)
{
$this->body = $body;
parent::__construct($properties, $prop_types=AMQPMessage::$PROPERTIES);
}
}
?>