📝Bull Board Queue

Queue management panel.

Imports

import { queuePool } from 'niro-health'
import { getBullBoardQueues } from 'niro-health';

Implementing the queue panel

In this example, I am using Express.

main.ts
import { NestFactory } from '@nestjs/core';
import { getBullBoardQueues } from 'niro-health';
import { AppModule } from './app.module';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);

  const bullBoardUserName = "USERNAME";
  const bullBoardPassword = "PASSWORD";

  passport.use(
    new BasicStrategy((username, password, done) => {
      if (username === bullBoardUserName && password === bullBoardPassword) {
        done(null, true);
      } else {
        done(null, false);
      }
    }),
  );
  
  const serverAdapter = new ExpressAdapter();
  serverAdapter.setBasePath('/admin/queues');
  
  const queues = getBullBoardQueues();
  
  createBullBoard({
    queues,
    serverAdapter,
  });
  
  app.use(
    '/admin/queues',
    passport.authenticate('basic', {
      session: false,
    }),
    serverAdapter.getRouter(),
  );
  
  await app.listen(3000);
}
bootstrap();

Creating our Job

I will use the example from the user module.

Você pode ver o código oficial no Github.

The folder structure will look like this:

  • jobs

  • configs

    • email

      • accountActivate.ts

  • constants

    • email

      • accountActivateProcess.ts

      • emailNameJob.ts

  • types

    • email

      • accountActivate.ts

  • email.ts

accountActivate.ts
import { JobOptions } from 'bull';

export const AccountActivateOptions: JobOptions = {
  attempts: 5, // * Number of times to attempt the job
  delay: 0, // * Delay this job for 0ms
  backoff: {
    type: 'fixed',
    delay: 300000, // ! 5 Minutes for each attempt
  },
};
accountActivateProcess.ts
export default 'account/activate' as const;
emailNameJob.ts
export default 'emails/delivery' as const;
accountActivate.ts
export type AccountActivateType = {
  email: string;
  username: string;
  token: string;
  temporarypass: string | null;
};
email.ts
import { Inject, Logger } from '@nestjs/common';
import { Process, Processor, OnQueueError, OnQueueActive } from '@nestjs/bull';
import { Job } from 'bull';

import SMTPTransport from 'nodemailer/lib/smtp-transport';

import type { IConfigurationService, IEmailService } from 'niro-health';

import EmailNameJob from './jobs/constants/email/emailNameJob';
import AccountActivateProcess from './jobs/constants/email/accountActivateProcess';

import type { AccountActivateType } from './jobs/types/email/accountActivate';

@Processor(EmailNameJob)
export class EmailJob {
  private readonly logger = new Logger(EmailJob.name);

  constructor(
    @Inject('IConfigurationService')
    private readonly configurationService: IConfigurationService,
    @Inject('IEmailService') private readonly emailService: IEmailService,
  ) {}

  private async _accountEmailConfirm(
    email: string,
    username: string,
    token: string,
    temporarypass: string | null,
  ): Promise<SMTPTransport.SentMessageInfo> {
    try {
      return await this.emailService.send(
        {
          from: '"Niro Health" <support@niro-health.com>',
          to: [email],
          subject: 'Niro Health - Account Activation',
          priority: 'normal',
        },
        'email-confirm',
        {
          title: `Dear ${username}, please click on the link below to activate your account!`,
          variables: {
            username,
            url: `${this.configurationService.WEBAPP_URI}/account/confirm?token=${token}`,
            temporarypass,
          },
        },
      );
    } catch (error) {
      throw new Error(JSON.stringify(error));
    }
  }

  @OnQueueActive()
  onActive(job: Job) {
    this.logger.debug(`Processing job ${job.id} of type ${job.name}`);
  }

  @OnQueueError()
  onError(err: Error) {
    throw new Error(`Queue error: ${err.message}`);
  }

  @Process(AccountActivateProcess)
  async handleAccountActivate(job: Job<AccountActivateType>) {
    this.logger.debug(
      `Email for activation of account(${job.data.username}) sent...`,
    );

    return await this._accountEmailConfirm(
      job.data.email,
      job.data.username,
      job.data.token,
      job.data.temporarypass,
    );
  }
}

Injecting the queues into the main module.

import { Module } from '@nestjs/common';

import { BullModule } from '@nestjs/bull';

import { AppController } from './app.controller';
import { AppService } from './app.service';
import { UsersModule } from './users/users.module';

