Trading System Logging Architecture¶
Overview¶
This document outlines the logging architecture for the trading system, including design decisions, implementation strategy, and configuration management.
Last Updated: December 2025
Status: ✅ Implemented
Author: Nishant Nayar
Design Decisions¶
1. Logging Framework¶
- Primary: Loguru (consolidated logging across all services)
- Secondary: PostgreSQL tables for structured log storage
- Rationale: Loguru provides excellent performance and features, PostgreSQL enables queryable log analysis
2. Log Retention Strategy¶
- Approach: Simple two-tier retention
- Active Logs: 30 days (real-time monitoring, hot queries)
- Archived Logs: 30-90 days (historical analysis, reports)
- Cleanup: Automatic daily cleanup at 2 AM
- Storage: Same database, different tables
3. Compliance Requirements¶
- Status: No compliance requirements
- Focus: Debugging, performance monitoring, system health
- Audit Trail: Basic audit logging for system operations
4. Log Aggregation¶
- Method: PostgreSQL tables for structured log storage
- Benefits: SQL queries, indexing, correlation with trading data
- Integration: Seamless with existing database infrastructure
Architecture Components¶
1. Log Categories¶
📊 Trading Logs: Trade executions, orders, positions
🔧 System Logs: Service health, errors, startup/shutdown
⚡ Performance Logs: Execution times, memory usage
🧠 Strategy Logs: Signal generation, backtesting results
⚠️ Risk Logs: Risk calculations, violations, alerts
2. Log Levels¶
ERROR: System failures, API errors, trading failures
WARNING: Risk violations, performance issues, retries
INFO: Normal operations, trade executions, data flow
DEBUG: Detailed execution (development only)
3. Service-Specific Logging¶
| Service | Log Level | Focus Area |
|---|---|---|
| Data Ingestion | INFO | Market data fetch, validation |
| Strategy Engine | DEBUG | Signal generation, calculations |
| Execution | INFO | Order placement, trade execution |
| Risk Management | WARNING | Risk calculations, violations |
| Analytics | INFO | Performance calculations, reports |
| Notification | INFO | Alert delivery, communication |
Implementation Architecture¶
1. Dual Logging System¶
┌─────────────────────────────────────────────────────────┐
│ Application Code │
├─────────────────────────────────────────────────────────┤
│ Logging Module (src/shared/logging/) │
│ ├── logger.py # Main logger setup │
│ ├── database_handler.py # Async queue-based DB │
│ ├── database_sink.py # Loguru sink for DB │
│ ├── formatters.py # Log formatting │
│ ├── config.py # Configuration loader │
│ └── correlation.py # Correlation ID tracking │
├─────────────────────────────────────────────────────────┤
│ Loguru (File) PostgreSQL (DB) │
│ ├── logs/errors.log ├── logging.system_logs │
│ (minimal fallback) └── logging.performance │
│ _logs │
└─────────────────────────────────────────────────────────┘
2. Key Features¶
Feature 1: Automatic Service Detection¶
# Automatically detects which service is logging
from src.shared.logging import get_logger
logger = get_logger(__name__) # Detects service from module name
logger.info("Data ingestion started")
# → Logs to: logs/data_ingestion.log AND logging.system_logs
Feature 2: Dual Output (File + Database)¶
# Single log statement goes to BOTH:
logger.info("Order created", order_id="ORD123", symbol="AAPL")
# File output (logs/execution.log):
# 2025-10-01 10:30:45.123 | INFO | execution:create_order:42 | Order created
# Database output (logging.system_logs):
# {
# timestamp: 2025-10-01 10:30:45.123,
# service: 'execution',
# level: 'INFO',
# message: 'Order created',
# metadata: {'order_id': 'ORD123', 'symbol': 'AAPL'}
# }
Feature 3: Correlation ID Tracking¶
# Track related operations across services
with correlation_context("trade-12345"):
logger.info("Order placed")
# ... order execution ...
logger.info("Position updated")
# ... position update ...
# All logs have correlation_id='trade-12345'
# Easy to trace complete flow!
Feature 4: Performance Tracking¶
from src.shared.logging import log_performance
@log_performance
def execute_trade(order_id):
# Your code here
pass
# Automatically logs:
# - Execution time
# - Memory usage (optional)
# - Function arguments (optional)
# Stored in logging.performance_logs
Feature 5: Structured Logging¶
# Pass structured data
logger.info(
"Trade executed",
trade_id="TRD123",
symbol="AAPL",
quantity=100,
price=150.25,
commission=1.50
)
# Stored as JSONB in database
# Easy to query and analyze!
3. Logger Setup Options¶
Option A: Simple Setup (Recommended)¶
from src.shared.logging import setup_logging
# Setup once at application startup
setup_logging()
# Use everywhere
from loguru import logger
logger.info("Application started")
Pros: - Simple to use - One-time setup - Works everywhere - No boilerplate
Cons: - Less control per module - Global configuration
Option B: Service-Specific Setup¶
from src.shared.logging import get_service_logger
# Each service gets its own logger
logger = get_service_logger("data_ingestion")
logger.info("Market data fetched")
# → Logs to logs/data_ingestion.log
Pros: - Service-specific configuration - Isolated log files - Different log levels per service
Cons: - More setup code - Need to specify service name
Option C: Context-Aware Logger (Advanced)¶
from src.shared.logging import get_logger
# Automatically detects service from module name
logger = get_logger(__name__)
# __name__ = 'src.services.execution.order_manager'
# → Detected service: 'execution'
# → Logs to: logs/execution.log
Pros: - Automatic service detection - No hardcoding service names - Clean code - Recommended approach!
Cons: - Relies on module structure
4. Database Handler Design¶
Implementation Details¶
The database logging system uses an async queue-based architecture:
- Queue Manager (
LogQueueManager): Manages background thread for async log processing - Database Sink (
DatabaseSink): Loguru sink that receives log records and enqueues them - Batch Processing: Logs are batched for efficient database writes
- Thread Safety: All operations are thread-safe using locks
Batching Strategy¶
- Method: Async queue-based batching with background worker thread
- Batch Size: 100 records (configurable via
batch_size) - Batch Timeout: 10 seconds (configurable via
batch_timeout) - Queue Size: Maximum 10,000 logs before blocking
- Fallback: Write to file if database fails
- Benefits: Non-blocking writes, efficient bulk inserts, graceful degradation
Log Levels for Database¶
- Files: ERROR only (minimal fallback for critical failures)
- Database: INFO+ (all logs INFO and above stored in database)
- Rationale: Database is primary storage, files are minimal fallback only
Error Handling¶
# Graceful degradation strategy
try:
write_to_database(log)
except DatabaseError:
try:
write_to_fallback_file(log)
except IOError:
print(f"CRITICAL: Failed to log: {log}", file=sys.stderr)
# Never fail the application due to logging!
Database Schema¶
The logging schema is located in the logging schema of the PostgreSQL database. Tables are created automatically via SQLAlchemy models or SQL scripts.
1. System Logs Table¶
CREATE TABLE logging.system_logs (
id BIGSERIAL PRIMARY KEY,
service VARCHAR(50) NOT NULL,
level VARCHAR(20) NOT NULL, -- 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'
message TEXT NOT NULL,
data JSONB, -- Structured metadata
correlation_id VARCHAR(100), -- For request tracking
event_type VARCHAR(50), -- Event classification
timestamp TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Indexes for performance
CREATE INDEX idx_system_logs_timestamp ON logging.system_logs(timestamp);
CREATE INDEX idx_system_logs_service ON logging.system_logs(service);
CREATE INDEX idx_system_logs_level ON logging.system_logs(level);
CREATE INDEX idx_system_logs_correlation ON logging.system_logs(correlation_id);
CREATE INDEX idx_system_logs_event_type ON logging.system_logs(event_type);
CREATE INDEX idx_system_logs_service_timestamp ON logging.system_logs(service, timestamp);
2. Performance Logs Table¶
CREATE TABLE logging.performance_logs (
id BIGSERIAL PRIMARY KEY,
service VARCHAR(50) NOT NULL,
operation VARCHAR(100) NOT NULL,
execution_time_ms DECIMAL(10,3) NOT NULL,
memory_usage_mb DECIMAL(10,3),
cpu_usage_percent DECIMAL(5,2),
timestamp TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Indexes for performance
CREATE INDEX idx_performance_logs_timestamp ON logging.performance_logs(timestamp);
CREATE INDEX idx_performance_logs_service ON logging.performance_logs(service);
CREATE INDEX idx_performance_logs_operation ON logging.performance_logs(operation);
CREATE INDEX idx_performance_logs_service_timestamp ON logging.performance_logs(service, timestamp);
3. SQLAlchemy Models¶
The database models are defined in src/shared/database/models/logging_models.py:
- SystemLog: Represents general system logs with structured data
- PerformanceLog: Represents performance metrics with execution times and resource usage
Both models use: - Timezone-aware timestamps (UTC) - Proper indexing for query performance - JSONB for flexible metadata storage - Automatic timestamp defaults
Configuration¶
1. Logging Configuration (config/logging.yaml)¶
logging:
# Log Levels
level: "INFO"
root_level: "INFO"
# Log Rotation (for minimal file fallback)
rotation:
size: "10 MB"
time: "daily"
retention: "30 days"
compression: false
# Log Format
format: "{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{function}:{line} | {message}"
# Log Files (minimal - only for error fallback)
files:
main: "logs/trading.log"
errors: "logs/errors.log"
system: "logs/system.log"
trades: "logs/trades.log"
performance: "logs/performance.log"
# Service-specific Logging
services:
data_ingestion:
level: "INFO"
file: "logs/data_ingestion.log"
strategy_engine:
level: "DEBUG"
file: "logs/strategy_engine.log"
execution:
level: "INFO"
file: "logs/execution.log"
risk_management:
level: "WARNING"
file: "logs/risk_management.log"
analytics:
level: "INFO"
file: "logs/analytics.log"
notification:
level: "INFO"
file: "logs/notification.log"
# Structured Logging
structured: true
json_format: false
# Performance Logging
performance:
enabled: true
log_execution_time: true
log_memory_usage: true
log_database_queries: false
# Database Logging (Primary Storage)
database:
enabled: true
active_table: "system_logs"
archive_table: "archived_system_logs"
batch_size: 100 # Write when 100 logs are queued
batch_timeout: 10 # Write every 10 seconds (or when batch is full)
async_logging: true # Use async queue-based processing
fallback_to_file: true # Fallback to file if database fails
2. Environment Settings¶
# src/config/settings.py
class LoggingSettings(BaseSettings):
# File retention
file_retention_days: int = 30
file_archive_after_days: int = 7
# Database retention
database_retention_days: int = 30
database_archive_after_days: int = 7
# Cleanup schedule
cleanup_schedule: str = "0 2 * * *"
cleanup_batch_size: int = 1000
# Retention policies
trading_logs_retention: int = 90
performance_logs_retention: int = 30
system_logs_retention: int = 30
error_logs_retention: int = 90
Usage Patterns¶
Pattern 1: Simple Logging¶
from loguru import logger
logger.info("Application started")
logger.error("Failed to connect to API", error=str(e))
Pattern 2: Structured Logging¶
logger.info(
"Order executed",
order_id="ORD123",
symbol="AAPL",
quantity=100,
price=150.25,
execution_time_ms=45
)
Pattern 3: Performance Tracking¶
from src.shared.logging import log_performance
@log_performance
def calculate_indicators(symbol: str, period: int):
# Heavy calculation
pass
Pattern 4: Correlation Tracking¶
from src.shared.logging import correlation_context
async def process_trade_flow(trade_id):
with correlation_context(trade_id):
await validate_risk() # Logged with trade_id
await execute_order() # Logged with trade_id
await update_position() # Logged with trade_id
Implementation Strategy¶
1. Dual Logging Approach¶
Service → Loguru → File + PostgreSQL
↓
Structured Data → Database
↓
SQL Queries → Analysis
2. Automatic Cleanup Process¶
- Schedule: Daily at 2 AM via Prefect flows
- Process:
- Archive logs older than 30 days
- Delete archived logs older than 90 days
- Log cleanup results
- Monitoring: Cleanup operation logs (future enhancement)
3. Log Analysis Capabilities¶
-- Find all logs related to a specific trade
SELECT * FROM system_logs
WHERE correlation_id = 'trade_12345'
ORDER BY timestamp;
-- Link trading logs with system logs
SELECT t.*, s.message, s.level
FROM trading_logs t
JOIN system_logs s ON t.trade_id = s.correlation_id
WHERE t.symbol = 'AAPL';
-- Performance analysis
SELECT service, AVG(execution_time_ms) as avg_time
FROM performance_logs
WHERE timestamp > NOW() - INTERVAL '1 day'
GROUP BY service;
Implementation Status¶
✅ Phase 1: Core Logging (Completed)¶
- [x] Basic Loguru setup
- [x] File logging with rotation (minimal fallback)
- [x] Service detection
- [x] Configuration loading
✅ Phase 2: Database Integration (Completed)¶
- [x] Database handler with async queue-based processing
- [x] Database sink for loguru integration
- [x] Async batching with configurable batch size and timeout
- [x] Fallback mechanism to file logging
- [x] Structured logging with JSONB metadata
- [x] SQLAlchemy models for SystemLog and PerformanceLog
- [x] Database schema creation
✅ Phase 3: Advanced Features (Completed)¶
- [x] Correlation ID tracking
- [x] Performance logging support
- [x] Thread-safe queue management
- [x] Graceful shutdown with log flushing
🚧 Phase 4: Monitoring (Future)¶
- [ ] Real-time log streaming
- [ ] Error alerting
- [ ] Performance dashboards
- [ ] Anomaly detection
- [ ] Log retention and cleanup automation
Future Enhancements¶
1. Phase 2 (Post-MVP)¶
- [ ] Real-time log monitoring dashboard
- [ ] Log analysis tools with charts
- [ ] Automated alerting based on log patterns
- [ ] Log correlation analysis tools
2. Phase 3 (Advanced)¶
- [ ] Machine learning for log pattern detection
- [ ] Predictive alerting based on log trends
- [ ] Advanced log visualization
- [ ] Integration with external monitoring tools
Open Questions¶
1. Implementation Details¶
- [x] Logging implementation in shared utilities
- [x] Prefect flow for automatic cleanup
- [x] Log correlation ID generation strategy
- [x] Performance optimization for high-volume logging
2. Monitoring & Alerts¶
- [ ] Real-time log monitoring requirements
- [ ] Alert thresholds for error rates
- [ ] Performance degradation detection
- [ ] System health indicators
3. Development vs Production¶
- [ ] Different logging levels for environments
- [ ] Development debugging tools
- [ ] Production log optimization
- [ ] Testing log configurations
Usage Examples¶
Basic Logging¶
from loguru import logger
# Simple logging - automatically goes to database
logger.info("Application started")
logger.error("Failed to connect to API", error=str(e))
Structured Logging¶
# Structured data automatically stored in JSONB
logger.info(
"Order executed",
order_id="ORD123",
symbol="AAPL",
quantity=100,
price=150.25,
execution_time_ms=45
)
Correlation Tracking¶
from src.shared.logging.correlation import set_correlation_id
# Set correlation ID for request tracking
set_correlation_id("trade-12345")
logger.info("Order placed") # Automatically includes correlation_id
logger.info("Position updated") # Same correlation_id
Performance Logging¶
from src.shared.logging import log_performance
@log_performance
def calculate_indicators(symbol: str, period: int):
# Heavy calculation
pass
# Automatically logs execution time to performance_logs table
Querying Logs¶
Find logs by correlation ID¶
SELECT * FROM logging.system_logs
WHERE correlation_id = 'trade-12345'
ORDER BY timestamp;
Performance analysis¶
SELECT
service,
operation,
AVG(execution_time_ms) as avg_time,
MAX(execution_time_ms) as max_time,
COUNT(*) as call_count
FROM logging.performance_logs
WHERE timestamp > NOW() - INTERVAL '1 day'
GROUP BY service, operation
ORDER BY avg_time DESC;
Error analysis¶
SELECT
service,
level,
COUNT(*) as error_count,
MAX(timestamp) as last_error
FROM logging.system_logs
WHERE level IN ('ERROR', 'CRITICAL')
AND timestamp > NOW() - INTERVAL '7 days'
GROUP BY service, level
ORDER BY error_count DESC;
Next Steps¶
- ✅ Database Schema: Log tables implemented in PostgreSQL
- ✅ Logging Utilities: Shared logging utilities created
- ✅ Service Integration: Logging integrated across services
- 🚧 Cleanup Automation: Implement Prefect cleanup flows for log retention
- ✅ Testing: Logging configuration tested and working
Note: This document will be updated as we make more architectural decisions and implement the logging system.