RabbitMQ (Basic)
Links
Download the docker k8s from docker
:
docker pull rabbitmq:management
Then start with http://localhost:15672/
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
Default account for login:
username: guest
password: guest
Start With NodeJS
Let use rabbitmq-client
to connect for the several.
https://www.npmjs.com/package/rabbitmq-client
yarn add rabbitmq-client dotenv
import 'dotenv/config'
import { Connection } from 'rabbitmq-client'
// MQ_CONNECT_STRING=amqp://guest:guest@localhost:5672
const MQ_STRING = process.env.MQ_CONNECT_STRING || ""
const rabbit = new Connection(MQ_STRING)
rabbit.on('error', (err) => {
console.log('RabbitMQ connection error', err)
})
rabbit.on('connection', () => {
console.log('Connection successfully (re)established')
})
In order to test the Publisher
and Consumer
sections, we will adding a hono.js
web server.
yarn add hono @hono/node-server
import 'dotenv/config'
import { Connection } from 'rabbitmq-client'
import { serve } from '@hono/node-server'
import { Hono } from 'hono'
import { showRoutes } from "hono/dev";
// MQ_CONNECT_STRING=amqp://guest:guest@localhost:5672
const MQ_STRING = process.env.MQ_CONNECT_STRING || ""
const rabbit = new Connection(MQ_STRING)
rabbit.on('error', (err) => {
console.log('RabbitMQ connection error', err)
})
rabbit.on('connection', () => {
console.log('Connection successfully (re)established')
})
const app = new Hono();
const PORT = process.env.PORT ? +process.env.PORT : 8181;
showRoutes(app, {
verbose: process.env.NODE_ENV === "development",
colorize: true,
});
console.log(`Server is running on port http://localhost:${PORT}`);
serve({
fetch: app.fetch,
port: PORT,
});
Publisher: Make Data to Query
Assume that you have created a Query
from the RabbitMQ named test_emails
// Lets' assume we have a query call "test_emails"
const MY_QUERY_NAME = "test_emails";
// A Router to make One Data
app.get('/push1Data', async (c) => {
const pub = rabbit.createPublisher({
confirm: true,
maxAttempts: 2,
// Optionally ensure the existence of an exchange before we use it
exchanges: [{exchange: MY_QUERY_NAME, type: 'topic'}]
})
const data = {
id: randomUUID(),
rngId: randomUUID()
}
await pub.send(MY_QUERY_NAME, data)
return c.json({ status: true, data })
})
// A Router to make 100 Data
app.get('/push100Data', async (c) => {
const pub = rabbit.createPublisher({
confirm: true,
maxAttempts: 2,
exchanges: [{exchange: MY_QUERY_NAME, type: 'topic'}]
})
const makeBigMessage = Array(100).fill(null).map(_ => ({
id: randomUUID(),
rngId: randomUUID()
}))
for (let v of makeBigMessage) {
await pub.send(MY_QUERY_NAME, v)
}
return c.json({ status: true, data: makeBigMessage })
})
Consumer: Get Data from Query
// Lets' assume we have a query call "test_emails"
const MY_QUERY_NAME = "test_emails";
const sub = rabbit.createConsumer({
queue: MY_QUERY_NAME,
queueOptions: { durable: true },
// Get One data Per time
qos: { prefetchCount: 1 },
// If you set this to >= 2, it does NOT MEAN you are getting a array
// But this consumer will exec >=2 at once in common
// Requeue message when the handler throws an Error (Default is true)
requeue: true
// Optionally ensure the existence of an exchange before we use it
exchanges: [{exchange: MY_QUERY_NAME, type: 'topic'}]
}, async (msg) => {
console.log('received message (user-events)', msg)
const data = msg.body; // { id: string, rngId: string }
console.log(data);
// Assume this data will be process for 5 seconds
await timer(5000)
})
sub.on('error', (err) => {
// Maybe the consumer was cancelled, or the connection was reset before a
// message could be acknowledged.
console.log('consumer error (user-events)', err)
})
/** Promise Timer */
export function timer(ms: number = 1000): Promise<void> {
return new Promise(function (resolve) {
setTimeout(resolve, ms);
});
}
Query: Making query at your code
You may make yor own query at coding level with rabbit.queueDeclare()
;
// http://localhost:8181/makeQuery/hello_mate
app.get('/makeQuery/:name', async (c) => {
const queryName = c.req.param("name")
await rabbit.queueDeclare({
queue: queryName,
// Exclusive queues may only be accessed by the current connection,
// and are deleted when that connection closes.
exclusive: true
})
return c.json({ status: true, name: queryName })
})
Advance: Publisher with Routing
For the same query, you may add routing
key for internal LB control.
// Lets' assume we have a query call "test_emails"
const MY_QUERY_NAME = "test_emails";
// A Router to make One Data
app.get('/push1Data', async (c) => {
const pub = rabbit.createPublisher({
confirm: true,
maxAttempts: 2,
// Optionally ensure the existence of an exchange before we use it
exchanges: [{exchange: MY_QUERY_NAME, type: 'topic'}]
})
const data = {
id: randomUUID(),
rngId: randomUUID()
}
const routingKey = Math.random() >= 0.5 ? "A" : "B";
await pub.send(
{
exchange: MY_QUERY_NAME,
routingKey: `route.${routingKey}` // Adding This Data will be route to here for this query
},
data
)
return c.json({ status: true, data })
})
const sub = rabbit.createConsumer({
// ...
// Optionally ensure the existence of an exchange before we use it
exchanges: [{exchange: MY_QUERY_NAME, type: 'topic'}]
// With a "topic" exchange, messages matching this pattern are routed to the queue
queueBindings: [{exchange: 'my-events', routingKey: 'route.*'}],
// Only Handle A Data if set to this
// queueBindings: [{exchange: 'my-events', routingKey: 'route.A'}],
}, async (msg) => {
// ...
})