1573 lines
45 KiB
PHP
1573 lines
45 KiB
PHP
<?php
|
|
/**
|
|
* Simple AMQP client library for AMQP for protocol version 0.8
|
|
*
|
|
* http://code.google.com/p/php-amqplib/
|
|
* Vadim Zaliva <lord@crocodile.org>
|
|
*
|
|
*/
|
|
|
|
require_once('amqp_wire.inc');
|
|
require_once('hexdump.inc');
|
|
|
|
function debug_msg($s)
|
|
{
|
|
echo $s, "\n";
|
|
}
|
|
|
|
function methodSig($a)
|
|
{
|
|
if(is_string($a))
|
|
return $a;
|
|
else
|
|
return sprintf("%d,%d",$a[0] ,$a[1]);
|
|
}
|
|
|
|
|
|
class AMQPException extends Exception
|
|
{
|
|
public function __construct($reply_code, $reply_text, $method_sig)
|
|
{
|
|
parent::__construct($reply_text,$reply_code);
|
|
|
|
$this->amqp_reply_code = $reply_code; // redundant, but kept for BC
|
|
$this->amqp_reply_text = $reply_text; // redundant, but kept for BC
|
|
$this->amqp_method_sig = $method_sig;
|
|
|
|
$ms=methodSig($method_sig);
|
|
if(array_key_exists($ms, AbstractChannel::$GLOBAL_METHOD_NAMES))
|
|
$mn = AbstractChannel::$GLOBAL_METHOD_NAMES[$ms];
|
|
else
|
|
$mn = "";
|
|
$this->args = array(
|
|
$reply_code,
|
|
$reply_text,
|
|
$method_sig,
|
|
$mn
|
|
);
|
|
}
|
|
}
|
|
|
|
class AMQPConnectionException extends AMQPException
|
|
{
|
|
public function __construct($reply_code, $reply_text, $method_sig)
|
|
{
|
|
parent::__construct($reply_code, $reply_text, $method_sig);
|
|
}
|
|
}
|
|
|
|
class AMQPChannelException extends AMQPException
|
|
{
|
|
public function __construct($reply_code, $reply_text, $method_sig)
|
|
{
|
|
parent::__construct($reply_code, $reply_text, $method_sig);
|
|
}
|
|
|
|
}
|
|
|
|
class AbstractChannel
|
|
{
|
|
private static $CONTENT_METHODS = array(
|
|
"60,60", // Basic.deliver
|
|
"60,71", // Basic.get_ok
|
|
);
|
|
|
|
private static $CLOSE_METHODS = array(
|
|
"10,60", // Connection.close
|
|
"20,40", // Channel.close
|
|
);
|
|
|
|
// All the method names
|
|
public static $GLOBAL_METHOD_NAMES = array(
|
|
"10,10" => "Connection.start",
|
|
"10,11" => "Connection.start_ok",
|
|
"10,20" => "Connection.secure",
|
|
"10,21" => "Connection.secure_ok",
|
|
"10,30" => "Connection.tune",
|
|
"10,31" => "Connection.tune_ok",
|
|
"10,40" => "Connection.open",
|
|
"10,41" => "Connection.open_ok",
|
|
"10,50" => "Connection.redirect",
|
|
"10,60" => "Connection.close",
|
|
"10,61" => "Connection.close_ok",
|
|
"20,10" => "Channel.open",
|
|
"20,11" => "Channel.open_ok",
|
|
"20,20" => "Channel.flow",
|
|
"20,21" => "Channel.flow_ok",
|
|
"20,30" => "Channel.alert",
|
|
"20,40" => "Channel.close",
|
|
"20,41" => "Channel.close_ok",
|
|
"30,10" => "Channel.access_request",
|
|
"30,11" => "Channel.access_request_ok",
|
|
"40,10" => "Channel.exchange_declare",
|
|
"40,11" => "Channel.exchange_declare_ok",
|
|
"40,20" => "Channel.exchange_delete",
|
|
"40,21" => "Channel.exchange_delete_ok",
|
|
"50,10" => "Channel.queue_declare",
|
|
"50,11" => "Channel.queue_declare_ok",
|
|
"50,20" => "Channel.queue_bind",
|
|
"50,21" => "Channel.queue_bind_ok",
|
|
"50,30" => "Channel.queue_purge",
|
|
"50,31" => "Channel.queue_purge_ok",
|
|
"50,40" => "Channel.queue_delete",
|
|
"50,41" => "Channel.queue_delete_ok",
|
|
"50,50" => "Channel.queue_unbind",
|
|
"50,51" => "Channel.queue_unbind_ok",
|
|
"60,10" => "Channel.basic_qos",
|
|
"60,11" => "Channel.basic_qos_ok",
|
|
"60,20" => "Channel.basic_consume",
|
|
"60,21" => "Channel.basic_consume_ok",
|
|
"60,30" => "Channel.basic_cancel",
|
|
"60,31" => "Channel.basic_cancel_ok",
|
|
"60,40" => "Channel.basic_publish",
|
|
"60,50" => "Channel.basic_return",
|
|
"60,60" => "Channel.basic_deliver",
|
|
"60,70" => "Channel.basic_get",
|
|
"60,71" => "Channel.basic_get_ok",
|
|
"60,72" => "Channel.basic_get_empty",
|
|
"60,80" => "Channel.basic_ack",
|
|
"60,90" => "Channel.basic_reject",
|
|
"60,100" => "Channel.basic_recover",
|
|
"90,10" => "Channel.tx_select",
|
|
"90,11" => "Channel.tx_select_ok",
|
|
"90,20" => "Channel.tx_commit",
|
|
"90,21" => "Channel.tx_commit_ok",
|
|
"90,30" => "Channel.tx_rollback",
|
|
"90,31" => "Channel.tx_rollback_ok"
|
|
);
|
|
|
|
protected $debug;
|
|
|
|
public function __construct($connection, $channel_id)
|
|
{
|
|
$this->connection = $connection;
|
|
$this->channel_id = $channel_id;
|
|
$connection->channels[$channel_id] = $this;
|
|
$this->frame_queue = array(); // Lower level queue for frames
|
|
$this->method_queue = array(); // Higher level queue for methods
|
|
$this->auto_decode = false;
|
|
$this->debug = defined('AMQP_DEBUG') ? AMQP_DEBUG : false;
|
|
}
|
|
|
|
public function getChannelId()
|
|
{
|
|
return $this->channel_id;
|
|
}
|
|
|
|
|
|
function dispatch($method_sig, $args, $content)
|
|
{
|
|
if(!array_key_exists($method_sig, $this->method_map))
|
|
throw new Exception("Unknown AMQP method $method_sig");
|
|
|
|
$amqp_method = $this->method_map[$method_sig];
|
|
if($content == NULL)
|
|
return call_user_func(array($this,$amqp_method), $args);
|
|
else
|
|
return call_user_func(array($this,$amqp_method), $args, $content);
|
|
}
|
|
|
|
function next_frame()
|
|
{
|
|
if($this->debug)
|
|
{
|
|
debug_msg("waiting for a new frame");
|
|
}
|
|
if($this->frame_queue != NULL)
|
|
return array_pop($this->frame_queue);
|
|
return $this->connection->wait_channel($this->channel_id);
|
|
}
|
|
|
|
protected function send_method_frame($method_sig, $args="")
|
|
{
|
|
$this->connection->send_channel_method_frame($this->channel_id, $method_sig, $args);
|
|
}
|
|
|
|
function wait_content()
|
|
{
|
|
$frm = $this->next_frame();
|
|
$frame_type = $frm[0];
|
|
$payload = $frm[1];
|
|
if($frame_type != 2)
|
|
throw new Exception("Expecting Content header");
|
|
|
|
$payload_reader = new AMQPReader(substr($payload,0,12));
|
|
$class_id = $payload_reader->read_short();
|
|
$weight = $payload_reader->read_short();
|
|
|
|
$body_size = $payload_reader->read_longlong();
|
|
$msg = new AMQPMessage();
|
|
$msg->load_properties(substr($payload,12));
|
|
|
|
$body_parts = array();
|
|
$body_received = 0;
|
|
while(bccomp($body_size,$body_received)==1)
|
|
{
|
|
$frm = $this->next_frame();
|
|
$frame_type = $frm[0];
|
|
$payload = $frm[1];
|
|
if($frame_type != 3)
|
|
throw new Exception("Expecting Content body, received frame type $frame_type");
|
|
$body_parts[] = $payload;
|
|
$body_received = bcadd($body_received, strlen($payload));
|
|
}
|
|
|
|
$msg->body = implode("",$body_parts);
|
|
|
|
if($this->auto_decode and isset($msg->content_encoding))
|
|
{
|
|
try
|
|
{
|
|
$msg->body = $msg->body->decode($msg->content_encoding);
|
|
} catch (Exception $e) {
|
|
if($this->debug)
|
|
{
|
|
debug_msg("Ignoring body decoding exception: " . $e->getMessage());
|
|
}
|
|
}
|
|
}
|
|
|
|
return $msg;
|
|
}
|
|
|
|
/**
|
|
* Wait for some expected AMQP methods and dispatch to them.
|
|
* Unexpected methods are queued up for later calls to this Python
|
|
* method.
|
|
*/
|
|
public function wait($allowed_methods=NULL)
|
|
{
|
|
if($allowed_methods)
|
|
{
|
|
if($this->debug)
|
|
{
|
|
debug_msg("waiting for " . implode(", ", $allowed_methods));
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if($this->debug)
|
|
{
|
|
debug_msg("waiting for any method");
|
|
}
|
|
}
|
|
|
|
//Process deferred methods
|
|
foreach($this->method_queue as $qk=>$queued_method)
|
|
{
|
|
if($this->debug)
|
|
{
|
|
debug_msg("checking queue method " . $qk);
|
|
}
|
|
|
|
$method_sig = $queued_method[0];
|
|
if($allowed_methods==NULL || in_array($method_sig, $allowed_methods))
|
|
{
|
|
unset($this->method_queue[$qk]);
|
|
|
|
if($this->debug)
|
|
{
|
|
debug_msg("Executing queued method: $method_sig: " . AbstractChannel::$GLOBAL_METHOD_NAMES[methodSig($method_sig)]);
|
|
}
|
|
|
|
return $this->dispatch($queued_method[0],
|
|
$queued_method[1],
|
|
$queued_method[2]);
|
|
}
|
|
}
|
|
|
|
// No deferred methods? wait for new ones
|
|
while(true)
|
|
{
|
|
$frm = $this->next_frame();
|
|
$frame_type = $frm[0];
|
|
$payload = $frm[1];
|
|
|
|
if($frame_type != 1)
|
|
throw new Exception("Expecting AMQP method, received frame type: $frame_type");
|
|
|
|
if(strlen($payload) < 4)
|
|
throw new Exception("Method frame too short");
|
|
|
|
$method_sig_array = unpack("n2", substr($payload,0,4));
|
|
$method_sig = "" . $method_sig_array[1] . "," . $method_sig_array[2];
|
|
$args = new AMQPReader(substr($payload,4));
|
|
|
|
if($this->debug)
|
|
{
|
|
debug_msg("> $method_sig: " . AbstractChannel::$GLOBAL_METHOD_NAMES[methodSig($method_sig)]);
|
|
}
|
|
|
|
|
|
if(in_array($method_sig, AbstractChannel::$CONTENT_METHODS))
|
|
$content = $this->wait_content();
|
|
else
|
|
$content = NULL;
|
|
|
|
if($allowed_methods==NULL ||
|
|
in_array($method_sig,$allowed_methods) ||
|
|
in_array($method_sig,AbstractChannel::$CLOSE_METHODS))
|
|
{
|
|
return $this->dispatch($method_sig, $args, $content);
|
|
}
|
|
|
|
// Wasn't what we were looking for? save it for later
|
|
if($this->debug)
|
|
{
|
|
debug_msg("Queueing for later: $method_sig: " . AbstractChannel::$GLOBAL_METHOD_NAMES[methodSig($method_sig)]);
|
|
}
|
|
array_push($this->method_queue,array($method_sig, $args, $content));
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
class AMQPConnection extends AbstractChannel
|
|
{
|
|
public static $AMQP_PROTOCOL_HEADER = "AMQP\x01\x01\x09\x01";
|
|
|
|
public static $LIBRARY_PROPERTIES = array(
|
|
"library" => array('S', "PHP Simple AMQP lib"),
|
|
"library_version" => array('S', "0.1")
|
|
);
|
|
|
|
protected $method_map = array(
|
|
"10,10" => "start",
|
|
"10,20" => "secure",
|
|
"10,30" => "tune",
|
|
"10,41" => "open_ok",
|
|
"10,50" => "redirect",
|
|
"10,60" => "_close",
|
|
"10,61" => "close_ok"
|
|
);
|
|
|
|
public function __construct($host, $port,
|
|
$user, $password,
|
|
$vhost="/",$insist=false,
|
|
$login_method="AMQPLAIN",
|
|
$login_response=NULL,
|
|
$locale="en_US",
|
|
$connection_timeout = 3,
|
|
$read_write_timeout = 3)
|
|
{
|
|
|
|
if($user && $password)
|
|
{
|
|
$login_response = new AMQPWriter();
|
|
$login_response->write_table(array("LOGIN" => array('S',$user),
|
|
"PASSWORD" => array('S',$password)));
|
|
$login_response = substr($login_response->getvalue(),4); //Skip the length
|
|
} else
|
|
$login_response = NULL;
|
|
|
|
|
|
$d = AMQPConnection::$LIBRARY_PROPERTIES;
|
|
while(true)
|
|
{
|
|
$this->channels = array();
|
|
// The connection object itself is treated as channel 0
|
|
parent::__construct($this, 0);
|
|
|
|
$this->channel_max = 65535;
|
|
$this->frame_max = 131072;
|
|
|
|
$errstr = $errno = NULL;
|
|
$this->sock = NULL;
|
|
if (!($this->sock = fsockopen($host,$port,$errno,$errstr,$connection_timeout)))
|
|
{
|
|
throw new Exception ("Error Connecting to server($errno): $errstr ");
|
|
}
|
|
|
|
stream_set_timeout($this->sock, $read_write_timeout);
|
|
stream_set_blocking($this->sock, 1);
|
|
$this->input = new AMQPReader(null, $this->sock);
|
|
|
|
$this->write(AMQPConnection::$AMQP_PROTOCOL_HEADER);
|
|
$this->wait(array("10,10"));
|
|
$this->x_start_ok($d, $login_method, $login_response, $locale);
|
|
|
|
$this->wait_tune_ok = true;
|
|
while($this->wait_tune_ok)
|
|
{
|
|
$this->wait(array(
|
|
"10,20", // secure
|
|
"10,30", // tune
|
|
));
|
|
}
|
|
|
|
$host = $this->x_open($vhost,"", $insist);
|
|
if(!$host)
|
|
return; // we weren't redirected
|
|
|
|
// we were redirected, close the socket, loop and try again
|
|
if($this->debug)
|
|
{
|
|
debug_msg("closing socket");
|
|
}
|
|
|
|
@fclose($this->sock); $this->sock=NULL;
|
|
}
|
|
}
|
|
|
|
public function __destruct()
|
|
{
|
|
if(isset($this->input))
|
|
if($this->input)
|
|
$this->close();
|
|
|
|
if($this->sock)
|
|
{
|
|
if($this->debug)
|
|
{
|
|
debug_msg("closing socket");
|
|
}
|
|
|
|
@fclose($this->sock);
|
|
}
|
|
}
|
|
|
|
protected function write($data)
|
|
{
|
|
if($this->debug)
|
|
{
|
|
debug_msg("< [hex]:\n" . hexdump($data, $htmloutput = false, $uppercase = true, $return = true));
|
|
}
|
|
|
|
$len = strlen($data);
|
|
while(true)
|
|
{
|
|
$written = fwrite($this->sock, $data);
|
|
|
|
if($written == false || $written <= 0)
|
|
{
|
|
throw new Exception ("Error sending data");
|
|
}
|
|
|
|
$len = $len - $written;
|
|
if($len>0)
|
|
$data=substr($data,0-$len);
|
|
else
|
|
break;
|
|
}
|
|
}
|
|
|
|
protected function do_close()
|
|
{
|
|
if(isset($this->input))
|
|
if($this->input)
|
|
{
|
|
$this->input->close();
|
|
$this->input = NULL;
|
|
}
|
|
|
|
if($this->sock)
|
|
{
|
|
if($this->debug)
|
|
{
|
|
debug_msg("closing socket");
|
|
}
|
|
|
|
@fclose($this->sock);
|
|
$this->sock = NULL;
|
|
}
|
|
}
|
|
|
|
public function get_free_channel_id()
|
|
{
|
|
for($i=1;$i<=$this->channel_max;$i++)
|
|
if(!array_key_exists($i,$this->channels))
|
|
return $i;
|
|
throw new Exception("No free channel ids");
|
|
}
|
|
|
|
public function send_content($channel, $class_id, $weight, $body_size,
|
|
$packed_properties, $body)
|
|
{
|
|
$pkt = new AMQPWriter();
|
|
|
|
$pkt->write_octet(2);
|
|
$pkt->write_short($channel);
|
|
$pkt->write_long(strlen($packed_properties)+12);
|
|
|
|
$pkt->write_short($class_id);
|
|
$pkt->write_short($weight);
|
|
$pkt->write_longlong($body_size);
|
|
$pkt->write($packed_properties);
|
|
|
|
$pkt->write_octet(0xCE);
|
|
$pkt = $pkt->getvalue();
|
|
$this->write($pkt);
|
|
|
|
while($body)
|
|
{
|
|
$payload = substr($body,0, $this->frame_max-8);
|
|
$body = substr($body,$this->frame_max-8);
|
|
$pkt = new AMQPWriter();
|
|
|
|
$pkt->write_octet(3);
|
|
$pkt->write_short($channel);
|
|
$pkt->write_long(strlen($payload));
|
|
|
|
$pkt->write($payload);
|
|
|
|
$pkt->write_octet(0xCE);
|
|
$pkt = $pkt->getvalue();
|
|
$this->write($pkt);
|
|
}
|
|
}
|
|
|
|
protected function send_channel_method_frame($channel, $method_sig, $args="")
|
|
{
|
|
if($args instanceof AMQPWriter)
|
|
$args = $args->getvalue();
|
|
|
|
$pkt = new AMQPWriter();
|
|
|
|
$pkt->write_octet(1);
|
|
$pkt->write_short($channel);
|
|
$pkt->write_long(strlen($args)+4); // 4 = length of class_id and method_id
|
|
// in payload
|
|
|
|
$pkt->write_short($method_sig[0]); // class_id
|
|
$pkt->write_short($method_sig[1]); // method_id
|
|
$pkt->write($args);
|
|
|
|
$pkt->write_octet(0xCE);
|
|
$pkt = $pkt->getvalue();
|
|
$this->write($pkt);
|
|
|
|
if($this->debug)
|
|
{
|
|
debug_msg("< " . methodSig($method_sig) . ": " . AbstractChannel::$GLOBAL_METHOD_NAMES[methodSig($method_sig)]);
|
|
}
|
|
|
|
}
|
|
|
|
/**
|
|
* Wait for a frame from the server
|
|
*/
|
|
protected function wait_frame()
|
|
{
|
|
$frame_type = $this->input->read_octet();
|
|
$channel = $this->input->read_short();
|
|
$size = $this->input->read_long();
|
|
$payload = $this->input->read($size);
|
|
|
|
$ch = $this->input->read_octet();
|
|
if($ch != 0xCE)
|
|
throw new Exception(sprintf("Framing error, unexpected byte: %x", $ch));
|
|
|
|
return array($frame_type, $channel, $payload);
|
|
}
|
|
|
|
/**
|
|
* Wait for a frame from the server destined for
|
|
* a particular channel.
|
|
*/
|
|
protected function wait_channel($channel_id)
|
|
{
|
|
while(true)
|
|
{
|
|
list($frame_type, $frame_channel, $payload) = $this->wait_frame();
|
|
if($frame_channel == $channel_id)
|
|
return array($frame_type, $payload);
|
|
|
|
// Not the channel we were looking for. Queue this frame
|
|
//for later, when the other channel is looking for frames.
|
|
array_push($this->channels[$frame_channel]->frame_queue,
|
|
array($frame_type, $payload));
|
|
|
|
// If we just queued up a method for channel 0 (the Connection
|
|
// itself) it's probably a close method in reaction to some
|
|
// error, so deal with it right away.
|
|
if(($frame_type == 1) && ($frame_channel == 0))
|
|
$this->wait();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Fetch a Channel object identified by the numeric channel_id, or
|
|
* create that object if it doesn't already exist.
|
|
*/
|
|
public function channel($channel_id=NULL)
|
|
{
|
|
if(array_key_exists($channel_id,$this->channels))
|
|
return $this->channels[$channel_id];
|
|
|
|
return new AMQPChannel($this->connection, $channel_id);
|
|
}
|
|
|
|
/**
|
|
* request a connection close
|
|
*/
|
|
public function close($reply_code=0, $reply_text="", $method_sig=array(0, 0))
|
|
{
|
|
try {
|
|
$args = new AMQPWriter();
|
|
$args->write_short($reply_code);
|
|
$args->write_shortstr($reply_text);
|
|
$args->write_short($method_sig[0]); // class_id
|
|
$args->write_short($method_sig[1]); // method_id
|
|
$this->send_method_frame(array(10, 60), $args);
|
|
} catch(Exception $e) {
|
|
return;
|
|
}
|
|
|
|
return $this->wait(array(
|
|
"10,61", // Connection.close_ok
|
|
));
|
|
}
|
|
|
|
public static function dump_table($table)
|
|
{
|
|
$tokens = array();
|
|
foreach ($table as $name => $value)
|
|
{
|
|
switch ($value[0])
|
|
{
|
|
case 'D':
|
|
$val = $value[1]->n . 'E' . $value[1]->e;
|
|
break;
|
|
case 'F':
|
|
$val = '(' . self::dump_table($value[1]) . ')';
|
|
case 'T':
|
|
$val = date('Y-m-d H:i:s', $value[1]);
|
|
break;
|
|
default:
|
|
$val = $value[1];
|
|
}
|
|
$tokens[] = $name . '=' . $val;
|
|
}
|
|
return implode(', ', $tokens);
|
|
|
|
}
|
|
|
|
protected function _close($args)
|
|
{
|
|
$reply_code = $args->read_short();
|
|
$reply_text = $args->read_shortstr();
|
|
$class_id = $args->read_short();
|
|
$method_id = $args->read_short();
|
|
|
|
$this->x_close_ok();
|
|
|
|
throw new AMQPConnectionException($reply_code, $reply_text, array($class_id, $method_id));
|
|
}
|
|
|
|
|
|
/**
|
|
* confirm a connection close
|
|
*/
|
|
protected function x_close_ok()
|
|
{
|
|
$this->send_method_frame(array(10, 61));
|
|
$this->do_close();
|
|
}
|
|
|
|
/**
|
|
* confirm a connection close
|
|
*/
|
|
protected function close_ok($args)
|
|
{
|
|
$this->do_close();
|
|
}
|
|
|
|
protected function x_open($virtual_host, $capabilities="", $insist=false)
|
|
{
|
|
$args = new AMQPWriter();
|
|
$args->write_shortstr($virtual_host);
|
|
$args->write_shortstr($capabilities);
|
|
$args->write_bit($insist);
|
|
$this->send_method_frame(array(10, 40), $args);
|
|
return $this->wait(array(
|
|
"10,41", // Connection.open_ok
|
|
"10,50" // Connection.redirect
|
|
));
|
|
}
|
|
|
|
|
|
/**
|
|
* signal that the connection is ready
|
|
*/
|
|
protected function open_ok($args)
|
|
{
|
|
$this->known_hosts = $args->read_shortstr();
|
|
if($this->debug)
|
|
{
|
|
debug_msg("Open OK! known_hosts: " . $this->known_hosts);
|
|
}
|
|
|
|
return NULL;
|
|
}
|
|
|
|
|
|
/**
|
|
* asks the client to use a different server
|
|
*/
|
|
protected function redirect($args)
|
|
{
|
|
$host = $args->read_shortstr();
|
|
$this->known_hosts = $args->read_shortstr();
|
|
if($this->debug)
|
|
{
|
|
debug_msg("Redirected to [". $host . "], known_hosts [" . $this->known_hosts . "]" );
|
|
}
|
|
return $host;
|
|
}
|
|
|
|
/**
|
|
* security mechanism challenge
|
|
*/
|
|
protected function secure($args)
|
|
{
|
|
$challenge = $args->read_longstr();
|
|
}
|
|
|
|
/**
|
|
* security mechanism response
|
|
*/
|
|
protected function x_secure_ok($response)
|
|
{
|
|
$args = new AMQPWriter();
|
|
$args->write_longstr($response);
|
|
$this->send_method_frame(array(10, 21), $args);
|
|
}
|
|
|
|
/**
|
|
* start connection negotiation
|
|
*/
|
|
protected function start($args)
|
|
{
|
|
$this->version_major = $args->read_octet();
|
|
$this->version_minor = $args->read_octet();
|
|
$this->server_properties = $args->read_table();
|
|
$this->mechanisms = explode(" ", $args->read_longstr());
|
|
$this->locales = explode(" ", $args->read_longstr());
|
|
|
|
if($this->debug)
|
|
{
|
|
debug_msg(sprintf("Start from server, version: %d.%d, properties: %s, mechanisms: %s, locales: %s",
|
|
$this->version_major,
|
|
$this->version_minor,
|
|
self::dump_table($this->server_properties),
|
|
implode(', ', $this->mechanisms),
|
|
implode(', ', $this->locales)));
|
|
}
|
|
|
|
}
|
|
|
|
|
|
protected function x_start_ok($client_properties, $mechanism, $response, $locale)
|
|
{
|
|
$args = new AMQPWriter();
|
|
$args->write_table($client_properties);
|
|
$args->write_shortstr($mechanism);
|
|
$args->write_longstr($response);
|
|
$args->write_shortstr($locale);
|
|
$this->send_method_frame(array(10, 11), $args);
|
|
}
|
|
|
|
/**
|
|
* propose connection tuning parameters
|
|
*/
|
|
protected function tune($args)
|
|
{
|
|
$v=$args->read_short();
|
|
if($v)
|
|
$this->channel_max = $v;
|
|
$v=$args->read_long();
|
|
if($v)
|
|
$this->frame_max = $v;
|
|
$this->heartbeat = $args->read_short();
|
|
|
|
$this->x_tune_ok($this->channel_max, $this->frame_max, 0);
|
|
}
|
|
|
|
/**
|
|
* negotiate connection tuning parameters
|
|
*/
|
|
protected function x_tune_ok($channel_max, $frame_max, $heartbeat)
|
|
{
|
|
$args = new AMQPWriter();
|
|
$args->write_short($channel_max);
|
|
$args->write_long($frame_max);
|
|
$args->write_short($heartbeat);
|
|
$this->send_method_frame(array(10, 31), $args);
|
|
$this->wait_tune_ok = False;
|
|
}
|
|
|
|
}
|
|
|
|
class AMQPChannel extends AbstractChannel
|
|
{
|
|
protected $method_map = array(
|
|
"20,11" => "open_ok",
|
|
"20,20" => "flow",
|
|
"20,21" => "flow_ok",
|
|
"20,30" => "alert",
|
|
"20,40" => "_close",
|
|
"20,41" => "close_ok",
|
|
"30,11" => "access_request_ok",
|
|
"40,11" => "exchange_declare_ok",
|
|
"40,21" => "exchange_delete_ok",
|
|
"50,11" => "queue_declare_ok",
|
|
"50,21" => "queue_bind_ok",
|
|
"50,31" => "queue_purge_ok",
|
|
"50,41" => "queue_delete_ok",
|
|
"50,51" => "queue_unbind_ok",
|
|
"60,11" => "basic_qos_ok",
|
|
"60,21" => "basic_consume_ok",
|
|
"60,31" => "basic_cancel_ok",
|
|
"60,50" => "basic_return",
|
|
"60,60" => "basic_deliver",
|
|
"60,71" => "basic_get_ok",
|
|
"60,72" => "basic_get_empty",
|
|
"90,11" => "tx_select_ok",
|
|
"90,21" => "tx_commit_ok",
|
|
"90,31" => "tx_rollback_ok"
|
|
);
|
|
|
|
public function __construct($connection,
|
|
$channel_id=NULL,
|
|
$auto_decode=true)
|
|
{
|
|
|
|
if($channel_id == NULL)
|
|
$channel_id = $connection->get_free_channel_id();
|
|
|
|
parent::__construct($connection, $channel_id);
|
|
|
|
if($this->debug)
|
|
{
|
|
debug_msg("using channel_id: " . $channel_id);
|
|
}
|
|
|
|
$this->default_ticket = 0;
|
|
$this->is_open = false;
|
|
$this->active = true; // Flow control
|
|
$this->alerts = array();
|
|
$this->callbacks = array();
|
|
$this->auto_decode = $auto_decode;
|
|
|
|
$this->x_open();
|
|
}
|
|
|
|
public function __destruct()
|
|
{
|
|
//TODO:???if($this->connection)
|
|
// $this->close("destroying channel");
|
|
}
|
|
|
|
/**
|
|
* Tear down this object, after we've agreed to close with the server.
|
|
*/
|
|
protected function do_close()
|
|
{
|
|
$this->is_open = false;
|
|
unset($this->connection->channels[$this->channel_id]);
|
|
$this->channel_id = $this->connection = NULL;
|
|
}
|
|
|
|
/**
|
|
* This method allows the server to send a non-fatal warning to
|
|
* the client. This is used for methods that are normally
|
|
* asynchronous and thus do not have confirmations, and for which
|
|
* the server may detect errors that need to be reported. Fatal
|
|
* errors are handled as channel or connection exceptions; non-
|
|
* fatal errors are sent through this method.
|
|
*/
|
|
protected function alert($args)
|
|
{
|
|
$reply_code = $args->read_short();
|
|
$reply_text = $args->read_shortstr();
|
|
$details = $args->read_table();
|
|
|
|
array_push($this->alerts,array($reply_code, $reply_text, $details));
|
|
}
|
|
|
|
/**
|
|
* request a channel close
|
|
*/
|
|
public function close($reply_code=0,
|
|
$reply_text="",
|
|
$method_sig=array(0, 0))
|
|
{
|
|
$args = new AMQPWriter();
|
|
$args->write_short($reply_code);
|
|
$args->write_shortstr($reply_text);
|
|
$args->write_short($method_sig[0]); // class_id
|
|
$args->write_short($method_sig[1]); // method_id
|
|
$this->send_method_frame(array(20, 40), $args);
|
|
return $this->wait(array(
|
|
"20,41" // Channel.close_ok
|
|
));
|
|
}
|
|
|
|
|
|
protected function _close($args)
|
|
{
|
|
$reply_code = $args->read_short();
|
|
$reply_text = $args->read_shortstr();
|
|
$class_id = $args->read_short();
|
|
$method_id = $args->read_short();
|
|
|
|
$this->send_method_frame(array(20, 41));
|
|
$this->do_close();
|
|
|
|
throw new AMQPChannelException($reply_code, $reply_text,
|
|
array($class_id, $method_id));
|
|
}
|
|
|
|
/**
|
|
* confirm a channel close
|
|
*/
|
|
protected function close_ok($args)
|
|
{
|
|
$this->do_close();
|
|
}
|
|
|
|
/**
|
|
* enable/disable flow from peer
|
|
*/
|
|
public function flow($active)
|
|
{
|
|
$args = new AMQPWriter();
|
|
$args->write_bit($active);
|
|
$this->send_method_frame(array(20, 20), $args);
|
|
return $this->wait(array(
|
|
"20,21" //Channel.flow_ok
|
|
));
|
|
}
|
|
|
|
protected function _flow($args)
|
|
{
|
|
$this->active = $args->read_bit();
|
|
$this->x_flow_ok($this->active);
|
|
}
|
|
|
|
protected function x_flow_ok($active)
|
|
{
|
|
$args = new AMQPWriter();
|
|
$args->write_bit($active);
|
|
$this->send_method_frame(array(20, 21), $args);
|
|
}
|
|
|
|
protected function flow_ok($args)
|
|
{
|
|
return $args->read_bit();
|
|
}
|
|
|
|
protected function x_open($out_of_band="")
|
|
{
|
|
if($this->is_open)
|
|
return;
|
|
|
|
$args = new AMQPWriter();
|
|
$args->write_shortstr($out_of_band);
|
|
$this->send_method_frame(array(20, 10), $args);
|
|
return $this->wait(array(
|
|
"20,11" //Channel.open_ok
|
|
));
|
|
}
|
|
|
|
protected function open_ok($args)
|
|
{
|
|
$this->is_open = true;
|
|
if($this->debug)
|
|
{
|
|
debug_msg("Channel open");
|
|
}
|
|
}
|
|
|
|
/**
|
|
* request an access ticket
|
|
*/
|
|
public function access_request($realm, $exclusive=false,
|
|
$passive=false, $active=false, $write=false, $read=false)
|
|
{
|
|
$args = new AMQPWriter();
|
|
$args->write_shortstr($realm);
|
|
$args->write_bit($exclusive);
|
|
$args->write_bit($passive);
|
|
$args->write_bit($active);
|
|
$args->write_bit($write);
|
|
$args->write_bit($read);
|
|
$this->send_method_frame(array(30, 10), $args);
|
|
return $this->wait(array(
|
|
"30,11" //Channel.access_request_ok
|
|
));
|
|
}
|
|
|
|
/**
|
|
* grant access to server resources
|
|
*/
|
|
protected function access_request_ok($args)
|
|
{
|
|
$this->default_ticket = $args->read_short();
|
|
return $this->default_ticket;
|
|
}
|
|
|
|
|
|
/**
|
|
* declare exchange, create if needed
|
|
*/
|
|
public function exchange_declare($exchange,
|
|
$type,
|
|
$passive=false,
|
|
$durable=false,
|
|
$auto_delete=true,
|
|
$internal=false,
|
|
$nowait=false,
|
|
$arguments=NULL,
|
|
$ticket=NULL)
|
|
{
|
|
if($arguments==NULL)
|
|
$arguments = array();
|
|
|
|
$args = new AMQPWriter();
|
|
if($ticket != NULL)
|
|
$args->write_short($ticket);
|
|
else
|
|
$args->write_short($this->default_ticket);
|
|
$args->write_shortstr($exchange);
|
|
$args->write_shortstr($type);
|
|
$args->write_bit($passive);
|
|
$args->write_bit($durable);
|
|
$args->write_bit($auto_delete);
|
|
$args->write_bit($internal);
|
|
$args->write_bit($nowait);
|
|
$args->write_table($arguments);
|
|
$this->send_method_frame(array(40, 10), $args);
|
|
|
|
if(!$nowait)
|
|
return $this->wait(array(
|
|
"40,11" //Channel.exchange_declare_ok
|
|
));
|
|
}
|
|
|
|
/**
|
|
* confirms an exchange declaration
|
|
*/
|
|
protected function exchange_declare_ok($args)
|
|
{
|
|
}
|
|
|
|
/**
|
|
* delete an exchange
|
|
*/
|
|
public function exchange_delete($exchange, $if_unused=false,
|
|
$nowait=false, $ticket=NULL)
|
|
{
|
|
$args = new AMQPWriter();
|
|
if($ticket != NULL)
|
|
$args->write_short($ticket);
|
|
else
|
|
$args->write_short($this->default_ticket);
|
|
$args->write_shortstr($exchange);
|
|
$args->write_bit($if_unused);
|
|
$args->write_bit($nowait);
|
|
$this->send_method_frame(array(40, 20), $args);
|
|
|
|
if(!$nowait)
|
|
return $this->wait(array(
|
|
"40,21" //Channel.exchange_delete_ok
|
|
));
|
|
}
|
|
|
|
/**
|
|
* confirm deletion of an exchange
|
|
*/
|
|
protected function exchange_delete_ok($args)
|
|
{
|
|
}
|
|
|
|
|
|
/**
|
|
* bind queue to an exchange
|
|
*/
|
|
public function queue_bind($queue, $exchange, $routing_key="",
|
|
$nowait=false, $arguments=NULL, $ticket=NULL)
|
|
{
|
|
if($arguments == NULL)
|
|
$arguments = array();
|
|
|
|
$args = new AMQPWriter();
|
|
if($ticket != NULL)
|
|
$args->write_short($ticket);
|
|
else
|
|
$args->write_short($this->default_ticket);
|
|
$args->write_shortstr($queue);
|
|
$args->write_shortstr($exchange);
|
|
$args->write_shortstr($routing_key);
|
|
$args->write_bit($nowait);
|
|
$args->write_table($arguments);
|
|
$this->send_method_frame(array(50, 20), $args);
|
|
|
|
if(!$nowait)
|
|
return $this->wait(array(
|
|
"50,21" // Channel.queue_bind_ok
|
|
));
|
|
}
|
|
|
|
/**
|
|
* confirm bind successful
|
|
*/
|
|
protected function queue_bind_ok($args)
|
|
{
|
|
}
|
|
|
|
/**
|
|
* unbind queue from an exchange
|
|
*/
|
|
public function queue_unbind($queue, $exchange, $routing_key="",
|
|
$arguments=NULL, $ticket=NULL)
|
|
{
|
|
if($arguments == NULL)
|
|
$arguments = array();
|
|
|
|
$args = new AMQPWriter();
|
|
if($ticket != NULL)
|
|
$args->write_short($ticket);
|
|
else
|
|
$args->write_short($this->default_ticket);
|
|
$args->write_shortstr($queue);
|
|
$args->write_shortstr($exchange);
|
|
$args->write_shortstr($routing_key);
|
|
$args->write_table($arguments);
|
|
$this->send_method_frame(array(50, 50), $args);
|
|
|
|
return $this->wait(array(
|
|
"50,51" // Channel.queue_unbind_ok
|
|
));
|
|
}
|
|
|
|
/**
|
|
* confirm unbind successful
|
|
*/
|
|
protected function queue_unbind_ok($args)
|
|
{
|
|
}
|
|
|
|
/**
|
|
* declare queue, create if needed
|
|
*/
|
|
public function queue_declare($queue="",
|
|
$passive=false,
|
|
$durable=false,
|
|
$exclusive=false,
|
|
$auto_delete=true,
|
|
$nowait=false,
|
|
$arguments=NULL,
|
|
$ticket=NULL)
|
|
{
|
|
if($arguments == NULL)
|
|
$arguments = array();
|
|
|
|
$args = new AMQPWriter();
|
|
if($ticket != NULL)
|
|
$args->write_short($ticket);
|
|
else
|
|
$args->write_short($this->default_ticket);
|
|
$args->write_shortstr($queue);
|
|
$args->write_bit($passive);
|
|
$args->write_bit($durable);
|
|
$args->write_bit($exclusive);
|
|
$args->write_bit($auto_delete);
|
|
$args->write_bit($nowait);
|
|
$args->write_table($arguments);
|
|
$this->send_method_frame(array(50, 10), $args);
|
|
|
|
if(!$nowait)
|
|
return $this->wait(array(
|
|
"50,11" // Channel.queue_declare_ok
|
|
));
|
|
}
|
|
|
|
/**
|
|
* confirms a queue definition
|
|
*/
|
|
protected function queue_declare_ok($args)
|
|
{
|
|
$queue = $args->read_shortstr();
|
|
$message_count = $args->read_long();
|
|
$consumer_count = $args->read_long();
|
|
|
|
return array($queue, $message_count, $consumer_count);
|
|
}
|
|
|
|
/**
|
|
* delete a queue
|
|
*/
|
|
public function queue_delete($queue="", $if_unused=false, $if_empty=false,
|
|
$nowait=false, $ticket=NULL)
|
|
{
|
|
$args = new AMQPWriter();
|
|
if($ticket != NULL)
|
|
$args->write_short($ticket);
|
|
else
|
|
$args->write_short($this->default_ticket);
|
|
|
|
$args->write_shortstr($queue);
|
|
$args->write_bit($if_unused);
|
|
$args->write_bit($if_empty);
|
|
$args->write_bit($nowait);
|
|
$this->send_method_frame(array(50, 40), $args);
|
|
|
|
if(!$nowait)
|
|
return $this->wait(array(
|
|
"50,41" //Channel.queue_delete_ok
|
|
));
|
|
}
|
|
|
|
/**
|
|
* confirm deletion of a queue
|
|
*/
|
|
protected function queue_delete_ok($args)
|
|
{
|
|
return $args->read_long();
|
|
}
|
|
|
|
/**
|
|
* purge a queue
|
|
*/
|
|
public function queue_purge($queue="", $nowait=false, $ticket=NULL)
|
|
{
|
|
$args = new AMQPWriter();
|
|
if($ticket != NULL)
|
|
$args->write_short($ticket);
|
|
else
|
|
$args->write_short($this->default_ticket);
|
|
$args->write_shortstr($queue);
|
|
$args->write_bit($nowait);
|
|
$this->send_method_frame(array(50, 30), $args);
|
|
|
|
if(!$nowait)
|
|
return $this->wait(array(
|
|
"50,31" //Channel.queue_purge_ok
|
|
));
|
|
}
|
|
|
|
/**
|
|
* confirms a queue purge
|
|
*/
|
|
protected function queue_purge_ok($args)
|
|
{
|
|
return $args->read_long();
|
|
}
|
|
|
|
/**
|
|
* acknowledge one or more messages
|
|
*/
|
|
public function basic_ack($delivery_tag, $multiple=false)
|
|
{
|
|
$args = new AMQPWriter();
|
|
$args->write_longlong($delivery_tag);
|
|
$args->write_bit($multiple);
|
|
$this->send_method_frame(array(60, 80), $args);
|
|
}
|
|
|
|
/**
|
|
* end a queue consumer
|
|
*/
|
|
public function basic_cancel($consumer_tag, $nowait=false)
|
|
{
|
|
$args = new AMQPWriter();
|
|
$args->write_shortstr($consumer_tag);
|
|
$args->write_bit($nowait);
|
|
$this->send_method_frame(array(60, 30), $args);
|
|
return $this->wait(array(
|
|
"60,31" // Channel.basic_cancel_ok
|
|
));
|
|
}
|
|
|
|
/**
|
|
* confirm a cancelled consumer
|
|
*/
|
|
protected function basic_cancel_ok($args)
|
|
{
|
|
$consumer_tag = $args->read_shortstr();
|
|
unset($this->callbacks[$consumer_tag]);
|
|
}
|
|
|
|
/**
|
|
* start a queue consumer
|
|
*/
|
|
public function basic_consume($queue="", $consumer_tag="", $no_local=false,
|
|
$no_ack=false, $exclusive=false, $nowait=false,
|
|
$callback=NULL, $ticket=NULL)
|
|
{
|
|
$args = new AMQPWriter();
|
|
if($ticket != NULL)
|
|
$args->write_short($ticket);
|
|
else
|
|
$args->write_short($this->default_ticket);
|
|
$args->write_shortstr($queue);
|
|
$args->write_shortstr($consumer_tag);
|
|
$args->write_bit($no_local);
|
|
$args->write_bit($no_ack);
|
|
$args->write_bit($exclusive);
|
|
$args->write_bit($nowait);
|
|
$this->send_method_frame(array(60, 20), $args);
|
|
|
|
if(!$nowait)
|
|
$consumer_tag = $this->wait(array(
|
|
"60,21" //Channel.basic_consume_ok
|
|
));
|
|
|
|
$this->callbacks[$consumer_tag] = $callback;
|
|
return $consumer_tag;
|
|
}
|
|
|
|
/**
|
|
* confirm a new consumer
|
|
*/
|
|
protected function basic_consume_ok($args)
|
|
{
|
|
return $args->read_shortstr();
|
|
}
|
|
|
|
/**
|
|
* notify the client of a consumer message
|
|
*/
|
|
protected function basic_deliver($args, $msg)
|
|
{
|
|
$consumer_tag = $args->read_shortstr();
|
|
$delivery_tag = $args->read_longlong();
|
|
$redelivered = $args->read_bit();
|
|
$exchange = $args->read_shortstr();
|
|
$routing_key = $args->read_shortstr();
|
|
|
|
$msg->delivery_info = array(
|
|
"channel" => $this,
|
|
"consumer_tag" => $consumer_tag,
|
|
"delivery_tag" => $delivery_tag,
|
|
"redelivered" => $redelivered,
|
|
"exchange" => $exchange,
|
|
"routing_key" => $routing_key
|
|
);
|
|
|
|
if(array_key_exists($consumer_tag, $this->callbacks))
|
|
$func = $this->callbacks[$consumer_tag];
|
|
else
|
|
$func = NULL;
|
|
|
|
if($func!=NULL)
|
|
call_user_func($func, $msg);
|
|
}
|
|
|
|
/**
|
|
* direct access to a queue
|
|
*/
|
|
public function basic_get($queue="", $no_ack=false, $ticket=NULL)
|
|
{
|
|
$args = new AMQPWriter();
|
|
if($ticket != NULL)
|
|
$args->write_short($ticket);
|
|
else
|
|
$args->write_short($this->default_ticket);
|
|
$args->write_shortstr($queue);
|
|
$args->write_bit($no_ack);
|
|
$this->send_method_frame(array(60, 70), $args);
|
|
return $this->wait(array(
|
|
"60,71", //Channel.basic_get_ok
|
|
"60,72" // Channel.basic_get_empty
|
|
));
|
|
}
|
|
|
|
/**
|
|
* indicate no messages available
|
|
*/
|
|
protected function basic_get_empty($args)
|
|
{
|
|
$cluster_id = $args->read_shortstr();
|
|
}
|
|
|
|
/**
|
|
* provide client with a message
|
|
*/
|
|
protected function basic_get_ok($args, $msg)
|
|
{
|
|
$delivery_tag = $args->read_longlong();
|
|
$redelivered = $args->read_bit();
|
|
$exchange = $args->read_shortstr();
|
|
$routing_key = $args->read_shortstr();
|
|
$message_count = $args->read_long();
|
|
|
|
$msg->delivery_info = array(
|
|
"delivery_tag" => $delivery_tag,
|
|
"redelivered" => $redelivered,
|
|
"exchange" => $exchange,
|
|
"routing_key" => $routing_key,
|
|
"message_count" => $message_count
|
|
);
|
|
return $msg;
|
|
}
|
|
|
|
/**
|
|
* publish a message
|
|
*/
|
|
public function basic_publish($msg, $exchange="", $routing_key="",
|
|
$mandatory=false, $immediate=false,
|
|
$ticket=NULL)
|
|
{
|
|
$args = new AMQPWriter();
|
|
if($ticket != NULL)
|
|
$args->write_short($ticket);
|
|
else
|
|
$args->write_short($this->default_ticket);
|
|
$args->write_shortstr($exchange);
|
|
$args->write_shortstr($routing_key);
|
|
$args->write_bit($mandatory);
|
|
$args->write_bit($immediate);
|
|
$this->send_method_frame(array(60, 40), $args);
|
|
|
|
$this->connection->send_content($this->channel_id, 60, 0,
|
|
strlen($msg->body),
|
|
$msg->serialize_properties(),
|
|
$msg->body);
|
|
}
|
|
|
|
|
|
/**
|
|
* specify quality of service
|
|
*/
|
|
public function basic_qos($prefetch_size, $prefetch_count, $a_global)
|
|
{
|
|
$args = new AMQPWriter();
|
|
$args->write_long($prefetch_size);
|
|
$args->write_short($prefetch_count);
|
|
$args->write_bit($a_global);
|
|
$this->send_method_frame(array(60, 10), $args);
|
|
return $this->wait(array(
|
|
"60,11" //Channel.basic_qos_ok
|
|
));
|
|
}
|
|
|
|
|
|
/**
|
|
* confirm the requested qos
|
|
*/
|
|
protected function basic_qos_ok($args)
|
|
{
|
|
}
|
|
|
|
/**
|
|
* redeliver unacknowledged messages
|
|
*/
|
|
public function basic_recover($requeue=false)
|
|
{
|
|
$args = new AMQPWriter();
|
|
$args->write_bit($requeue);
|
|
$this->send_method_frame(array(60, 100), $args);
|
|
}
|
|
|
|
/**
|
|
* reject an incoming message
|
|
*/
|
|
public function basic_reject($delivery_tag, $requeue)
|
|
{
|
|
$args = new AMQPWriter();
|
|
$args->write_longlong($delivery_tag);
|
|
$args->write_bit($requeue);
|
|
$this->send_method_frame(array(60, 90), $args);
|
|
}
|
|
|
|
/**
|
|
* return a failed message
|
|
*/
|
|
protected function basic_return($args)
|
|
{
|
|
$reply_code = $args->read_short();
|
|
$reply_text = $args->read_shortstr();
|
|
$exchange = $args->read_shortstr();
|
|
$routing_key = $args->read_shortstr();
|
|
$msg = $this->wait();
|
|
}
|
|
|
|
|
|
public function tx_commit()
|
|
{
|
|
$this->send_method_frame(array(90, 20));
|
|
return $this->wait(array(
|
|
"90,21" //Channel.tx_commit_ok
|
|
));
|
|
}
|
|
|
|
/**
|
|
* confirm a successful commit
|
|
*/
|
|
protected function tx_commit_ok($args)
|
|
{
|
|
}
|
|
|
|
|
|
/**
|
|
* abandon the current transaction
|
|
*/
|
|
public function tx_rollback()
|
|
{
|
|
$this->send_method_frame(array(90, 30));
|
|
return $this->wait(array(
|
|
"90,31" //Channel.tx_rollback_ok
|
|
));
|
|
}
|
|
|
|
/**
|
|
* confirm a successful rollback
|
|
*/
|
|
protected function tx_rollback_ok($args)
|
|
{
|
|
}
|
|
|
|
/**
|
|
* select standard transaction mode
|
|
*/
|
|
public function tx_select()
|
|
{
|
|
$this->send_method_frame(array(90, 10));
|
|
return $this->wait(array(
|
|
"90,11" //Channel.tx_select_ok
|
|
));
|
|
}
|
|
|
|
/**
|
|
* confirm transaction mode
|
|
*/
|
|
protected function tx_select_ok($args)
|
|
{
|
|
}
|
|
|
|
}
|
|
|
|
/**
|
|
* A Message for use with the Channnel.basic_* methods.
|
|
*/
|
|
class AMQPMessage extends GenericContent
|
|
{
|
|
protected static $PROPERTIES = array(
|
|
"content_type" => "shortstr",
|
|
"content_encoding" => "shortstr",
|
|
"application_headers" => "table",
|
|
"delivery_mode" => "octet",
|
|
"priority" => "octet",
|
|
"correlation_id" => "shortstr",
|
|
"reply_to" => "shortstr",
|
|
"expiration" => "shortstr",
|
|
"message_id" => "shortstr",
|
|
"timestamp" => "timestamp",
|
|
"type" => "shortstr",
|
|
"user_id" => "shortstr",
|
|
"app_id" => "shortstr",
|
|
"cluster_id" => "shortst"
|
|
);
|
|
|
|
public function __construct($body = '', $properties = null)
|
|
{
|
|
$this->body = $body;
|
|
|
|
parent::__construct($properties, $prop_types=AMQPMessage::$PROPERTIES);
|
|
}
|
|
}
|
|
|
|
?>
|