xiang 发布于  2024-2-28 17:42 

nodejs消费rabbitmq队列消息 nodejs

index.js

var amqp = require('amqplib/callback_api');

const MyConsume = require('./MyConsume');

amqp.connect('amqp://name:password!@localhost:5672/vhost', function (error0, connection) {
    if (error0) {
        throw error0;
    }
    connection.createChannel(function (error1, channel) {
        if (error1) {
            throw error1;
        }

        var exchangeName = 'my_topic'; // 你的 exchange 名称
        var exchangeType = 'topic'; // exchange 类型,可以是 direct、fanout、topic 等
        var routingKey = 'my_routing_key'; // 你的 routing key
        var queueName = 'my_queue'; // 你的队列名称

        channel.assertExchange(exchangeName, exchangeType, { durable: true }, function(error2, ok) {
            if (error2) {
                throw error2;
            }
            console.log('Exchange ' + exchangeName + ' is ready');

            // 声明一个队列
            channel.assertQueue(queueName, { durable: true  }, function(error3, q) {
                if (error3) {
                    throw error3;
                }
                console.log('Queue ' +queueName + ' is created');

                // 将队列绑定到 exchange,并指定 routing key
                channel.bindQueue(queueName, exchangeName, routingKey);
                console.log('Queue ' + queueName + ' is bound to Exchange ' + exchangeName + ' with routing key ' + routingKey);

                // 在这里开始消费消息
                channel.consume(queueName, function(msg) {
                  MyConsume.handleMessage(msg,channel);
                }, {
                    noAck: false
                });
            });
        });
    });
});

MyConsume.js

const { UserModel } = require('./UserModel');

function handleMessage(msg,channel) {
    UserModel.create({
        user_name: 'Example Bookmark',
        url: 'http://www.example.com',
        type_id: 1,
        order: 1,
        is_recommend: true,
        status: true
      }).then(bookmark => {
        console.log('New bookmark created:');
        console.log(new Date());
      }).catch(error => {
        console.error('Error creating bookmark:', error);
      });
    // 在这里可以添加你的其他处理逻辑
    channel.ack(msg)
}

module.exports = {
    handleMessage: handleMessage
};

使用Mysql连接池,开10个mysql连接,消费3万rabbitmq消息,每条消息的逻辑是往mysql数据表写入一条数据,30秒写入完.

以上nodejs代码的写法我用php实现不到.哪怕是用swoole.如何用swoole来实现请高人指教.

标签: rabbitmq

xiang 发布于  2024-1-29 10:06 

php rabbitmq 队列持久化,消息持久化 php

ubuntu停止rabbitmq

service rabbitmq-server stop

ubuntu启动rabbitmq

service rabbitmq-server start

queue队列持久化

  $channel->queue_declare($queue, false, true, false, false);

durable = true 队列持久化


message消息持久化

 $msg = new AMQPMessage('Hello World!'.$time, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));

重启 rabbitmq-server

queue和ready message依然存在

标签: rabbitmq

xiang 发布于  2024-1-13 10:27