Cleanup imports in app.py

This commit is contained in:
2025-07-06 22:54:17 +02:00
parent f4b8664fd5
commit 60020e32f6

134
app.py
View File

@@ -1,27 +1,59 @@
from flask import Flask, render_template, request, redirect, url_for, jsonify, flash, session, g, Response, send_file, abort
from models import db, TimeEntry, WorkConfig, User, SystemSettings, Team, Role, Project, Company, CompanyWorkConfig, CompanySettings, UserPreferences, WorkRegion, AccountType, ProjectCategory, Task, SubTask, TaskStatus, TaskPriority, TaskDependency, Sprint, SprintStatus, Announcement, SystemEvent, WidgetType, UserDashboard, DashboardWidget, WidgetTemplate, Comment, CommentVisibility, BrandingSettings, Note, NoteLink, NoteVisibility, NoteFolder
from data_formatting import (
format_duration, prepare_export_data, prepare_team_hours_export_data,
format_table_data, format_graph_data, format_team_data, format_burndown_data
)
from data_export import (
export_to_csv, export_to_excel, export_team_hours_to_csv, export_team_hours_to_excel,
export_analytics_csv, export_analytics_excel
)
from time_utils import apply_time_rounding, round_duration_to_interval, get_user_rounding_settings
import logging
from datetime import datetime, time, timedelta
import os
# Standard library imports
import base64
import csv
import io
import json
import logging
import os
import re
import pandas as pd
from sqlalchemy import func
import tempfile
import time as time_module
import uuid
import zipfile
from datetime import datetime, time, timedelta
from functools import wraps
from flask_mail import Mail, Message
from urllib.parse import unquote
# Third-party imports
import markdown
import pandas as pd
import qrcode
from dotenv import load_dotenv
from password_utils import PasswordValidator
from flask import (Flask, Response, abort, flash, g, jsonify, redirect,
render_template, request, send_file, session, url_for)
from flask_mail import Mail, Message
from sqlalchemy import and_, func, or_
from werkzeug.security import check_password_hash
from werkzeug.utils import secure_filename
# Local application imports
from data_export import (export_analytics_csv, export_analytics_excel,
export_team_hours_to_csv, export_team_hours_to_excel,
export_to_csv, export_to_excel)
from data_formatting import (format_burndown_data, format_duration,
format_graph_data, format_table_data,
format_team_data, prepare_export_data,
prepare_team_hours_export_data)
from frontmatter_utils import parse_frontmatter
from migrate_db import (get_db_path, migrate_data as migrate_data_from_db,
migrate_postgresql_schema, migrate_task_system,
migrate_to_company_model, migrate_work_config_data,
run_all_migrations)
from models import (AccountType, Announcement, BrandingSettings, Comment,
CommentVisibility, Company, CompanySettings,
CompanyWorkConfig, DashboardWidget, Note, NoteFolder,
NoteLink, NoteVisibility, Project, ProjectCategory, Role,
Sprint, SprintStatus, SubTask, SystemEvent, SystemSettings,
Task, TaskDependency, TaskPriority, TaskStatus, Team,
TimeEntry, User, UserDashboard, UserPreferences,
WidgetTemplate, WidgetType, WorkConfig, WorkRegion, db)
from password_utils import PasswordValidator
from time_utils import (apply_time_rounding, format_date_by_preference,
format_datetime_by_preference, format_duration_readable,
format_time_by_preference, format_time_short_by_preference,
get_available_date_formats, get_available_rounding_options,
get_user_format_settings, get_user_rounding_settings,
round_duration_to_interval)
# Load environment variables from .env file
load_dotenv()
@@ -74,7 +106,6 @@ def run_migrations():
# Run PostgreSQL-specific migrations
try:
from migrate_db import migrate_postgresql_schema
migrate_postgresql_schema()
except ImportError:
print("PostgreSQL migration function not available")
@@ -84,7 +115,6 @@ def run_migrations():
else:
print("Using SQLite - running SQLite migrations...")
try:
from migrate_db import run_all_migrations
run_all_migrations()
print("SQLite database migrations completed successfully!")
except ImportError as e:
@@ -100,10 +130,9 @@ def run_migrations():
db.create_all()
init_system_settings()
def migrate_to_company_model():
def migrate_to_company_model_wrapper():
"""Migrate existing data to support company model (stub - handled by migrate_db)"""
try:
from migrate_db import migrate_to_company_model, get_db_path
db_path = get_db_path()
migrate_to_company_model(db_path)
except ImportError:
@@ -137,18 +166,16 @@ def init_system_settings():
def migrate_data():
"""Handle data migrations and setup (stub - handled by migrate_db)"""
try:
from migrate_db import migrate_data
migrate_data()
migrate_data_from_db()
except ImportError:
print("migrate_db module not available - skipping data migration")
except Exception as e:
print(f"Error during data migration: {e}")
raise
def migrate_work_config_data():
def migrate_work_config_data_wrapper():
"""Migrate existing WorkConfig data to new architecture (stub - handled by migrate_db)"""
try:
from migrate_db import migrate_work_config_data, get_db_path
db_path = get_db_path()
migrate_work_config_data(db_path)
except ImportError:
@@ -157,10 +184,9 @@ def migrate_work_config_data():
print(f"Error during work config migration: {e}")
raise
def migrate_task_system():
def migrate_task_system_wrapper():
"""Create tables for the task management system (stub - handled by migrate_db)"""
try:
from migrate_db import migrate_task_system, get_db_path
db_path = get_db_path()
migrate_task_system(db_path)
except ImportError:
@@ -213,7 +239,6 @@ def from_json_filter(json_str):
if not json_str:
return []
try:
import json
return json.loads(json_str)
except (json.JSONDecodeError, TypeError):
return []
@@ -223,8 +248,6 @@ def format_date_filter(dt):
"""Format date according to user preferences."""
if not dt or not g.user:
return dt.strftime('%Y-%m-%d') if dt else ''
from time_utils import format_date_by_preference, get_user_format_settings
date_format, _ = get_user_format_settings(g.user)
return format_date_by_preference(dt, date_format)
@@ -233,8 +256,6 @@ def format_time_filter(dt):
"""Format time according to user preferences."""
if not dt or not g.user:
return dt.strftime('%H:%M:%S') if dt else ''
from time_utils import format_time_by_preference, get_user_format_settings
_, time_format_24h = get_user_format_settings(g.user)
return format_time_by_preference(dt, time_format_24h)
@@ -243,8 +264,6 @@ def format_time_short_filter(dt):
"""Format time without seconds according to user preferences."""
if not dt or not g.user:
return dt.strftime('%H:%M') if dt else ''
from time_utils import format_time_short_by_preference, get_user_format_settings
_, time_format_24h = get_user_format_settings(g.user)
return format_time_short_by_preference(dt, time_format_24h)
@@ -253,8 +272,6 @@ def format_datetime_filter(dt):
"""Format datetime according to user preferences."""
if not dt or not g.user:
return dt.strftime('%Y-%m-%d %H:%M:%S') if dt else ''
from time_utils import format_datetime_by_preference, get_user_format_settings
date_format, time_format_24h = get_user_format_settings(g.user)
return format_datetime_by_preference(dt, date_format, time_format_24h)
@@ -263,8 +280,6 @@ def format_duration_filter(duration_seconds):
"""Format duration in readable format."""
if duration_seconds is None:
return '00:00:00'
from time_utils import format_duration_readable
return format_duration_readable(duration_seconds)
# Authentication decorator
@@ -719,7 +734,6 @@ def register_freelancer():
company_name = business_name if business_name else f"{username}'s Workspace"
# Generate unique company slug
import re
slug = re.sub(r'[^\w\s-]', '', company_name.lower())
slug = re.sub(r'[-\s]+', '-', slug).strip('-')
@@ -813,7 +827,6 @@ def setup_company():
if error is None:
try:
# Generate company slug
import re
slug = re.sub(r'[^\w\s-]', '', company_name.lower())
slug = re.sub(r'[-\s]+', '-', slug).strip('-')
@@ -1337,7 +1350,6 @@ def update_avatar():
# Validate URL if provided
if avatar_url:
# Basic URL validation
import re
url_pattern = re.compile(
r'^https?://' # http:// or https://
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' # domain...
@@ -1383,9 +1395,6 @@ def update_avatar():
@login_required
def upload_avatar():
"""Handle avatar file upload"""
import os
from werkzeug.utils import secure_filename
import uuid
user = User.query.get(session['user_id'])
@@ -1491,9 +1500,6 @@ def setup_2fa():
db.session.commit()
# Generate QR code
import qrcode
import io
import base64
qr_uri = g.user.get_2fa_uri(issuer_name=g.branding.app_name)
qr = qrcode.QRCode(version=1, box_size=10, border=5)
@@ -1634,7 +1640,6 @@ def notes_list():
query = query.filter_by(visibility=NoteVisibility.COMPANY)
else: # 'all' - show all accessible notes
# Complex filter for visibility
from sqlalchemy import or_, and_
conditions = [
# User's own notes
Note.created_by_id == g.user.id,
@@ -1978,7 +1983,6 @@ def download_note(slug, format):
elif format == 'txt':
# Download as plain text
from frontmatter_utils import parse_frontmatter
metadata, body = parse_frontmatter(note.content)
# Create plain text version
@@ -2023,8 +2027,6 @@ def download_note(slug, format):
@company_required
def download_notes_bulk():
"""Download multiple notes as a zip file"""
import zipfile
import tempfile
note_ids = request.form.getlist('note_ids[]')
format = request.form.get('format', 'md')
@@ -2068,7 +2070,6 @@ def download_notes_bulk():
</html>"""
filename = f"{safe_filename}.html"
else: # txt
from frontmatter_utils import parse_frontmatter
metadata, body = parse_frontmatter(note.content)
content = f"{note.title}\n{'=' * len(note.title)}\n\n{body}"
filename = f"{safe_filename}.txt"
@@ -2089,7 +2090,6 @@ def download_notes_bulk():
finally:
# Clean up temp file after sending
import os
os.unlink(temp_file.name)
@@ -2098,11 +2098,8 @@ def download_notes_bulk():
@company_required
def download_folder(folder_path, format):
"""Download all notes in a folder as a zip file"""
import zipfile
import tempfile
# Decode folder path (replace URL encoding)
from urllib.parse import unquote
folder_path = unquote(folder_path)
# Get all notes in this folder
@@ -2156,7 +2153,6 @@ def download_folder(folder_path, format):
</html>"""
filename = f"{safe_filename}.html"
else: # txt
from frontmatter_utils import parse_frontmatter
metadata, body = parse_frontmatter(note.content)
# Remove markdown formatting
text_body = body
@@ -2454,7 +2450,6 @@ def api_create_folder():
return jsonify({'success': False, 'message': 'Folder name is required'}), 400
# Validate folder name (no special characters except dash and underscore)
import re
if not re.match(r'^[a-zA-Z0-9_\- ]+$', folder_name):
return jsonify({'success': False, 'message': 'Folder name can only contain letters, numbers, spaces, dashes, and underscores'}), 400
@@ -2511,7 +2506,6 @@ def api_rename_folder():
return jsonify({'success': False, 'message': 'Old path and new name are required'}), 400
# Validate folder name
import re
if not re.match(r'^[a-zA-Z0-9_\- ]+$', new_name):
return jsonify({'success': False, 'message': 'Folder name can only contain letters, numbers, spaces, dashes, and underscores'}), 400
@@ -2771,7 +2765,6 @@ def arrive():
db.session.commit()
# Format response with user preferences
from time_utils import format_datetime_by_preference, get_user_format_settings
date_format, time_format_24h = get_user_format_settings(g.user)
return jsonify({
@@ -2893,7 +2886,6 @@ def config():
company_config = CompanyWorkConfig.query.filter_by(company_id=g.user.company_id).first()
# Import time utils for display options
from time_utils import get_available_rounding_options, get_available_date_formats
rounding_options = get_available_rounding_options()
date_format_options = get_available_date_formats()
@@ -3225,7 +3217,6 @@ def system_admin_dashboard():
regular_admins = User.query.filter_by(role=Role.ADMIN).count()
# Recent activity (last 7 days)
from datetime import datetime, timedelta
week_ago = datetime.now() - timedelta(days=7)
recent_users = User.query.filter(User.created_at >= week_ago).count()
@@ -3480,7 +3471,6 @@ def system_admin_company_detail(company_id):
projects = Project.query.filter_by(company_id=company.id).all()
# Recent activity
from datetime import datetime, timedelta
week_ago = datetime.now() - timedelta(days=7)
recent_time_entries = TimeEntry.query.join(User).filter(
User.company_id == company.id,
@@ -3652,7 +3642,6 @@ def system_admin_branding():
os.makedirs(upload_dir, exist_ok=True)
# Save the file with a timestamp to avoid conflicts
import time
filename = f"logo_{int(time.time())}_{logo_file.filename}"
logo_path = os.path.join(upload_dir, filename)
logo_file.save(logo_path)
@@ -3667,7 +3656,6 @@ def system_admin_branding():
os.makedirs(upload_dir, exist_ok=True)
# Save the file with a timestamp to avoid conflicts
import time
filename = f"favicon_{int(time.time())}_{favicon_file.filename}"
favicon_path = os.path.join(upload_dir, filename)
favicon_file.save(favicon_path)
@@ -3699,7 +3687,6 @@ def system_admin_health():
warnings = SystemEvent.get_events_by_severity('warning', days=7, limit=20)
# System metrics
from datetime import datetime, timedelta
now = datetime.now()
# Database connection test
@@ -3804,11 +3791,9 @@ def system_admin_announcement_new():
selected_companies = request.form.getlist('target_companies')
if selected_roles:
import json
target_roles = json.dumps(selected_roles)
if selected_companies:
import json
target_companies = json.dumps([int(c) for c in selected_companies])
announcement = Announcement(
@@ -3882,13 +3867,11 @@ def system_admin_announcement_edit(id):
selected_companies = request.form.getlist('target_companies')
if selected_roles:
import json
announcement.target_roles = json.dumps(selected_roles)
else:
announcement.target_roles = None
if selected_companies:
import json
announcement.target_companies = json.dumps([int(c) for c in selected_companies])
else:
announcement.target_companies = None
@@ -4773,7 +4756,6 @@ def api_company_teams(company_id):
@system_admin_required
def api_system_admin_stats():
"""API: Get real-time system statistics for dashboard"""
from datetime import datetime, timedelta
# Get basic counts
total_companies = Company.query.count()
@@ -4893,7 +4875,6 @@ def api_company_stats(company_id):
active_projects = Project.query.filter_by(company_id=company.id, is_active=True).count()
# Time entries statistics
from datetime import datetime, timedelta
week_ago = datetime.now() - timedelta(days=7)
month_ago = datetime.now() - timedelta(days=30)
@@ -6432,7 +6413,6 @@ def get_dashboard():
# Parse config JSON
try:
import json
config = json.loads(widget.config) if widget.config else {}
except (json.JSONDecodeError, TypeError):
config = {}
@@ -6528,7 +6508,6 @@ def create_or_update_widget():
# Store config as JSON string
if config:
import json
widget.config = json.dumps(config)
else:
widget.config = None
@@ -6563,7 +6542,6 @@ def create_or_update_widget():
# Parse config for response
try:
import json
config_dict = json.loads(widget.config) if widget.config else {}
except (json.JSONDecodeError, TypeError):
config_dict = {}
@@ -6681,7 +6659,6 @@ def get_widget_data(widget_id):
widget_data = {}
if widget.widget_type == WidgetType.DAILY_SUMMARY:
from datetime import datetime, timedelta
config = widget.config or {}
period = config.get('summary_period', 'daily')
@@ -6794,7 +6771,6 @@ def get_widget_data(widget_id):
} for t in tasks]
elif widget.widget_type == WidgetType.WEEKLY_CHART:
from datetime import datetime, timedelta
# Get weekly data for chart
now = datetime.now()
@@ -6886,7 +6862,6 @@ def get_widget_data(widget_id):
widget_data['project_progress'] = project_progress
elif widget.widget_type == WidgetType.PRODUCTIVITY_METRICS:
from datetime import datetime, timedelta
# Calculate productivity metrics
now = datetime.now()
@@ -7089,7 +7064,6 @@ def render_markdown():
return jsonify({'html': ''})
# Import markdown here to avoid issues if not installed
import markdown
html = markdown.markdown(content, extensions=['extra', 'codehilite', 'toc'])
return jsonify({'html': html})