* * This source file is subject to the MIT license that is bundled * with this source code in the file LICENSE. */ declare(strict_types=1); namespace App\Amqp\Consumer; use App\Context\ApiUrlContext; use Hyperf\Amqp\Message\ConsumerMessage; use App\Context\QueueContext; use Hyperf\Amqp\Result; use Hyperf\DbConnection\Db; use PhpAmqpLib\Exception\AMQPChannelClosedException; use PhpAmqpLib\Message\AMQPMessage; use App\Log\Log; use Exception; use PhpAmqpLib\Exception\AMQPConnectionClosedException; /** * Author: ykxiao * Date: 2025/6/3 * Time: 下午7:18 * Description: 消费者抽象基类. * * (c) ykxiao * * This source file is subject to the MIT license that is bundled * with this source code in the file LICENSE. */ abstract class BaseConsumer extends ConsumerMessage { public function consumeMessage($data, AMQPMessage $message): Result { $consumerClass = get_class($this); // 设置用户信息上下文信息 if (!empty($data['user'])) { QueueContext::setUser($data['user']); } if (!empty($this->data['api_url'])) { ApiUrlContext::setApiUrl($this->data['api_url']); } Db::beginTransaction(); try { $handle = $this->handle($data); Db::commit(); return $handle; } catch (AMQPChannelClosedException|AMQPConnectionClosedException $e) { Log::get('queue', 'queue')->error("AMQP通道关闭异常 ($consumerClass): " . $e->getMessage(), [ 'data' => $data, 'exception' => $e, ]); Db::rollBack(); // 可选:重连逻辑 or 丢弃 return Result::ACK; // 或 NACK } catch (Exception $e) { Log::get('queue', 'queue')->error("AMQP消费者异常 ($consumerClass): " . $e->getMessage(), [ 'data' => $data, 'exception' => $e, ]); Db::rollBack(); return Result::ACK; } } /** * 子类实现的核心处理逻辑 * @param mixed $data * @return Result */ abstract protected function handle(mixed $data): Result; }