* * This source file is subject to the MIT license that is bundled * with this source code in the file LICENSE. */ declare(strict_types=1); namespace App\Amqp\Consumer; use Hyperf\Amqp\Message\ConsumerMessage; use Hyperf\Amqp\Result; use PhpAmqpLib\Exception\AMQPChannelClosedException; use PhpAmqpLib\Message\AMQPMessage; use App\Log\Log; use Exception; use PhpAmqpLib\Exception\AMQPConnectionClosedException; /** * Author: ykxiao * Date: 2025/6/3 * Time: 下午7:18 * Description: 消费者抽象基类. * * (c) ykxiao * * This source file is subject to the MIT license that is bundled * with this source code in the file LICENSE. */ abstract class BaseConsumer extends ConsumerMessage { /** * 消费者处理逻辑 * @param mixed $data * @param AMQPMessage $message * @return Result * @throws Exception */ public function consumeMessage($data, AMQPMessage $message): Result { $consumerClass = get_class($this); try { return $this->handle($data); } catch (AMQPChannelClosedException | AMQPConnectionClosedException $e) { Log::get('queue', 'queue')->error("AMQP通道关闭异常 ($consumerClass): " . $e->getMessage(), [ 'data' => $data, 'exception' => $e, ]); // 可选:重连逻辑 or 丢弃 return Result::ACK; // 或 NACK } catch (Exception $e) { Log::get('queue', 'queue')->error("AMQP消费者异常 ($consumerClass): " . $e->getMessage(), [ 'data' => $data, 'exception' => $e, ]); return Result::ACK; } } /** * 子类实现的核心处理逻辑 * @param mixed $data * @return Result */ abstract protected function handle(mixed $data): Result; }