📝 Notes💻 DeploymentRabbitMQBasic - Get Started (NodeJS)

RabbitMQ (Basic)

Download the docker k8s from docker:

docker pull rabbitmq:management

Then start with http://localhost:15672/

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management

Default account for login:

username: guest
password: guest

Start With NodeJS

Let use rabbitmq-client to connect for the several.

https://www.npmjs.com/package/rabbitmq-client

yarn add rabbitmq-client dotenv
import 'dotenv/config'
import { Connection } from 'rabbitmq-client'
 
// MQ_CONNECT_STRING=amqp://guest:guest@localhost:5672
const MQ_STRING = process.env.MQ_CONNECT_STRING || ""
const rabbit = new Connection(MQ_STRING)
 
rabbit.on('error', (err) => {
    console.log('RabbitMQ connection error', err)
})
 
rabbit.on('connection', () => {
    console.log('Connection successfully (re)established')
})

In order to test the Publisher and Consumer sections, we will adding a hono.js web server.

yarn add hono @hono/node-server 
import 'dotenv/config'
import { Connection } from 'rabbitmq-client'
 
import { serve } from '@hono/node-server'
import { Hono } from 'hono'
import { showRoutes } from "hono/dev";
 
// MQ_CONNECT_STRING=amqp://guest:guest@localhost:5672
const MQ_STRING = process.env.MQ_CONNECT_STRING || ""
const rabbit = new Connection(MQ_STRING)
 
rabbit.on('error', (err) => {
    console.log('RabbitMQ connection error', err)
})
 
rabbit.on('connection', () => {
    console.log('Connection successfully (re)established')
})
 
const app = new Hono();
 
const PORT = process.env.PORT ? +process.env.PORT : 8181;
 
showRoutes(app, {
    verbose: process.env.NODE_ENV === "development",
    colorize: true,
});
 
console.log(`Server is running on port http://localhost:${PORT}`);
 
serve({
    fetch: app.fetch,
    port: PORT,
});

Publisher: Make Data to Query

Assume that you have created a Query from the RabbitMQ named test_emails

// Lets' assume we have a query call "test_emails"
const MY_QUERY_NAME = "test_emails";
 
// A Router to make One Data
app.get('/push1Data', async (c) => {
    const pub = rabbit.createPublisher({
        confirm: true,
        maxAttempts: 2,
        // Optionally ensure the existence of an exchange before we use it
        exchanges: [{exchange: MY_QUERY_NAME, type: 'topic'}]
    })
 
    const data = {
        id: randomUUID(),
        rngId: randomUUID()
    }
 
    await pub.send(MY_QUERY_NAME, data)
 
    return c.json({ status: true, data })
})
 
// A Router to make 100 Data
app.get('/push100Data', async (c) => {
 
    const pub = rabbit.createPublisher({
        confirm: true,
        maxAttempts: 2,
        exchanges: [{exchange: MY_QUERY_NAME, type: 'topic'}]
    })
 
    const makeBigMessage = Array(100).fill(null).map(_ => ({
        id: randomUUID(),
        rngId: randomUUID()
    }))
    
    for (let v of makeBigMessage) {
        await pub.send(MY_QUERY_NAME, v)
    }
 
    return c.json({ status: true, data: makeBigMessage })
})

Consumer: Get Data from Query

// Lets' assume we have a query call "test_emails"
const MY_QUERY_NAME = "test_emails";
 
const sub = rabbit.createConsumer({
    queue: MY_QUERY_NAME,
    queueOptions: { durable: true },
 
    // Get One data Per time
    qos: { prefetchCount: 1 },
    // If you set this to >= 2, it does NOT MEAN you are getting a array
    // But this consumer will exec >=2 at once in common
 
    // Requeue message when the handler throws an Error (Default is true)
    requeue: true
 
    // Optionally ensure the existence of an exchange before we use it
    exchanges: [{exchange: MY_QUERY_NAME, type: 'topic'}]
 
}, async (msg) => {
    console.log('received message (user-events)', msg)
 
    const data = msg.body; // { id: string, rngId: string }
    console.log(data);
 
    // Assume this data will be process for 5 seconds
    await timer(5000)
})
 
sub.on('error', (err) => {
    // Maybe the consumer was cancelled, or the connection was reset before a
    // message could be acknowledged.
    console.log('consumer error (user-events)', err)
})
 
/** Promise Timer */
export function timer(ms: number = 1000): Promise<void> {
    return new Promise(function (resolve) {
        setTimeout(resolve, ms);
    });
}

Query: Making query at your code

You may make yor own query at coding level with rabbit.queueDeclare();

 
// http://localhost:8181/makeQuery/hello_mate
app.get('/makeQuery/:name', async (c) => {
 
    const queryName = c.req.param("name")
    await rabbit.queueDeclare({
        queue: queryName,
 
        // Exclusive queues may only be accessed by the current connection,
        // and are deleted when that connection closes.
        exclusive: true 
    })
 
    return c.json({ status: true, name: queryName })
})
 

Advance: Publisher with Routing

For the same query, you may add routing key for internal LB control.

// Lets' assume we have a query call "test_emails"
const MY_QUERY_NAME = "test_emails";
 
// A Router to make One Data
app.get('/push1Data', async (c) => {
    const pub = rabbit.createPublisher({
        confirm: true,
        maxAttempts: 2,
        // Optionally ensure the existence of an exchange before we use it
        exchanges: [{exchange: MY_QUERY_NAME, type: 'topic'}]
    })
 
    const data = {
        id: randomUUID(),
        rngId: randomUUID()
    }
 
    const routingKey = Math.random() >= 0.5 ? "A" : "B";
 
    await pub.send(
        { 
            exchange: MY_QUERY_NAME,
            routingKey: `route.${routingKey}` // Adding This Data will be route to here for this query
        },
        data
    )
 
    return c.json({ status: true, data })
})
 
const sub = rabbit.createConsumer({
    // ...
 
    // Optionally ensure the existence of an exchange before we use it
    exchanges: [{exchange: MY_QUERY_NAME, type: 'topic'}]
 
    // With a "topic" exchange, messages matching this pattern are routed to the queue
    queueBindings: [{exchange: 'my-events', routingKey: 'route.*'}],
 
    // Only Handle A Data if set to this
    // queueBindings: [{exchange: 'my-events', routingKey: 'route.A'}],
 
}, async (msg) => {
    // ...
})