Skip to Content
๐Ÿ“ Notes๐Ÿ’ป DeploymentRabbitmqRabbitMQ With NodeJS

RabbitMQ (Basic)

Download the docker k8s from docker:

docker pull rabbitmq:management

Then start with http://localhost:15672/ย 

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management

Default account for login:

username: guest password: guest

Start With NodeJS

Let use rabbitmq-client to connect for the several.

https://www.npmjs.com/package/rabbitmq-clientย 

yarn add rabbitmq-client dotenv
import 'dotenv/config' import { Connection } from 'rabbitmq-client' // MQ_CONNECT_STRING=amqp://guest:guest@localhost:5672 const MQ_STRING = process.env.MQ_CONNECT_STRING || "" const rabbit = new Connection(MQ_STRING) rabbit.on('error', (err) => { console.log('RabbitMQ connection error', err) }) rabbit.on('connection', () => { console.log('Connection successfully (re)established') })

In order to test the Publisher and Consumer sections, we will adding a hono.js web server.

yarn add hono @hono/node-server
import 'dotenv/config' import { Connection } from 'rabbitmq-client' import { serve } from '@hono/node-server' import { Hono } from 'hono' import { showRoutes } from "hono/dev"; // MQ_CONNECT_STRING=amqp://guest:guest@localhost:5672 const MQ_STRING = process.env.MQ_CONNECT_STRING || "" const rabbit = new Connection(MQ_STRING) rabbit.on('error', (err) => { console.log('RabbitMQ connection error', err) }) rabbit.on('connection', () => { console.log('Connection successfully (re)established') }) const app = new Hono(); const PORT = process.env.PORT ? +process.env.PORT : 8181; showRoutes(app, { verbose: process.env.NODE_ENV === "development", colorize: true, }); console.log(`Server is running on port http://localhost:${PORT}`); serve({ fetch: app.fetch, port: PORT, });

Publisher: Make Data to Query

Assume that you have created a Query from the RabbitMQ named test_emails

// Lets' assume we have a query call "test_emails" const MY_QUERY_NAME = "test_emails"; // A Router to make One Data app.get('/push1Data', async (c) => { const pub = rabbit.createPublisher({ confirm: true, maxAttempts: 2, // Optionally ensure the existence of an exchange before we use it exchanges: [{exchange: MY_QUERY_NAME, type: 'topic'}] }) const data = { id: randomUUID(), rngId: randomUUID() } await pub.send(MY_QUERY_NAME, data) return c.json({ status: true, data }) }) // A Router to make 100 Data app.get('/push100Data', async (c) => { const pub = rabbit.createPublisher({ confirm: true, maxAttempts: 2, exchanges: [{exchange: MY_QUERY_NAME, type: 'topic'}] }) const makeBigMessage = Array(100).fill(null).map(_ => ({ id: randomUUID(), rngId: randomUUID() })) for (let v of makeBigMessage) { await pub.send(MY_QUERY_NAME, v) } return c.json({ status: true, data: makeBigMessage }) })

Consumer: Get Data from Query

// Lets' assume we have a query call "test_emails" const MY_QUERY_NAME = "test_emails"; const sub = rabbit.createConsumer({ queue: MY_QUERY_NAME, queueOptions: { durable: true }, // Get One data Per time qos: { prefetchCount: 1 }, // If you set this to >= 2, it does NOT MEAN you are getting a array // But this consumer will exec >=2 at once in common // Requeue message when the handler throws an Error (Default is true) requeue: true // Optionally ensure the existence of an exchange before we use it exchanges: [{exchange: MY_QUERY_NAME, type: 'topic'}] }, async (msg) => { console.log('received message (user-events)', msg) const data = msg.body; // { id: string, rngId: string } console.log(data); // Assume this data will be process for 5 seconds await timer(5000) }) sub.on('error', (err) => { // Maybe the consumer was cancelled, or the connection was reset before a // message could be acknowledged. console.log('consumer error (user-events)', err) }) /** Promise Timer */ export function timer(ms: number = 1000): Promise<void> { return new Promise(function (resolve) { setTimeout(resolve, ms); }); }

Query: Making query at your code

You may make yor own query at coding level with rabbit.queueDeclare();

// http://localhost:8181/makeQuery/hello_mate app.get('/makeQuery/:name', async (c) => { const queryName = c.req.param("name") await rabbit.queueDeclare({ queue: queryName, // Exclusive queues may only be accessed by the current connection, // and are deleted when that connection closes. exclusive: true }) return c.json({ status: true, name: queryName }) })

Advance: Publisher with Routing

For the same query, you may add routing key for internal LB control.

// Lets' assume we have a query call "test_emails" const MY_QUERY_NAME = "test_emails"; // A Router to make One Data app.get('/push1Data', async (c) => { const pub = rabbit.createPublisher({ confirm: true, maxAttempts: 2, // Optionally ensure the existence of an exchange before we use it exchanges: [{exchange: MY_QUERY_NAME, type: 'topic'}] }) const data = { id: randomUUID(), rngId: randomUUID() } const routingKey = Math.random() >= 0.5 ? "A" : "B"; await pub.send( { exchange: MY_QUERY_NAME, routingKey: `route.${routingKey}` // Adding This Data will be route to here for this query }, data ) return c.json({ status: true, data }) }) const sub = rabbit.createConsumer({ // ... // Optionally ensure the existence of an exchange before we use it exchanges: [{exchange: MY_QUERY_NAME, type: 'topic'}] // With a "topic" exchange, messages matching this pattern are routed to the queue queueBindings: [{exchange: 'my-events', routingKey: 'route.*'}], // Only Handle A Data if set to this // queueBindings: [{exchange: 'my-events', routingKey: 'route.A'}], }, async (msg) => { // ... })
Last updated on