@Module({
  imports: [
    BullModule.forRoot('users-queues', {
      url: process.env.REDIS_HOST,
      redis: {
        password: process.env.REDIS_PASSWORD,
      },
    }),
    UsersModule,
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}

Injecting the queues into the users module.

users.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';

import EmailNameJob from './jobs/constants/email/emailNameJob';

import { UsersController } from './users.controller';
import { UsersService } from './users.service';
import { UsersParser } from './parsers';
import { EmailJob } from './jobs/email';

import { 
  CoreModule,
  ConfigurationService,
  DebugService,
  ValidatorRegexpService,
  StringExService,
  RandomService,
  CryptoService,
  JsonWebTokenService,
  SimilarityFilterService,
  EmailService,
  AwsCoreService,
  AwsConfigurationService,
  AwsStsService,
  SqliteService,
  RedisService,
  PrismaService
} from 'niro-health';

@Module({
  imports: [
    BullModule.registerQueue({
      configKey: 'users-queues',
      name: EmailNameJob,
    }),
    CoreModule,
  ],
  controllers: [UsersController],
  providers: [
    { provide: 'IUsersService', useClass: UsersService },
    { provide: 'IUsersParser', useClass: UsersParser },
    { provide: 'IEmailJob', useClass: EmailJob },
    { provide: 'IConfigurationService', useClass: ConfigurationService },
    { provide: 'IDebugService', useClass: DebugService },
    { provide: 'IValidatorRegexpService', useClass: ValidatorRegexpService },
    { provide: 'IStringExService', useClass: StringExService },
    { provide: 'IRandomService', useClass: RandomService },
    { provide: 'ICryptoService', useClass: CryptoService },
    { provide: 'IJsonWebTokenService', useClass: JsonWebTokenService },
    { provide: 'ISimilarityFilterService', useClass: SimilarityFilterService },
    { provide: 'IEmailService', useClass: EmailService },
    { provide: 'IAwsCoreService', useClass: AwsCoreService },
    { provide: 'IAwsConfigurationService', useClass: AwsConfigurationService },
    { provide: 'IAwsStsService', useClass: AwsStsService },
    { provide: 'IRedisService', useClass: RedisService },
    { provide: 'ISqliteService', useClass: SqliteService },
    { provide: 'IPrismaService', useClass: PrismaService },
  ],
  exports: [{ provide: 'IUsersService', useClass: UsersService }],
})
export class UsersModule {}

Injetando as filas no controller do modulo de usuarios

users.controller.ts
import {
  Inject,
  HttpException,
  HttpStatus,
  Controller,
  UseGuards,
  UsePipes,
  Get,
  Post,
  Body,
  Req,
  Patch,
  Query,
  Param,
  Delete,
} from '@nestjs/common';

import { Request } from 'express';

import { InjectQueue } from '@nestjs/bull';
import { Queue } from 'bull';

import { 
  queuePool,
  JoiValidationPipe,
  CoreService,
  AppHostService,
  JsonWebTokenService
} from 'niro-health';

import EmailNameJob from './jobs/constants/email/emailNameJob';
import AccountActivateProcess from './jobs/constants/email/accountActivateProcess';
import { AccountActivateType } from './jobs/types/email/accountActivate';
import { AccountActivateOptions } from './jobs/configs/email/accountActivate';

import { RolesGuard } from './guards/roles.guard';
// import { Roles } from './guards/roles.decorator';

import { TokenGuard } from './guards/token.guard';
// import { Token } from './guards/token.decorator';

import { UsersService } from './users.service';
import { UsersParser } from './parsers';

import type { CreateUserDto } from './dto/create';
import type { ActivateUserDto } from './dto/activate';
import type { LoginUserDto } from './dto/login';
import type { SessionValidateUserDto } from './dto/sessionValidate';
import type { LogoutUserDto } from './dto/logout';
import type { UpdateUserDto } from './dto/update';

import { CreateUserSchema } from './dto/schemas/create.joi';
import { ActivateUserSchema } from './dto/schemas/activate.joi';
import { LoginUserSchema } from './dto/schemas/login.joi';
import { SessionValidateUserSchema } from './dto/schemas/sessionValidate.joi';
import { LogoutUserSchema } from './dto/schemas/logout.joi';
import { UpdateUserSchema } from './dto/schemas/update.joi';

@Controller('api/users')
@UseGuards(RolesGuard, TokenGuard)
export class UsersController {
  constructor(
    @InjectQueue(EmailNameJob)
    private readonly emailQueue: Queue<AccountActivateType>,
    private readonly coreService: CoreService,
    private readonly appHostService: AppHostService,
    @Inject('IUsersService') private readonly usersService: UsersService,
    @Inject('IUsersParser') private readonly usersParser: UsersParser,
  ) {
    queuePool.add(emailQueue);
  }

  @Post()
  @UsePipes(new JoiValidationPipe(CreateUserSchema))
  async create(@Body() createUserDto: CreateUserDto) {
    const user = await this.usersService.create(createUserDto);

    if (user instanceof Error)
      throw new HttpException(user.message, HttpStatus.FORBIDDEN);

    try {
      await this.emailQueue.add(
        AccountActivateProcess,
        {
          email: createUserDto.email,
          username: createUserDto.username,
          token: this.appHostService.app
            .get<JsonWebTokenService>('IJsonWebTokenService')
            .save(
              {
                id: user.id,
                username: user.username,
                timestamp: new Date().getTime(),
              },
              null,
              `7d`,
            ) as string,
          temporarypass: null,
        },
        AccountActivateOptions,
      );

      return this.usersParser.toJSON(user);
    } catch (error) {
      throw new HttpException(error.message, HttpStatus.FORBIDDEN);
    }
  }

  @Post('activate')
  @UsePipes(new JoiValidationPipe(ActivateUserSchema))
  async activate(@Body() activateUserDto: ActivateUserDto) {
    const token = this.appHostService.app
      .get<JsonWebTokenService>('IJsonWebTokenService')
      .load(activateUserDto.token, null);

    if (token instanceof Error)
      throw new HttpException(
        'Account activation token is invalid. Please try again.',
        HttpStatus.FORBIDDEN,
      );

    const { id } = token as { id: string };

    const user = await this.usersService.activate(id);

    if (user instanceof Error)
      throw new HttpException(user.message, HttpStatus.FORBIDDEN);

    return true;
  }

  @Post('auth/login')
  @UsePipes(new JoiValidationPipe(LoginUserSchema))
  async login(@Body() loginUserDto: LoginUserDto, @Req() request: Request) {
    const clientGeoIP = await this.coreService.getClientGeoIP(request);

    const user = await this.usersService.login(
      loginUserDto.email,
      loginUserDto.password,
      clientGeoIP.device_name,
      {
        ...clientGeoIP,
        token_signature: '',
      },
    );

    if (user instanceof Error)
      throw new HttpException(user.message, HttpStatus.FORBIDDEN);

    return this.usersParser.toJSON(user);
  }

  @Post('auth/validate')
  @UsePipes(new JoiValidationPipe(SessionValidateUserSchema))
  async sessionValidate(
    @Body() sessionValidateUserDto: SessionValidateUserDto,
    @Req() request: Request,
  ) {
    const clientGeoIP = await this.coreService.getClientGeoIP(request);

    const session = await this.usersService.sessionValidate(
      sessionValidateUserDto.id,
      sessionValidateUserDto.token_value,
      sessionValidateUserDto.token_signature,
      sessionValidateUserDto.token_revalidate_value,
      sessionValidateUserDto.token_revalidate_signature,
      clientGeoIP.device_name,
      {
        ...clientGeoIP,
        token_signature: sessionValidateUserDto.token_signature,
      },
    );

    if (session instanceof Error)
      throw new HttpException(session.message, HttpStatus.FORBIDDEN);

    return session;
  }

  @Post('auth/logout')
  @UsePipes(new JoiValidationPipe(LogoutUserSchema))
  async logout(@Body() logoutUserDto: LogoutUserDto) {
    const session = await this.usersService.logout(
      logoutUserDto.id,
      logoutUserDto.token_value,
    );

    if (session instanceof Error)
      throw new HttpException(session.message, HttpStatus.FORBIDDEN);

    return true;
  }

  @Get()
  async findAll(@Query('limit') limit: string, @Query('skip') skip: string) {
    return (
      await this.usersService.findAll(
        limit && parseInt(limit),
        skip && parseInt(skip),
      )
    ).map((user) => this.usersParser.toJSON(user));
  }

  @Get(':id')
  async findOne(@Param('id') id: string) {
    const user = await this.usersService.findOne(id);

    if (user instanceof Error)
      throw new HttpException(user.message, HttpStatus.FORBIDDEN);

    return this.usersParser.toJSON(user);
  }

  @Patch(':id')
  async update(
    @Param('id') id: string,
    @Body(new JoiValidationPipe(UpdateUserSchema))
    updateUserDto: UpdateUserDto,
  ) {
    const user = await this.usersService.update(id, updateUserDto);

    if (user instanceof Error)
      throw new HttpException(user.message, HttpStatus.FORBIDDEN);

    return this.usersParser.toJSON(user);
  }

  @Delete(':id')
  async delete(@Param('id') id: string) {
    const deleted = await this.usersService.delete(id);

    if (deleted instanceof Error)
      throw new HttpException(deleted.message, HttpStatus.FORBIDDEN);

    return deleted;
  }
}

Constants

Constant
Description

queuePool

This is a pool of queues that will be used by BullBoard.

getBullBoardQueues

This is a function that will be used by BullBoard to get the queues.

Last updated