698 lines
18 KiB
PHP
698 lines
18 KiB
PHP
<?php
|
|
|
|
/**
|
|
* AMQP protocol serialization/deserialization to/from wire format.
|
|
*
|
|
* http://code.google.com/p/php-amqplib/
|
|
* Vadim Zaliva <lord@crocodile.org>
|
|
*
|
|
*
|
|
* To understand all signed/unsinged and 32/64 bit madness in this
|
|
* code, please read first the following article:
|
|
*
|
|
* http://www.mysqlperformanceblog.com/2007/03/27/integers-in-php-running-with-scissors-and-portability/
|
|
*/
|
|
|
|
require_once('hexdump.inc');
|
|
|
|
/**
|
|
* AMQP protocol decimal value.
|
|
*
|
|
* Values are represented as (n,e) pairs. The actual value
|
|
* is n * 10^(-e).
|
|
*
|
|
* From 0.8 spec: Decimal values are
|
|
* not intended to support floating point values, but rather
|
|
* business values such as currency rates and amounts. The
|
|
* 'decimals' octet is not signed.
|
|
*/
|
|
class AMQPDecimal
|
|
{
|
|
public function __construct($n, $e)
|
|
{
|
|
if($e < 0)
|
|
throw new Exception("Decimal exponent value must be unsigned!");
|
|
$this->n = $n;
|
|
$this->e = $e;
|
|
}
|
|
|
|
public function asBCvalue()
|
|
{
|
|
return bcdiv($this->n, bcpow(10,$this->e));
|
|
}
|
|
}
|
|
|
|
class AMQPWriter
|
|
{
|
|
public function __construct()
|
|
{
|
|
$this->out = "";
|
|
$this->bits = array();
|
|
$this->bitcount = 0;
|
|
}
|
|
|
|
private static function chrbytesplit($x, $bytes)
|
|
{
|
|
return array_map('chr', AMQPWriter::bytesplit($x,$bytes));
|
|
}
|
|
|
|
/**
|
|
* Splits number (could be either int or string) into array of byte
|
|
* values (represented as integers) in big-endian byte order.
|
|
*/
|
|
private static function bytesplit($x, $bytes)
|
|
{
|
|
if(is_int($x))
|
|
{
|
|
if($x<0)
|
|
$x = sprintf("%u", $x);
|
|
}
|
|
|
|
$res = array();
|
|
for($i=0;$i<$bytes;$i++)
|
|
{
|
|
$b = bcmod($x,'256');
|
|
array_unshift($res,(int)$b);
|
|
$x=bcdiv($x,'256', 0);
|
|
}
|
|
if($x!=0)
|
|
throw new Exception("Value too big!");
|
|
return $res;
|
|
}
|
|
|
|
private function flushbits()
|
|
{
|
|
if(count($this->bits))
|
|
{
|
|
$this->out .= implode("", array_map('chr',$this->bits));
|
|
$this->bits = array();
|
|
$this->bitcount = 0;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get what's been encoded so far.
|
|
*/
|
|
public function getvalue()
|
|
{
|
|
$this->flushbits();
|
|
return $this->out;
|
|
}
|
|
|
|
/**
|
|
* Write a plain Python string, with no special encoding.
|
|
*/
|
|
public function write($s)
|
|
{
|
|
$this->flushbits();
|
|
$this->out .= $s;
|
|
}
|
|
|
|
/**
|
|
* Write a boolean value.
|
|
*/
|
|
public function write_bit($b)
|
|
{
|
|
if($b)
|
|
$b = 1;
|
|
else
|
|
$b = 0;
|
|
$shift = $this->bitcount % 8;
|
|
if($shift == 0)
|
|
$last = 0;
|
|
else
|
|
$last = array_pop($this->bits);
|
|
|
|
$last |= ($b << $shift);
|
|
array_push($this->bits, $last);
|
|
|
|
$this->bitcount += 1;
|
|
}
|
|
|
|
/**
|
|
* Write an integer as an unsigned 8-bit value.
|
|
*/
|
|
public function write_octet($n)
|
|
{
|
|
if($n < 0 || $n > 255)
|
|
throw new Exception('Octet out of range 0..255');
|
|
$this->flushbits();
|
|
$this->out .= chr($n);
|
|
}
|
|
|
|
/**
|
|
* Write an integer as an unsigned 16-bit value.
|
|
*/
|
|
public function write_short($n)
|
|
{
|
|
if($n < 0 || $n > 65535)
|
|
throw new Exception('Octet out of range 0..65535');
|
|
$this->flushbits();
|
|
$this->out .= pack('n', $n);
|
|
}
|
|
|
|
/**
|
|
* Write an integer as an unsigned 32-bit value.
|
|
*/
|
|
public function write_long($n)
|
|
{
|
|
$this->flushbits();
|
|
$this->out .= implode("", AMQPWriter::chrbytesplit($n,4));
|
|
}
|
|
|
|
private function write_signed_long($n)
|
|
{
|
|
$this->flushbits();
|
|
// although format spec for 'N' mentions unsigned
|
|
// it will deal with sinned integers as well. tested.
|
|
$this->out .= pack('N', $n);
|
|
}
|
|
|
|
/**
|
|
* Write an integer as an unsigned 64-bit value.
|
|
*/
|
|
public function write_longlong($n)
|
|
{
|
|
$this->flushbits();
|
|
$this->out .= implode("", AMQPWriter::chrbytesplit($n,8));
|
|
}
|
|
|
|
/**
|
|
* Write a string up to 255 bytes long after encoding.
|
|
* Assume UTF-8 encoding.
|
|
*/
|
|
public function write_shortstr($s)
|
|
{
|
|
$this->flushbits();
|
|
if(strlen($s) > 255)
|
|
throw new Exception('String too long');
|
|
$this->write_octet(strlen($s));
|
|
$this->out .= $s;
|
|
}
|
|
|
|
|
|
/*
|
|
* Write a string up to 2**32 bytes long. Assume UTF-8 encoding.
|
|
*/
|
|
public function write_longstr($s)
|
|
{
|
|
$this->flushbits();
|
|
$this->write_long(strlen($s));
|
|
$this->out .= $s;
|
|
}
|
|
|
|
|
|
/**
|
|
* Write unix time_t value as 64 bit timestamp.
|
|
*/
|
|
public function write_timestamp($v)
|
|
{
|
|
$this->write_longlong($v);
|
|
}
|
|
|
|
/**
|
|
* Write PHP array, as table. Input array format: keys are strings,
|
|
* values are (type,value) tuples.
|
|
*/
|
|
public function write_table($d)
|
|
{
|
|
$this->flushbits();
|
|
$table_data = new AMQPWriter();
|
|
foreach($d as $k=>$va)
|
|
{
|
|
list($ftype,$v) = $va;
|
|
$table_data->write_shortstr($k);
|
|
if($ftype=='S')
|
|
{
|
|
$table_data->write('S');
|
|
$table_data->write_longstr($v);
|
|
} else if($ftype=='I')
|
|
{
|
|
$table_data->write('I');
|
|
$table_data->write_signed_long($v);
|
|
} else if($ftype=='D')
|
|
{
|
|
// 'D' type values are passed AMQPDecimal instances.
|
|
$table_data->write('D');
|
|
$table_data->write_octet($v->e);
|
|
$table_data->write_signed_long($v->n);
|
|
} else if($ftype=='T')
|
|
{
|
|
$table_data->write('T');
|
|
$table_data->write_timestamp($v);
|
|
} else if($ftype=='F')
|
|
{
|
|
$table_data->write('F');
|
|
$table_data->write_table($v);
|
|
}
|
|
}
|
|
$table_data = $table_data->getvalue();
|
|
$this->write_long(strlen($table_data));
|
|
$this->write($table_data);
|
|
}
|
|
}
|
|
|
|
class AMQPReader
|
|
{
|
|
public function __construct($str, $sock=NULL)
|
|
{
|
|
$this->str = $str;
|
|
if ($sock !== NULL)
|
|
{
|
|
$this->sock = new BufferedInput($sock);
|
|
} else
|
|
{
|
|
$this->sock = NULL;
|
|
}
|
|
$this->offset = 0;
|
|
|
|
$this->bitcount = $this->bits = 0;
|
|
|
|
if(((int)4294967296)!=0)
|
|
$this->is64bits = true;
|
|
else
|
|
$this->is64bits = false;
|
|
|
|
if(!function_exists("bcmul"))
|
|
throw new Exception("'bc math' module required");
|
|
|
|
$this->buffer_read_timeout = 5; // in seconds
|
|
}
|
|
|
|
public function close()
|
|
{
|
|
if($this->sock)
|
|
$this->sock->close();
|
|
}
|
|
|
|
public function read($n)
|
|
{
|
|
$this->bitcount = $this->bits = 0;
|
|
return $this->rawread($n);
|
|
}
|
|
|
|
private function rawread($n)
|
|
{
|
|
if($this->sock)
|
|
{
|
|
$res = '';
|
|
$read = 0;
|
|
|
|
$start = time();
|
|
while($read < $n && !feof($this->sock->real_sock()) &&
|
|
(false !== ($buf = fread($this->sock->real_sock(), $n - $read))))
|
|
{
|
|
if ($buf == '')
|
|
{
|
|
usleep(100);
|
|
}
|
|
else
|
|
$start = time();
|
|
|
|
$read += strlen($buf);
|
|
$res .= $buf;
|
|
}
|
|
|
|
if(strlen($res)!=$n)
|
|
throw new Exception ("Error reading data. Recevived " .
|
|
strlen($res) . " instead of expected $n bytes");
|
|
$this->offset += $n;
|
|
} else
|
|
{
|
|
if(strlen($this->str) < $n)
|
|
throw new Exception ("Error reading data. Requested $n bytes while string buffer has only " .
|
|
strlen($this->str));
|
|
$res = substr($this->str,0,$n);
|
|
$this->str = substr($this->str,$n);
|
|
$this->offset += $n;
|
|
}
|
|
return $res;
|
|
}
|
|
|
|
public function read_bit()
|
|
{
|
|
if(!$this->bitcount)
|
|
{
|
|
$this->bits = ord($this->rawread(1));
|
|
$this->bitcount = 8;
|
|
}
|
|
$result = ($this->bits & 1) == 1;
|
|
$this->bits >>= 1;
|
|
$this->bitcount -= 1;
|
|
return $result;
|
|
}
|
|
|
|
public function read_octet()
|
|
{
|
|
$this->bitcount = $this->bits = 0;
|
|
list(,$res) = unpack('C', $this->rawread(1));
|
|
return $res;
|
|
}
|
|
|
|
public function read_short()
|
|
{
|
|
$this->bitcount = $this->bits = 0;
|
|
list(,$res) = unpack('n', $this->rawread(2));
|
|
return $res;
|
|
}
|
|
|
|
/**
|
|
* Reads 32 bit integer in big-endian byte order.
|
|
*
|
|
* On 64 bit systems it will return always usngined int
|
|
* value in 0..2^32 range.
|
|
*
|
|
* On 32 bit systems it will return signed int value in
|
|
* -2^31...+2^31 range.
|
|
*
|
|
* Use with caution!
|
|
*/
|
|
public function read_php_int()
|
|
{
|
|
list(,$res) = unpack('N', $this->rawread(4));
|
|
if($this->is64bits)
|
|
{
|
|
$sres = sprintf ( "%u", $res );
|
|
return (int)$sres;
|
|
} else {
|
|
return $res;
|
|
}
|
|
}
|
|
|
|
// PHP does not have unsigned 32 bit int,
|
|
// so we return it as a string
|
|
public function read_long()
|
|
{
|
|
$this->bitcount = $this->bits = 0;
|
|
list(,$res) = unpack('N', $this->rawread(4));
|
|
$sres = sprintf ( "%u", $res );
|
|
return $sres;
|
|
}
|
|
|
|
private function read_signed_long()
|
|
{
|
|
$this->bitcount = $this->bits = 0;
|
|
// In PHP unpack('N') always return signed value,
|
|
// on both 32 and 64 bit systems!
|
|
list(,$res) = unpack('N', $this->rawread(4));
|
|
return $res;
|
|
}
|
|
|
|
// Even on 64 bit systems PHP integers are singed.
|
|
// Since we need an unsigned value here we return it
|
|
// as a string.
|
|
public function read_longlong()
|
|
{
|
|
$this->bitcount = $this->bits = 0;
|
|
$hi = unpack('N', $this->rawread(4));
|
|
$lo = unpack('N', $this->rawread(4));
|
|
|
|
// workaround signed/unsigned braindamage in php
|
|
$hi = sprintf ( "%u", $hi[1] );
|
|
$lo = sprintf ( "%u", $lo[1] );
|
|
|
|
return bcadd(bcmul($hi, "4294967296" ), $lo);
|
|
}
|
|
|
|
/**
|
|
* Read a utf-8 encoded string that's stored in up to
|
|
* 255 bytes. Return it decoded as a Python unicode object.
|
|
*/
|
|
public function read_shortstr()
|
|
{
|
|
$this->bitcount = $this->bits = 0;
|
|
list(,$slen) = unpack('C', $this->rawread(1));
|
|
return $this->rawread($slen);
|
|
}
|
|
|
|
/**
|
|
* Read a string that's up to 2**32 bytes, the encoding
|
|
* isn't specified in the AMQP spec, so just return it as
|
|
* a plain PHP string.
|
|
*/
|
|
public function read_longstr()
|
|
{
|
|
$this->bitcount = $this->bits = 0;
|
|
$slen = $this->read_php_int();
|
|
if($slen<0)
|
|
throw new Exception("Strings longer than supported on this platform");
|
|
return $this->rawread($slen);
|
|
}
|
|
|
|
/**
|
|
* Read and AMQP timestamp, which is a 64-bit integer representing
|
|
* seconds since the Unix epoch in 1-second resolution.
|
|
*/
|
|
function read_timestamp()
|
|
{
|
|
return $this->read_longlong();
|
|
}
|
|
|
|
/**
|
|
* Read an AMQP table, and return as a PHP array. keys are strings,
|
|
* values are (type,value) tuples.
|
|
*/
|
|
public function read_table()
|
|
{
|
|
$this->bitcount = $this->bits = 0;
|
|
$tlen = $this->read_php_int();
|
|
if($tlen<0)
|
|
throw new Exception("Table is longer than supported");
|
|
$table_data = new AMQPReader($this->rawread($tlen));
|
|
$result = array();
|
|
while($table_data->tell() < $tlen)
|
|
{
|
|
$name = $table_data->read_shortstr();
|
|
$ftype = $table_data->rawread(1);
|
|
if($ftype == 'S') {
|
|
$val = $table_data->read_longstr();
|
|
} else if($ftype == 'I') {
|
|
$val = $table_data->read_signed_long();
|
|
} else if($ftype == 'D')
|
|
{
|
|
$e = $table_data->read_octet();
|
|
$n = $table_data->read_signed_long();
|
|
$val = new AMQPDecimal($n, $e);
|
|
} else if($ftype == 'T')
|
|
{
|
|
$val = $table_data->read_timestamp();
|
|
} else if($ftype == 'F')
|
|
{
|
|
$val = $table_data->read_table(); // recursion
|
|
} else {
|
|
error_log("Usupported table field type $ftype");
|
|
$val = NULL;
|
|
}
|
|
$result[$name] = array($ftype,$val);
|
|
}
|
|
return $result;
|
|
}
|
|
|
|
|
|
protected function tell()
|
|
{
|
|
return $this->offset;
|
|
}
|
|
|
|
}
|
|
|
|
|
|
/**
|
|
* Abstract base class for AMQP content. Subclasses should override
|
|
* the PROPERTIES attribute.
|
|
*/
|
|
class GenericContent
|
|
{
|
|
protected static $PROPERTIES = array(
|
|
"dummy" => "shortstr"
|
|
);
|
|
|
|
public function __construct($props, $prop_types=NULL)
|
|
{
|
|
if($prop_types)
|
|
$this->prop_types = $prop_types;
|
|
else
|
|
$this->prop_types = GenericContent::$PROPERTIES;
|
|
$d = array();
|
|
if ($props)
|
|
$d = array_intersect_key($props, $this->prop_types);
|
|
else
|
|
$d = array();
|
|
$this->properties = $d;
|
|
}
|
|
|
|
|
|
/**
|
|
* Look for additional properties in the 'properties' dictionary,
|
|
* and if present - the 'delivery_info' dictionary.
|
|
*/
|
|
public function get($name)
|
|
{
|
|
if(array_key_exists($name,$this->properties))
|
|
return $this->properties[$name];
|
|
|
|
if(isset($this->delivery_info))
|
|
if(array_key_exists($name,$this->delivery_info))
|
|
return $this->delivery_info[$name];
|
|
|
|
throw new Exception("No such property");
|
|
}
|
|
|
|
|
|
/**
|
|
* Given the raw bytes containing the property-flags and
|
|
* property-list from a content-frame-header, parse and insert
|
|
* into a dictionary stored in this object as an attribute named
|
|
* 'properties'.
|
|
*/
|
|
public function load_properties($raw_bytes)
|
|
{
|
|
$r = new AMQPReader($raw_bytes);
|
|
|
|
// Read 16-bit shorts until we get one with a low bit set to zero
|
|
$flags = array();
|
|
while(true)
|
|
{
|
|
$flag_bits = $r->read_short();
|
|
array_push($flags, $flag_bits);
|
|
if(($flag_bits & 1) == 0)
|
|
break;
|
|
}
|
|
|
|
$shift = 0;
|
|
$d = array();
|
|
foreach ($this->prop_types as $key => $proptype)
|
|
{
|
|
if($shift == 0) {
|
|
if(!$flags) {
|
|
break;
|
|
}
|
|
$flag_bits = array_shift($flags);
|
|
$shift = 15;
|
|
}
|
|
if($flag_bits & (1 << $shift))
|
|
$d[$key] = call_user_func(array($r,"read_".$proptype));
|
|
$shift -= 1;
|
|
}
|
|
$this->properties = $d;
|
|
}
|
|
|
|
|
|
/**
|
|
* serialize the 'properties' attribute (a dictionary) into the
|
|
* raw bytes making up a set of property flags and a property
|
|
* list, suitable for putting into a content frame header.
|
|
*/
|
|
public function serialize_properties()
|
|
{
|
|
$shift = 15;
|
|
$flag_bits = 0;
|
|
$flags = array();
|
|
$raw_bytes = new AMQPWriter();
|
|
foreach ($this->prop_types as $key => $proptype)
|
|
{
|
|
if(array_key_exists($key,$this->properties))
|
|
$val = $this->properties[$key];
|
|
else
|
|
$val = NULL;
|
|
if($val != NULL)
|
|
{
|
|
if($shift == 0)
|
|
{
|
|
array_push($flags, $flag_bits);
|
|
$flag_bits = 0;
|
|
$shift = 15;
|
|
}
|
|
|
|
$flag_bits |= (1 << $shift);
|
|
if($proptype != "bit")
|
|
call_user_func(array($raw_bytes, "write_" . $proptype),
|
|
$val);
|
|
}
|
|
$shift -= 1;
|
|
}
|
|
array_push($flags, $flag_bits);
|
|
$result = new AMQPWriter();
|
|
foreach($flags as $flag_bits)
|
|
$result->write_short($flag_bits);
|
|
$result->write($raw_bytes->getvalue());
|
|
|
|
return $result->getvalue();
|
|
}
|
|
}
|
|
|
|
class BufferedInput
|
|
{
|
|
public function __construct($sock)
|
|
{
|
|
$this->block_size = 8192;
|
|
|
|
$this->sock = $sock;
|
|
$this->reset("");
|
|
|
|
}
|
|
|
|
public function real_sock()
|
|
{
|
|
return $this->sock;
|
|
}
|
|
|
|
public function read($n)
|
|
{
|
|
if ($this->offset >= strlen($this->buffer))
|
|
{
|
|
if (!($rv = $this->populate_buffer()))
|
|
{
|
|
return $rv;
|
|
}
|
|
}
|
|
return $this->read_buffer($n);
|
|
}
|
|
|
|
public function close()
|
|
{
|
|
fclose($this->sock);
|
|
$this->reset("");
|
|
}
|
|
|
|
private function read_buffer($n)
|
|
{
|
|
$n = min($n, strlen($this->buffer) - $this->offset);
|
|
if ($n === 0)
|
|
{
|
|
// substr("", 0, 0) => FALSE, which screws up read loops that are
|
|
// expecting non-blocking reads to return "". This avoids that edge
|
|
// case when the buffer is empty/used up.
|
|
return "";
|
|
}
|
|
$block = substr($this->buffer, $this->offset, $n);
|
|
$this->offset += $n;
|
|
return $block;
|
|
}
|
|
|
|
private function reset($block)
|
|
{
|
|
$this->buffer = $block;
|
|
$this->offset = 0;
|
|
}
|
|
|
|
private function populate_buffer()
|
|
{
|
|
if(feof($this->sock))
|
|
{
|
|
$this->reset("");
|
|
return FALSE;
|
|
}
|
|
|
|
$block = fread($this->sock, $this->block_size);
|
|
if ($block !== FALSE)
|
|
{
|
|
$this->reset($block);
|
|
return TRUE;
|
|
} else
|
|
{
|
|
return $block;
|
|
}
|
|
}
|
|
}
|
|
?>
|