Add system health and system event logging.

This commit is contained in:
2025-07-04 08:46:06 +02:00
parent e31401a939
commit 52d3400728
6 changed files with 878 additions and 6 deletions

111
models.py
View File

@@ -612,4 +612,113 @@ class Announcement(db.Model):
def get_active_announcements_for_user(user):
"""Get all active announcements visible to a specific user"""
announcements = Announcement.query.filter_by(is_active=True).all()
return [ann for ann in announcements if ann.is_visible_to_user(user)]
return [ann for ann in announcements if ann.is_visible_to_user(user)]
# System Event model for logging system activities
class SystemEvent(db.Model):
id = db.Column(db.Integer, primary_key=True)
event_type = db.Column(db.String(50), nullable=False) # e.g., 'login', 'logout', 'user_created', 'system_error'
event_category = db.Column(db.String(30), nullable=False) # e.g., 'auth', 'user_management', 'system', 'error'
description = db.Column(db.Text, nullable=False)
severity = db.Column(db.String(20), default='info') # 'info', 'warning', 'error', 'critical'
timestamp = db.Column(db.DateTime, default=datetime.now, nullable=False)
# Optional associations
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=True)
company_id = db.Column(db.Integer, db.ForeignKey('company.id'), nullable=True)
# Additional metadata (JSON string)
event_metadata = db.Column(db.Text, nullable=True) # Store additional event data as JSON
# IP address and user agent for security tracking
ip_address = db.Column(db.String(45), nullable=True) # IPv6 compatible
user_agent = db.Column(db.Text, nullable=True)
# Relationships
user = db.relationship('User', backref='system_events')
company = db.relationship('Company', backref='system_events')
def __repr__(self):
return f'<SystemEvent {self.event_type}: {self.description[:50]}>'
@staticmethod
def log_event(event_type, description, event_category='system', severity='info',
user_id=None, company_id=None, event_metadata=None, ip_address=None, user_agent=None):
"""Helper method to log system events"""
event = SystemEvent(
event_type=event_type,
event_category=event_category,
description=description,
severity=severity,
user_id=user_id,
company_id=company_id,
event_metadata=event_metadata,
ip_address=ip_address,
user_agent=user_agent
)
db.session.add(event)
try:
db.session.commit()
except Exception as e:
db.session.rollback()
# Log to application logger if DB logging fails
import logging
logging.error(f"Failed to log system event: {e}")
@staticmethod
def get_recent_events(days=7, limit=100):
"""Get recent system events from the last N days"""
from datetime import datetime, timedelta
since = datetime.now() - timedelta(days=days)
return SystemEvent.query.filter(
SystemEvent.timestamp >= since
).order_by(SystemEvent.timestamp.desc()).limit(limit).all()
@staticmethod
def get_events_by_severity(severity, days=7, limit=50):
"""Get events by severity level"""
from datetime import datetime, timedelta
since = datetime.now() - timedelta(days=days)
return SystemEvent.query.filter(
SystemEvent.timestamp >= since,
SystemEvent.severity == severity
).order_by(SystemEvent.timestamp.desc()).limit(limit).all()
@staticmethod
def get_system_health_summary():
"""Get a summary of system health based on recent events"""
from datetime import datetime, timedelta
from sqlalchemy import func
now = datetime.now()
last_24h = now - timedelta(hours=24)
last_week = now - timedelta(days=7)
# Count events by severity in last 24h
recent_errors = SystemEvent.query.filter(
SystemEvent.timestamp >= last_24h,
SystemEvent.severity.in_(['error', 'critical'])
).count()
recent_warnings = SystemEvent.query.filter(
SystemEvent.timestamp >= last_24h,
SystemEvent.severity == 'warning'
).count()
# Count total events in last week
weekly_events = SystemEvent.query.filter(
SystemEvent.timestamp >= last_week
).count()
# Get most recent error
last_error = SystemEvent.query.filter(
SystemEvent.severity.in_(['error', 'critical'])
).order_by(SystemEvent.timestamp.desc()).first()
return {
'errors_24h': recent_errors,
'warnings_24h': recent_warnings,
'total_events_week': weekly_events,
'last_error': last_error,
'health_status': 'healthy' if recent_errors == 0 else 'issues' if recent_errors < 5 else 'critical'
}