* */ require_once('amqp_wire.inc'); require_once('hexdump.inc'); function debug_msg($s) { echo $s, "\n"; } function methodSig($a) { if(is_string($a)) return $a; else return sprintf("%d,%d",$a[0] ,$a[1]); } class AMQPException extends Exception { public function __construct($reply_code, $reply_text, $method_sig) { parent::__construct($reply_text,$reply_code); $this->amqp_reply_code = $reply_code; // redundant, but kept for BC $this->amqp_reply_text = $reply_text; // redundant, but kept for BC $this->amqp_method_sig = $method_sig; $ms=methodSig($method_sig); if(array_key_exists($ms, AbstractChannel::$GLOBAL_METHOD_NAMES)) $mn = AbstractChannel::$GLOBAL_METHOD_NAMES[$ms]; else $mn = ""; $this->args = array( $reply_code, $reply_text, $method_sig, $mn ); } } class AMQPConnectionException extends AMQPException { public function __construct($reply_code, $reply_text, $method_sig) { parent::__construct($reply_code, $reply_text, $method_sig); } } class AMQPChannelException extends AMQPException { public function __construct($reply_code, $reply_text, $method_sig) { parent::__construct($reply_code, $reply_text, $method_sig); } } class AbstractChannel { private static $CONTENT_METHODS = array( "60,60", // Basic.deliver "60,71", // Basic.get_ok ); private static $CLOSE_METHODS = array( "10,60", // Connection.close "20,40", // Channel.close ); // All the method names public static $GLOBAL_METHOD_NAMES = array( "10,10" => "Connection.start", "10,11" => "Connection.start_ok", "10,20" => "Connection.secure", "10,21" => "Connection.secure_ok", "10,30" => "Connection.tune", "10,31" => "Connection.tune_ok", "10,40" => "Connection.open", "10,41" => "Connection.open_ok", "10,50" => "Connection.redirect", "10,60" => "Connection.close", "10,61" => "Connection.close_ok", "20,10" => "Channel.open", "20,11" => "Channel.open_ok", "20,20" => "Channel.flow", "20,21" => "Channel.flow_ok", "20,30" => "Channel.alert", "20,40" => "Channel.close", "20,41" => "Channel.close_ok", "30,10" => "Channel.access_request", "30,11" => "Channel.access_request_ok", "40,10" => "Channel.exchange_declare", "40,11" => "Channel.exchange_declare_ok", "40,20" => "Channel.exchange_delete", "40,21" => "Channel.exchange_delete_ok", "50,10" => "Channel.queue_declare", "50,11" => "Channel.queue_declare_ok", "50,20" => "Channel.queue_bind", "50,21" => "Channel.queue_bind_ok", "50,30" => "Channel.queue_purge", "50,31" => "Channel.queue_purge_ok", "50,40" => "Channel.queue_delete", "50,41" => "Channel.queue_delete_ok", "50,50" => "Channel.queue_unbind", "50,51" => "Channel.queue_unbind_ok", "60,10" => "Channel.basic_qos", "60,11" => "Channel.basic_qos_ok", "60,20" => "Channel.basic_consume", "60,21" => "Channel.basic_consume_ok", "60,30" => "Channel.basic_cancel", "60,31" => "Channel.basic_cancel_ok", "60,40" => "Channel.basic_publish", "60,50" => "Channel.basic_return", "60,60" => "Channel.basic_deliver", "60,70" => "Channel.basic_get", "60,71" => "Channel.basic_get_ok", "60,72" => "Channel.basic_get_empty", "60,80" => "Channel.basic_ack", "60,90" => "Channel.basic_reject", "60,100" => "Channel.basic_recover", "90,10" => "Channel.tx_select", "90,11" => "Channel.tx_select_ok", "90,20" => "Channel.tx_commit", "90,21" => "Channel.tx_commit_ok", "90,30" => "Channel.tx_rollback", "90,31" => "Channel.tx_rollback_ok" ); protected $debug; public function __construct($connection, $channel_id) { $this->connection = $connection; $this->channel_id = $channel_id; $connection->channels[$channel_id] = $this; $this->frame_queue = array(); // Lower level queue for frames $this->method_queue = array(); // Higher level queue for methods $this->auto_decode = false; $this->debug = defined('AMQP_DEBUG') ? AMQP_DEBUG : false; } public function getChannelId() { return $this->channel_id; } function dispatch($method_sig, $args, $content) { if(!array_key_exists($method_sig, $this->method_map)) throw new Exception("Unknown AMQP method $method_sig"); $amqp_method = $this->method_map[$method_sig]; if($content == NULL) return call_user_func(array($this,$amqp_method), $args); else return call_user_func(array($this,$amqp_method), $args, $content); } function next_frame() { if($this->debug) { debug_msg("waiting for a new frame"); } if($this->frame_queue != NULL) return array_pop($this->frame_queue); return $this->connection->wait_channel($this->channel_id); } protected function send_method_frame($method_sig, $args="") { $this->connection->send_channel_method_frame($this->channel_id, $method_sig, $args); } function wait_content() { $frm = $this->next_frame(); $frame_type = $frm[0]; $payload = $frm[1]; if($frame_type != 2) throw new Exception("Expecting Content header"); $payload_reader = new AMQPReader(substr($payload,0,12)); $class_id = $payload_reader->read_short(); $weight = $payload_reader->read_short(); $body_size = $payload_reader->read_longlong(); $msg = new AMQPMessage(); $msg->load_properties(substr($payload,12)); $body_parts = array(); $body_received = 0; while(bccomp($body_size,$body_received)==1) { $frm = $this->next_frame(); $frame_type = $frm[0]; $payload = $frm[1]; if($frame_type != 3) throw new Exception("Expecting Content body, received frame type $frame_type"); $body_parts[] = $payload; $body_received = bcadd($body_received, strlen($payload)); } $msg->body = implode("",$body_parts); if($this->auto_decode and isset($msg->content_encoding)) { try { $msg->body = $msg->body->decode($msg->content_encoding); } catch (Exception $e) { if($this->debug) { debug_msg("Ignoring body decoding exception: " . $e->getMessage()); } } } return $msg; } /** * Wait for some expected AMQP methods and dispatch to them. * Unexpected methods are queued up for later calls to this Python * method. */ public function wait($allowed_methods=NULL) { if($allowed_methods) { if($this->debug) { debug_msg("waiting for " . implode(", ", $allowed_methods)); } } else { if($this->debug) { debug_msg("waiting for any method"); } } //Process deferred methods foreach($this->method_queue as $qk=>$queued_method) { if($this->debug) { debug_msg("checking queue method " . $qk); } $method_sig = $queued_method[0]; if($allowed_methods==NULL || in_array($method_sig, $allowed_methods)) { unset($this->method_queue[$qk]); if($this->debug) { debug_msg("Executing queued method: $method_sig: " . AbstractChannel::$GLOBAL_METHOD_NAMES[methodSig($method_sig)]); } return $this->dispatch($queued_method[0], $queued_method[1], $queued_method[2]); } } // No deferred methods? wait for new ones while(true) { $frm = $this->next_frame(); $frame_type = $frm[0]; $payload = $frm[1]; if($frame_type != 1) throw new Exception("Expecting AMQP method, received frame type: $frame_type"); if(strlen($payload) < 4) throw new Exception("Method frame too short"); $method_sig_array = unpack("n2", substr($payload,0,4)); $method_sig = "" . $method_sig_array[1] . "," . $method_sig_array[2]; $args = new AMQPReader(substr($payload,4)); if($this->debug) { debug_msg("> $method_sig: " . AbstractChannel::$GLOBAL_METHOD_NAMES[methodSig($method_sig)]); } if(in_array($method_sig, AbstractChannel::$CONTENT_METHODS)) $content = $this->wait_content(); else $content = NULL; if($allowed_methods==NULL || in_array($method_sig,$allowed_methods) || in_array($method_sig,AbstractChannel::$CLOSE_METHODS)) { return $this->dispatch($method_sig, $args, $content); } // Wasn't what we were looking for? save it for later if($this->debug) { debug_msg("Queueing for later: $method_sig: " . AbstractChannel::$GLOBAL_METHOD_NAMES[methodSig($method_sig)]); } array_push($this->method_queue,array($method_sig, $args, $content)); } } } class AMQPConnection extends AbstractChannel { public static $AMQP_PROTOCOL_HEADER = "AMQP\x01\x01\x09\x01"; public static $LIBRARY_PROPERTIES = array( "library" => array('S', "PHP Simple AMQP lib"), "library_version" => array('S', "0.1") ); protected $method_map = array( "10,10" => "start", "10,20" => "secure", "10,30" => "tune", "10,41" => "open_ok", "10,50" => "redirect", "10,60" => "_close", "10,61" => "close_ok" ); public function __construct($host, $port, $user, $password, $vhost="/",$insist=false, $login_method="AMQPLAIN", $login_response=NULL, $locale="en_US", $connection_timeout = 3, $read_write_timeout = 3) { if($user && $password) { $login_response = new AMQPWriter(); $login_response->write_table(array("LOGIN" => array('S',$user), "PASSWORD" => array('S',$password))); $login_response = substr($login_response->getvalue(),4); //Skip the length } else $login_response = NULL; $d = AMQPConnection::$LIBRARY_PROPERTIES; while(true) { $this->channels = array(); // The connection object itself is treated as channel 0 parent::__construct($this, 0); $this->channel_max = 65535; $this->frame_max = 131072; $errstr = $errno = NULL; $this->sock = NULL; if (!($this->sock = fsockopen($host,$port,$errno,$errstr,$connection_timeout))) { throw new Exception ("Error Connecting to server($errno): $errstr "); } stream_set_timeout($this->sock, $read_write_timeout); stream_set_blocking($this->sock, 1); $this->input = new AMQPReader(null, $this->sock); $this->write(AMQPConnection::$AMQP_PROTOCOL_HEADER); $this->wait(array("10,10")); $this->x_start_ok($d, $login_method, $login_response, $locale); $this->wait_tune_ok = true; while($this->wait_tune_ok) { $this->wait(array( "10,20", // secure "10,30", // tune )); } $host = $this->x_open($vhost,"", $insist); if(!$host) return; // we weren't redirected // we were redirected, close the socket, loop and try again if($this->debug) { debug_msg("closing socket"); } @fclose($this->sock); $this->sock=NULL; } } public function __destruct() { if(isset($this->input)) if($this->input) $this->close(); if($this->sock) { if($this->debug) { debug_msg("closing socket"); } @fclose($this->sock); } } protected function write($data) { if($this->debug) { debug_msg("< [hex]:\n" . hexdump($data, $htmloutput = false, $uppercase = true, $return = true)); } $len = strlen($data); while(true) { if(false === ($written = fwrite($this->sock, $data))) { throw new Exception ("Error sending data"); } $len = $len - $written; if($len>0) $data=substr($data,0-$len); else break; } } protected function do_close() { if(isset($this->input)) if($this->input) { $this->input->close(); $this->input = NULL; } if($this->sock) { if($this->debug) { debug_msg("closing socket"); } @fclose($this->sock); $this->sock = NULL; } } public function get_free_channel_id() { for($i=1;$i<=$this->channel_max;$i++) if(!array_key_exists($i,$this->channels)) return $i; throw new Exception("No free channel ids"); } public function send_content($channel, $class_id, $weight, $body_size, $packed_properties, $body) { $pkt = new AMQPWriter(); $pkt->write_octet(2); $pkt->write_short($channel); $pkt->write_long(strlen($packed_properties)+12); $pkt->write_short($class_id); $pkt->write_short($weight); $pkt->write_longlong($body_size); $pkt->write($packed_properties); $pkt->write_octet(0xCE); $pkt = $pkt->getvalue(); $this->write($pkt); while($body) { $payload = substr($body,0, $this->frame_max-8); $body = substr($body,$this->frame_max-8); $pkt = new AMQPWriter(); $pkt->write_octet(3); $pkt->write_short($channel); $pkt->write_long(strlen($payload)); $pkt->write($payload); $pkt->write_octet(0xCE); $pkt = $pkt->getvalue(); $this->write($pkt); } } protected function send_channel_method_frame($channel, $method_sig, $args="") { if($args instanceof AMQPWriter) $args = $args->getvalue(); $pkt = new AMQPWriter(); $pkt->write_octet(1); $pkt->write_short($channel); $pkt->write_long(strlen($args)+4); // 4 = length of class_id and method_id // in payload $pkt->write_short($method_sig[0]); // class_id $pkt->write_short($method_sig[1]); // method_id $pkt->write($args); $pkt->write_octet(0xCE); $pkt = $pkt->getvalue(); $this->write($pkt); if($this->debug) { debug_msg("< " . methodSig($method_sig) . ": " . AbstractChannel::$GLOBAL_METHOD_NAMES[methodSig($method_sig)]); } } /** * Wait for a frame from the server */ protected function wait_frame() { $frame_type = $this->input->read_octet(); $channel = $this->input->read_short(); $size = $this->input->read_long(); $payload = $this->input->read($size); $ch = $this->input->read_octet(); if($ch != 0xCE) throw new Exception(sprintf("Framing error, unexpected byte: %x", $ch)); return array($frame_type, $channel, $payload); } /** * Wait for a frame from the server destined for * a particular channel. */ protected function wait_channel($channel_id) { while(true) { list($frame_type, $frame_channel, $payload) = $this->wait_frame(); if($frame_channel == $channel_id) return array($frame_type, $payload); // Not the channel we were looking for. Queue this frame //for later, when the other channel is looking for frames. array_push($this->channels[$frame_channel]->frame_queue, array($frame_type, $payload)); // If we just queued up a method for channel 0 (the Connection // itself) it's probably a close method in reaction to some // error, so deal with it right away. if(($frame_type == 1) && ($frame_channel == 0)) $this->wait(); } } /** * Fetch a Channel object identified by the numeric channel_id, or * create that object if it doesn't already exist. */ public function channel($channel_id=NULL) { if(array_key_exists($channel_id,$this->channels)) return $this->channels[$channel_id]; return new AMQPChannel($this->connection, $channel_id); } /** * request a connection close */ public function close($reply_code=0, $reply_text="", $method_sig=array(0, 0)) { $args = new AMQPWriter(); $args->write_short($reply_code); $args->write_shortstr($reply_text); $args->write_short($method_sig[0]); // class_id $args->write_short($method_sig[1]); // method_id $this->send_method_frame(array(10, 60), $args); return $this->wait(array( "10,61", // Connection.close_ok )); } public static function dump_table($table) { $tokens = array(); foreach ($table as $name => $value) { switch ($value[0]) { case 'D': $val = $value[1]->n . 'E' . $value[1]->e; break; case 'F': $val = '(' . self::dump_table($value[1]) . ')'; case 'T': $val = date('Y-m-d H:i:s', $value[1]); break; default: $val = $value[1]; } $tokens[] = $name . '=' . $val; } return implode(', ', $tokens); } protected function _close($args) { $reply_code = $args->read_short(); $reply_text = $args->read_shortstr(); $class_id = $args->read_short(); $method_id = $args->read_short(); $this->x_close_ok(); throw new AMQPConnectionException($reply_code, $reply_text, array($class_id, $method_id)); } /** * confirm a connection close */ protected function x_close_ok() { $this->send_method_frame(array(10, 61)); $this->do_close(); } /** * confirm a connection close */ protected function close_ok($args) { $this->do_close(); } protected function x_open($virtual_host, $capabilities="", $insist=false) { $args = new AMQPWriter(); $args->write_shortstr($virtual_host); $args->write_shortstr($capabilities); $args->write_bit($insist); $this->send_method_frame(array(10, 40), $args); return $this->wait(array( "10,41", // Connection.open_ok "10,50" // Connection.redirect )); } /** * signal that the connection is ready */ protected function open_ok($args) { $this->known_hosts = $args->read_shortstr(); if($this->debug) { debug_msg("Open OK! known_hosts: " . $this->known_hosts); } return NULL; } /** * asks the client to use a different server */ protected function redirect($args) { $host = $args->read_shortstr(); $this->known_hosts = $args->read_shortstr(); if($this->debug) { debug_msg("Redirected to [". $host . "], known_hosts [" . $this->known_hosts . "]" ); } return $host; } /** * security mechanism challenge */ protected function secure($args) { $challenge = $args->read_longstr(); } /** * security mechanism response */ protected function x_secure_ok($response) { $args = new AMQPWriter(); $args->write_longstr($response); $this->send_method_frame(array(10, 21), $args); } /** * start connection negotiation */ protected function start($args) { $this->version_major = $args->read_octet(); $this->version_minor = $args->read_octet(); $this->server_properties = $args->read_table(); $this->mechanisms = explode(" ", $args->read_longstr()); $this->locales = explode(" ", $args->read_longstr()); if($this->debug) { debug_msg(sprintf("Start from server, version: %d.%d, properties: %s, mechanisms: %s, locales: %s", $this->version_major, $this->version_minor, self::dump_table($this->server_properties), implode(', ', $this->mechanisms), implode(', ', $this->locales))); } } protected function x_start_ok($client_properties, $mechanism, $response, $locale) { $args = new AMQPWriter(); $args->write_table($client_properties); $args->write_shortstr($mechanism); $args->write_longstr($response); $args->write_shortstr($locale); $this->send_method_frame(array(10, 11), $args); } /** * propose connection tuning parameters */ protected function tune($args) { $v=$args->read_short(); if($v) $this->channel_max = $v; $v=$args->read_long(); if($v) $this->frame_max = $v; $this->heartbeat = $args->read_short(); $this->x_tune_ok($this->channel_max, $this->frame_max, 0); } /** * negotiate connection tuning parameters */ protected function x_tune_ok($channel_max, $frame_max, $heartbeat) { $args = new AMQPWriter(); $args->write_short($channel_max); $args->write_long($frame_max); $args->write_short($heartbeat); $this->send_method_frame(array(10, 31), $args); $this->wait_tune_ok = False; } } class AMQPChannel extends AbstractChannel { protected $method_map = array( "20,11" => "open_ok", "20,20" => "flow", "20,21" => "flow_ok", "20,30" => "alert", "20,40" => "_close", "20,41" => "close_ok", "30,11" => "access_request_ok", "40,11" => "exchange_declare_ok", "40,21" => "exchange_delete_ok", "50,11" => "queue_declare_ok", "50,21" => "queue_bind_ok", "50,31" => "queue_purge_ok", "50,41" => "queue_delete_ok", "50,51" => "queue_unbind_ok", "60,11" => "basic_qos_ok", "60,21" => "basic_consume_ok", "60,31" => "basic_cancel_ok", "60,50" => "basic_return", "60,60" => "basic_deliver", "60,71" => "basic_get_ok", "60,72" => "basic_get_empty", "90,11" => "tx_select_ok", "90,21" => "tx_commit_ok", "90,31" => "tx_rollback_ok" ); public function __construct($connection, $channel_id=NULL, $auto_decode=true) { if($channel_id == NULL) $channel_id = $connection->get_free_channel_id(); parent::__construct($connection, $channel_id); if($this->debug) { debug_msg("using channel_id: " . $channel_id); } $this->default_ticket = 0; $this->is_open = false; $this->active = true; // Flow control $this->alerts = array(); $this->callbacks = array(); $this->auto_decode = $auto_decode; $this->x_open(); } public function __destruct() { //TODO:???if($this->connection) // $this->close("destroying channel"); } /** * Tear down this object, after we've agreed to close with the server. */ protected function do_close() { $this->is_open = false; unset($this->connection->channels[$this->channel_id]); $this->channel_id = $this->connection = NULL; } /** * This method allows the server to send a non-fatal warning to * the client. This is used for methods that are normally * asynchronous and thus do not have confirmations, and for which * the server may detect errors that need to be reported. Fatal * errors are handled as channel or connection exceptions; non- * fatal errors are sent through this method. */ protected function alert($args) { $reply_code = $args->read_short(); $reply_text = $args->read_shortstr(); $details = $args->read_table(); array_push($this->alerts,array($reply_code, $reply_text, $details)); } /** * request a channel close */ public function close($reply_code=0, $reply_text="", $method_sig=array(0, 0)) { $args = new AMQPWriter(); $args->write_short($reply_code); $args->write_shortstr($reply_text); $args->write_short($method_sig[0]); // class_id $args->write_short($method_sig[1]); // method_id $this->send_method_frame(array(20, 40), $args); return $this->wait(array( "20,41" // Channel.close_ok )); } protected function _close($args) { $reply_code = $args->read_short(); $reply_text = $args->read_shortstr(); $class_id = $args->read_short(); $method_id = $args->read_short(); $this->send_method_frame(array(20, 41)); $this->do_close(); throw new AMQPChannelException($reply_code, $reply_text, array($class_id, $method_id)); } /** * confirm a channel close */ protected function close_ok($args) { $this->do_close(); } /** * enable/disable flow from peer */ public function flow($active) { $args = new AMQPWriter(); $args->write_bit($active); $this->send_method_frame(array(20, 20), $args); return $this->wait(array( "20,21" //Channel.flow_ok )); } protected function _flow($args) { $this->active = $args->read_bit(); $this->x_flow_ok($this->active); } protected function x_flow_ok($active) { $args = new AMQPWriter(); $args->write_bit($active); $this->send_method_frame(array(20, 21), $args); } protected function flow_ok($args) { return $args->read_bit(); } protected function x_open($out_of_band="") { if($this->is_open) return; $args = new AMQPWriter(); $args->write_shortstr($out_of_band); $this->send_method_frame(array(20, 10), $args); return $this->wait(array( "20,11" //Channel.open_ok )); } protected function open_ok($args) { $this->is_open = true; if($this->debug) { debug_msg("Channel open"); } } /** * request an access ticket */ public function access_request($realm, $exclusive=false, $passive=false, $active=false, $write=false, $read=false) { $args = new AMQPWriter(); $args->write_shortstr($realm); $args->write_bit($exclusive); $args->write_bit($passive); $args->write_bit($active); $args->write_bit($write); $args->write_bit($read); $this->send_method_frame(array(30, 10), $args); return $this->wait(array( "30,11" //Channel.access_request_ok )); } /** * grant access to server resources */ protected function access_request_ok($args) { $this->default_ticket = $args->read_short(); return $this->default_ticket; } /** * declare exchange, create if needed */ public function exchange_declare($exchange, $type, $passive=false, $durable=false, $auto_delete=true, $internal=false, $nowait=false, $arguments=NULL, $ticket=NULL) { if($arguments==NULL) $arguments = array(); $args = new AMQPWriter(); if($ticket != NULL) $args->write_short($ticket); else $args->write_short($this->default_ticket); $args->write_shortstr($exchange); $args->write_shortstr($type); $args->write_bit($passive); $args->write_bit($durable); $args->write_bit($auto_delete); $args->write_bit($internal); $args->write_bit($nowait); $args->write_table($arguments); $this->send_method_frame(array(40, 10), $args); if(!$nowait) return $this->wait(array( "40,11" //Channel.exchange_declare_ok )); } /** * confirms an exchange declaration */ protected function exchange_declare_ok($args) { } /** * delete an exchange */ public function exchange_delete($exchange, $if_unused=false, $nowait=false, $ticket=NULL) { $args = new AMQPWriter(); if($ticket != NULL) $args->write_short($ticket); else $args->write_short($this->default_ticket); $args->write_shortstr($exchange); $args->write_bit($if_unused); $args->write_bit($nowait); $this->send_method_frame(array(40, 20), $args); if(!$nowait) return $this->wait(array( "40,21" //Channel.exchange_delete_ok )); } /** * confirm deletion of an exchange */ protected function exchange_delete_ok($args) { } /** * bind queue to an exchange */ public function queue_bind($queue, $exchange, $routing_key="", $nowait=false, $arguments=NULL, $ticket=NULL) { if($arguments == NULL) $arguments = array(); $args = new AMQPWriter(); if($ticket != NULL) $args->write_short($ticket); else $args->write_short($this->default_ticket); $args->write_shortstr($queue); $args->write_shortstr($exchange); $args->write_shortstr($routing_key); $args->write_bit($nowait); $args->write_table($arguments); $this->send_method_frame(array(50, 20), $args); if(!$nowait) return $this->wait(array( "50,21" // Channel.queue_bind_ok )); } /** * confirm bind successful */ protected function queue_bind_ok($args) { } /** * unbind queue from an exchange */ public function queue_unbind($queue, $exchange, $routing_key="", $arguments=NULL, $ticket=NULL) { if($arguments == NULL) $arguments = array(); $args = new AMQPWriter(); if($ticket != NULL) $args->write_short($ticket); else $args->write_short($this->default_ticket); $args->write_shortstr($queue); $args->write_shortstr($exchange); $args->write_shortstr($routing_key); $args->write_table($arguments); $this->send_method_frame(array(50, 50), $args); return $this->wait(array( "50,51" // Channel.queue_unbind_ok )); } /** * confirm unbind successful */ protected function queue_unbind_ok($args) { } /** * declare queue, create if needed */ public function queue_declare($queue="", $passive=false, $durable=false, $exclusive=false, $auto_delete=true, $nowait=false, $arguments=NULL, $ticket=NULL) { if($arguments == NULL) $arguments = array(); $args = new AMQPWriter(); if($ticket != NULL) $args->write_short($ticket); else $args->write_short($this->default_ticket); $args->write_shortstr($queue); $args->write_bit($passive); $args->write_bit($durable); $args->write_bit($exclusive); $args->write_bit($auto_delete); $args->write_bit($nowait); $args->write_table($arguments); $this->send_method_frame(array(50, 10), $args); if(!$nowait) return $this->wait(array( "50,11" // Channel.queue_declare_ok )); } /** * confirms a queue definition */ protected function queue_declare_ok($args) { $queue = $args->read_shortstr(); $message_count = $args->read_long(); $consumer_count = $args->read_long(); return array($queue, $message_count, $consumer_count); } /** * delete a queue */ public function queue_delete($queue="", $if_unused=false, $if_empty=false, $nowait=false, $ticket=NULL) { $args = new AMQPWriter(); if($ticket != NULL) $args->write_short($ticket); else $args->write_short($this->default_ticket); $args->write_shortstr($queue); $args->write_bit($if_unused); $args->write_bit($if_empty); $args->write_bit($nowait); $this->send_method_frame(array(50, 40), $args); if(!$nowait) return $this->wait(array( "50,41" //Channel.queue_delete_ok )); } /** * confirm deletion of a queue */ protected function queue_delete_ok($args) { return $args->read_long(); } /** * purge a queue */ public function queue_purge($queue="", $nowait=false, $ticket=NULL) { $args = new AMQPWriter(); if($ticket != NULL) $args->write_short($ticket); else $args->write_short($this->default_ticket); $args->write_shortstr($queue); $args->write_bit($nowait); $this->send_method_frame(array(50, 30), $args); if(!$nowait) return $this->wait(array( "50,31" //Channel.queue_purge_ok )); } /** * confirms a queue purge */ protected function queue_purge_ok($args) { return $args->read_long(); } /** * acknowledge one or more messages */ public function basic_ack($delivery_tag, $multiple=false) { $args = new AMQPWriter(); $args->write_longlong($delivery_tag); $args->write_bit($multiple); $this->send_method_frame(array(60, 80), $args); } /** * end a queue consumer */ public function basic_cancel($consumer_tag, $nowait=false) { $args = new AMQPWriter(); $args->write_shortstr($consumer_tag); $args->write_bit($nowait); $this->send_method_frame(array(60, 30), $args); return $this->wait(array( "60,31" // Channel.basic_cancel_ok )); } /** * confirm a cancelled consumer */ protected function basic_cancel_ok($args) { $consumer_tag = $args->read_shortstr(); unset($this->callbacks[$consumer_tag]); } /** * start a queue consumer */ public function basic_consume($queue="", $consumer_tag="", $no_local=false, $no_ack=false, $exclusive=false, $nowait=false, $callback=NULL, $ticket=NULL) { $args = new AMQPWriter(); if($ticket != NULL) $args->write_short($ticket); else $args->write_short($this->default_ticket); $args->write_shortstr($queue); $args->write_shortstr($consumer_tag); $args->write_bit($no_local); $args->write_bit($no_ack); $args->write_bit($exclusive); $args->write_bit($nowait); $this->send_method_frame(array(60, 20), $args); if(!$nowait) $consumer_tag = $this->wait(array( "60,21" //Channel.basic_consume_ok )); $this->callbacks[$consumer_tag] = $callback; return $consumer_tag; } /** * confirm a new consumer */ protected function basic_consume_ok($args) { return $args->read_shortstr(); } /** * notify the client of a consumer message */ protected function basic_deliver($args, $msg) { $consumer_tag = $args->read_shortstr(); $delivery_tag = $args->read_longlong(); $redelivered = $args->read_bit(); $exchange = $args->read_shortstr(); $routing_key = $args->read_shortstr(); $msg->delivery_info = array( "channel" => $this, "consumer_tag" => $consumer_tag, "delivery_tag" => $delivery_tag, "redelivered" => $redelivered, "exchange" => $exchange, "routing_key" => $routing_key ); if(array_key_exists($consumer_tag, $this->callbacks)) $func = $this->callbacks[$consumer_tag]; else $func = NULL; if($func!=NULL) call_user_func($func, $msg); } /** * direct access to a queue */ public function basic_get($queue="", $no_ack=false, $ticket=NULL) { $args = new AMQPWriter(); if($ticket != NULL) $args->write_short($ticket); else $args->write_short($this->default_ticket); $args->write_shortstr($queue); $args->write_bit($no_ack); $this->send_method_frame(array(60, 70), $args); return $this->wait(array( "60,71", //Channel.basic_get_ok "60,72" // Channel.basic_get_empty )); } /** * indicate no messages available */ protected function basic_get_empty($args) { $cluster_id = $args->read_shortstr(); } /** * provide client with a message */ protected function basic_get_ok($args, $msg) { $delivery_tag = $args->read_longlong(); $redelivered = $args->read_bit(); $exchange = $args->read_shortstr(); $routing_key = $args->read_shortstr(); $message_count = $args->read_long(); $msg->delivery_info = array( "delivery_tag" => $delivery_tag, "redelivered" => $redelivered, "exchange" => $exchange, "routing_key" => $routing_key, "message_count" => $message_count ); return $msg; } /** * publish a message */ public function basic_publish($msg, $exchange="", $routing_key="", $mandatory=false, $immediate=false, $ticket=NULL) { $args = new AMQPWriter(); if($ticket != NULL) $args->write_short($ticket); else $args->write_short($this->default_ticket); $args->write_shortstr($exchange); $args->write_shortstr($routing_key); $args->write_bit($mandatory); $args->write_bit($immediate); $this->send_method_frame(array(60, 40), $args); $this->connection->send_content($this->channel_id, 60, 0, strlen($msg->body), $msg->serialize_properties(), $msg->body); } /** * specify quality of service */ public function basic_qos($prefetch_size, $prefetch_count, $a_global) { $args = new AMQPWriter(); $args->write_long($prefetch_size); $args->write_short($prefetch_count); $args->write_bit($a_global); $this->send_method_frame(array(60, 10), $args); return $this->wait(array( "60,11" //Channel.basic_qos_ok )); } /** * confirm the requested qos */ protected function basic_qos_ok($args) { } /** * redeliver unacknowledged messages */ public function basic_recover($requeue=false) { $args = new AMQPWriter(); $args->write_bit($requeue); $this->send_method_frame(array(60, 100), $args); } /** * reject an incoming message */ public function basic_reject($delivery_tag, $requeue) { $args = new AMQPWriter(); $args->write_longlong($delivery_tag); $args->write_bit($requeue); $this->send_method_frame(array(60, 90), $args); } /** * return a failed message */ protected function basic_return($args) { $reply_code = $args->read_short(); $reply_text = $args->read_shortstr(); $exchange = $args->read_shortstr(); $routing_key = $args->read_shortstr(); $msg = $this->wait(); } public function tx_commit() { $this->send_method_frame(array(90, 20)); return $this->wait(array( "90,21" //Channel.tx_commit_ok )); } /** * confirm a successful commit */ protected function tx_commit_ok($args) { } /** * abandon the current transaction */ public function tx_rollback() { $this->send_method_frame(array(90, 30)); return $this->wait(array( "90,31" //Channel.tx_rollback_ok )); } /** * confirm a successful rollback */ protected function tx_rollback_ok($args) { } /** * select standard transaction mode */ public function tx_select() { $this->send_method_frame(array(90, 10)); return $this->wait(array( "90,11" //Channel.tx_select_ok )); } /** * confirm transaction mode */ protected function tx_select_ok($args) { } } /** * A Message for use with the Channnel.basic_* methods. */ class AMQPMessage extends GenericContent { protected static $PROPERTIES = array( "content_type" => "shortstr", "content_encoding" => "shortstr", "application_headers" => "table", "delivery_mode" => "octet", "priority" => "octet", "correlation_id" => "shortstr", "reply_to" => "shortstr", "expiration" => "shortstr", "message_id" => "shortstr", "timestamp" => "timestamp", "type" => "shortstr", "user_id" => "shortstr", "app_id" => "shortstr", "cluster_id" => "shortst" ); public function __construct($body = '', $properties = null) { $this->body = $body; parent::__construct($properties, $prop_types=AMQPMessage::$PROPERTIES); } } ?>