Queue and Messaging
The concept of the queue is to allow messages to be sent using connections an queue.
connection vs queue
concept of Connection is like a server/service that can handle multiple queues. Think of RabbitMQ server that can handle multiple queues.
queue is a channel inside a connection that can handle messages.
Configuration
// app/config/queues.ts
export default {
queues: {
default: {
type: "database",
},
},
};
How messaging works
unlike traditional approaches where you send string. Pashmak Queue works with objects:
import { queue } from '@devbro/pashmak/facades';
import { QueueMessageInterface } from '@devbro/pashmak/queue';
export class IncomingWebhookMessage implements QueueMessageInterface {
/**
* Method to convert Message to string.
* keep in mind this string should be able to be converted back to Message object.
*/
async getMessage(): Promise<string> {
return JSON.stringify({...this.something, ...this.somethingElse});
}
/**
* Method to set message from string or object.
*/
async setMessage(value: string | Record<string, any>): Promise<void> {
???
this.something = ???;
this.somethingElse = ???;
}
/**
* Method to validate message before processing.
* return true if message is valid, false otherwise.
* It is to prevent processing invalid messages.
*/
async validateMessage(): Promise<Boolean> {
return true;
}
async processMessage(): Promise<void> {
// process the message here
}
/**
* Send the message to a queue.
*/
async dispatch() {
queue().dispatch('stripe_incoming_webhook', this);
}
}
Available drivers:
Database
Uses database to store messages. This is the default driver and requires a table in your database.
- npm
- Yarn
- pnpm
# Create a migration for queue table
pashmak generate queue migration
# Then run migrations
pashmak migrate
# Create a migration for queue table
pashmak generate queue migration
# Then run migrations
pashmak migrate
# Create a migration for queue table
pashmak generate queue migration
# Then run migrations
pashmak migrate
// app/config/queues.ts
import { DatabaseTransportConfig } from '@devbro/pashmak/queue';
export default {
default: {
provider: "database",
config: {
???
} as DatabaseTransportConfig,
},
};
Memory (In-Memory)
The memory transport stores messages in memory and is ideal for development, testing, or simple applications that don't require persistence.
// app/config/queues.ts
export default {
queues: {
default: {
provider: "memory",
config: {
interval: 10_000, // Interval to check for new messages (ms)
},
},
},
};
Async Transport
The async transport processes messages asynchronously within the application, allowing for non-blocking operations and improved performance. Unlike memory transport, it will execute messages right away without waiting for an interval. Ideal for testing and development.
// app/config/queues.ts
import { AsyncTransportConfig } from '@devbro/pashmak/queue';
export default {
default: {
provider: "async",
config: {
???
} as AsyncTransportConfig,
},
};
SQS (Amazon Simple Queue Service)
Uses AWS SQS service for reliable, scalable message queuing. Requires AWS credentials and proper IAM permissions.
// app/config/queues.ts
import { AwsSqsTransportConfig } from '@devbro/pashmak/queue';
export default {
default: {
provider: "sqs",
config: {
???
} as AwsSqsTransportConfig,
},
};
AMQP (RabbitMQ, BullMQ)
Uses AMQP protocol to connect to RabbitMQ or other AMQP-compatible message brokers. Provides advanced routing and exchange patterns.
// app/config/queues.ts
import { AmqpTransportConfig } from '@devbro/pashmak/queue';
export default {
default: {
provider: "amqp",
config: {
???
} as AmqpTransportConfig,
},
};
Redis
Uses Redis lists and pub/sub for fast, lightweight message queuing with optional retry and dead-letter queue support.
// app/config/queues.ts
import { RedisTransportConfig } from '@devbro/pashmak/queue';
export default {
default: {
provider: "redis",
config: {
???
} as RedisTransportConfig,
},
};
Google Cloud Pub/Sub
Uses Google Cloud Pub/Sub for globally distributed message queuing with automatic scaling and high availability.
// app/config/queues.ts
import { GooglePubSubTransportConfig } from '@devbro/pashmak/queue';
export default {
default: {
type: "pubsub",
config: {
???
} as GooglePubSubTransportConfig,
},
};
Azure Service Bus
Uses Azure Service Bus for enterprise-grade message queuing with advanced features like sessions, transactions, and message scheduling.
// app/config/queues.ts
import { AzureServiceBusTransportConfig } from '@devbro/pashmak/queue';
export default {
default: {
provider: "azure_service_bus",
config: {
???
} as AzureServiceBusTransportConfig,
},
};
Registering your own Provider
You can create custom queue transports by implementing the QueueTransportInterface:
import { QueueTransportInterface } from "@devbro/pashmak/queue";
export class CustomTransport implements QueueTransportInterface {
/**
* Dispatch a message to a specific channel
*/
async dispatch(channel: string, message: string): Promise<void> {
// Your implementation
}
/**
* Register a listener callback for a channel
*/
async registerListener(
channel: string,
callback: (message: string) => Promise<void>,
): Promise<void> {
// Your implementation
}
/**
* Start listening for messages on all registered channels
*/
async startListening(): Promise<void> {
// Your implementation
}
/**
* Stop listening and cleanup resources
*/
async stopListening(): Promise<void> {
// Your implementation
}
}
Then register your custom transport in the queue factory:
import { QueueTransportFactory } from "@devbro/pashmak/queue";
import { CustomTransport } from "./CustomTransport";
// Register your transport
QueueTransportFactory.register("custom", (config) => {
return new CustomTransport(config);
});
// Use it in your config
export default {
queues: {
default: {
type: "custom",
// Your custom configuration
},
},
};
Creating your own Transport
You can create custom queue transports by implementing the QueueTransportInterface:
import { QueueTransportInterface } from "@devbro/pashmak/queue";
export class CustomTransport implements QueueTransportInterface {
/**
* Dispatch a message to a specific channel
*/
async dispatch(channel: string, message: string): Promise<void> {
// Your implementation
}
/**
* Register a listener callback for a channel
*/
async registerListener(
channel: string,
callback: (message: string) => Promise<void>,
): Promise<void> {
// Your implementation
}
/**
* Start listening for messages on all registered channels
*/
async startListening(): Promise<void> {
// Your implementation
}
/**
* Stop listening and cleanup resources
*/
async stopListening(): Promise<void> {
// Your implementation
}
}
Then register your custom transport in the queue factory:
import { QueueTransportFactory } from "@devbro/pashmak/queue";
import { CustomTransport } from "./CustomTransport";
// Register your transport
QueueTransportFactory.register("custom", (config) => {
return new CustomTransport(config);
});
and refer to it in your configuration:
export default {
default: {
provider: "custom",
config: {
// Your custom configuration
},
},
};