📝Bull Board Queue
Queue management panel.
Imports
import { queuePool } from 'niro-health'
import { getBullBoardQueues } from 'niro-health';
Implementing the queue panel
In this example, I am using Express.
main.ts
import { NestFactory } from '@nestjs/core';
import { getBullBoardQueues } from 'niro-health';
import { AppModule } from './app.module';
async function bootstrap() {
const app = await NestFactory.create(AppModule);
const bullBoardUserName = "USERNAME";
const bullBoardPassword = "PASSWORD";
passport.use(
new BasicStrategy((username, password, done) => {
if (username === bullBoardUserName && password === bullBoardPassword) {
done(null, true);
} else {
done(null, false);
}
}),
);
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');
const queues = getBullBoardQueues();
createBullBoard({
queues,
serverAdapter,
});
app.use(
'/admin/queues',
passport.authenticate('basic', {
session: false,
}),
serverAdapter.getRouter(),
);
await app.listen(3000);
}
bootstrap();
Creating our Job
I will use the example from the user module.
Você pode ver o código oficial no Github.
The folder structure will look like this:
jobs
configs
email
accountActivate.ts
constants
email
accountActivateProcess.ts
emailNameJob.ts
types
email
accountActivate.ts
email.ts
accountActivate.ts
import { JobOptions } from 'bull';
export const AccountActivateOptions: JobOptions = {
attempts: 5, // * Number of times to attempt the job
delay: 0, // * Delay this job for 0ms
backoff: {
type: 'fixed',
delay: 300000, // ! 5 Minutes for each attempt
},
};
accountActivateProcess.ts
export default 'account/activate' as const;
emailNameJob.ts
export default 'emails/delivery' as const;
accountActivate.ts
export type AccountActivateType = {
email: string;
username: string;
token: string;
temporarypass: string | null;
};
email.ts
import { Inject, Logger } from '@nestjs/common';
import { Process, Processor, OnQueueError, OnQueueActive } from '@nestjs/bull';
import { Job } from 'bull';
import SMTPTransport from 'nodemailer/lib/smtp-transport';
import type { IConfigurationService, IEmailService } from 'niro-health';
import EmailNameJob from './jobs/constants/email/emailNameJob';
import AccountActivateProcess from './jobs/constants/email/accountActivateProcess';
import type { AccountActivateType } from './jobs/types/email/accountActivate';
@Processor(EmailNameJob)
export class EmailJob {
private readonly logger = new Logger(EmailJob.name);
constructor(
@Inject('IConfigurationService')
private readonly configurationService: IConfigurationService,
@Inject('IEmailService') private readonly emailService: IEmailService,
) {}
private async _accountEmailConfirm(
email: string,
username: string,
token: string,
temporarypass: string | null,
): Promise<SMTPTransport.SentMessageInfo> {
try {
return await this.emailService.send(
{
from: '"Niro Health" <support@niro-health.com>',
to: [email],
subject: 'Niro Health - Account Activation',
priority: 'normal',
},
'email-confirm',
{
title: `Dear ${username}, please click on the link below to activate your account!`,
variables: {
username,
url: `${this.configurationService.WEBAPP_URI}/account/confirm?token=${token}`,
temporarypass,
},
},
);
} catch (error) {
throw new Error(JSON.stringify(error));
}
}
@OnQueueActive()
onActive(job: Job) {
this.logger.debug(`Processing job ${job.id} of type ${job.name}`);
}
@OnQueueError()
onError(err: Error) {
throw new Error(`Queue error: ${err.message}`);
}
@Process(AccountActivateProcess)
async handleAccountActivate(job: Job<AccountActivateType>) {
this.logger.debug(
`Email for activation of account(${job.data.username}) sent...`,
);
return await this._accountEmailConfirm(
job.data.email,
job.data.username,
job.data.token,
job.data.temporarypass,
);
}
}
Injecting the queues into the main module.
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { UsersModule } from './users/users.module';
@Module({
imports: [
BullModule.forRoot('users-queues', {
url: process.env.REDIS_HOST,
redis: {
password: process.env.REDIS_PASSWORD,
},
}),
UsersModule,
],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {}
Injecting the queues into the users module.
users.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
import EmailNameJob from './jobs/constants/email/emailNameJob';
import { UsersController } from './users.controller';
import { UsersService } from './users.service';
import { UsersParser } from './parsers';
import { EmailJob } from './jobs/email';
import {
CoreModule,
ConfigurationService,
DebugService,
ValidatorRegexpService,
StringExService,
RandomService,
CryptoService,
JsonWebTokenService,
SimilarityFilterService,
EmailService,
AwsCoreService,
AwsConfigurationService,
AwsStsService,
SqliteService,
RedisService,
PrismaService
} from 'niro-health';
@Module({
imports: [
BullModule.registerQueue({
configKey: 'users-queues',
name: EmailNameJob,
}),
CoreModule,
],
controllers: [UsersController],
providers: [
{ provide: 'IUsersService', useClass: UsersService },
{ provide: 'IUsersParser', useClass: UsersParser },
{ provide: 'IEmailJob', useClass: EmailJob },
{ provide: 'IConfigurationService', useClass: ConfigurationService },
{ provide: 'IDebugService', useClass: DebugService },
{ provide: 'IValidatorRegexpService', useClass: ValidatorRegexpService },
{ provide: 'IStringExService', useClass: StringExService },
{ provide: 'IRandomService', useClass: RandomService },
{ provide: 'ICryptoService', useClass: CryptoService },
{ provide: 'IJsonWebTokenService', useClass: JsonWebTokenService },
{ provide: 'ISimilarityFilterService', useClass: SimilarityFilterService },
{ provide: 'IEmailService', useClass: EmailService },
{ provide: 'IAwsCoreService', useClass: AwsCoreService },
{ provide: 'IAwsConfigurationService', useClass: AwsConfigurationService },
{ provide: 'IAwsStsService', useClass: AwsStsService },
{ provide: 'IRedisService', useClass: RedisService },
{ provide: 'ISqliteService', useClass: SqliteService },
{ provide: 'IPrismaService', useClass: PrismaService },
],
exports: [{ provide: 'IUsersService', useClass: UsersService }],
})
export class UsersModule {}
Injetando as filas no controller do modulo de usuarios
users.controller.ts
import {
Inject,
HttpException,
HttpStatus,
Controller,
UseGuards,
UsePipes,
Get,
Post,
Body,
Req,
Patch,
Query,
Param,
Delete,
} from '@nestjs/common';
import { Request } from 'express';
import { InjectQueue } from '@nestjs/bull';
import { Queue } from 'bull';
import {
queuePool,
JoiValidationPipe,
CoreService,
AppHostService,
JsonWebTokenService
} from 'niro-health';
import EmailNameJob from './jobs/constants/email/emailNameJob';
import AccountActivateProcess from './jobs/constants/email/accountActivateProcess';
import { AccountActivateType } from './jobs/types/email/accountActivate';
import { AccountActivateOptions } from './jobs/configs/email/accountActivate';
import { RolesGuard } from './guards/roles.guard';
// import { Roles } from './guards/roles.decorator';
import { TokenGuard } from './guards/token.guard';
// import { Token } from './guards/token.decorator';
import { UsersService } from './users.service';
import { UsersParser } from './parsers';
import type { CreateUserDto } from './dto/create';
import type { ActivateUserDto } from './dto/activate';
import type { LoginUserDto } from './dto/login';
import type { SessionValidateUserDto } from './dto/sessionValidate';
import type { LogoutUserDto } from './dto/logout';
import type { UpdateUserDto } from './dto/update';
import { CreateUserSchema } from './dto/schemas/create.joi';
import { ActivateUserSchema } from './dto/schemas/activate.joi';
import { LoginUserSchema } from './dto/schemas/login.joi';
import { SessionValidateUserSchema } from './dto/schemas/sessionValidate.joi';
import { LogoutUserSchema } from './dto/schemas/logout.joi';
import { UpdateUserSchema } from './dto/schemas/update.joi';
@Controller('api/users')
@UseGuards(RolesGuard, TokenGuard)
export class UsersController {
constructor(
@InjectQueue(EmailNameJob)
private readonly emailQueue: Queue<AccountActivateType>,
private readonly coreService: CoreService,
private readonly appHostService: AppHostService,
@Inject('IUsersService') private readonly usersService: UsersService,
@Inject('IUsersParser') private readonly usersParser: UsersParser,
) {
queuePool.add(emailQueue);
}
@Post()
@UsePipes(new JoiValidationPipe(CreateUserSchema))
async create(@Body() createUserDto: CreateUserDto) {
const user = await this.usersService.create(createUserDto);
if (user instanceof Error)
throw new HttpException(user.message, HttpStatus.FORBIDDEN);
try {
await this.emailQueue.add(
AccountActivateProcess,
{
email: createUserDto.email,
username: createUserDto.username,
token: this.appHostService.app
.get<JsonWebTokenService>('IJsonWebTokenService')
.save(
{
id: user.id,
username: user.username,
timestamp: new Date().getTime(),
},
null,
`7d`,
) as string,
temporarypass: null,
},
AccountActivateOptions,
);
return this.usersParser.toJSON(user);
} catch (error) {
throw new HttpException(error.message, HttpStatus.FORBIDDEN);
}
}
@Post('activate')
@UsePipes(new JoiValidationPipe(ActivateUserSchema))
async activate(@Body() activateUserDto: ActivateUserDto) {
const token = this.appHostService.app
.get<JsonWebTokenService>('IJsonWebTokenService')
.load(activateUserDto.token, null);
if (token instanceof Error)
throw new HttpException(
'Account activation token is invalid. Please try again.',
HttpStatus.FORBIDDEN,
);
const { id } = token as { id: string };
const user = await this.usersService.activate(id);
if (user instanceof Error)
throw new HttpException(user.message, HttpStatus.FORBIDDEN);
return true;
}
@Post('auth/login')
@UsePipes(new JoiValidationPipe(LoginUserSchema))
async login(@Body() loginUserDto: LoginUserDto, @Req() request: Request) {
const clientGeoIP = await this.coreService.getClientGeoIP(request);
const user = await this.usersService.login(
loginUserDto.email,
loginUserDto.password,
clientGeoIP.device_name,
{
...clientGeoIP,
token_signature: '',
},
);
if (user instanceof Error)
throw new HttpException(user.message, HttpStatus.FORBIDDEN);
return this.usersParser.toJSON(user);
}
@Post('auth/validate')
@UsePipes(new JoiValidationPipe(SessionValidateUserSchema))
async sessionValidate(
@Body() sessionValidateUserDto: SessionValidateUserDto,
@Req() request: Request,
) {
const clientGeoIP = await this.coreService.getClientGeoIP(request);
const session = await this.usersService.sessionValidate(
sessionValidateUserDto.id,
sessionValidateUserDto.token_value,
sessionValidateUserDto.token_signature,
sessionValidateUserDto.token_revalidate_value,
sessionValidateUserDto.token_revalidate_signature,
clientGeoIP.device_name,
{
...clientGeoIP,
token_signature: sessionValidateUserDto.token_signature,
},
);
if (session instanceof Error)
throw new HttpException(session.message, HttpStatus.FORBIDDEN);
return session;
}
@Post('auth/logout')
@UsePipes(new JoiValidationPipe(LogoutUserSchema))
async logout(@Body() logoutUserDto: LogoutUserDto) {
const session = await this.usersService.logout(
logoutUserDto.id,
logoutUserDto.token_value,
);
if (session instanceof Error)
throw new HttpException(session.message, HttpStatus.FORBIDDEN);
return true;
}
@Get()
async findAll(@Query('limit') limit: string, @Query('skip') skip: string) {
return (
await this.usersService.findAll(
limit && parseInt(limit),
skip && parseInt(skip),
)
).map((user) => this.usersParser.toJSON(user));
}
@Get(':id')
async findOne(@Param('id') id: string) {
const user = await this.usersService.findOne(id);
if (user instanceof Error)
throw new HttpException(user.message, HttpStatus.FORBIDDEN);
return this.usersParser.toJSON(user);
}
@Patch(':id')
async update(
@Param('id') id: string,
@Body(new JoiValidationPipe(UpdateUserSchema))
updateUserDto: UpdateUserDto,
) {
const user = await this.usersService.update(id, updateUserDto);
if (user instanceof Error)
throw new HttpException(user.message, HttpStatus.FORBIDDEN);
return this.usersParser.toJSON(user);
}
@Delete(':id')
async delete(@Param('id') id: string) {
const deleted = await this.usersService.delete(id);
if (deleted instanceof Error)
throw new HttpException(deleted.message, HttpStatus.FORBIDDEN);
return deleted;
}
}
Constants
Constant
Description
queuePool
This is a pool of queues that will be used by BullBoard.
getBullBoardQueues
This is a function that will be used by BullBoard to get the queues.
Last updated