From 60020e32f6f83aeaa6ca458d2a027694e86e42d5 Mon Sep 17 00:00:00 2001 From: Jens Luedicke Date: Sun, 6 Jul 2025 22:54:17 +0200 Subject: [PATCH] Cleanup imports in app.py --- app.py | 134 +++++++++++++++++++++++---------------------------------- 1 file changed, 54 insertions(+), 80 deletions(-) diff --git a/app.py b/app.py index 36fcf31..4124c5a 100644 --- a/app.py +++ b/app.py @@ -1,27 +1,59 @@ -from flask import Flask, render_template, request, redirect, url_for, jsonify, flash, session, g, Response, send_file, abort -from models import db, TimeEntry, WorkConfig, User, SystemSettings, Team, Role, Project, Company, CompanyWorkConfig, CompanySettings, UserPreferences, WorkRegion, AccountType, ProjectCategory, Task, SubTask, TaskStatus, TaskPriority, TaskDependency, Sprint, SprintStatus, Announcement, SystemEvent, WidgetType, UserDashboard, DashboardWidget, WidgetTemplate, Comment, CommentVisibility, BrandingSettings, Note, NoteLink, NoteVisibility, NoteFolder -from data_formatting import ( - format_duration, prepare_export_data, prepare_team_hours_export_data, - format_table_data, format_graph_data, format_team_data, format_burndown_data -) -from data_export import ( - export_to_csv, export_to_excel, export_team_hours_to_csv, export_team_hours_to_excel, - export_analytics_csv, export_analytics_excel -) -from time_utils import apply_time_rounding, round_duration_to_interval, get_user_rounding_settings -import logging -from datetime import datetime, time, timedelta -import os +# Standard library imports +import base64 import csv import io +import json +import logging +import os import re -import pandas as pd -from sqlalchemy import func +import tempfile +import time as time_module +import uuid +import zipfile +from datetime import datetime, time, timedelta from functools import wraps -from flask_mail import Mail, Message +from urllib.parse import unquote + +# Third-party imports +import markdown +import pandas as pd +import qrcode from dotenv import load_dotenv -from password_utils import PasswordValidator +from flask import (Flask, Response, abort, flash, g, jsonify, redirect, + render_template, request, send_file, session, url_for) +from flask_mail import Mail, Message +from sqlalchemy import and_, func, or_ from werkzeug.security import check_password_hash +from werkzeug.utils import secure_filename + +# Local application imports +from data_export import (export_analytics_csv, export_analytics_excel, + export_team_hours_to_csv, export_team_hours_to_excel, + export_to_csv, export_to_excel) +from data_formatting import (format_burndown_data, format_duration, + format_graph_data, format_table_data, + format_team_data, prepare_export_data, + prepare_team_hours_export_data) +from frontmatter_utils import parse_frontmatter +from migrate_db import (get_db_path, migrate_data as migrate_data_from_db, + migrate_postgresql_schema, migrate_task_system, + migrate_to_company_model, migrate_work_config_data, + run_all_migrations) +from models import (AccountType, Announcement, BrandingSettings, Comment, + CommentVisibility, Company, CompanySettings, + CompanyWorkConfig, DashboardWidget, Note, NoteFolder, + NoteLink, NoteVisibility, Project, ProjectCategory, Role, + Sprint, SprintStatus, SubTask, SystemEvent, SystemSettings, + Task, TaskDependency, TaskPriority, TaskStatus, Team, + TimeEntry, User, UserDashboard, UserPreferences, + WidgetTemplate, WidgetType, WorkConfig, WorkRegion, db) +from password_utils import PasswordValidator +from time_utils import (apply_time_rounding, format_date_by_preference, + format_datetime_by_preference, format_duration_readable, + format_time_by_preference, format_time_short_by_preference, + get_available_date_formats, get_available_rounding_options, + get_user_format_settings, get_user_rounding_settings, + round_duration_to_interval) # Load environment variables from .env file load_dotenv() @@ -74,7 +106,6 @@ def run_migrations(): # Run PostgreSQL-specific migrations try: - from migrate_db import migrate_postgresql_schema migrate_postgresql_schema() except ImportError: print("PostgreSQL migration function not available") @@ -84,7 +115,6 @@ def run_migrations(): else: print("Using SQLite - running SQLite migrations...") try: - from migrate_db import run_all_migrations run_all_migrations() print("SQLite database migrations completed successfully!") except ImportError as e: @@ -100,10 +130,9 @@ def run_migrations(): db.create_all() init_system_settings() -def migrate_to_company_model(): +def migrate_to_company_model_wrapper(): """Migrate existing data to support company model (stub - handled by migrate_db)""" try: - from migrate_db import migrate_to_company_model, get_db_path db_path = get_db_path() migrate_to_company_model(db_path) except ImportError: @@ -137,18 +166,16 @@ def init_system_settings(): def migrate_data(): """Handle data migrations and setup (stub - handled by migrate_db)""" try: - from migrate_db import migrate_data - migrate_data() + migrate_data_from_db() except ImportError: print("migrate_db module not available - skipping data migration") except Exception as e: print(f"Error during data migration: {e}") raise -def migrate_work_config_data(): +def migrate_work_config_data_wrapper(): """Migrate existing WorkConfig data to new architecture (stub - handled by migrate_db)""" try: - from migrate_db import migrate_work_config_data, get_db_path db_path = get_db_path() migrate_work_config_data(db_path) except ImportError: @@ -157,10 +184,9 @@ def migrate_work_config_data(): print(f"Error during work config migration: {e}") raise -def migrate_task_system(): +def migrate_task_system_wrapper(): """Create tables for the task management system (stub - handled by migrate_db)""" try: - from migrate_db import migrate_task_system, get_db_path db_path = get_db_path() migrate_task_system(db_path) except ImportError: @@ -213,7 +239,6 @@ def from_json_filter(json_str): if not json_str: return [] try: - import json return json.loads(json_str) except (json.JSONDecodeError, TypeError): return [] @@ -223,8 +248,6 @@ def format_date_filter(dt): """Format date according to user preferences.""" if not dt or not g.user: return dt.strftime('%Y-%m-%d') if dt else '' - - from time_utils import format_date_by_preference, get_user_format_settings date_format, _ = get_user_format_settings(g.user) return format_date_by_preference(dt, date_format) @@ -233,8 +256,6 @@ def format_time_filter(dt): """Format time according to user preferences.""" if not dt or not g.user: return dt.strftime('%H:%M:%S') if dt else '' - - from time_utils import format_time_by_preference, get_user_format_settings _, time_format_24h = get_user_format_settings(g.user) return format_time_by_preference(dt, time_format_24h) @@ -243,8 +264,6 @@ def format_time_short_filter(dt): """Format time without seconds according to user preferences.""" if not dt or not g.user: return dt.strftime('%H:%M') if dt else '' - - from time_utils import format_time_short_by_preference, get_user_format_settings _, time_format_24h = get_user_format_settings(g.user) return format_time_short_by_preference(dt, time_format_24h) @@ -253,8 +272,6 @@ def format_datetime_filter(dt): """Format datetime according to user preferences.""" if not dt or not g.user: return dt.strftime('%Y-%m-%d %H:%M:%S') if dt else '' - - from time_utils import format_datetime_by_preference, get_user_format_settings date_format, time_format_24h = get_user_format_settings(g.user) return format_datetime_by_preference(dt, date_format, time_format_24h) @@ -263,8 +280,6 @@ def format_duration_filter(duration_seconds): """Format duration in readable format.""" if duration_seconds is None: return '00:00:00' - - from time_utils import format_duration_readable return format_duration_readable(duration_seconds) # Authentication decorator @@ -719,7 +734,6 @@ def register_freelancer(): company_name = business_name if business_name else f"{username}'s Workspace" # Generate unique company slug - import re slug = re.sub(r'[^\w\s-]', '', company_name.lower()) slug = re.sub(r'[-\s]+', '-', slug).strip('-') @@ -813,7 +827,6 @@ def setup_company(): if error is None: try: # Generate company slug - import re slug = re.sub(r'[^\w\s-]', '', company_name.lower()) slug = re.sub(r'[-\s]+', '-', slug).strip('-') @@ -1337,7 +1350,6 @@ def update_avatar(): # Validate URL if provided if avatar_url: # Basic URL validation - import re url_pattern = re.compile( r'^https?://' # http:// or https:// r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' # domain... @@ -1383,9 +1395,6 @@ def update_avatar(): @login_required def upload_avatar(): """Handle avatar file upload""" - import os - from werkzeug.utils import secure_filename - import uuid user = User.query.get(session['user_id']) @@ -1491,9 +1500,6 @@ def setup_2fa(): db.session.commit() # Generate QR code - import qrcode - import io - import base64 qr_uri = g.user.get_2fa_uri(issuer_name=g.branding.app_name) qr = qrcode.QRCode(version=1, box_size=10, border=5) @@ -1634,7 +1640,6 @@ def notes_list(): query = query.filter_by(visibility=NoteVisibility.COMPANY) else: # 'all' - show all accessible notes # Complex filter for visibility - from sqlalchemy import or_, and_ conditions = [ # User's own notes Note.created_by_id == g.user.id, @@ -1978,7 +1983,6 @@ def download_note(slug, format): elif format == 'txt': # Download as plain text - from frontmatter_utils import parse_frontmatter metadata, body = parse_frontmatter(note.content) # Create plain text version @@ -2023,8 +2027,6 @@ def download_note(slug, format): @company_required def download_notes_bulk(): """Download multiple notes as a zip file""" - import zipfile - import tempfile note_ids = request.form.getlist('note_ids[]') format = request.form.get('format', 'md') @@ -2068,7 +2070,6 @@ def download_notes_bulk(): """ filename = f"{safe_filename}.html" else: # txt - from frontmatter_utils import parse_frontmatter metadata, body = parse_frontmatter(note.content) content = f"{note.title}\n{'=' * len(note.title)}\n\n{body}" filename = f"{safe_filename}.txt" @@ -2089,7 +2090,6 @@ def download_notes_bulk(): finally: # Clean up temp file after sending - import os os.unlink(temp_file.name) @@ -2098,11 +2098,8 @@ def download_notes_bulk(): @company_required def download_folder(folder_path, format): """Download all notes in a folder as a zip file""" - import zipfile - import tempfile # Decode folder path (replace URL encoding) - from urllib.parse import unquote folder_path = unquote(folder_path) # Get all notes in this folder @@ -2156,7 +2153,6 @@ def download_folder(folder_path, format): """ filename = f"{safe_filename}.html" else: # txt - from frontmatter_utils import parse_frontmatter metadata, body = parse_frontmatter(note.content) # Remove markdown formatting text_body = body @@ -2454,7 +2450,6 @@ def api_create_folder(): return jsonify({'success': False, 'message': 'Folder name is required'}), 400 # Validate folder name (no special characters except dash and underscore) - import re if not re.match(r'^[a-zA-Z0-9_\- ]+$', folder_name): return jsonify({'success': False, 'message': 'Folder name can only contain letters, numbers, spaces, dashes, and underscores'}), 400 @@ -2511,7 +2506,6 @@ def api_rename_folder(): return jsonify({'success': False, 'message': 'Old path and new name are required'}), 400 # Validate folder name - import re if not re.match(r'^[a-zA-Z0-9_\- ]+$', new_name): return jsonify({'success': False, 'message': 'Folder name can only contain letters, numbers, spaces, dashes, and underscores'}), 400 @@ -2771,7 +2765,6 @@ def arrive(): db.session.commit() # Format response with user preferences - from time_utils import format_datetime_by_preference, get_user_format_settings date_format, time_format_24h = get_user_format_settings(g.user) return jsonify({ @@ -2893,7 +2886,6 @@ def config(): company_config = CompanyWorkConfig.query.filter_by(company_id=g.user.company_id).first() # Import time utils for display options - from time_utils import get_available_rounding_options, get_available_date_formats rounding_options = get_available_rounding_options() date_format_options = get_available_date_formats() @@ -3225,7 +3217,6 @@ def system_admin_dashboard(): regular_admins = User.query.filter_by(role=Role.ADMIN).count() # Recent activity (last 7 days) - from datetime import datetime, timedelta week_ago = datetime.now() - timedelta(days=7) recent_users = User.query.filter(User.created_at >= week_ago).count() @@ -3480,7 +3471,6 @@ def system_admin_company_detail(company_id): projects = Project.query.filter_by(company_id=company.id).all() # Recent activity - from datetime import datetime, timedelta week_ago = datetime.now() - timedelta(days=7) recent_time_entries = TimeEntry.query.join(User).filter( User.company_id == company.id, @@ -3652,7 +3642,6 @@ def system_admin_branding(): os.makedirs(upload_dir, exist_ok=True) # Save the file with a timestamp to avoid conflicts - import time filename = f"logo_{int(time.time())}_{logo_file.filename}" logo_path = os.path.join(upload_dir, filename) logo_file.save(logo_path) @@ -3667,7 +3656,6 @@ def system_admin_branding(): os.makedirs(upload_dir, exist_ok=True) # Save the file with a timestamp to avoid conflicts - import time filename = f"favicon_{int(time.time())}_{favicon_file.filename}" favicon_path = os.path.join(upload_dir, filename) favicon_file.save(favicon_path) @@ -3699,7 +3687,6 @@ def system_admin_health(): warnings = SystemEvent.get_events_by_severity('warning', days=7, limit=20) # System metrics - from datetime import datetime, timedelta now = datetime.now() # Database connection test @@ -3804,11 +3791,9 @@ def system_admin_announcement_new(): selected_companies = request.form.getlist('target_companies') if selected_roles: - import json target_roles = json.dumps(selected_roles) if selected_companies: - import json target_companies = json.dumps([int(c) for c in selected_companies]) announcement = Announcement( @@ -3882,13 +3867,11 @@ def system_admin_announcement_edit(id): selected_companies = request.form.getlist('target_companies') if selected_roles: - import json announcement.target_roles = json.dumps(selected_roles) else: announcement.target_roles = None if selected_companies: - import json announcement.target_companies = json.dumps([int(c) for c in selected_companies]) else: announcement.target_companies = None @@ -4773,7 +4756,6 @@ def api_company_teams(company_id): @system_admin_required def api_system_admin_stats(): """API: Get real-time system statistics for dashboard""" - from datetime import datetime, timedelta # Get basic counts total_companies = Company.query.count() @@ -4893,7 +4875,6 @@ def api_company_stats(company_id): active_projects = Project.query.filter_by(company_id=company.id, is_active=True).count() # Time entries statistics - from datetime import datetime, timedelta week_ago = datetime.now() - timedelta(days=7) month_ago = datetime.now() - timedelta(days=30) @@ -6432,7 +6413,6 @@ def get_dashboard(): # Parse config JSON try: - import json config = json.loads(widget.config) if widget.config else {} except (json.JSONDecodeError, TypeError): config = {} @@ -6528,7 +6508,6 @@ def create_or_update_widget(): # Store config as JSON string if config: - import json widget.config = json.dumps(config) else: widget.config = None @@ -6563,7 +6542,6 @@ def create_or_update_widget(): # Parse config for response try: - import json config_dict = json.loads(widget.config) if widget.config else {} except (json.JSONDecodeError, TypeError): config_dict = {} @@ -6681,7 +6659,6 @@ def get_widget_data(widget_id): widget_data = {} if widget.widget_type == WidgetType.DAILY_SUMMARY: - from datetime import datetime, timedelta config = widget.config or {} period = config.get('summary_period', 'daily') @@ -6794,7 +6771,6 @@ def get_widget_data(widget_id): } for t in tasks] elif widget.widget_type == WidgetType.WEEKLY_CHART: - from datetime import datetime, timedelta # Get weekly data for chart now = datetime.now() @@ -6886,7 +6862,6 @@ def get_widget_data(widget_id): widget_data['project_progress'] = project_progress elif widget.widget_type == WidgetType.PRODUCTIVITY_METRICS: - from datetime import datetime, timedelta # Calculate productivity metrics now = datetime.now() @@ -7089,7 +7064,6 @@ def render_markdown(): return jsonify({'html': ''}) # Import markdown here to avoid issues if not installed - import markdown html = markdown.markdown(content, extensions=['extra', 'codehilite', 'toc']) return jsonify({'html': html})