Database Optimization and ORM Patterns¶
Overview¶
This document covers database performance optimization, indexing strategies, partitioning, data consistency patterns, monitoring, maintenance, security, and SQLAlchemy ORM usage patterns. For schema definitions, see Database Schema. For architecture overview, see Database Overview.
Last Updated: December 2025
Status: ✅ Core Patterns Implemented (v1.0.0)
Author: Nishant Nayar
Database Performance Considerations¶
Indexing Strategy¶
Comprehensive Indexing Design¶
-- Market Data Indexes
CREATE INDEX idx_market_data_symbol_timestamp ON market_data(symbol, timestamp DESC);
CREATE INDEX idx_market_data_timestamp ON market_data(timestamp DESC);
CREATE INDEX idx_market_data_symbol ON market_data(symbol);
-- Trading Indexes
CREATE INDEX idx_orders_symbol_status ON orders(symbol, status);
CREATE INDEX idx_orders_strategy ON orders(strategy);
CREATE INDEX idx_orders_created_at ON orders(created_at DESC);
CREATE INDEX idx_orders_account_id ON orders(account_id);
CREATE INDEX idx_trades_symbol_executed_at ON trades(symbol, executed_at DESC);
CREATE INDEX idx_trades_strategy ON trades(strategy);
CREATE INDEX idx_trades_account_id ON trades(account_id);
-- Position Indexes
CREATE INDEX idx_positions_account_id ON positions(account_id);
CREATE INDEX idx_positions_symbol ON positions(symbol);
-- Logging Indexes
CREATE INDEX idx_system_logs_timestamp ON system_logs(timestamp DESC);
CREATE INDEX idx_system_logs_service_timestamp ON system_logs(service, timestamp DESC);
CREATE INDEX idx_system_logs_level_timestamp ON system_logs(level, timestamp DESC);
CREATE INDEX idx_system_logs_correlation ON system_logs(correlation_id);
CREATE INDEX idx_system_logs_event_type ON system_logs(event_type);
-- Performance Logs Indexes
CREATE INDEX idx_performance_logs_service_timestamp ON performance_logs(service, timestamp DESC);
CREATE INDEX idx_performance_logs_operation ON performance_logs(operation);
Partitioning Strategy¶
Time-Based Partitioning for Large Tables¶
-- Partition market_data by year
CREATE TABLE market_data (
id BIGSERIAL,
symbol VARCHAR(20) NOT NULL,
timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
open DECIMAL(15,4),
high DECIMAL(15,4),
low DECIMAL(15,4),
close DECIMAL(15,4),
volume BIGINT,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
PRIMARY KEY (id, timestamp)
) PARTITION BY RANGE (timestamp);
-- Create partitions for each year
CREATE TABLE market_data_y2024 PARTITION OF market_data
FOR VALUES FROM ('2024-01-01') TO ('2025-01-01');
CREATE TABLE market_data_y2025 PARTITION OF market_data
FOR VALUES FROM ('2025-01-01') TO ('2026-01-01');
-- Auto-create future partitions
CREATE OR REPLACE FUNCTION create_monthly_partition(table_name text, start_date date)
RETURNS void AS $$
DECLARE
partition_name text;
end_date date;
BEGIN
partition_name := table_name || '_' || to_char(start_date, 'YYYY_MM');
end_date := start_date + interval '1 month';
EXECUTE format('CREATE TABLE %I PARTITION OF %I FOR VALUES FROM (%L) TO (%L)',
partition_name, table_name, start_date, end_date);
END;
$$ LANGUAGE plpgsql;
Data Consistency and Concurrency¶
Transaction Management¶
Transaction Strategy Design¶
# src/shared/database/transaction_manager.py
from contextlib import contextmanager
from sqlalchemy.orm import Session
from sqlalchemy import text
class TransactionManager:
def __init__(self, engine):
self.engine = engine
@contextmanager
def transaction(self, isolation_level='READ_COMMITTED'):
"""Context manager for database transactions"""
connection = self.engine.connect()
transaction = connection.begin()
try:
# Set isolation level
connection.execute(text(f"SET TRANSACTION ISOLATION LEVEL {isolation_level}"))
session = Session(bind=connection)
yield session
transaction.commit()
except Exception as e:
transaction.rollback()
raise e
finally:
session.close()
connection.close()
@contextmanager
def read_only_transaction(self):
"""Read-only transaction for analytics queries"""
with self.transaction('READ_COMMITTED') as session:
yield session
Optimistic Locking Implementation¶
# src/shared/database/optimistic_locking.py
from sqlalchemy import Column, Integer, DateTime
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class OptimisticLockingMixin:
version = Column(Integer, default=1, nullable=False)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def update_with_version(self, session, **kwargs):
"""Update with optimistic locking"""
current_version = self.version
self.version += 1
result = session.query(self.__class__).filter(
self.__class__.id == self.id,
self.__class__.version == current_version
).update(kwargs)
if result == 0:
raise OptimisticLockingError("Record was modified by another process")
return result
Data Synchronization Patterns¶
Event-Driven Synchronization¶
Event Bus Design¶
# src/shared/events/data_sync.py
import redis
import json
from typing import Dict, Any, List
from datetime import datetime
class DataSyncEventBus:
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url)
self.pubsub = self.redis.pubsub()
def publish_trade_event(self, trade_data: Dict[str, Any]):
"""Publish trade execution event"""
event = {
'type': 'trade_executed',
'service': 'execution',
'data': trade_data,
'timestamp': datetime.utcnow().isoformat(),
'correlation_id': trade_data.get('trade_id')
}
self.redis.publish('data_sync:trades', json.dumps(event))
def publish_market_data_event(self, market_data: Dict[str, Any]):
"""Publish market data update event"""
event = {
'type': 'market_data_updated',
'service': 'data_ingestion',
'data': market_data,
'timestamp': datetime.utcnow().isoformat()
}
self.redis.publish('data_sync:market_data', json.dumps(event))
def subscribe_to_events(self, event_types: List[str], callback):
"""Subscribe to data sync events"""
for event_type in event_types:
self.pubsub.subscribe(f'data_sync:{event_type}')
for message in self.pubsub.listen():
if message['type'] == 'message':
event = json.loads(message['data'])
callback(event)
Data Synchronization Service¶
# src/shared/data_sync/synchronizer.py
class DataSynchronizer:
def __init__(self, event_bus: DataSyncEventBus, analytics_db: Engine):
self.event_bus = event_bus
self.analytics_db = analytics_db
self.setup_event_handlers()
def setup_event_handlers(self):
"""Setup event handlers for data synchronization"""
self.event_bus.subscribe_to_events(
['trades', 'market_data', 'positions'],
self.handle_data_event
)
def handle_data_event(self, event: Dict[str, Any]):
"""Handle incoming data synchronization events"""
event_type = event['type']
if event_type == 'trade_executed':
self.sync_trade_data(event['data'])
elif event_type == 'market_data_updated':
self.sync_market_data(event['data'])
elif event_type == 'position_updated':
self.sync_position_data(event['data'])
def sync_trade_data(self, trade_data: Dict[str, Any]):
"""Sync trade data to analytics database"""
with self.analytics_db.connect() as conn:
# Update portfolio summary
conn.execute(text("""
INSERT INTO portfolio_summary (account_id, symbol, total_value, realized_pnl)
VALUES (:account_id, :symbol, :total_value, :realized_pnl)
ON CONFLICT (account_id, symbol)
DO UPDATE SET
total_value = portfolio_summary.total_value + :total_value,
realized_pnl = portfolio_summary.realized_pnl + :realized_pnl,
last_updated = NOW()
"""), trade_data)
# Update performance metrics
conn.execute(text("""
INSERT INTO performance_metrics (strategy, date, total_trades, win_rate)
VALUES (:strategy, CURRENT_DATE, 1, :win_rate)
ON CONFLICT (strategy, date)
DO UPDATE SET
total_trades = performance_metrics.total_trades + 1,
win_rate = (performance_metrics.win_rate * performance_metrics.total_trades + :win_rate) / (performance_metrics.total_trades + 1)
"""), trade_data)
Database Monitoring and Maintenance¶
Performance Monitoring¶
# src/shared/monitoring/db_monitoring.py
class DatabaseMonitoring:
def __init__(self, db_connections: Dict[str, Engine]):
self.connections = db_connections
self.metrics = {}
def track_query_performance(self, service: str, query: str, execution_time: float):
"""Track query performance metrics"""
if service not in self.metrics:
self.metrics[service] = {
'query_count': 0,
'total_time': 0,
'slow_queries': []
}
self.metrics[service]['query_count'] += 1
self.metrics[service]['total_time'] += execution_time
if execution_time > 1.0: # 1 second threshold
self.metrics[service]['slow_queries'].append({
'query': query,
'execution_time': execution_time,
'timestamp': datetime.utcnow()
})
def get_performance_summary(self):
"""Get database performance summary"""
summary = {}
for service, metrics in self.metrics.items():
summary[service] = {
'avg_query_time': metrics['total_time'] / metrics['query_count'] if metrics['query_count'] > 0 else 0,
'total_queries': metrics['query_count'],
'slow_query_count': len(metrics['slow_queries'])
}
return summary
Automated Maintenance¶
# src/shared/maintenance/db_maintenance.py
class DatabaseMaintenance:
def __init__(self, db_connections: Dict[str, Engine]):
self.connections = db_connections
def run_vacuum_analyze(self, service: str):
"""Run VACUUM ANALYZE on service database"""
with self.connections[service].connect() as conn:
conn.execute(text("VACUUM ANALYZE"))
def cleanup_old_logs(self, service: str, retention_days: int = 30):
"""Clean up old log entries"""
with self.connections[service].connect() as conn:
# Archive old logs
conn.execute(text("""
INSERT INTO archived_system_logs
SELECT * FROM system_logs
WHERE created_at < NOW() - INTERVAL '%s days'
"""), (retention_days,))
# Delete archived logs
conn.execute(text("""
DELETE FROM system_logs
WHERE created_at < NOW() - INTERVAL '%s days'
"""), (retention_days,))
def update_statistics(self, service: str):
"""Update database statistics"""
with self.connections[service].connect() as conn:
conn.execute(text("ANALYZE"))
Security Considerations¶
Database Security Implementation¶
# src/shared/security/db_security.py
class DatabaseSecurity:
def __init__(self, db_connections: Dict[str, Engine]):
self.connections = db_connections
def setup_row_level_security(self, service: str):
"""Setup row-level security for multi-tenant data"""
with self.connections[service].connect() as conn:
# Enable RLS on sensitive tables
conn.execute(text("ALTER TABLE trades ENABLE ROW LEVEL SECURITY"))
conn.execute(text("ALTER TABLE positions ENABLE ROW LEVEL SECURITY"))
conn.execute(text("ALTER TABLE orders ENABLE ROW LEVEL SECURITY"))
# Create policies
conn.execute(text("""
CREATE POLICY trades_account_policy ON trades
FOR ALL TO trading_app
USING (account_id = current_setting('app.current_account_id'))
"""))
def audit_data_access(self, service: str, user_id: str, table_name: str, operation: str):
"""Audit data access for compliance"""
with self.connections[service].connect() as conn:
conn.execute(text("""
INSERT INTO audit_log (user_id, table_name, operation, timestamp)
VALUES (:user_id, :table_name, :operation, NOW())
"""), {
'user_id': user_id,
'table_name': table_name,
'operation': operation
})
SQLAlchemy ORM and Session Management¶
Overview¶
The trading system uses SQLAlchemy ORM (Object-Relational Mapping) to interact with PostgreSQL. This provides a Pythonic interface to the database while maintaining type safety and preventing SQL injection attacks.
Declarative Base¶
All database models inherit from a common declarative base:
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
# All models inherit from Base
class MarketData(Base):
__tablename__ = "market_data"
__table_args__ = {'schema': 'data_ingestion'}
# ... column definitions
Purpose: - Provides metadata registry for all tables - Enables ORM mapping between Python classes and database tables - Tracks relationships and foreign keys - Provides query interface
Session Management Design¶
Automated Session Management¶
The system uses context managers for automatic session lifecycle management:
# Write operations
with db_transaction() as session:
order = Order(symbol='AAPL', quantity=100)
session.add(order)
# Auto-commit on success, auto-rollback on error
# Read operations
with db_readonly_session() as session:
results = session.query(MarketData).filter_by(symbol='AAPL').all()
Two Session Types¶
1. Transaction Session (db_transaction)
For write operations that modify data:
@contextmanager
def db_transaction() -> Generator[Session, None, None]:
"""
Database transaction context manager with automatic commit/rollback
Features:
- Automatic commit on success
- Automatic rollback on error
- Connection pooling
- Error logging
- Always closes session
Usage:
with db_transaction() as session:
order = Order(symbol='AAPL', quantity=100)
session.add(order)
Use cases:
- Creating orders, trades, positions
- Updating risk limits
- Recording strategy signals
- Any data modification
"""
2. Read-Only Session (db_readonly_session)
For read operations that don't modify data:
@contextmanager
def db_readonly_session() -> Generator[Session, None, None]:
"""
Read-only database session for analytics and reporting
Features:
- No write operations allowed
- Optimized for read performance
- No transaction overhead
- Connection pooling
Usage:
with db_readonly_session() as session:
data = session.query(MarketData).filter(...).all()
Use cases:
- Analytics queries
- Dashboard data
- Reporting
- Backtesting
"""
Performance Benefits of Read-Only Sessions: - No Write-Ahead Log (WAL) overhead - Reduced locking overhead - PostgreSQL can optimize query execution - Lower resource consumption - Can leverage read replicas (future scaling)
Schema Handling¶
Schemas are specified in model definitions using __table_args__:
class MarketData(Base):
__tablename__ = "market_data"
__table_args__ = {'schema': 'data_ingestion'} # Explicit schema
id = Column(BigInteger, primary_key=True)
symbol = Column(String(20), nullable=False)
# ...
Benefits: - Schema is explicit in code - No connection string magic - Works with automated session management - Maintains schema isolation
Error Handling Strategy¶
The session manager implements comprehensive error handling:
try:
yield session
session.commit()
logger.debug("Transaction committed successfully")
except IntegrityError as e:
session.rollback()
logger.error(f"Database integrity error: {e}")
raise # Constraint violations, duplicate keys
except OperationalError as e:
session.rollback()
logger.error(f"Database operational error: {e}")
raise # Connection issues, timeouts
except DataError as e:
session.rollback()
logger.error(f"Database data error: {e}")
raise # Invalid data types, out of range
except Exception as e:
session.rollback()
logger.error(f"Unexpected database error: {e}")
raise
finally:
session.close() # Always close
Error Categories: - IntegrityError - Constraint violations, duplicate keys - OperationalError - Connection problems, timeouts - DataError - Invalid data types, out of range values - ProgrammingError - SQL syntax errors
Error Handling Benefits: - Centralized error logging (all DB errors captured) - Categorized errors for different handling strategies - Exceptions re-raised for caller to handle - Stack traces preserved for debugging - Monitoring and alerting ready
Connection Pooling¶
Connection pooling is configured in src/config/database.py:
engine = create_engine(
database_url,
poolclass=QueuePool,
pool_size=10, # Keep 10 connections open
max_overflow=20, # Allow 20 more if needed
pool_timeout=30, # Wait 30s for available connection
pool_recycle=3600 # Recycle connections after 1 hour
)
Benefits: - Reuses existing connections (faster) - Prevents connection exhaustion - Automatic connection management - Configurable pool size per workload
Usage Examples¶
Example 1: Create Order (Write Operation)¶
from src.services.execution.models import Order, OrderSide
from src.shared.database.base import db_transaction
def create_order(symbol: str, quantity: int, price: float):
"""Create a new trading order"""
with db_transaction() as session:
order = Order(
order_id=generate_id(),
account_id='ACC123',
symbol=symbol,
quantity=quantity,
price=price,
side=OrderSide.BUY,
status=OrderStatus.PENDING
)
session.add(order)
# Auto-commit on success
return order.order_id
Example 2: Query Market Data (Read Operation)¶
from src.services.data_ingestion.models import MarketData
from src.shared.database.base import db_readonly_session
from datetime import datetime, timedelta
def get_market_history(symbol: str, days: int):
"""Get historical market data"""
with db_readonly_session() as session:
cutoff = datetime.now() - timedelta(days=days)
return session.query(MarketData)\
.filter(MarketData.symbol == symbol)\
.filter(MarketData.timestamp >= cutoff)\
.order_by(MarketData.timestamp.desc())\
.all()
Example 3: Complex Transaction (Multiple Operations)¶
from src.shared.database.base import db_transaction
def execute_trade(order_id: str, execution_price: float):
"""Execute a trade and update related records"""
with db_transaction() as session:
# 1. Get and update order
order = session.query(Order).filter_by(order_id=order_id).first()
if not order:
raise ValueError(f"Order {order_id} not found")
order.status = OrderStatus.FILLED
order.updated_at = datetime.now(timezone.utc)
# 2. Create trade record
trade = Trade(
trade_id=generate_id(),
order_id=order_id,
account_id=order.account_id,
symbol=order.symbol,
quantity=order.quantity,
price=execution_price,
executed_at=datetime.now(timezone.utc)
)
session.add(trade)
# 3. Update or create position
position = session.query(Position).filter_by(
account_id=order.account_id,
symbol=order.symbol
).first()
if position:
# Update existing position
position.quantity += order.quantity
position.avg_price = calculate_avg_price(position, order)
position.last_updated = datetime.now(timezone.utc)
else:
# Create new position
position = Position(
account_id=order.account_id,
symbol=order.symbol,
quantity=order.quantity,
avg_price=execution_price
)
session.add(position)
# All operations committed together or all rolled back
return trade.trade_id
Example 4: Analytics Query (Aggregation)¶
from src.shared.database.base import db_readonly_session
from sqlalchemy import func
def get_portfolio_summary(account_id: str):
"""Get aggregated portfolio metrics"""
with db_readonly_session() as session:
summary = session.query(
Position.symbol,
Position.quantity,
Position.avg_price,
Position.unrealized_pnl,
func.sum(Position.market_value).label('total_value')
)\
.filter(Position.account_id == account_id)\
.filter(Position.quantity > 0)\
.group_by(Position.symbol)\
.all()
return summary
Design Principles¶
| Principle | Implementation | Benefit |
|---|---|---|
| Automation | Context managers | Prevents connection leaks |
| Separation | Read/Write sessions | Performance optimization |
| Explicitness | Schema in models | Clear and maintainable |
| Simplicity | Single database | No distributed transactions |
| Observability | Error logging | Monitoring and debugging |
| Safety | Auto-rollback | Data consistency |
| Efficiency | Connection pooling | Resource optimization |
Transaction Isolation¶
The system uses PostgreSQL's default isolation level:
- Isolation Level: READ COMMITTED
- Behavior: Queries see only committed data
- Concurrency: High (minimal locking)
- Use Case: Suitable for most trading operations
For operations requiring stronger guarantees:
from sqlalchemy import text
with db_transaction() as session:
# Set higher isolation level if needed
session.execute(text("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE"))
# Your operations here
Best Practices¶
DO:
- ✅ Use db_transaction() for all write operations
- ✅ Use db_readonly_session() for analytics and reporting
- ✅ Keep transactions short and focused
- ✅ Handle exceptions at the application level
- ✅ Use connection pooling (already configured)
- ✅ Specify schema in model definitions
DON'T: - ❌ Mix read and write sessions (use write session if needed) - ❌ Keep transactions open for long periods - ❌ Catch and ignore database errors - ❌ Create sessions manually (use context managers) - ❌ Use distributed transactions (not needed) - ❌ Modify read-only session data
Advanced Features¶
Manual Session Management (Advanced)¶
For cases requiring explicit control:
from src.shared.database.base import get_session
def advanced_operation():
"""Advanced use case with manual session management"""
session = get_session()
try:
# Your operations
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
⚠️ Warning: Manual session management requires careful handling. Prefer context managers.
Nested Transactions (Savepoints)¶
For complex operations requiring partial rollback:
with db_transaction() as session:
order = Order(...)
session.add(order)
# Create savepoint
savepoint = session.begin_nested()
try:
# Risky operation
risky_update()
savepoint.commit()
except Exception:
# Rollback to savepoint, main transaction continues
savepoint.rollback()
See Also: - Database Overview - Architecture overview and setup - Database Schema - Detailed schema definitions