This commit is contained in:
34
app/Amqp/Consumer/AliSlsConsumer.php
Normal file
34
app/Amqp/Consumer/AliSlsConsumer.php
Normal file
@ -0,0 +1,34 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Amqp\Consumer;
|
||||
|
||||
use App\Service\AliLogsSignService;
|
||||
use Hyperf\Amqp\Result;
|
||||
use Hyperf\Amqp\Annotation\Consumer;
|
||||
use Hyperf\Di\Annotation\Inject;
|
||||
|
||||
/**
|
||||
* Author: ykxiao
|
||||
* Date: 2025/6/3
|
||||
* Time: 下午7:18
|
||||
* Description: 阿里日志消费.
|
||||
*
|
||||
* (c) ykxiao <yk_9001@hotmail.com>
|
||||
*
|
||||
* This source file is subject to the MIT license that is bundled
|
||||
* with this source code in the file LICENSE.
|
||||
*/
|
||||
#[Consumer(exchange: 'wh_ali_sls', routingKey: 'wh_ali_sls_key', queue: 'wh_ali_sls_queue', name: "AliSlsConsumer", nums: 5)]
|
||||
class AliSlsConsumer extends BaseConsumer
|
||||
{
|
||||
#[Inject]
|
||||
protected AliLogsSignService $aliLogsSignService;
|
||||
public function handle($data): Result
|
||||
{
|
||||
$this->aliLogsSignService->putWebTracking($data);
|
||||
|
||||
return Result::ACK;
|
||||
}
|
||||
}
|
89
app/Amqp/Consumer/BaseConsumer.php
Normal file
89
app/Amqp/Consumer/BaseConsumer.php
Normal file
@ -0,0 +1,89 @@
|
||||
<?php
|
||||
/**
|
||||
* Author: ykxiao
|
||||
* Date: 2025/6/3
|
||||
* Time: 下午6:37
|
||||
* Description:
|
||||
*
|
||||
* (c) ykxiao <yk_9001@hotmail.com>
|
||||
*
|
||||
* This source file is subject to the MIT license that is bundled
|
||||
* with this source code in the file LICENSE.
|
||||
*/
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Amqp\Consumer;
|
||||
|
||||
use Hyperf\Amqp\Message\ConsumerMessage;
|
||||
use App\Context\QueueContext;
|
||||
use Hyperf\Amqp\Result;
|
||||
use Hyperf\DbConnection\Db;
|
||||
use PhpAmqpLib\Exception\AMQPChannelClosedException;
|
||||
use PhpAmqpLib\Message\AMQPMessage;
|
||||
use App\Log\Log;
|
||||
use Exception;
|
||||
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
|
||||
|
||||
/**
|
||||
* Author: ykxiao
|
||||
* Date: 2025/6/3
|
||||
* Time: 下午7:18
|
||||
* Description: 消费者抽象基类.
|
||||
*
|
||||
* (c) ykxiao <yk_9001@hotmail.com>
|
||||
*
|
||||
* This source file is subject to the MIT license that is bundled
|
||||
* with this source code in the file LICENSE.
|
||||
*/
|
||||
abstract class BaseConsumer extends ConsumerMessage
|
||||
{
|
||||
public function consumeMessage($data, AMQPMessage $message): Result
|
||||
{
|
||||
$consumerClass = get_class($this);
|
||||
|
||||
// 设置用户信息上下文信息
|
||||
if (!empty($data['user'])) {
|
||||
QueueContext::setUser($data['user']);
|
||||
}
|
||||
if (!empty($company = $data['company'])) {
|
||||
QueueContext::setCompanyInfo($company);
|
||||
}
|
||||
|
||||
Db::beginTransaction();
|
||||
|
||||
try {
|
||||
$handle = $this->handle($data);
|
||||
|
||||
Db::commit();
|
||||
|
||||
return $handle;
|
||||
} catch (AMQPChannelClosedException|AMQPConnectionClosedException $e) {
|
||||
Log::get('queue', 'queue')->error("AMQP通道关闭异常 ($consumerClass): " . $e->getMessage(), [
|
||||
'data' => $data,
|
||||
'exception' => $e,
|
||||
]);
|
||||
|
||||
Db::rollBack();
|
||||
|
||||
// 可选:重连逻辑 or 丢弃
|
||||
return Result::ACK; // 或 NACK
|
||||
} catch (Exception $e) {
|
||||
Log::get('queue', 'queue')->error("AMQP消费者异常 ($consumerClass): " . $e->getMessage(), [
|
||||
'data' => $data,
|
||||
'exception' => $e,
|
||||
]);
|
||||
|
||||
Db::rollBack();
|
||||
|
||||
return Result::ACK;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 子类实现的核心处理逻辑
|
||||
* @param mixed $data
|
||||
* @return Result
|
||||
*/
|
||||
abstract protected function handle(mixed $data): Result;
|
||||
}
|
19
app/Amqp/Consumer/UserImportConsumer.php
Normal file
19
app/Amqp/Consumer/UserImportConsumer.php
Normal file
@ -0,0 +1,19 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Amqp\Consumer;
|
||||
|
||||
use Hyperf\Amqp\Result;
|
||||
use Hyperf\Amqp\Annotation\Consumer;
|
||||
use Hyperf\Amqp\Message\ConsumerMessage;
|
||||
use PhpAmqpLib\Message\AMQPMessage;
|
||||
|
||||
#[Consumer(exchange: 'wh_user_import', routingKey: 'wh_user_import_key', queue: 'wh_user_import_queue', name: "UserImportConsumer", nums: 5)]
|
||||
class UserImportConsumer extends BaseConsumer
|
||||
{
|
||||
public function handle($data): Result
|
||||
{
|
||||
return Result::ACK;
|
||||
}
|
||||
}
|
12
app/Amqp/Producer/AliSlsProducer.php
Normal file
12
app/Amqp/Producer/AliSlsProducer.php
Normal file
@ -0,0 +1,12 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Amqp\Producer;
|
||||
|
||||
use Hyperf\Amqp\Annotation\Producer;
|
||||
|
||||
#[Producer(exchange: 'wh_ali_sls', routingKey: 'wh_ali_sls_key')]
|
||||
class AliSlsProducer extends BaseProducer
|
||||
{
|
||||
}
|
44
app/Amqp/Producer/BaseProducer.php
Normal file
44
app/Amqp/Producer/BaseProducer.php
Normal file
@ -0,0 +1,44 @@
|
||||
<?php
|
||||
/**
|
||||
* Author: ykxiao
|
||||
* Date: 2025/6/6
|
||||
* Time: 上午9:53
|
||||
* Description:
|
||||
*
|
||||
* (c) ykxiao <yk_9001@hotmail.com>
|
||||
*
|
||||
* This source file is subject to the MIT license that is bundled
|
||||
* with this source code in the file LICENSE.
|
||||
*/
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Amqp\Producer;
|
||||
|
||||
use App\Context\UserContext;
|
||||
use Hyperf\Amqp\Message\ProducerMessage;
|
||||
|
||||
/**
|
||||
* Author: ykxiao
|
||||
* Date: 2025/6/6
|
||||
* Time: 上午9:57
|
||||
* Description: amqp生产者抽象基类.
|
||||
*
|
||||
* (c) ykxiao <yk_9001@hotmail.com>
|
||||
*
|
||||
* This source file is subject to the MIT license that is bundled
|
||||
* with this source code in the file LICENSE.
|
||||
*/
|
||||
abstract class BaseProducer extends ProducerMessage
|
||||
{
|
||||
public function __construct(array $data)
|
||||
{
|
||||
// 设置用户信息上下文信息
|
||||
if (UserContext::hasCurrentUser()) {
|
||||
$data['user'] = UserContext::getCurrentUser();
|
||||
}
|
||||
|
||||
$this->payload = $data;
|
||||
$this->properties['delivery_mode'] = 2; // 消息持久化
|
||||
}
|
||||
}
|
13
app/Amqp/Producer/UserImportProducer.php
Normal file
13
app/Amqp/Producer/UserImportProducer.php
Normal file
@ -0,0 +1,13 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Amqp\Producer;
|
||||
|
||||
use Hyperf\Amqp\Annotation\Producer;
|
||||
use Hyperf\Amqp\Message\ProducerMessage;
|
||||
|
||||
#[Producer(exchange: 'wh_user_import', routingKey: 'wh_user_import_key')]
|
||||
class UserImportProducer extends BaseProducer
|
||||
{
|
||||
}
|
Reference in New Issue
Block a user