Event-Driven Architecture Beyond the Basics: Patterns That Actually Work
Moving from 'events everywhere' chaos to reliable event-driven systems—lessons from hipages and VMware on dual writes, idempotency, sagas, and the patterns that separate working systems from distributed disasters.
Event-Driven Architecture Beyond the Basics: Patterns That Actually Work
We had just finished migrating hipages to microservices, and everything was events. Job posted? Event. Tradie matched? Event. Payment processed? Event. Quote accepted? You guessed it—event.
The theory was sound: loosely coupled services, independent scaling, and an audit trail of everything that happened in our marketplace. The reality? A 3 AM page because a tradie was charged twice for the same lead, duplicate notifications blowing up our SendGrid bill, and a data inconsistency so subtle it took three days to track down.
Event-driven architecture is powerful, but the basics—"publish an event when something happens"—aren't enough. At VMware, we learned this managing SD-WAN device configurations across distributed edge locations. At hipages, we learned it the hard way with financial transactions. Here's what you need to know before your events turn into distributed chaos.
The Naive Approach (And Why It Fails)
When teams first adopt event-driven architectures, they typically start like this:
// The naive implementation
@Injectable()
export class JobService {
constructor(
private readonly jobRepository: Repository<Job>,
private readonly eventBus: EventBus,
private readonly logger: Logger
) {}
async createJob(jobData: CreateJobDto): Promise<Job> {
// Save to database
const job = await this.jobRepository.save(jobData);
// Publish event
await this.eventBus.publish(new JobCreatedEvent(job));
return job;
}
}
Looks simple, right? But this code has a fatal flaw: what happens if the database commit succeeds but the event publish fails? Or worse, what if the service crashes between the two operations?
You've created a classic dual-write problem. The job exists in the database but no one knows about it. The tradie matching service never starts looking for tradies. The notification service never sends confirmation emails. Your system is now in an inconsistent state, and you'll only find out when someone asks "Why wasn't I notified about this job?"
Pattern 1: The Outbox Pattern
The outbox pattern solves the dual-write problem by making the database the single source of truth for both data and events.
How It Works
Instead of publishing directly to Kafka/RabbitMQ, you write events to an "outbox" table in the same database transaction as your business data:
// Database entities
@Entity()
export class Job {
@PrimaryGeneratedColumn('uuid')
id: string;
@Column()
title: string;
@Column()
customerId: string;
@Column({ type: 'enum', enum: JobStatus })
status: JobStatus;
@CreateDateColumn()
createdAt: Date;
}
@Entity()
export class OutboxEvent {
@PrimaryGeneratedColumn('increment')
id: number;
@Column()
aggregateType: string; // 'job', 'payment', 'tradie'
@Column()
aggregateId: string; // The job ID, payment ID, etc.
@Column()
eventType: string; // 'JobCreated', 'PaymentProcessed'
@Column('jsonb')
payload: any; // Event data
@Column({ default: 'pending' })
status: 'pending' | 'published' | 'failed';
@Column({ type: 'int', default: 0 })
retryCount: number;
@CreateDateColumn()
createdAt: Date;
@Column({ type: 'timestamp', nullable: true })
publishedAt: Date;
}
The Service Implementation
@Injectable()
export class JobService {
constructor(
private readonly dataSource: DataSource,
private readonly logger: Logger
) {}
async createJob(jobData: CreateJobDto): Promise<Job> {
return await this.dataSource.transaction(async (manager) => {
// 1. Save the job
const jobRepo = manager.getRepository(Job);
const job = await jobRepo.save(jobData);
// 2. Write event to outbox (same transaction!)
const outboxRepo = manager.getRepository(OutboxEvent);
await outboxRepo.save({
aggregateType: 'job',
aggregateId: job.id,
eventType: 'JobCreated',
payload: {
jobId: job.id,
title: job.title,
customerId: job.customerId,
status: job.status,
createdAt: job.createdAt
},
status: 'pending',
createdAt: new Date()
});
this.logger.log(`Job ${job.id} created, event written to outbox`);
return job;
});
// If either fails, the transaction rolls back. No inconsistency.
}
}
The Relay Process
A separate process polls the outbox table and publishes events to your message broker:
@Injectable()
export class OutboxRelayService {
constructor(
private readonly outboxRepository: Repository<OutboxEvent>,
private readonly kafkaProducer: KafkaProducer,
private readonly alertService: AlertService,
private readonly logger: Logger
) {}
@Interval(1000) // Run every second
async relayEvents(): Promise<void> {
const pendingEvents = await this.outboxRepository.find({
where: { status: 'pending' },
order: { id: 'ASC' },
take: 100 // Batch size
});
for (const event of pendingEvents) {
try {
await this.kafkaProducer.send({
topic: `${event.aggregateType}.${event.eventType}`,
messages: [{
key: event.aggregateId,
value: JSON.stringify({
eventId: event.id,
eventType: event.eventType,
aggregateId: event.aggregateId,
timestamp: event.createdAt,
payload: event.payload
})
}]
});
// Mark as published
await this.outboxRepository.update(event.id, {
status: 'published',
publishedAt: new Date()
});
this.logger.log(`Published event ${event.id} to Kafka`);
} catch (error) {
this.logger.error(`Failed to publish event ${event.id}`, error);
// Increment retry count
await this.outboxRepository.update(event.id, {
retryCount: event.retryCount + 1
});
// If max retries exceeded, mark as failed for manual review
if (event.retryCount >= 5) {
await this.outboxRepository.update(event.id, {
status: 'failed'
});
// Alert the team
await this.alertService.sendAlert({
severity: 'high',
message: `Event ${event.id} failed to publish after 5 retries`,
eventId: event.id
});
}
}
}
}
}
Why This Works
- Atomicity: The job and event are committed in the same transaction
- Durability: Events survive service crashes
- Ordering: Events are published in the order they were created (by ID)
- Observability: The outbox table is a built-in audit log
Pattern 2: Event Versioning and Schema Evolution
Your events are a public API. Changing them is a breaking change. Here's how we handled schema evolution at hipages:
Versioning Strategy
// Event envelope with versioning
interface DomainEvent<T> {
eventId: string;
eventType: string;
aggregateId: string;
version: number; // Schema version
timestamp: Date;
payload: T;
}
// Version 1 of the event
interface JobCreatedV1 {
jobId: string;
title: string;
customerId: string;
budget: number;
}
// Version 2 adds location
interface JobCreatedV2 {
jobId: string;
title: string;
customerId: string;
budget: number;
location: {
suburb: string;
state: string;
postcode: string;
};
}
The Versioned Publisher
@Injectable()
export class EventPublisher {
async publishJobCreated(job: Job): Promise<void> {
const event: DomainEvent<JobCreatedV2> = {
eventId: uuid(),
eventType: 'JobCreated',
aggregateId: job.id,
version: 2, // Current version
timestamp: new Date(),
payload: {
jobId: job.id,
title: job.title,
customerId: job.customerId,
budget: job.budget,
location: {
suburb: job.location.suburb,
state: job.location.state,
postcode: job.location.postcode
}
}
};
await this.outboxRepository.save({
aggregateType: 'job',
aggregateId: job.id,
eventType: 'JobCreated',
payload: event,
status: 'pending'
});
}
}
Backwards-Compatible Consumers
@Injectable()
export class JobNotificationHandler {
async handleJobCreated(event: DomainEvent<JobCreatedV2>): Promise<void> {
// Handle different versions
switch (event.version) {
case 1: {
const v1Payload = event.payload as JobCreatedV1;
await this.sendNotification({
jobId: v1Payload.jobId,
title: v1Payload.title,
location: 'Location not specified' // Default for v1
});
break;
}
case 2: {
const v2Payload = event.payload as JobCreatedV2;
await this.sendNotification({
jobId: v2Payload.jobId,
title: v2Payload.title,
location: `${v2Payload.location.suburb}, ${v2Payload.location.state}`
});
break;
}
default:
throw new Error(`Unsupported event version: ${event.version}`);
}
}
}
Event Registry
For larger systems, maintain an event registry:
# events.yaml
JobCreated:
versions:
1:
jobId: string
title: string
customerId: string
budget: number
2:
jobId: string
title: string
customerId: string
budget: number
location:
suburb: string
state: string
postcode: string
deprecated_fields: []
added_fields:
- location
PaymentProcessed:
versions:
1:
paymentId: string
amount: number
currency: string
status: string
This registry serves as:
- Documentation for all teams
- Contract tests between publishers and consumers
- Schema validation at build time
Pattern 3: Idempotency
In distributed systems, messages get delivered more than once. Network hiccups, consumer retries, and broker redeliveries all cause duplicates. Your consumers must be idempotent.
The Idempotency Key Pattern
// Event with idempotency key
interface PaymentProcessedEvent {
eventId: string; // Unique event ID (from producer)
idempotencyKey: string; // Business key for deduplication
paymentId: string;
amount: number;
customerId: string;
}
@Injectable()
export class PaymentNotificationHandler {
constructor(
private readonly processedEventsRepository: Repository<ProcessedEvent>,
private readonly notificationService: NotificationService
) {}
async handlePaymentProcessed(event: PaymentProcessedEvent): Promise<void> {
// Check if we've already processed this event
const alreadyProcessed = await this.processedEventsRepository.findOne({
where: { eventId: event.eventId }
});
if (alreadyProcessed) {
this.logger.log(`Event ${event.eventId} already processed, skipping`);
return;
}
// Or check by idempotency key for business-level dedup
const existingPayment = await this.processedEventsRepository.findOne({
where: { idempotencyKey: event.idempotencyKey }
});
if (existingPayment) {
this.logger.log(`Payment ${event.idempotencyKey} already processed`);
return;
}
// Process the event
await this.notificationService.sendPaymentConfirmation({
customerId: event.customerId,
amount: event.amount,
paymentId: event.paymentId
});
// Record that we processed it
await this.processedEventsRepository.save({
eventId: event.eventId,
idempotencyKey: event.idempotencyKey,
processedAt: new Date()
});
}
}
Database-Level Idempotency
Use unique constraints for stronger guarantees:
@Entity()
@Unique(['idempotencyKey'])
export class PaymentNotification {
@PrimaryGeneratedColumn('uuid')
id: string;
@Column()
@Index()
idempotencyKey: string;
@Column()
customerId: string;
@Column('decimal')
amount: number;
@CreateDateColumn()
sentAt: Date;
}
// Handler using upsert (TypeORM 0.3.0+ required)
async handlePaymentProcessed(event: PaymentProcessedEvent): Promise<void> {
try {
await this.paymentNotificationRepository.upsert({
idempotencyKey: event.idempotencyKey,
customerId: event.customerId,
amount: event.amount,
sentAt: new Date()
}, {
conflictPaths: ['idempotencyKey'],
skipUpdateIfNoValuesChanged: true
});
} catch (error) {
if (error.code === '23505') { // Unique constraint violation
this.logger.log(`Duplicate event ignored: ${event.idempotencyKey}`);
return;
}
throw error;
}
}
Pattern 4: The Saga Pattern for Distributed Transactions
When a business process spans multiple services, you need a saga—a sequence of local transactions where each step publishes an event that triggers the next step.
The Scenario: Job Booking Flow
At hipages, booking a job involved:
- Reserve lead credits from tradie account
- Create the job in job service
- Send notifications to tradie and customer
- Update analytics
If step 3 fails, we need to undo steps 1 and 2.
Choreography vs Orchestration
Choreography (event-driven, decentralized):
CreditService ──CreditsReserved──> JobService ──JobCreated──> NotificationService
^ │
└────────────────CreditsReleased (on failure)──────────────────┘
Orchestration (central coordinator):
BookingOrchestrator ──ReserveCredits──> CreditService
│
├──CreditsReserved──> CreateJob──> JobService
│
├──JobCreated──> SendNotifications──> NotificationService
│
└──(on failure)──> ReleaseCredits──> CreditService
Implementation: Orchestrated Saga
@Injectable()
export class BookingSagaOrchestrator {
constructor(
private readonly sagaRepository: Repository<SagaInstance>,
private readonly eventBus: EventBus,
private readonly logger: Logger
) {}
async startBookingSaga(bookingData: BookingRequest): Promise<string> {
const sagaId = uuid();
await this.sagaRepository.save({
id: sagaId,
status: 'pending',
steps: [
{ step: 'reserve_credits', status: 'pending', compensation: 'release_credits' },
{ step: 'create_job', status: 'pending', compensation: 'delete_job' },
{ step: 'send_notifications', status: 'pending', compensation: null }
],
data: bookingData,
createdAt: new Date()
});
// Start the saga
await this.executeStep(sagaId, 'reserve_credits', {
type: 'ReserveCreditsCommand',
payload: {
tradieId: bookingData.tradieId,
amount: bookingData.creditCost,
sagaId
}
});
return sagaId;
}
async handleEvent(event: SagaEvent): Promise<void> {
const saga = await this.sagaRepository.findOne({
where: { id: event.sagaId }
});
if (!saga) {
this.logger.warn(`Saga ${event.sagaId} not found`);
return;
}
if (event.type.endsWith('Succeeded')) {
await this.handleStepSuccess(saga, event);
} else if (event.type.endsWith('Failed')) {
await this.handleStepFailure(saga, event);
}
}
private async handleStepSuccess(saga: SagaInstance, event: SagaEvent): Promise<void> {
const currentStep = saga.steps.find(s => s.status === 'pending');
if (!currentStep) return;
currentStep.status = 'completed';
await this.sagaRepository.save(saga);
// Execute next step
switch (currentStep.step) {
case 'reserve_credits':
await this.executeStep(saga.id, 'create_job', {
type: 'CreateJobCommand',
payload: { ...saga.data, sagaId: saga.id }
});
break;
case 'create_job':
await this.executeStep(saga.id, 'send_notifications', {
type: 'SendNotificationsCommand',
payload: { ...saga.data, sagaId: saga.id, jobId: event.jobId }
});
break;
case 'send_notifications':
await this.completeSaga(saga);
break;
}
}
private async handleStepFailure(saga: SagaInstance, event: SagaEvent): Promise<void> {
this.logger.error(`Step failed in saga ${saga.id}`, event);
saga.status = 'compensating';
await this.sagaRepository.save(saga);
// Execute compensations in reverse order
const completedSteps = saga.steps
.filter(s => s.status === 'completed')
.reverse();
for (const step of completedSteps) {
if (step.compensation) {
await this.executeCompensation(saga, step);
}
}
saga.status = 'compensated';
await this.sagaRepository.save(saga);
}
private async executeCompensation(saga: SagaInstance, step: SagaStep): Promise<void> {
this.logger.log(`Executing compensation for step ${step.step}`);
switch (step.compensation) {
case 'release_credits':
await this.eventBus.publish({
type: 'ReleaseCreditsCommand',
payload: {
tradieId: saga.data.tradieId,
amount: saga.data.creditCost,
sagaId: saga.id,
reason: 'booking_failed'
}
});
break;
case 'delete_job':
await this.eventBus.publish({
type: 'DeleteJobCommand',
payload: {
jobId: saga.data.jobId,
sagaId: saga.id,
reason: 'booking_failed'
}
});
break;
}
}
}
When to Use Sagas
Use orchestration when:
- Complex workflows with many steps
- Need visibility into saga state
- Steps have strict ordering
- Team prefers central coordination
Use choreography when:
- Simple workflows
- Teams want high autonomy
- Steps can execute in parallel
- Services are truly independent
Pattern 5: Observability in Event-Driven Systems
Debugging event-driven systems is notoriously hard. Here's how we made them observable.
Correlation IDs and Distributed Tracing
// Middleware to extract or create correlation ID
@Injectable()
export class CorrelationIdMiddleware implements NestMiddleware {
use(req: Request, res: Response, next: NextFunction) {
const correlationId = req.headers['x-correlation-id'] || uuid();
req['correlationId'] = correlationId;
res.setHeader('X-Correlation-ID', correlationId);
// Store in AsyncLocalStorage for automatic propagation
asyncLocalStorage.run(new Map().set('correlationId', correlationId), () => {
next();
});
}
}
// Event producer includes correlation ID
async publishEvent(event: DomainEvent<any>): Promise<void> {
const correlationId = asyncLocalStorage.getStore()?.get('correlationId') || uuid();
await this.kafkaProducer.send({
topic: event.eventType,
messages: [{
key: event.aggregateId,
headers: {
'x-correlation-id': correlationId,
'x-event-id': event.eventId,
'x-timestamp': event.timestamp.toISOString()
},
value: JSON.stringify(event)
}]
});
}
// Event consumer extracts and uses correlation ID
@Injectable()
export class JobEventConsumer {
async consume(message: KafkaMessage): Promise<void> {
const correlationId = message.headers['x-correlation-id']?.toString() || uuid();
const eventId = message.headers['x-event-id']?.toString();
// Set up context for this processing
await asyncLocalStorage.run(new Map().set('correlationId', correlationId), async () => {
this.logger.log(`Processing event ${eventId}`, { correlationId });
// All logs within this handler will include correlationId
await this.handleEvent(JSON.parse(message.value.toString()));
});
}
}
Dead Letter Queues (DLQ)
Not all failures are retryable. Use DLQs to quarantine problematic messages:
@Injectable()
export class EventConsumer {
constructor(
private readonly dlqProducer: KafkaProducer,
private readonly metrics: MetricsService,
private readonly alertService: AlertService,
private readonly logger: Logger
) {}
async consume(message: KafkaMessage): Promise<void> {
try {
await this.processMessage(message);
} catch (error) {
this.logger.error('Failed to process message', error);
// Check if we should retry
const retryCount = this.getRetryCount(message);
if (retryCount < 3 && this.isRetryableError(error)) {
// Retry with delay
await this.scheduleRetry(message, retryCount + 1);
} else {
// Send to DLQ
await this.sendToDLQ(message, error, retryCount);
}
}
}
private async sendToDLQ(
message: KafkaMessage,
error: Error,
retryCount: number
): Promise<void> {
await this.dlqProducer.send({
topic: 'events.dlq',
messages: [{
key: message.key,
value: JSON.stringify({
originalMessage: message.value.toString(),
originalTopic: message.topic,
error: {
message: error.message,
stack: error.stack,
type: error.constructor.name
},
retryCount,
failedAt: new Date().toISOString(),
correlationId: message.headers['x-correlation-id']?.toString()
})
}]
});
// Alert the team
await this.alertService.sendAlert({
severity: 'high',
message: `Message sent to DLQ: ${message.topic}`,
correlationId: message.headers['x-correlation-id']?.toString(),
error: error.message
});
// Increment metric
this.metrics.increment('events.dlq.messages', {
topic: message.topic,
error_type: error.constructor.name
});
}
}
Monitoring Dashboards
Track these metrics religiously:
// Key metrics to track
interface EventMetrics {
// Publishing metrics
eventsPublished: Counter;
publishLatency: Histogram;
publishErrors: Counter;
// Consumption metrics
eventsConsumed: Counter;
consumptionLatency: Histogram;
consumerLag: Gauge; // How far behind real-time
// Error metrics
processingErrors: Counter;
dlqMessages: Counter;
retryAttempts: Counter;
// Business metrics
sagaCompletionTime: Histogram;
sagaCompensations: Counter;
outboxLag: Gauge; // Time between creation and publishing
}
When NOT to Use Event-Driven Architecture
Event-driven isn't always the answer:
Don't use events when:
- You need immediate consistency: If the client needs to know the result right now, use synchronous calls
- Simple CRUD operations: A basic user profile update doesn't need an event
- Low volume: The overhead isn't worth it for <100 events/day
- Debugging is critical: Event-driven systems are harder to debug than synchronous ones
- Team isn't ready: Without proper monitoring and patterns, you'll create chaos
Use events when:
- You need loose coupling: Services shouldn't know about each other
- Audit trails matter: You need to know everything that happened
- Multiple consumers: Several services care about the same event
- Async processing: Work can happen in the background
- High volume: You need to buffer and process asynchronously
Lessons Learned
1. Start with Outbox, Not Events
If you only implement one pattern, make it the outbox. It's the difference between "mostly works" and "reliably works."
2. Version Your Events from Day 1
It's easier to add versioning from the start than to retrofit it later. Make it mandatory in your event schema.
3. Assume Duplicates
Your consumers WILL see duplicate events. Build idempotency in from the start, not as an afterthought.
4. Sagas Are Last Resort
Distributed transactions are complex. If you can redesign to avoid them, do. When you can't, use sagas, but document them extensively.
5. Observability Is Non-Negotiable
Without correlation IDs and DLQs, debugging event-driven systems is nearly impossible. Invest in observability early.
6. Document Your Events
Your events are APIs. Document them with the same rigor as your REST endpoints. Include examples, version history, and consumer information.
Best Practices Checklist
- Implement outbox pattern for all event publishing
- Add versioning to every event schema
- Build idempotency into all consumers (database-level preferred)
- Use correlation IDs for distributed tracing
- Implement DLQs for non-retryable failures
- Monitor consumer lag and set up alerts
- Document all events in a central registry
- Test failure scenarios (network failures, duplicates, ordering issues)
- Keep events small (<1MB) to avoid broker issues
- Use schema validation (Avro/Protobuf) for type safety
- Implement circuit breakers for external service calls within consumers
- Archive old events (don't keep everything in Kafka forever)
Conclusion
Event-driven architecture is powerful but dangerous. The patterns in this post—outbox, versioning, idempotency, sagas, and observability—are what separate systems that work from systems that fail at 3 AM.
At hipages, these patterns helped us handle millions of events daily while maintaining data consistency across our marketplace. At VMware, they ensured SD-WAN device configurations propagated reliably across distributed networks.
The key insight: events aren't just "fire and forget." They're commitments. Treat them with the same care as database transactions, and your event-driven system will be reliable, observable, and maintainable.
Want to discuss event-driven patterns? Connect with me on LinkedIn or email me.