connection = $connection; $this->channel_id = $channel_id; $connection->channels[$channel_id] = $this; $this->frame_queue = array(); // Lower level queue for frames $this->method_queue = array(); // Higher level queue for methods $this->auto_decode = false; $this->debug = defined('AMQP_DEBUG') ? AMQP_DEBUG : false; $this->msg_property_reader = new AMQPReader(null); $this->wait_content_reader = new AMQPReader(null); $this->dispatch_reader = new AMQPReader(null); $this->protocolVersion = self::getProtocolVersion(); switch ($this->protocolVersion) { case self::PROTOCOL_091: self::$PROTOCOL_CONSTANTS_CLASS = 'PhpAmqpLib\Wire\Constants091'; $c = self::$PROTOCOL_CONSTANTS_CLASS; $this->amqp_protocol_header = $c::$AMQP_PROTOCOL_HEADER; $this->protocolWriter = new Protocol091(); $this->waitHelper = new Wait091(); $this->methodMap = new MethodMap091(); break; case self::PROTOCOL_080: self::$PROTOCOL_CONSTANTS_CLASS = 'PhpAmqpLib\Wire\Constants080'; $c = self::$PROTOCOL_CONSTANTS_CLASS; $this->amqp_protocol_header = $c::$AMQP_PROTOCOL_HEADER; $this->protocolWriter = new Protocol080(); $this->waitHelper = new Wait080(); $this->methodMap = new MethodMap080(); break; default: //this is logic exception (requires code changes to fix), so OutOfRange, not OutOfBounds or Runtime throw new AMQPOutOfRangeException(sprintf('Protocol version %s not implemented.', $this->protocolVersion)); } } /** * @return string * @throws AMQPOutOfRangeException */ public static function getProtocolVersion() { $protocol = defined('AMQP_PROTOCOL') ? AMQP_PROTOCOL : self::PROTOCOL_091; //adding check here to catch unknown protocol ASAP, as this method may be called from the outside if (!in_array($protocol, array(self::PROTOCOL_080, self::PROTOCOL_091), TRUE)) { throw new AMQPOutOfRangeException(sprintf('Protocol version %s not implemented.', $protocol)); } return $protocol; } /** * @return string */ public function getChannelId() { return $this->channel_id; } public function setBodySizeLimit($max_bytes) { $max_bytes = intval($max_bytes); if ( $max_bytes > 0 ) { $this->body_size_max = $max_bytes; } else { $this->body_size_max = null; } } /** * @return AbstractConnection */ public function getConnection() { return $this->connection; } /** * @return array */ public function getMethodQueue() { return $this->method_queue; } /** * @param string $method_sig * @param string $args * @param $content * @return mixed * @throws \PhpAmqpLib\Exception\AMQPRuntimeException */ public function dispatch($method_sig, $args, $content) { if (!$this->methodMap->valid_method($method_sig)) { throw new AMQPRuntimeException(sprintf( 'Unknown AMQP method "%s"', $method_sig )); } $amqp_method = $this->methodMap->get_method($method_sig); if (!method_exists($this, $amqp_method)) { throw new AMQPRuntimeException(sprintf( 'Method: "%s" not implemented by class: %s', $amqp_method, get_class($this) )); } $this->dispatch_reader->reuse($args); if ($content == null) { return call_user_func(array($this, $amqp_method), $this->dispatch_reader); } return call_user_func(array($this, $amqp_method), $this->dispatch_reader, $content); } /** * @param int $timeout * @return array|mixed */ public function next_frame($timeout = 0) { if ($this->debug) { MiscHelper::debug_msg('waiting for a new frame'); } if (!empty($this->frame_queue)) { return array_shift($this->frame_queue); } return $this->connection->wait_channel($this->channel_id, $timeout); } /** * @param $method_sig * @param string $args */ protected function send_method_frame($method_sig, $args = '') { $this->connection->send_channel_method_frame($this->channel_id, $method_sig, $args); } /** * This is here for performance reasons to batch calls to fwrite from basic.publish * * @param $method_sig * @param string $args * @param null $pkt * @return null|\PhpAmqpLib\Wire\AMQPWriter */ protected function prepare_method_frame($method_sig, $args = '', $pkt = null) { return $this->connection->prepare_channel_method_frame($this->channel_id, $method_sig, $args, $pkt); } /** * @return AMQPMessage * @throws \PhpAmqpLib\Exception\AMQPRuntimeException */ public function wait_content() { $frm = $this->next_frame(); $frame_type = $frm[0]; $payload = $frm[1]; if ($frame_type != 2) { throw new AMQPRuntimeException('Expecting Content header'); } $this->wait_content_reader->reuse(mb_substr($payload, 0, 12, 'ASCII')); // $payload_reader = new AMQPReader(substr($payload,0,12)); $class_id = $this->wait_content_reader->read_short(); $weight = $this->wait_content_reader->read_short(); $body_size = $this->wait_content_reader->read_longlong(); //hack to avoid creating new instances of AMQPReader; $this->msg_property_reader->reuse(mb_substr($payload, 12, mb_strlen($payload, 'ASCII') - 12, 'ASCII')); $msg = new AMQPMessage(); $msg->load_properties($this->msg_property_reader); $msg->body_size = $body_size; $body_parts = array(); $body_received = 0; while (bccomp($body_size, $body_received, 0) == 1) { $frm = $this->next_frame(); $frame_type = $frm[0]; $payload = $frm[1]; if ($frame_type != 3) { $PROTOCOL_CONSTANTS_CLASS = self::$PROTOCOL_CONSTANTS_CLASS; throw new AMQPRuntimeException(sprintf( 'Expecting Content body, received frame type %s (%s)', $frame_type, $PROTOCOL_CONSTANTS_CLASS::$FRAME_TYPES[$frame_type] )); } $body_received = bcadd($body_received, mb_strlen($payload, 'ASCII'), 0); if ( ! is_null($this->body_size_max) && $body_received > $this->body_size_max ) { $msg->is_truncated = true; continue; } $body_parts[] = $payload; } $msg->body = implode('', $body_parts); if ($this->auto_decode && isset($msg->content_encoding)) { try { $msg->body = $msg->body->decode($msg->content_encoding); } catch (\Exception $e) { if ($this->debug) { MiscHelper::debug_msg('Ignoring body decoding exception: ' . $e->getMessage()); } } } return $msg; } /** * Wait for some expected AMQP methods and dispatch to them. * Unexpected methods are queued up for later calls to this PHP * method. * * @param array $allowed_methods * @param bool $non_blocking * @param int $timeout * @throws \PhpAmqpLib\Exception\AMQPOutOfBoundsException * @throws \PhpAmqpLib\Exception\AMQPRuntimeException * @return mixed */ public function wait($allowed_methods = null, $non_blocking = false, $timeout = 0) { $PROTOCOL_CONSTANTS_CLASS = self::$PROTOCOL_CONSTANTS_CLASS; if ($allowed_methods && $this->debug) { MiscHelper::debug_msg('waiting for ' . implode(', ', $allowed_methods)); } elseif ($this->debug) { MiscHelper::debug_msg('waiting for any method'); } //Process deferred methods foreach ($this->method_queue as $qk => $queued_method) { if ($this->debug) { MiscHelper::debug_msg('checking queue method ' . $qk); } $method_sig = $queued_method[0]; if ($allowed_methods == null || in_array($method_sig, $allowed_methods)) { unset($this->method_queue[$qk]); if ($this->debug) { MiscHelper::debug_msg(sprintf( 'Executing queued method: %s: %s', $method_sig, $PROTOCOL_CONSTANTS_CLASS::$GLOBAL_METHOD_NAMES[MiscHelper::methodSig($method_sig)] )); } return $this->dispatch($queued_method[0], $queued_method[1], $queued_method[2]); } } // No deferred methods? wait for new ones while (true) { $frm = $this->next_frame($timeout); $frame_type = $frm[0]; $payload = $frm[1]; if ($frame_type != 1) { throw new AMQPRuntimeException(sprintf( 'Expecting AMQP method, received frame type: %s (%s)', $frame_type, $PROTOCOL_CONSTANTS_CLASS::$FRAME_TYPES[$frame_type] )); } if (mb_strlen($payload, 'ASCII') < 4) { throw new AMQPOutOfBoundsException('Method frame too short'); } $method_sig_array = unpack('n2', mb_substr($payload, 0, 4, 'ASCII')); $method_sig = '' . $method_sig_array[1] . ',' . $method_sig_array[2]; $args = mb_substr($payload, 4, mb_strlen($payload, 'ASCII') - 4, 'ASCII'); if ($this->debug) { MiscHelper::debug_msg(sprintf( '> %s: %s', $method_sig, $PROTOCOL_CONSTANTS_CLASS::$GLOBAL_METHOD_NAMES[MiscHelper::methodSig($method_sig)] )); } if (in_array($method_sig, $PROTOCOL_CONSTANTS_CLASS::$CONTENT_METHODS)) { $content = $this->wait_content(); } else { $content = null; } if ($allowed_methods == null || in_array($method_sig, $allowed_methods) || in_array($method_sig, $PROTOCOL_CONSTANTS_CLASS::$CLOSE_METHODS) ) { return $this->dispatch($method_sig, $args, $content); } // Wasn't what we were looking for? save it for later if ($this->debug) { MiscHelper::debug_msg('Queueing for later: $method_sig: ' . $PROTOCOL_CONSTANTS_CLASS::$GLOBAL_METHOD_NAMES[MiscHelper::methodSig($method_sig)]); } $this->method_queue[] = array($method_sig, $args, $content); if ($non_blocking) { break; } } } /** * @param $handler * @param array $arguments */ protected function dispatch_to_handler($handler, array $arguments) { if (is_callable($handler)) { call_user_func_array($handler, $arguments); } } }