feat: Add production-ready utilities and performance improvements

Security & Data Integrity:
- Centralized transaction helper with deadlock retry (exponential backoff)
- SafeQueryBuilder for safe parameterized queries
- Zod-based input validation middleware
- Audit logging to Outline's events table

Performance:
- Cursor-based pagination for large datasets
- Pool monitoring with configurable alerts
- Database index migrations for optimal query performance

Changes:
- Refactored bulk-operations, desk-sync, export-import to use centralized transaction helper
- Added 7 new utility modules (audit, monitoring, pagination, query-builder, transaction, validation)
- Created migrations/001_indexes.sql with 40+ recommended indexes

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-31 15:23:32 +00:00
parent 7c83a9e168
commit b4ba42cbf1
13 changed files with 2240 additions and 57 deletions

View File

@@ -4,27 +4,9 @@
* @author Descomplicar® | @link descomplicar.pt | @copyright 2026
*/
import { Pool, PoolClient } from 'pg';
import { BaseTool, ToolResponse } from '../types/tools.js';
import { isValidUUID } from '../utils/security.js';
/**
* Execute operations within a transaction
*/
async function withTransaction<T>(pool: Pool, callback: (client: PoolClient) => Promise<T>): Promise<T> {
const client = await pool.connect();
try {
await client.query('BEGIN');
const result = await callback(client);
await client.query('COMMIT');
return result;
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
import { withTransaction } from '../utils/transaction.js';
/**
* bulk.archive_documents - Archive multiple documents

View File

@@ -4,27 +4,9 @@
* @author Descomplicar® | @link descomplicar.pt | @copyright 2026
*/
import { Pool, PoolClient } from 'pg';
import { BaseTool, ToolResponse } from '../types/tools.js';
import { isValidUUID, sanitizeInput } from '../utils/security.js';
/**
* Execute operations within a transaction
*/
async function withTransaction<T>(pool: Pool, callback: (client: PoolClient) => Promise<T>): Promise<T> {
const client = await pool.connect();
try {
await client.query('BEGIN');
const result = await callback(client);
await client.query('COMMIT');
return result;
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
import { withTransaction } from '../utils/transaction.js';
interface CreateDeskProjectDocArgs {
collection_id: string;

View File

@@ -4,27 +4,9 @@
* @author Descomplicar® | @link descomplicar.pt | @copyright 2026
*/
import { Pool, PoolClient } from 'pg';
import { BaseTool, ToolResponse } from '../types/tools.js';
import { isValidUUID, sanitizeInput } from '../utils/security.js';
/**
* Execute operations within a transaction
*/
async function withTransaction<T>(pool: Pool, callback: (client: PoolClient) => Promise<T>): Promise<T> {
const client = await pool.connect();
try {
await client.query('BEGIN');
const result = await callback(client);
await client.query('COMMIT');
return result;
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
import { withTransaction } from '../utils/transaction.js';
interface ExportCollectionArgs {
collection_id: string;

334
src/utils/audit.ts Normal file
View File

@@ -0,0 +1,334 @@
/**
* MCP Outline PostgreSQL - Audit Logging
* Automatic logging of write operations to events table
* @author Descomplicar® | @link descomplicar.pt | @copyright 2026
*/
import { Pool, PoolClient } from 'pg';
import { BaseTool, ToolResponse } from '../types/tools.js';
import { logger } from './logger.js';
/**
* Audit log entry structure
*/
export interface AuditLogEntry {
/** User ID performing the action (optional for MCP) */
userId?: string;
/** Action/event name (e.g., 'documents.create', 'collections.delete') */
action: string;
/** Type of resource (e.g., 'document', 'collection', 'user') */
resourceType: string;
/** ID of the affected resource */
resourceId: string;
/** Team ID (workspace) */
teamId?: string;
/** Additional metadata */
metadata?: Record<string, any>;
/** IP address (optional) */
ip?: string;
}
/**
* Event names mapping for different operations
*/
export const AuditEvents = {
// Documents
DOCUMENT_CREATE: 'documents.create',
DOCUMENT_UPDATE: 'documents.update',
DOCUMENT_DELETE: 'documents.delete',
DOCUMENT_ARCHIVE: 'documents.archive',
DOCUMENT_RESTORE: 'documents.restore',
DOCUMENT_MOVE: 'documents.move',
DOCUMENT_PUBLISH: 'documents.publish',
DOCUMENT_UNPUBLISH: 'documents.unpublish',
// Collections
COLLECTION_CREATE: 'collections.create',
COLLECTION_UPDATE: 'collections.update',
COLLECTION_DELETE: 'collections.delete',
COLLECTION_ADD_USER: 'collections.add_user',
COLLECTION_REMOVE_USER: 'collections.remove_user',
COLLECTION_ADD_GROUP: 'collections.add_group',
COLLECTION_REMOVE_GROUP: 'collections.remove_group',
// Users
USER_CREATE: 'users.create',
USER_UPDATE: 'users.update',
USER_DELETE: 'users.delete',
USER_SUSPEND: 'users.suspend',
USER_ACTIVATE: 'users.activate',
USER_PROMOTE: 'users.promote',
USER_DEMOTE: 'users.demote',
// Groups
GROUP_CREATE: 'groups.create',
GROUP_UPDATE: 'groups.update',
GROUP_DELETE: 'groups.delete',
GROUP_ADD_USER: 'groups.add_user',
GROUP_REMOVE_USER: 'groups.remove_user',
// Comments
COMMENT_CREATE: 'comments.create',
COMMENT_UPDATE: 'comments.update',
COMMENT_DELETE: 'comments.delete',
// Shares
SHARE_CREATE: 'shares.create',
SHARE_REVOKE: 'shares.revoke',
// Bulk operations
BULK_ARCHIVE: 'bulk.archive',
BULK_DELETE: 'bulk.delete',
BULK_MOVE: 'bulk.move',
BULK_RESTORE: 'bulk.restore',
// API Keys
API_KEY_CREATE: 'api_keys.create',
API_KEY_DELETE: 'api_keys.delete',
// Webhooks
WEBHOOK_CREATE: 'webhooks.create',
WEBHOOK_UPDATE: 'webhooks.update',
WEBHOOK_DELETE: 'webhooks.delete',
// Integrations
INTEGRATION_CREATE: 'integrations.create',
INTEGRATION_UPDATE: 'integrations.update',
INTEGRATION_DELETE: 'integrations.delete',
} as const;
export type AuditEvent = typeof AuditEvents[keyof typeof AuditEvents];
/**
* Log an audit event to the database
*/
export async function logAudit(
pool: Pool | PoolClient,
entry: AuditLogEntry
): Promise<void> {
try {
await pool.query(
`INSERT INTO events (
id, name, "actorId", "modelId", "teamId", data, ip, "createdAt"
) VALUES (
gen_random_uuid(), $1, $2, $3, $4, $5, $6, NOW()
)`,
[
entry.action,
entry.userId || null,
entry.resourceId,
entry.teamId || null,
JSON.stringify({
resourceType: entry.resourceType,
source: 'mcp-outline-postgresql',
...entry.metadata,
}),
entry.ip || null,
]
);
logger.debug('Audit log created', {
action: entry.action,
resourceType: entry.resourceType,
resourceId: entry.resourceId,
});
} catch (error) {
// Don't fail the operation if audit logging fails
logger.error('Failed to create audit log', {
error: error instanceof Error ? error.message : String(error),
entry,
});
}
}
/**
* Log multiple audit events in a batch
*/
export async function logAuditBatch(
pool: Pool | PoolClient,
entries: AuditLogEntry[]
): Promise<void> {
if (entries.length === 0) return;
try {
const values: any[] = [];
const placeholders: string[] = [];
let paramIndex = 1;
for (const entry of entries) {
placeholders.push(
`(gen_random_uuid(), $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, NOW())`
);
values.push(
entry.action,
entry.userId || null,
entry.resourceId,
entry.teamId || null,
JSON.stringify({
resourceType: entry.resourceType,
source: 'mcp-outline-postgresql',
...entry.metadata,
}),
entry.ip || null
);
}
await pool.query(
`INSERT INTO events (id, name, "actorId", "modelId", "teamId", data, ip, "createdAt")
VALUES ${placeholders.join(', ')}`,
values
);
logger.debug('Audit log batch created', { count: entries.length });
} catch (error) {
logger.error('Failed to create audit log batch', {
error: error instanceof Error ? error.message : String(error),
count: entries.length,
});
}
}
/**
* Extract resource type from tool name
*/
function extractResourceType(toolName: string): string {
// outline_list_documents -> documents
// outline_create_collection -> collection
const parts = toolName.replace('outline_', '').split('_');
if (parts.length >= 2) {
// Last part is usually the resource type (pluralized)
const lastPart = parts[parts.length - 1];
// Remove trailing 's' for singular
return lastPart.endsWith('s') ? lastPart.slice(0, -1) : lastPart;
}
return 'unknown';
}
/**
* Extract action from tool name
*/
function extractAction(toolName: string): string {
// outline_create_document -> documents.create
const parts = toolName.replace('outline_', '').split('_');
if (parts.length >= 2) {
const action = parts[0]; // create, update, delete, list, etc.
const resource = parts.slice(1).join('_'); // document, collection, etc.
return `${resource}.${action}`;
}
return toolName;
}
/**
* Check if a tool performs a write operation
*/
function isWriteOperation(toolName: string): boolean {
const writeActions = [
'create', 'insert', 'add',
'update', 'edit', 'modify', 'set',
'delete', 'remove', 'destroy',
'archive', 'restore',
'suspend', 'activate',
'promote', 'demote',
'publish', 'unpublish',
'move', 'transfer',
'grant', 'revoke',
'bulk',
];
const lowerName = toolName.toLowerCase();
return writeActions.some((action) => lowerName.includes(action));
}
/**
* Options for audit log middleware
*/
export interface AuditLogOptions<T> {
/** Function to extract resource info from args */
getResourceInfo?: (args: T, result: any) => { type: string; id: string; teamId?: string };
/** Custom action name (defaults to extracted from tool name) */
action?: string;
/** Whether to log this operation (defaults to checking if write operation) */
shouldLog?: (args: T) => boolean;
/** Extract additional metadata */
getMetadata?: (args: T, result: any) => Record<string, any>;
}
/**
* Middleware to automatically add audit logging to a tool
*/
export function withAuditLog<T extends Record<string, any>>(
tool: BaseTool<T>,
options?: AuditLogOptions<T>
): BaseTool<T> {
const originalHandler = tool.handler;
return {
...tool,
handler: async (args, pgClient): Promise<ToolResponse> => {
// Execute the original handler
const result = await originalHandler(args, pgClient);
// Determine if we should log this operation
const shouldLog = options?.shouldLog?.(args) ?? isWriteOperation(tool.name);
if (shouldLog) {
try {
// Extract resource info
let resourceInfo: { type: string; id: string; teamId?: string };
if (options?.getResourceInfo) {
const parsed = JSON.parse(result.content[0].text);
resourceInfo = options.getResourceInfo(args, parsed);
} else {
// Default: try to extract from args or result
resourceInfo = {
type: extractResourceType(tool.name),
id: (args as any).id ||
(args as any).document_id ||
(args as any).collection_id ||
(args as any).user_id ||
'unknown',
};
}
// Get additional metadata
let metadata: Record<string, any> | undefined;
if (options?.getMetadata) {
const parsed = JSON.parse(result.content[0].text);
metadata = options.getMetadata(args, parsed);
}
// Log the audit event
await logAudit(pgClient, {
action: options?.action || extractAction(tool.name),
resourceType: resourceInfo.type,
resourceId: resourceInfo.id,
teamId: resourceInfo.teamId,
metadata,
});
} catch (error) {
// Don't fail the operation if audit logging fails
logger.error('Audit log middleware failed', {
tool: tool.name,
error: error instanceof Error ? error.message : String(error),
});
}
}
return result;
},
};
}
/**
* Create an audit logger for a specific team/workspace
*/
export function createTeamAuditLogger(pool: Pool, teamId: string) {
return {
log: (entry: Omit<AuditLogEntry, 'teamId'>) =>
logAudit(pool, { ...entry, teamId }),
logBatch: (entries: Omit<AuditLogEntry, 'teamId'>[]) =>
logAuditBatch(pool, entries.map((e) => ({ ...e, teamId }))),
};
}

View File

@@ -5,3 +5,9 @@
export * from './logger.js';
export * from './security.js';
export * from './query-builder.js';
export * from './validation.js';
export * from './audit.js';
export * from './transaction.js';
export * from './monitoring.js';
export * from './pagination.js';

285
src/utils/monitoring.ts Normal file
View File

@@ -0,0 +1,285 @@
/**
* MCP Outline PostgreSQL - Pool Monitoring
* Connection pool monitoring and alerting
* @author Descomplicar® | @link descomplicar.pt | @copyright 2026
*/
import { Pool } from 'pg';
import { logger } from './logger.js';
/**
* Pool statistics snapshot
*/
export interface PoolStats {
/** Total number of clients in the pool */
totalCount: number;
/** Number of idle clients */
idleCount: number;
/** Number of clients waiting for a connection */
waitingCount: number;
/** Timestamp of the snapshot */
timestamp: Date;
/** Pool utilization percentage (0-100) */
utilization: number;
/** Whether the pool is healthy */
healthy: boolean;
}
/**
* Monitoring configuration options
*/
export interface MonitoringConfig {
/** Interval in milliseconds between stats checks (default: 60000 = 1 minute) */
interval?: number;
/** Utilization threshold for warnings (default: 80%) */
warningThreshold?: number;
/** Utilization threshold for critical alerts (default: 95%) */
criticalThreshold?: number;
/** Number of waiting connections to trigger alert (default: 5) */
maxWaitingCount?: number;
/** Enable detailed logging (default: false) */
verbose?: boolean;
/** Callback for custom alerting */
onAlert?: (stats: PoolStats, severity: 'warning' | 'critical') => void;
}
const DEFAULT_CONFIG: Required<Omit<MonitoringConfig, 'onAlert'>> = {
interval: 60000, // 1 minute
warningThreshold: 80,
criticalThreshold: 95,
maxWaitingCount: 5,
verbose: false,
};
/**
* Pool monitor class for tracking connection pool health
*/
export class PoolMonitor {
private pool: Pool;
private config: Required<Omit<MonitoringConfig, 'onAlert'>> & Pick<MonitoringConfig, 'onAlert'>;
private intervalId: NodeJS.Timeout | null = null;
private statsHistory: PoolStats[] = [];
private maxHistoryLength = 60; // Keep 1 hour of stats at 1-minute intervals
constructor(pool: Pool, config?: MonitoringConfig) {
this.pool = pool;
this.config = { ...DEFAULT_CONFIG, ...config };
}
/**
* Start monitoring the pool
*/
start(): void {
if (this.intervalId) {
logger.warn('Pool monitor already running');
return;
}
logger.info('Starting pool monitor', {
interval: this.config.interval,
warningThreshold: this.config.warningThreshold,
criticalThreshold: this.config.criticalThreshold,
});
this.intervalId = setInterval(() => {
this.checkPool();
}, this.config.interval);
// Run initial check
this.checkPool();
}
/**
* Stop monitoring the pool
*/
stop(): void {
if (this.intervalId) {
clearInterval(this.intervalId);
this.intervalId = null;
logger.info('Pool monitor stopped');
}
}
/**
* Get current pool statistics
*/
getStats(): PoolStats {
const totalCount = this.pool.totalCount;
const idleCount = this.pool.idleCount;
const waitingCount = this.pool.waitingCount;
// Calculate utilization
// Active connections = total - idle
const activeCount = totalCount - idleCount;
const utilization = totalCount > 0 ? Math.round((activeCount / totalCount) * 100) : 0;
// Determine health status
const healthy = utilization < this.config.criticalThreshold && waitingCount < this.config.maxWaitingCount;
return {
totalCount,
idleCount,
waitingCount,
timestamp: new Date(),
utilization,
healthy,
};
}
/**
* Get statistics history
*/
getHistory(): PoolStats[] {
return [...this.statsHistory];
}
/**
* Get average statistics over the history period
*/
getAverageStats(): { avgUtilization: number; avgWaiting: number; healthyPercentage: number } {
if (this.statsHistory.length === 0) {
return { avgUtilization: 0, avgWaiting: 0, healthyPercentage: 100 };
}
const sum = this.statsHistory.reduce(
(acc, stat) => ({
utilization: acc.utilization + stat.utilization,
waiting: acc.waiting + stat.waitingCount,
healthy: acc.healthy + (stat.healthy ? 1 : 0),
}),
{ utilization: 0, waiting: 0, healthy: 0 }
);
return {
avgUtilization: Math.round(sum.utilization / this.statsHistory.length),
avgWaiting: Math.round((sum.waiting / this.statsHistory.length) * 10) / 10,
healthyPercentage: Math.round((sum.healthy / this.statsHistory.length) * 100),
};
}
/**
* Check pool and log/alert if needed
*/
private checkPool(): void {
const stats = this.getStats();
// Add to history
this.statsHistory.push(stats);
if (this.statsHistory.length > this.maxHistoryLength) {
this.statsHistory.shift();
}
// Verbose logging
if (this.config.verbose) {
logger.debug('Pool stats', {
total: stats.totalCount,
idle: stats.idleCount,
waiting: stats.waitingCount,
utilization: `${stats.utilization}%`,
healthy: stats.healthy,
});
}
// Check for alerts
if (stats.utilization >= this.config.criticalThreshold) {
logger.error('CRITICAL: Pool saturation detected', {
utilization: `${stats.utilization}%`,
waiting: stats.waitingCount,
total: stats.totalCount,
idle: stats.idleCount,
});
this.config.onAlert?.(stats, 'critical');
} else if (stats.utilization >= this.config.warningThreshold) {
logger.warn('WARNING: High pool utilization', {
utilization: `${stats.utilization}%`,
waiting: stats.waitingCount,
total: stats.totalCount,
idle: stats.idleCount,
});
this.config.onAlert?.(stats, 'warning');
} else if (stats.waitingCount >= this.config.maxWaitingCount) {
logger.warn('WARNING: High number of waiting connections', {
waiting: stats.waitingCount,
utilization: `${stats.utilization}%`,
total: stats.totalCount,
});
this.config.onAlert?.(stats, 'warning');
}
}
}
/**
* Create and start a pool monitor
*/
export function monitorPool(pool: Pool, config?: MonitoringConfig): PoolMonitor {
const monitor = new PoolMonitor(pool, config);
monitor.start();
return monitor;
}
/**
* Quick pool health check (no monitoring, just current status)
*/
export function checkPoolHealth(pool: Pool, config?: Pick<MonitoringConfig, 'criticalThreshold' | 'maxWaitingCount'>): {
healthy: boolean;
stats: PoolStats;
issues: string[];
} {
const criticalThreshold = config?.criticalThreshold ?? 95;
const maxWaitingCount = config?.maxWaitingCount ?? 5;
const totalCount = pool.totalCount;
const idleCount = pool.idleCount;
const waitingCount = pool.waitingCount;
const activeCount = totalCount - idleCount;
const utilization = totalCount > 0 ? Math.round((activeCount / totalCount) * 100) : 0;
const issues: string[] = [];
if (utilization >= criticalThreshold) {
issues.push(`Pool utilization at ${utilization}% (threshold: ${criticalThreshold}%)`);
}
if (waitingCount >= maxWaitingCount) {
issues.push(`${waitingCount} connections waiting (threshold: ${maxWaitingCount})`);
}
if (totalCount === 0) {
issues.push('No connections in pool');
}
const stats: PoolStats = {
totalCount,
idleCount,
waitingCount,
timestamp: new Date(),
utilization,
healthy: issues.length === 0,
};
return {
healthy: issues.length === 0,
stats,
issues,
};
}
/**
* Log pool stats once (useful for debugging)
*/
export function logPoolStats(pool: Pool): void {
const health = checkPoolHealth(pool);
logger.info('Pool status', {
total: health.stats.totalCount,
idle: health.stats.idleCount,
active: health.stats.totalCount - health.stats.idleCount,
waiting: health.stats.waitingCount,
utilization: `${health.stats.utilization}%`,
healthy: health.healthy,
issues: health.issues.length > 0 ? health.issues : undefined,
});
}

327
src/utils/pagination.ts Normal file
View File

@@ -0,0 +1,327 @@
/**
* MCP Outline PostgreSQL - Cursor-Based Pagination
* Efficient pagination using cursors instead of OFFSET
* @author Descomplicar® | @link descomplicar.pt | @copyright 2026
*/
import { Pool, QueryResultRow } from 'pg';
import { logger } from './logger.js';
/**
* Cursor pagination input arguments
*/
export interface CursorPaginationArgs {
/** Maximum number of items to return (default: 25, max: 100) */
limit?: number;
/** Cursor for next page (base64 encoded) */
cursor?: string;
/** Direction of pagination (default: 'desc') */
direction?: 'asc' | 'desc';
}
/**
* Cursor pagination result
*/
export interface CursorPaginationResult<T> {
/** Array of items */
items: T[];
/** Cursor for the next page (null if no more pages) */
nextCursor: string | null;
/** Cursor for the previous page (null if first page) */
prevCursor: string | null;
/** Whether there are more items */
hasMore: boolean;
/** Total count (optional, requires additional query) */
totalCount?: number;
}
/**
* Cursor data structure (encoded in base64)
*/
interface CursorData {
/** Value of the cursor field */
v: string | number;
/** Direction for this cursor */
d: 'asc' | 'desc';
/** Secondary sort field value (for stable sorting) */
s?: string;
}
/**
* Encode cursor data to base64
*/
export function encodeCursor(data: CursorData): string {
return Buffer.from(JSON.stringify(data)).toString('base64url');
}
/**
* Decode cursor from base64
*/
export function decodeCursor(cursor: string): CursorData | null {
try {
const decoded = Buffer.from(cursor, 'base64url').toString('utf-8');
return JSON.parse(decoded) as CursorData;
} catch (error) {
logger.warn('Invalid cursor', { cursor, error: (error as Error).message });
return null;
}
}
/**
* Options for cursor-based pagination
*/
export interface PaginateOptions {
/** The field to use for cursor (default: 'createdAt') */
cursorField?: string;
/** Secondary sort field for stability (default: 'id') */
secondaryField?: string;
/** Whether to include total count (requires extra query) */
includeTotalCount?: boolean;
/** Default limit if not specified */
defaultLimit?: number;
/** Maximum allowed limit */
maxLimit?: number;
}
const DEFAULT_OPTIONS: Required<PaginateOptions> = {
cursorField: 'createdAt',
secondaryField: 'id',
includeTotalCount: false,
defaultLimit: 25,
maxLimit: 100,
};
/**
* Build cursor-based pagination query parts
*
* @param args - Pagination arguments
* @param options - Pagination options
* @returns Query parts (WHERE clause, ORDER BY, LIMIT, parameters)
*/
export function buildCursorQuery(
args: CursorPaginationArgs,
options?: PaginateOptions
): {
cursorCondition: string;
orderBy: string;
limit: number;
params: any[];
paramIndex: number;
} {
const opts = { ...DEFAULT_OPTIONS, ...options };
const direction = args.direction || 'desc';
const limit = Math.min(Math.max(1, args.limit || opts.defaultLimit), opts.maxLimit);
const params: any[] = [];
let paramIndex = 1;
let cursorCondition = '';
// Parse cursor if provided
if (args.cursor) {
const cursorData = decodeCursor(args.cursor);
if (cursorData) {
// Build cursor condition with secondary field for stability
const op = direction === 'desc' ? '<' : '>';
if (cursorData.s) {
// Compound cursor: (cursorField, secondaryField) comparison
cursorCondition = `("${opts.cursorField}", "${opts.secondaryField}") ${op} ($${paramIndex}, $${paramIndex + 1})`;
params.push(cursorData.v, cursorData.s);
paramIndex += 2;
} else {
// Simple cursor
cursorCondition = `"${opts.cursorField}" ${op} $${paramIndex}`;
params.push(cursorData.v);
paramIndex += 1;
}
}
}
// Build ORDER BY
const orderDirection = direction.toUpperCase();
const orderBy = `"${opts.cursorField}" ${orderDirection}, "${opts.secondaryField}" ${orderDirection}`;
return {
cursorCondition,
orderBy,
limit: limit + 1, // Fetch one extra to detect hasMore
params,
paramIndex,
};
}
/**
* Process query results for cursor pagination
*
* @param rows - Query result rows
* @param limit - Original limit (not the +1 used in query)
* @param cursorField - Field used for cursor
* @param secondaryField - Secondary field for stable cursor
* @param direction - Pagination direction
* @returns Pagination result
*/
export function processCursorResults<T extends QueryResultRow>(
rows: T[],
limit: number,
cursorField: string = 'createdAt',
secondaryField: string = 'id',
direction: 'asc' | 'desc' = 'desc'
): CursorPaginationResult<T> {
// Check if there are more results
const hasMore = rows.length > limit;
// Remove the extra item used for hasMore detection
const items = hasMore ? rows.slice(0, limit) : rows;
// Build next cursor
let nextCursor: string | null = null;
if (hasMore && items.length > 0) {
const lastItem = items[items.length - 1];
nextCursor = encodeCursor({
v: lastItem[cursorField],
d: direction,
s: lastItem[secondaryField],
});
}
// Build prev cursor (first item, opposite direction)
let prevCursor: string | null = null;
if (items.length > 0) {
const firstItem = items[0];
prevCursor = encodeCursor({
v: firstItem[cursorField],
d: direction === 'desc' ? 'asc' : 'desc',
s: firstItem[secondaryField],
});
}
return {
items,
nextCursor,
prevCursor,
hasMore,
};
}
/**
* Execute a cursor-paginated query
*
* This is a high-level helper that combines building and processing.
*
* @param pool - PostgreSQL pool
* @param baseQuery - Base SELECT query (without WHERE/ORDER BY/LIMIT)
* @param baseParams - Parameters for the base query
* @param args - Pagination arguments
* @param options - Pagination options
* @returns Paginated results
*
* @example
* ```typescript
* const result = await paginateWithCursor(
* pool,
* 'SELECT * FROM documents WHERE "deletedAt" IS NULL AND "collectionId" = $1',
* [collectionId],
* { limit: 25, cursor: args.cursor },
* { cursorField: 'createdAt' }
* );
* ```
*/
export async function paginateWithCursor<T extends QueryResultRow>(
pool: Pool,
baseQuery: string,
baseParams: any[],
args: CursorPaginationArgs,
options?: PaginateOptions
): Promise<CursorPaginationResult<T>> {
const opts = { ...DEFAULT_OPTIONS, ...options };
const limit = Math.min(Math.max(1, args.limit || opts.defaultLimit), opts.maxLimit);
const direction = args.direction || 'desc';
// Build cursor query parts
const { cursorCondition, orderBy, limit: queryLimit, params: cursorParams, paramIndex } =
buildCursorQuery(args, opts);
// Combine base params with cursor params
const allParams = [...baseParams, ...cursorParams];
// Adjust parameter placeholders in base query if needed
// Base query params are $1, $2, etc.
// Cursor params start at $N where N = baseParams.length + 1
// Build final query
let query = baseQuery;
// Add cursor condition if present
if (cursorCondition) {
// Check if base query has WHERE
if (baseQuery.toUpperCase().includes('WHERE')) {
query += ` AND ${cursorCondition}`;
} else {
query += ` WHERE ${cursorCondition}`;
}
}
// Add ORDER BY and LIMIT
query += ` ORDER BY ${orderBy}`;
query += ` LIMIT ${queryLimit}`;
// Execute query
const result = await pool.query<T>(query, allParams);
// Process results
const paginationResult = processCursorResults(
result.rows,
limit,
opts.cursorField,
opts.secondaryField,
direction
);
// Optionally get total count
if (opts.includeTotalCount) {
const countQuery = baseQuery.replace(/SELECT .* FROM/, 'SELECT COUNT(*) as count FROM');
const countResult = await pool.query<{ count: string }>(countQuery, baseParams);
paginationResult.totalCount = parseInt(countResult.rows[0]?.count || '0', 10);
}
return paginationResult;
}
/**
* Convert offset-based pagination to cursor-based response format
* Useful for backwards compatibility
*/
export function offsetToCursorResult<T>(
items: T[],
offset: number,
limit: number,
totalCount?: number
): CursorPaginationResult<T> {
const hasMore = totalCount !== undefined ? offset + items.length < totalCount : items.length === limit;
return {
items,
nextCursor: hasMore ? encodeCursor({ v: offset + limit, d: 'desc' }) : null,
prevCursor: offset > 0 ? encodeCursor({ v: Math.max(0, offset - limit), d: 'desc' }) : null,
hasMore,
totalCount,
};
}
/**
* Validate pagination arguments
*/
export function validatePaginationArgs(
args: CursorPaginationArgs,
options?: Pick<PaginateOptions, 'maxLimit' | 'defaultLimit'>
): { limit: number; cursor: string | null; direction: 'asc' | 'desc' } {
const maxLimit = options?.maxLimit ?? 100;
const defaultLimit = options?.defaultLimit ?? 25;
return {
limit: Math.min(Math.max(1, args.limit ?? defaultLimit), maxLimit),
cursor: args.cursor || null,
direction: args.direction || 'desc',
};
}

277
src/utils/query-builder.ts Normal file
View File

@@ -0,0 +1,277 @@
/**
* MCP Outline PostgreSQL - Safe Query Builder
* Helper for parameterized queries to prevent SQL injection
* @author Descomplicar® | @link descomplicar.pt | @copyright 2026
*/
import { sanitizeInput, isValidUUID } from './security.js';
/**
* Safe query builder with automatic parameter handling
*/
export class SafeQueryBuilder {
private params: any[] = [];
private paramIndex = 1;
private conditions: string[] = [];
/**
* Add a parameter and return its placeholder
*/
addParam(value: any): string {
this.params.push(value);
return `$${this.paramIndex++}`;
}
/**
* Get current parameter index (useful for manual queries)
*/
getNextIndex(): number {
return this.paramIndex;
}
/**
* Build ILIKE condition (case-insensitive search)
*/
buildILike(column: string, value: string): string {
const sanitized = sanitizeInput(value);
return `${column} ILIKE ${this.addParam(`%${sanitized}%`)}`;
}
/**
* Build exact ILIKE condition (no wildcards)
*/
buildILikeExact(column: string, value: string): string {
const sanitized = sanitizeInput(value);
return `${column} ILIKE ${this.addParam(sanitized)}`;
}
/**
* Build ILIKE condition with prefix match
*/
buildILikePrefix(column: string, value: string): string {
const sanitized = sanitizeInput(value);
return `${column} ILIKE ${this.addParam(`${sanitized}%`)}`;
}
/**
* Build IN clause for array of values
*/
buildIn(column: string, values: any[]): string {
if (values.length === 0) {
return 'FALSE'; // Empty IN clause
}
return `${column} = ANY(${this.addParam(values)})`;
}
/**
* Build NOT IN clause
*/
buildNotIn(column: string, values: any[]): string {
if (values.length === 0) {
return 'TRUE'; // Empty NOT IN clause
}
return `${column} != ALL(${this.addParam(values)})`;
}
/**
* Build equals condition
*/
buildEquals(column: string, value: any): string {
return `${column} = ${this.addParam(value)}`;
}
/**
* Build not equals condition
*/
buildNotEquals(column: string, value: any): string {
return `${column} != ${this.addParam(value)}`;
}
/**
* Build greater than condition
*/
buildGreaterThan(column: string, value: any): string {
return `${column} > ${this.addParam(value)}`;
}
/**
* Build greater than or equals condition
*/
buildGreaterThanOrEquals(column: string, value: any): string {
return `${column} >= ${this.addParam(value)}`;
}
/**
* Build less than condition
*/
buildLessThan(column: string, value: any): string {
return `${column} < ${this.addParam(value)}`;
}
/**
* Build less than or equals condition
*/
buildLessThanOrEquals(column: string, value: any): string {
return `${column} <= ${this.addParam(value)}`;
}
/**
* Build BETWEEN condition
*/
buildBetween(column: string, from: any, to: any): string {
return `${column} BETWEEN ${this.addParam(from)} AND ${this.addParam(to)}`;
}
/**
* Build IS NULL condition
*/
buildIsNull(column: string): string {
return `${column} IS NULL`;
}
/**
* Build IS NOT NULL condition
*/
buildIsNotNull(column: string): string {
return `${column} IS NOT NULL`;
}
/**
* Build UUID equals condition with validation
*/
buildUUIDEquals(column: string, uuid: string): string {
if (!isValidUUID(uuid)) {
throw new Error(`Invalid UUID: ${uuid}`);
}
return `${column} = ${this.addParam(uuid)}`;
}
/**
* Build UUID IN clause with validation
*/
buildUUIDIn(column: string, uuids: string[]): string {
for (const uuid of uuids) {
if (!isValidUUID(uuid)) {
throw new Error(`Invalid UUID: ${uuid}`);
}
}
return this.buildIn(column, uuids);
}
/**
* Add a condition to the internal conditions array
*/
addCondition(condition: string): this {
this.conditions.push(condition);
return this;
}
/**
* Add a condition if value is truthy
*/
addConditionIf(condition: string, value: any): this {
if (value !== undefined && value !== null && value !== '') {
this.conditions.push(condition);
}
return this;
}
/**
* Build WHERE clause from accumulated conditions
*/
buildWhereClause(separator = ' AND '): string {
if (this.conditions.length === 0) {
return '';
}
return `WHERE ${this.conditions.join(separator)}`;
}
/**
* Get all parameters
*/
getParams(): any[] {
return this.params;
}
/**
* Get conditions array
*/
getConditions(): string[] {
return this.conditions;
}
/**
* Reset builder state
*/
reset(): this {
this.params = [];
this.paramIndex = 1;
this.conditions = [];
return this;
}
/**
* Clone builder (useful for subqueries)
*/
clone(): SafeQueryBuilder {
const clone = new SafeQueryBuilder();
clone.params = [...this.params];
clone.paramIndex = this.paramIndex;
clone.conditions = [...this.conditions];
return clone;
}
}
/**
* Create a new SafeQueryBuilder instance
*/
export function createQueryBuilder(): SafeQueryBuilder {
return new SafeQueryBuilder();
}
/**
* Build a simple SELECT query
*/
export function buildSelectQuery(
table: string,
columns: string[],
builder: SafeQueryBuilder,
options?: {
orderBy?: string;
orderDirection?: 'ASC' | 'DESC';
limit?: number;
offset?: number;
}
): { query: string; params: any[] } {
const columnsStr = columns.join(', ');
const whereClause = builder.buildWhereClause();
let query = `SELECT ${columnsStr} FROM ${table} ${whereClause}`;
if (options?.orderBy) {
const direction = options.orderDirection || 'DESC';
query += ` ORDER BY ${options.orderBy} ${direction}`;
}
if (options?.limit) {
query += ` LIMIT ${builder.addParam(options.limit)}`;
}
if (options?.offset) {
query += ` OFFSET ${builder.addParam(options.offset)}`;
}
return { query, params: builder.getParams() };
}
/**
* Build a COUNT query
*/
export function buildCountQuery(
table: string,
builder: SafeQueryBuilder
): { query: string; params: any[] } {
const whereClause = builder.buildWhereClause();
const query = `SELECT COUNT(*) as count FROM ${table} ${whereClause}`;
return { query, params: builder.getParams() };
}

301
src/utils/transaction.ts Normal file
View File

@@ -0,0 +1,301 @@
/**
* MCP Outline PostgreSQL - Transaction Helper
* Centralized transaction management with retry logic for deadlocks
* @author Descomplicar® | @link descomplicar.pt | @copyright 2026
*/
import { Pool, PoolClient } from 'pg';
import { logger } from './logger.js';
/**
* Default retry configuration
*/
export interface TransactionRetryConfig {
/** Maximum number of retry attempts (default: 3) */
maxRetries?: number;
/** Base delay in milliseconds between retries (default: 100) */
baseDelayMs?: number;
/** Maximum delay in milliseconds (default: 2000) */
maxDelayMs?: number;
/** Timeout for the entire transaction in milliseconds (default: 30000) */
timeoutMs?: number;
}
const DEFAULT_RETRY_CONFIG: Required<TransactionRetryConfig> = {
maxRetries: 3,
baseDelayMs: 100,
maxDelayMs: 2000,
timeoutMs: 30000,
};
/**
* PostgreSQL error codes that indicate a retryable error
*/
const RETRYABLE_ERROR_CODES = [
'40001', // serialization_failure (deadlock)
'40P01', // deadlock_detected
'55P03', // lock_not_available
'57P01', // admin_shutdown (connection terminated)
'08006', // connection_failure
'08003', // connection_does_not_exist
];
/**
* Check if an error is retryable
*/
function isRetryableError(error: unknown): boolean {
if (error instanceof Error) {
// Check PostgreSQL error code
const pgError = error as Error & { code?: string };
if (pgError.code && RETRYABLE_ERROR_CODES.includes(pgError.code)) {
return true;
}
// Check error message for common patterns
const message = error.message.toLowerCase();
if (
message.includes('deadlock') ||
message.includes('lock') ||
message.includes('connection') ||
message.includes('serialization')
) {
return true;
}
}
return false;
}
/**
* Calculate delay with exponential backoff and jitter
*/
function calculateDelay(attempt: number, config: Required<TransactionRetryConfig>): number {
// Exponential backoff: baseDelay * 2^attempt
const exponentialDelay = config.baseDelayMs * Math.pow(2, attempt - 1);
// Add jitter (random variation up to 25%)
const jitter = exponentialDelay * 0.25 * Math.random();
// Cap at maxDelay
return Math.min(exponentialDelay + jitter, config.maxDelayMs);
}
/**
* Execute operations within a transaction with automatic retry for deadlocks
*
* @param pool - PostgreSQL connection pool
* @param callback - Function to execute within the transaction
* @param config - Optional retry configuration
* @returns Result of the callback function
*
* @example
* ```typescript
* const result = await withTransaction(pool, async (client) => {
* await client.query('UPDATE documents SET ...');
* await client.query('INSERT INTO events ...');
* return { success: true };
* });
* ```
*/
export async function withTransaction<T>(
pool: Pool,
callback: (client: PoolClient) => Promise<T>,
config?: TransactionRetryConfig
): Promise<T> {
const retryConfig = { ...DEFAULT_RETRY_CONFIG, ...config };
let lastError: Error | null = null;
const startTime = Date.now();
for (let attempt = 1; attempt <= retryConfig.maxRetries; attempt++) {
// Check timeout
if (Date.now() - startTime > retryConfig.timeoutMs) {
throw new Error(
`Transaction timeout after ${retryConfig.timeoutMs}ms. Last error: ${lastError?.message || 'unknown'}`
);
}
const client = await pool.connect();
try {
await client.query('BEGIN');
const result = await callback(client);
await client.query('COMMIT');
// Log successful retry
if (attempt > 1) {
logger.info('Transaction succeeded after retry', {
attempt,
totalTime: Date.now() - startTime,
});
}
return result;
} catch (error) {
// Always rollback on error
try {
await client.query('ROLLBACK');
} catch (rollbackError) {
logger.error('Rollback failed', {
error: rollbackError instanceof Error ? rollbackError.message : String(rollbackError),
});
}
lastError = error instanceof Error ? error : new Error(String(error));
// Check if we should retry
if (isRetryableError(error) && attempt < retryConfig.maxRetries) {
const delay = calculateDelay(attempt, retryConfig);
logger.warn('Transaction failed, retrying', {
attempt,
maxRetries: retryConfig.maxRetries,
delay,
error: lastError.message,
errorCode: (error as Error & { code?: string }).code,
});
// Wait before retry
await new Promise(resolve => setTimeout(resolve, delay));
continue;
}
// Non-retryable error or max retries exceeded
throw error;
} finally {
client.release();
}
}
// This should never be reached, but TypeScript needs it
throw lastError || new Error('Transaction failed with unknown error');
}
/**
* Execute operations within a transaction without retry
* Use this for operations that should not be retried (e.g., operations with side effects)
*
* @param pool - PostgreSQL connection pool
* @param callback - Function to execute within the transaction
* @returns Result of the callback function
*/
export async function withTransactionNoRetry<T>(
pool: Pool,
callback: (client: PoolClient) => Promise<T>
): Promise<T> {
const client = await pool.connect();
try {
await client.query('BEGIN');
const result = await callback(client);
await client.query('COMMIT');
return result;
} catch (error) {
try {
await client.query('ROLLBACK');
} catch (rollbackError) {
logger.error('Rollback failed', {
error: rollbackError instanceof Error ? rollbackError.message : String(rollbackError),
});
}
throw error;
} finally {
client.release();
}
}
/**
* Execute a read-only transaction (uses SERIALIZABLE isolation)
* Good for consistent reads across multiple queries
*
* @param pool - PostgreSQL connection pool
* @param callback - Function to execute within the transaction
* @returns Result of the callback function
*/
export async function withReadOnlyTransaction<T>(
pool: Pool,
callback: (client: PoolClient) => Promise<T>
): Promise<T> {
const client = await pool.connect();
try {
await client.query('BEGIN TRANSACTION READ ONLY');
const result = await callback(client);
await client.query('COMMIT');
return result;
} catch (error) {
try {
await client.query('ROLLBACK');
} catch (rollbackError) {
logger.error('Rollback failed', {
error: rollbackError instanceof Error ? rollbackError.message : String(rollbackError),
});
}
throw error;
} finally {
client.release();
}
}
/**
* Savepoint helper for nested transaction-like behavior
*/
export class Savepoint {
private client: PoolClient;
private name: string;
private released = false;
constructor(client: PoolClient, name: string) {
this.client = client;
this.name = name;
}
/**
* Create a savepoint
*/
async save(): Promise<void> {
await this.client.query(`SAVEPOINT ${this.name}`);
}
/**
* Rollback to this savepoint
*/
async rollback(): Promise<void> {
if (!this.released) {
await this.client.query(`ROLLBACK TO SAVEPOINT ${this.name}`);
}
}
/**
* Release this savepoint (marks it as no longer needed)
*/
async release(): Promise<void> {
if (!this.released) {
await this.client.query(`RELEASE SAVEPOINT ${this.name}`);
this.released = true;
}
}
}
/**
* Create a savepoint within a transaction
*
* @example
* ```typescript
* await withTransaction(pool, async (client) => {
* await client.query('INSERT INTO ...');
*
* const savepoint = await createSavepoint(client, 'before_risky_op');
* try {
* await riskyOperation(client);
* await savepoint.release();
* } catch (error) {
* await savepoint.rollback();
* // Continue with alternative logic
* }
* });
* ```
*/
export async function createSavepoint(client: PoolClient, name: string): Promise<Savepoint> {
const savepoint = new Savepoint(client, name);
await savepoint.save();
return savepoint;
}

236
src/utils/validation.ts Normal file
View File

@@ -0,0 +1,236 @@
/**
* MCP Outline PostgreSQL - Input Validation
* Automatic input validation using Zod schemas
* @author Descomplicar® | @link descomplicar.pt | @copyright 2026
*/
import { z } from 'zod';
import { BaseTool, ToolResponse } from '../types/tools.js';
// Common validation schemas
export const schemas = {
// UUID validation
uuid: z.string().uuid('Invalid UUID format'),
// UUID array validation
uuidArray: z.array(z.string().uuid('Invalid UUID format')),
// Email validation
email: z.string().email('Invalid email format'),
// Non-empty string
nonEmptyString: z.string().min(1, 'String cannot be empty'),
// Pagination
pagination: z.object({
limit: z.number().int().min(1).max(100).optional().default(25),
offset: z.number().int().min(0).optional().default(0),
}),
// Date string (ISO format)
isoDate: z.string().refine(
(val) => !isNaN(Date.parse(val)),
'Invalid date format. Use ISO format (YYYY-MM-DD or YYYY-MM-DDTHH:mm:ssZ)'
),
// Permission level
permission: z.enum(['read', 'read_write', 'admin']),
// User role
userRole: z.enum(['admin', 'member', 'viewer', 'guest']),
// Sort direction
sortDirection: z.enum(['ASC', 'DESC']).optional().default('DESC'),
// Boolean string (for query params)
booleanString: z.union([
z.boolean(),
z.string().transform((val) => val === 'true' || val === '1'),
]),
};
/**
* Validate input against a Zod schema
* Throws ZodError with detailed messages if validation fails
*/
export function validateInput<T>(schema: z.ZodSchema<T>, input: unknown): T {
return schema.parse(input);
}
/**
* Validate input and return result object (no throw)
*/
export function safeValidateInput<T>(
schema: z.ZodSchema<T>,
input: unknown
): { success: true; data: T } | { success: false; error: z.ZodError } {
const result = schema.safeParse(input);
if (result.success) {
return { success: true, data: result.data };
}
return { success: false, error: result.error };
}
/**
* Format Zod error for user-friendly message
*/
export function formatZodError(error: z.ZodError): string {
return error.errors
.map((e) => {
const path = e.path.join('.');
return path ? `${path}: ${e.message}` : e.message;
})
.join('; ');
}
/**
* Validation middleware for tools
* Validates input args against the tool's inputSchema
*/
export function withValidation<T extends Record<string, any>>(
tool: BaseTool<T>,
schema: z.ZodSchema<T>
): BaseTool<T> {
const originalHandler = tool.handler;
return {
...tool,
handler: async (args, pgClient): Promise<ToolResponse> => {
try {
const validatedArgs = validateInput(schema, args);
return await originalHandler(validatedArgs, pgClient);
} catch (error) {
if (error instanceof z.ZodError) {
return {
content: [{
type: 'text',
text: JSON.stringify({
error: 'Validation Error',
message: formatZodError(error),
details: error.errors,
}, null, 2),
}],
};
}
throw error;
}
},
};
}
/**
* Common validation schemas for tool arguments
*/
export const toolSchemas = {
// List with pagination
listArgs: z.object({
limit: z.number().int().min(1).max(100).optional(),
offset: z.number().int().min(0).optional(),
}),
// Get by ID
getByIdArgs: z.object({
id: schemas.uuid,
}),
// Delete by ID
deleteByIdArgs: z.object({
id: schemas.uuid,
}),
// Document list args
documentListArgs: z.object({
collection_id: schemas.uuid.optional(),
user_id: schemas.uuid.optional(),
template: z.boolean().optional(),
include_archived: z.boolean().optional(),
query: z.string().optional(),
limit: z.number().int().min(1).max(100).optional(),
offset: z.number().int().min(0).optional(),
}),
// Bulk document IDs
bulkDocumentArgs: z.object({
document_ids: schemas.uuidArray.min(1, 'At least one document_id required').max(100, 'Maximum 100 documents'),
}),
// Collection membership
collectionMembershipArgs: z.object({
collection_id: schemas.uuid,
user_id: schemas.uuid,
permission: schemas.permission.optional(),
}),
// Date range args
dateRangeArgs: z.object({
date_from: schemas.isoDate.optional(),
date_to: schemas.isoDate.optional(),
}),
// Search args
searchArgs: z.object({
query: z.string().min(1, 'Query cannot be empty'),
collection_ids: schemas.uuidArray.optional(),
limit: z.number().int().min(1).max(100).optional(),
offset: z.number().int().min(0).optional(),
}),
};
/**
* Validate UUIDs array helper
*/
export function validateUUIDs(uuids: string[], fieldName = 'UUIDs'): void {
const result = schemas.uuidArray.safeParse(uuids);
if (!result.success) {
const invalid = uuids.filter((uuid) => !z.string().uuid().safeParse(uuid).success);
throw new Error(`Invalid ${fieldName}: ${invalid.join(', ')}`);
}
}
/**
* Validate enum value helper
*/
export function validateEnum<T extends string>(
value: string,
allowedValues: readonly T[],
fieldName: string
): T {
if (!allowedValues.includes(value as T)) {
throw new Error(
`Invalid ${fieldName}: "${value}". Allowed values: ${allowedValues.join(', ')}`
);
}
return value as T;
}
/**
* Validate string length helper
*/
export function validateStringLength(
value: string,
min: number,
max: number,
fieldName: string
): void {
if (value.length < min || value.length > max) {
throw new Error(
`${fieldName} must be between ${min} and ${max} characters (got ${value.length})`
);
}
}
/**
* Validate number range helper
*/
export function validateNumberRange(
value: number,
min: number,
max: number,
fieldName: string
): void {
if (value < min || value > max) {
throw new Error(
`${fieldName} must be between ${min} and ${max} (got ${value})`
);
}
}