Files
TimeTrack/app.py

5021 lines
188 KiB
Python

from flask import Flask, render_template, request, redirect, url_for, jsonify, flash, session, g, Response, send_file
from models import db, TimeEntry, WorkConfig, User, SystemSettings, Team, Role, Project, Company, CompanyWorkConfig, UserPreferences, WorkRegion, AccountType, ProjectCategory, Task, SubTask, TaskStatus, TaskPriority, Announcement, SystemEvent, KanbanBoard, KanbanColumn, KanbanCard, WidgetType, UserDashboard, DashboardWidget, WidgetTemplate
from data_formatting import (
format_duration, prepare_export_data, prepare_team_hours_export_data,
format_table_data, format_graph_data, format_team_data
)
from data_export import (
export_to_csv, export_to_excel, export_team_hours_to_csv, export_team_hours_to_excel,
export_analytics_csv, export_analytics_excel
)
from time_utils import apply_time_rounding, round_duration_to_interval, get_user_rounding_settings
import logging
from datetime import datetime, time, timedelta
import os
import csv
import io
import pandas as pd
from sqlalchemy import func
from functools import wraps
from flask_mail import Mail, Message
from dotenv import load_dotenv
from werkzeug.security import check_password_hash
# Load environment variables from .env file
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = os.environ.get('DATABASE_URL', 'sqlite:////data/timetrack.db')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'dev_key_for_timetrack')
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=7) # Session lasts for 7 days
# Configure Flask-Mail
app.config['MAIL_SERVER'] = os.environ.get('MAIL_SERVER', 'smtp.example.com')
app.config['MAIL_PORT'] = int(os.environ.get('MAIL_PORT') or 587)
app.config['MAIL_USE_TLS'] = os.environ.get('MAIL_USE_TLS', 'true').lower() in ['true', 'on', '1']
app.config['MAIL_USERNAME'] = os.environ.get('MAIL_USERNAME', 'your-email@example.com')
app.config['MAIL_PASSWORD'] = os.environ.get('MAIL_PASSWORD', 'your-password')
app.config['MAIL_DEFAULT_SENDER'] = os.environ.get('MAIL_DEFAULT_SENDER', 'TimeTrack <noreply@timetrack.com>')
# Log mail configuration (without password)
logger.info(f"Mail server: {app.config['MAIL_SERVER']}")
logger.info(f"Mail port: {app.config['MAIL_PORT']}")
logger.info(f"Mail use TLS: {app.config['MAIL_USE_TLS']}")
logger.info(f"Mail username: {app.config['MAIL_USERNAME']}")
logger.info(f"Mail default sender: {app.config['MAIL_DEFAULT_SENDER']}")
mail = Mail(app)
# Initialize the database with the app
db.init_app(app)
# Consolidated migration using migrate_db module
def run_migrations():
"""Run all database migrations using the consolidated migrate_db module."""
# Check if we're using PostgreSQL or SQLite
database_url = app.config['SQLALCHEMY_DATABASE_URI']
print(f"DEBUG: Database URL: {database_url}")
is_postgresql = 'postgresql://' in database_url or 'postgres://' in database_url
print(f"DEBUG: Is PostgreSQL: {is_postgresql}")
if is_postgresql:
print("Using PostgreSQL - skipping SQLite migrations, ensuring tables exist...")
with app.app_context():
db.create_all()
init_system_settings()
print("PostgreSQL setup completed successfully!")
else:
print("Using SQLite - running SQLite migrations...")
try:
from migrate_db import run_all_migrations
run_all_migrations()
print("SQLite database migrations completed successfully!")
except ImportError as e:
print(f"Error importing migrate_db: {e}")
print("Falling back to basic table creation...")
with app.app_context():
db.create_all()
init_system_settings()
except Exception as e:
print(f"Error during SQLite migration: {e}")
print("Falling back to basic table creation...")
with app.app_context():
db.create_all()
init_system_settings()
def migrate_to_company_model():
"""Migrate existing data to support company model (stub - handled by migrate_db)"""
try:
from migrate_db import migrate_to_company_model, get_db_path
db_path = get_db_path()
migrate_to_company_model(db_path)
except ImportError:
print("migrate_db module not available - skipping company model migration")
except Exception as e:
print(f"Error during company migration: {e}")
raise
def init_system_settings():
"""Initialize system settings with default values if they don't exist"""
if not SystemSettings.query.filter_by(key='registration_enabled').first():
print("Adding registration_enabled system setting...")
reg_setting = SystemSettings(
key='registration_enabled',
value='true',
description='Controls whether new user registration is allowed'
)
db.session.add(reg_setting)
db.session.commit()
if not SystemSettings.query.filter_by(key='email_verification_required').first():
print("Adding email_verification_required system setting...")
email_setting = SystemSettings(
key='email_verification_required',
value='true',
description='Controls whether email verification is required for new user accounts'
)
db.session.add(email_setting)
db.session.commit()
def migrate_data():
"""Handle data migrations and setup (stub - handled by migrate_db)"""
try:
from migrate_db import migrate_data
migrate_data()
except ImportError:
print("migrate_db module not available - skipping data migration")
except Exception as e:
print(f"Error during data migration: {e}")
raise
def migrate_work_config_data():
"""Migrate existing WorkConfig data to new architecture (stub - handled by migrate_db)"""
try:
from migrate_db import migrate_work_config_data, get_db_path
db_path = get_db_path()
migrate_work_config_data(db_path)
except ImportError:
print("migrate_db module not available - skipping work config data migration")
except Exception as e:
print(f"Error during work config migration: {e}")
raise
def migrate_task_system():
"""Create tables for the task management system (stub - handled by migrate_db)"""
try:
from migrate_db import migrate_task_system, get_db_path
db_path = get_db_path()
migrate_task_system(db_path)
except ImportError:
print("migrate_db module not available - skipping task system migration")
except Exception as e:
print(f"Error during task system migration: {e}")
raise
# Call this function during app initialization
@app.before_first_request
def initialize_app():
run_migrations() # This handles all migrations including work config data
# Add this after initializing the app but before defining routes
@app.context_processor
def inject_globals():
"""Make certain variables available to all templates."""
# Get active announcements for current user
active_announcements = []
if g.user:
active_announcements = Announcement.get_active_announcements_for_user(g.user)
# Get tracking script settings
tracking_script_enabled = False
tracking_script_code = ''
try:
tracking_enabled_setting = SystemSettings.query.filter_by(key='tracking_script_enabled').first()
if tracking_enabled_setting:
tracking_script_enabled = tracking_enabled_setting.value == 'true'
tracking_code_setting = SystemSettings.query.filter_by(key='tracking_script_code').first()
if tracking_code_setting:
tracking_script_code = tracking_code_setting.value
except Exception:
pass # In case database isn't available yet
return {
'Role': Role,
'AccountType': AccountType,
'current_year': datetime.now().year,
'active_announcements': active_announcements,
'tracking_script_enabled': tracking_script_enabled,
'tracking_script_code': tracking_script_code
}
# Template filters for date/time formatting
@app.template_filter('from_json')
def from_json_filter(json_str):
"""Parse JSON string to Python object."""
if not json_str:
return []
try:
import json
return json.loads(json_str)
except (json.JSONDecodeError, TypeError):
return []
@app.template_filter('format_date')
def format_date_filter(dt):
"""Format date according to user preferences."""
if not dt or not g.user:
return dt.strftime('%Y-%m-%d') if dt else ''
from time_utils import format_date_by_preference, get_user_format_settings
date_format, _ = get_user_format_settings(g.user)
return format_date_by_preference(dt, date_format)
@app.template_filter('format_time')
def format_time_filter(dt):
"""Format time according to user preferences."""
if not dt or not g.user:
return dt.strftime('%H:%M:%S') if dt else ''
from time_utils import format_time_by_preference, get_user_format_settings
_, time_format_24h = get_user_format_settings(g.user)
return format_time_by_preference(dt, time_format_24h)
@app.template_filter('format_time_short')
def format_time_short_filter(dt):
"""Format time without seconds according to user preferences."""
if not dt or not g.user:
return dt.strftime('%H:%M') if dt else ''
from time_utils import format_time_short_by_preference, get_user_format_settings
_, time_format_24h = get_user_format_settings(g.user)
return format_time_short_by_preference(dt, time_format_24h)
@app.template_filter('format_datetime')
def format_datetime_filter(dt):
"""Format datetime according to user preferences."""
if not dt or not g.user:
return dt.strftime('%Y-%m-%d %H:%M:%S') if dt else ''
from time_utils import format_datetime_by_preference, get_user_format_settings
date_format, time_format_24h = get_user_format_settings(g.user)
return format_datetime_by_preference(dt, date_format, time_format_24h)
@app.template_filter('format_duration')
def format_duration_filter(duration_seconds):
"""Format duration in readable format."""
if duration_seconds is None:
return '00:00:00'
from time_utils import format_duration_readable
return format_duration_readable(duration_seconds)
# Authentication decorator
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if g.user is None:
return redirect(url_for('login', next=request.url))
return f(*args, **kwargs)
return decorated_function
# Admin-only decorator
def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if g.user is None or (g.user.role != Role.ADMIN and g.user.role != Role.SYSTEM_ADMIN):
flash('You need administrator privileges to access this page.', 'error')
return redirect(url_for('home'))
return f(*args, **kwargs)
return decorated_function
# System Admin-only decorator
def system_admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if g.user is None or g.user.role != Role.SYSTEM_ADMIN:
flash('You need system administrator privileges to access this page.', 'error')
return redirect(url_for('home'))
return f(*args, **kwargs)
return decorated_function
def get_system_setting(key, default='false'):
"""Helper function to get system setting value"""
setting = SystemSettings.query.filter_by(key=key).first()
return setting.value if setting else default
def is_system_admin(user=None):
"""Helper function to check if user is system admin"""
if user is None:
user = g.user
return user and user.role == Role.SYSTEM_ADMIN
def can_access_system_settings(user=None):
"""Helper function to check if user can access system-wide settings"""
return is_system_admin(user)
def get_available_roles():
"""Get roles available for assignment, excluding SYSTEM_ADMIN unless one already exists"""
roles = list(Role)
# Only show SYSTEM_ADMIN role if at least one system admin already exists
# This prevents accidental creation of system admins
system_admin_exists = User.query.filter_by(role=Role.SYSTEM_ADMIN).count() > 0
if not system_admin_exists:
roles = [role for role in roles if role != Role.SYSTEM_ADMIN]
return roles
# Add this decorator function after your existing decorators
def role_required(min_role):
"""
Decorator to restrict access based on user role.
min_role should be a Role enum value (e.g., Role.TEAM_LEADER)
"""
def role_decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if g.user is None:
return redirect(url_for('login', next=request.url))
# Admin and System Admin always have access
if g.user.role == Role.ADMIN or g.user.role == Role.SYSTEM_ADMIN:
return f(*args, **kwargs)
# Check role hierarchy
role_hierarchy = {
Role.TEAM_MEMBER: 1,
Role.TEAM_LEADER: 2,
Role.SUPERVISOR: 3,
Role.ADMIN: 4,
Role.SYSTEM_ADMIN: 5
}
if role_hierarchy.get(g.user.role, 0) < role_hierarchy.get(min_role, 0):
flash('You do not have sufficient permissions to access this page.', 'error')
return redirect(url_for('home'))
return f(*args, **kwargs)
return decorated_function
return role_decorator
def company_required(f):
"""
Decorator to ensure user has a valid company association and set company context.
"""
@wraps(f)
def decorated_function(*args, **kwargs):
if g.user is None:
return redirect(url_for('login', next=request.url))
# System admins can access without company association
if g.user.role == Role.SYSTEM_ADMIN:
return f(*args, **kwargs)
if g.user.company_id is None:
flash('You must be associated with a company to access this page.', 'error')
return redirect(url_for('setup_company'))
# Set company context
g.company = Company.query.get(g.user.company_id)
if not g.company or not g.company.is_active:
flash('Your company is not active. Please contact support.', 'error')
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated_function
@app.before_request
def load_logged_in_user():
user_id = session.get('user_id')
if user_id is None:
g.user = None
g.company = None
else:
g.user = User.query.get(user_id)
if g.user:
# Set company context
if g.user.company_id:
g.company = Company.query.get(g.user.company_id)
else:
g.company = None
# Check if user is verified
if not g.user.is_verified and request.endpoint not in ['verify_email', 'static', 'logout', 'setup_company']:
# Allow unverified users to access only verification and static resources
if request.endpoint not in ['login', 'register']:
flash('Please verify your email address before accessing this page.', 'warning')
session.clear()
return redirect(url_for('login'))
else:
g.company = None
@app.route('/')
def home():
if g.user:
# Get active entry (no departure time)
active_entry = TimeEntry.query.filter_by(
user_id=g.user.id,
departure_time=None
).first()
# Get today's completed entries for history
today = datetime.now().date()
history = TimeEntry.query.filter(
TimeEntry.user_id == g.user.id,
TimeEntry.departure_time.isnot(None),
TimeEntry.arrival_time >= datetime.combine(today, time.min),
TimeEntry.arrival_time <= datetime.combine(today, time.max)
).order_by(TimeEntry.arrival_time.desc()).all()
# Get available projects for this user (company-scoped)
available_projects = []
if g.user.company_id:
all_projects = Project.query.filter_by(
company_id=g.user.company_id,
is_active=True
).all()
for project in all_projects:
if project.is_user_allowed(g.user):
available_projects.append(project)
return render_template('index.html', title='Home',
active_entry=active_entry,
history=history,
available_projects=available_projects)
else:
return render_template('about.html', title='Home')
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
user = User.query.filter_by(username=username).first()
if user:
# Fix role if it's a string or None
if isinstance(user.role, str) or user.role is None:
# Map string role values to enum values
role_mapping = {
'Team Member': Role.TEAM_MEMBER,
'TEAM_MEMBER': Role.TEAM_MEMBER,
'Team Leader': Role.TEAM_LEADER,
'TEAM_LEADER': Role.TEAM_LEADER,
'Supervisor': Role.SUPERVISOR,
'SUPERVISOR': Role.SUPERVISOR,
'Administrator': Role.ADMIN,
'ADMIN': Role.ADMIN
}
if isinstance(user.role, str):
user.role = role_mapping.get(user.role, Role.TEAM_MEMBER)
else:
user.role = Role.ADMIN if user.role == Role.ADMIN else Role.TEAM_MEMBER
db.session.commit()
# Now proceed with password check
if user.check_password(password):
# Check if user is blocked
if user.is_blocked:
flash('Your account has been disabled. Please contact an administrator.', 'error')
return render_template('login.html')
# Check if 2FA is enabled
if user.two_factor_enabled:
# Store user ID for 2FA verification
session['2fa_user_id'] = user.id
return redirect(url_for('verify_2fa'))
else:
# Continue with normal login process
session['user_id'] = user.id
session['username'] = user.username
session['role'] = user.role.value
# Log successful login
SystemEvent.log_event(
'user_login',
f'User {user.username} logged in successfully',
'auth',
'info',
user_id=user.id,
company_id=user.company_id,
ip_address=request.remote_addr,
user_agent=request.headers.get('User-Agent')
)
flash('Login successful!', 'success')
return redirect(url_for('home'))
# Log failed login attempt
SystemEvent.log_event(
'login_failed',
f'Failed login attempt for username: {username}',
'auth',
'warning',
ip_address=request.remote_addr,
user_agent=request.headers.get('User-Agent')
)
flash('Invalid username or password', 'error')
return render_template('login.html', title='Login')
@app.route('/logout')
def logout():
# Log logout event before clearing session
if 'user_id' in session:
user = User.query.get(session['user_id'])
if user:
SystemEvent.log_event(
'user_logout',
f'User {user.username} logged out',
'auth',
'info',
user_id=user.id,
company_id=user.company_id,
ip_address=request.remote_addr,
user_agent=request.headers.get('User-Agent')
)
session.clear()
flash('You have been logged out.', 'info')
return redirect(url_for('login'))
@app.route('/register', methods=['GET', 'POST'])
def register():
# Check if registration is enabled
registration_enabled = get_system_setting('registration_enabled', 'true') == 'true'
if not registration_enabled:
flash('Registration is currently disabled by the administrator.', 'error')
return redirect(url_for('login'))
# Check if companies exist, if not redirect to company setup
if Company.query.count() == 0:
flash('No companies exist yet. Please set up your company first.', 'info')
return redirect(url_for('setup_company'))
if request.method == 'POST':
username = request.form.get('username')
email = request.form.get('email')
password = request.form.get('password')
confirm_password = request.form.get('confirm_password')
company_code = request.form.get('company_code', '').strip()
# Validate input
error = None
if not username:
error = 'Username is required'
elif not email:
error = 'Email is required'
elif not password:
error = 'Password is required'
elif password != confirm_password:
error = 'Passwords do not match'
elif not company_code:
error = 'Company code is required'
# Find company by code
company = None
if company_code:
company = Company.query.filter_by(slug=company_code.lower()).first()
if not company:
error = 'Invalid company code'
# Check for existing users within the company
if company and not error:
if User.query.filter_by(username=username, company_id=company.id).first():
error = 'Username already exists in this company'
elif User.query.filter_by(email=email, company_id=company.id).first():
error = 'Email already registered in this company'
if error is None and company:
try:
# Check if this is the first user account in this company
is_first_user_in_company = User.query.filter_by(company_id=company.id).count() == 0
# Check if email verification is required
email_verification_required = get_system_setting('email_verification_required', 'true') == 'true'
new_user = User(
username=username,
email=email,
company_id=company.id,
is_verified=False
)
new_user.set_password(password)
# Make first user in company an admin with full privileges
if is_first_user_in_company:
new_user.role = Role.ADMIN
new_user.is_verified = True # Auto-verify first user in company
elif not email_verification_required:
# If email verification is disabled, auto-verify new users
new_user.is_verified = True
# Generate verification token (even if not needed, for consistency)
token = new_user.generate_verification_token()
db.session.add(new_user)
db.session.commit()
if is_first_user_in_company:
# First user in company gets admin privileges and is auto-verified
logger.info(f"First user account created in company {company.name}: {username} with admin privileges")
flash(f'Welcome! You are the first user in {company.name} and have been granted administrator privileges. You can now log in.', 'success')
elif not email_verification_required:
# Email verification is disabled, user can log in immediately
logger.info(f"User account created with auto-verification in company {company.name}: {username}")
flash('Registration successful! You can now log in.', 'success')
else:
# Send verification email for regular users when verification is required
verification_url = url_for('verify_email', token=token, _external=True)
msg = Message('Verify your TimeTrack account', recipients=[email])
msg.body = f'''Hello {username},
Thank you for registering with TimeTrack. To complete your registration, please click on the link below:
{verification_url}
This link will expire in 24 hours.
If you did not register for TimeTrack, please ignore this email.
Best regards,
The TimeTrack Team
'''
mail.send(msg)
logger.info(f"Verification email sent to {email}")
flash('Registration initiated! Please check your email to verify your account.', 'success')
return redirect(url_for('login'))
except Exception as e:
db.session.rollback()
logger.error(f"Error during registration: {str(e)}")
error = f"An error occurred during registration: {str(e)}"
flash(error, 'error')
return render_template('register.html', title='Register')
@app.route('/register/freelancer', methods=['GET', 'POST'])
def register_freelancer():
"""Freelancer registration route - creates user without company token"""
# Check if registration is enabled
registration_enabled = get_system_setting('registration_enabled', 'true') == 'true'
if not registration_enabled:
flash('Registration is currently disabled by the administrator.', 'error')
return redirect(url_for('login'))
if request.method == 'POST':
username = request.form.get('username')
email = request.form.get('email')
password = request.form.get('password')
confirm_password = request.form.get('confirm_password')
business_name = request.form.get('business_name', '').strip()
# Validate input
error = None
if not username:
error = 'Username is required'
elif not email:
error = 'Email is required'
elif not password:
error = 'Password is required'
elif password != confirm_password:
error = 'Passwords do not match'
# Check for existing users globally (freelancers get unique usernames/emails)
if not error:
if User.query.filter_by(username=username).first():
error = 'Username already exists'
elif User.query.filter_by(email=email).first():
error = 'Email already registered'
if error is None:
try:
# Create personal company for freelancer
company_name = business_name if business_name else f"{username}'s Workspace"
# Generate unique company slug
import re
slug = re.sub(r'[^\w\s-]', '', company_name.lower())
slug = re.sub(r'[-\s]+', '-', slug).strip('-')
# Ensure slug uniqueness
base_slug = slug
counter = 1
while Company.query.filter_by(slug=slug).first():
slug = f"{base_slug}-{counter}"
counter += 1
# Create personal company
personal_company = Company(
name=company_name,
slug=slug,
description=f"Personal workspace for {username}",
is_personal=True,
max_users=1 # Limit to single user
)
db.session.add(personal_company)
db.session.flush() # Get company ID
# Create freelancer user
new_user = User(
username=username,
email=email,
company_id=personal_company.id,
account_type=AccountType.FREELANCER,
business_name=business_name if business_name else None,
role=Role.ADMIN, # Freelancers are admins of their personal company
is_verified=True # Auto-verify freelancers
)
new_user.set_password(password)
db.session.add(new_user)
db.session.commit()
logger.info(f"Freelancer account created: {username} with personal company: {company_name}")
flash(f'Welcome {username}! Your freelancer account has been created successfully. You can now log in.', 'success')
return redirect(url_for('login'))
except Exception as e:
db.session.rollback()
logger.error(f"Error during freelancer registration: {str(e)}")
error = f"An error occurred during registration: {str(e)}"
if error:
flash(error, 'error')
return render_template('register_freelancer.html', title='Register as Freelancer')
@app.route('/setup_company', methods=['GET', 'POST'])
def setup_company():
"""Company setup route for creating new companies with admin users"""
existing_companies = Company.query.count()
# Determine access level
is_initial_setup = existing_companies == 0
is_super_admin = g.user and g.user.role == Role.ADMIN and existing_companies > 0
is_authorized = is_initial_setup or is_super_admin
# Check authorization for non-initial setups
if not is_initial_setup and not is_super_admin:
flash('You do not have permission to create new companies.', 'error')
return redirect(url_for('home') if g.user else url_for('login'))
if request.method == 'POST':
company_name = request.form.get('company_name')
company_description = request.form.get('company_description', '')
admin_username = request.form.get('admin_username')
admin_email = request.form.get('admin_email')
admin_password = request.form.get('admin_password')
confirm_password = request.form.get('confirm_password')
# Validate input
error = None
if not company_name:
error = 'Company name is required'
elif not admin_username:
error = 'Admin username is required'
elif not admin_email:
error = 'Admin email is required'
elif not admin_password:
error = 'Admin password is required'
elif admin_password != confirm_password:
error = 'Passwords do not match'
elif len(admin_password) < 6:
error = 'Password must be at least 6 characters long'
if error is None:
try:
# Generate company slug
import re
slug = re.sub(r'[^\w\s-]', '', company_name.lower())
slug = re.sub(r'[-\s]+', '-', slug).strip('-')
# Ensure slug uniqueness
base_slug = slug
counter = 1
while Company.query.filter_by(slug=slug).first():
slug = f"{base_slug}-{counter}"
counter += 1
# Create company
company = Company(
name=company_name,
slug=slug,
description=company_description,
is_active=True
)
db.session.add(company)
db.session.flush() # Get company.id without committing
# Check if username/email already exists in this company context
existing_user_by_username = User.query.filter_by(
username=admin_username,
company_id=company.id
).first()
existing_user_by_email = User.query.filter_by(
email=admin_email,
company_id=company.id
).first()
if existing_user_by_username:
error = 'Username already exists in this company'
elif existing_user_by_email:
error = 'Email already registered in this company'
if error is None:
# Create admin user
admin_user = User(
username=admin_username,
email=admin_email,
company_id=company.id,
role=Role.ADMIN,
is_verified=True # Auto-verify company admin
)
admin_user.set_password(admin_password)
db.session.add(admin_user)
db.session.commit()
if is_initial_setup:
# Auto-login the admin user for initial setup
session['user_id'] = admin_user.id
session['username'] = admin_user.username
session['role'] = admin_user.role.value
flash(f'Company "{company_name}" created successfully! You are now logged in as the administrator.', 'success')
return redirect(url_for('home'))
else:
# For super admin creating additional companies, don't auto-login
flash(f'Company "{company_name}" created successfully! Admin user "{admin_username}" has been created with the company code "{slug}".', 'success')
return redirect(url_for('admin_company') if g.user else url_for('login'))
else:
db.session.rollback()
except Exception as e:
db.session.rollback()
logger.error(f"Error during company setup: {str(e)}")
error = f"An error occurred during setup: {str(e)}"
if error:
flash(error, 'error')
return render_template('setup_company.html',
title='Company Setup',
existing_companies=existing_companies,
is_initial_setup=is_initial_setup,
is_super_admin=is_super_admin)
@app.route('/verify_email/<token>')
def verify_email(token):
user = User.query.filter_by(verification_token=token).first()
if not user:
flash('Invalid or expired verification link.', 'error')
return redirect(url_for('login'))
if user.verify_token(token):
db.session.commit()
flash('Email verified successfully! You can now log in.', 'success')
else:
flash('Invalid or expired verification link.', 'error')
return redirect(url_for('login'))
@app.route('/dashboard')
@role_required(Role.TEAM_MEMBER)
@company_required
def dashboard():
"""User dashboard with configurable widgets."""
return render_template('dashboard.html', title='Dashboard')
# Redirect old admin dashboard URL to new dashboard
@app.route('/admin/users')
@admin_required
@company_required
def admin_users():
users = User.query.filter_by(company_id=g.user.company_id).all()
return render_template('admin_users.html', title='User Management', users=users)
@app.route('/admin/users/create', methods=['GET', 'POST'])
@admin_required
@company_required
def create_user():
if request.method == 'POST':
username = request.form.get('username')
email = request.form.get('email')
password = request.form.get('password')
auto_verify = 'auto_verify' in request.form
# Get role and team
role_name = request.form.get('role')
team_id = request.form.get('team_id')
# Validate input
error = None
if not username:
error = 'Username is required'
elif not email:
error = 'Email is required'
elif not password:
error = 'Password is required'
elif User.query.filter_by(username=username, company_id=g.user.company_id).first():
error = 'Username already exists in your company'
elif User.query.filter_by(email=email, company_id=g.user.company_id).first():
error = 'Email already registered in your company'
if error is None:
# Convert role string to enum
try:
role = Role[role_name] if role_name else Role.TEAM_MEMBER
except KeyError:
role = Role.TEAM_MEMBER
# Create new user with role and team
new_user = User(
username=username,
email=email,
company_id=g.user.company_id,
is_verified=auto_verify,
role=role,
team_id=team_id if team_id else None
)
new_user.set_password(password)
if not auto_verify:
# Generate verification token and send email
token = new_user.generate_verification_token()
verification_url = url_for('verify_email', token=token, _external=True)
msg = Message('Verify your TimeTrack account', recipients=[email])
msg.body = f'''Hello {username},
An administrator has created an account for you on TimeTrack. To activate your account, please click on the link below:
{verification_url}
This link will expire in 24 hours.
Best regards,
The TimeTrack Team
'''
mail.send(msg)
db.session.add(new_user)
db.session.commit()
if auto_verify:
flash(f'User {username} created and automatically verified!', 'success')
else:
flash(f'User {username} created! Verification email sent.', 'success')
return redirect(url_for('admin_users'))
flash(error, 'error')
# Get all teams for the form (company-scoped)
teams = Team.query.filter_by(company_id=g.user.company_id).all()
roles = get_available_roles()
return render_template('create_user.html', title='Create User', teams=teams, roles=roles)
@app.route('/admin/users/edit/<int:user_id>', methods=['GET', 'POST'])
@admin_required
@company_required
def edit_user(user_id):
user = User.query.filter_by(id=user_id, company_id=g.user.company_id).first_or_404()
if request.method == 'POST':
username = request.form.get('username')
email = request.form.get('email')
password = request.form.get('password')
# Get role and team
role_name = request.form.get('role')
team_id = request.form.get('team_id')
# Validate input
error = None
if not username:
error = 'Username is required'
elif not email:
error = 'Email is required'
elif username != user.username and User.query.filter_by(username=username, company_id=g.user.company_id).first():
error = 'Username already exists in your company'
elif email != user.email and User.query.filter_by(email=email, company_id=g.user.company_id).first():
error = 'Email already registered in your company'
if error is None:
user.username = username
user.email = email
# Convert role string to enum
try:
user.role = Role[role_name] if role_name else Role.TEAM_MEMBER
except KeyError:
user.role = Role.TEAM_MEMBER
user.team_id = team_id if team_id else None
if password:
user.set_password(password)
db.session.commit()
flash(f'User {username} updated successfully!', 'success')
return redirect(url_for('admin_users'))
flash(error, 'error')
# Get all teams for the form (company-scoped)
teams = Team.query.filter_by(company_id=g.user.company_id).all()
roles = get_available_roles()
return render_template('edit_user.html', title='Edit User', user=user, teams=teams, roles=roles)
@app.route('/admin/users/delete/<int:user_id>', methods=['POST'])
@admin_required
@company_required
def delete_user(user_id):
user = User.query.filter_by(id=user_id, company_id=g.user.company_id).first_or_404()
# Prevent deleting yourself
if user.id == session.get('user_id'):
flash('You cannot delete your own account', 'error')
return redirect(url_for('admin_users'))
username = user.username
db.session.delete(user)
db.session.commit()
flash(f'User {username} deleted successfully', 'success')
return redirect(url_for('admin_users'))
@app.route('/profile', methods=['GET', 'POST'])
@login_required
def profile():
user = User.query.get(session['user_id'])
if request.method == 'POST':
email = request.form.get('email')
current_password = request.form.get('current_password')
new_password = request.form.get('new_password')
confirm_password = request.form.get('confirm_password')
# Validate input
error = None
if not email:
error = 'Email is required'
elif email != user.email and User.query.filter_by(email=email).first():
error = 'Email already registered'
# Password change validation
if new_password:
if not current_password:
error = 'Current password is required to set a new password'
elif not user.check_password(current_password):
error = 'Current password is incorrect'
elif new_password != confirm_password:
error = 'New passwords do not match'
if error is None:
user.email = email
if new_password:
user.set_password(new_password)
db.session.commit()
flash('Profile updated successfully!', 'success')
return redirect(url_for('profile'))
flash(error, 'error')
return render_template('profile.html', title='My Profile', user=user)
@app.route('/2fa/setup', methods=['GET', 'POST'])
@login_required
def setup_2fa():
if request.method == 'POST':
# Verify the TOTP code before enabling 2FA
totp_code = request.form.get('totp_code')
if not totp_code:
flash('Please enter the verification code from your authenticator app.', 'error')
return redirect(url_for('setup_2fa'))
try:
if g.user.verify_2fa_token(totp_code, allow_setup=True):
g.user.two_factor_enabled = True
db.session.commit()
flash('Two-factor authentication has been successfully enabled!', 'success')
return redirect(url_for('profile'))
else:
flash('Invalid verification code. Please make sure your device time is synchronized and try again.', 'error')
return redirect(url_for('setup_2fa'))
except Exception as e:
logger.error(f"2FA setup error: {str(e)}")
flash('An error occurred during 2FA setup. Please try again.', 'error')
return redirect(url_for('setup_2fa'))
# GET request - show setup page
if g.user.two_factor_enabled:
flash('Two-factor authentication is already enabled.', 'info')
return redirect(url_for('profile'))
# Generate secret if not exists
if not g.user.two_factor_secret:
g.user.generate_2fa_secret()
db.session.commit()
# Generate QR code
import qrcode
import io
import base64
qr_uri = g.user.get_2fa_uri()
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(qr_uri)
qr.make(fit=True)
# Create QR code image
qr_img = qr.make_image(fill_color="black", back_color="white")
img_buffer = io.BytesIO()
qr_img.save(img_buffer, format='PNG')
img_buffer.seek(0)
qr_code_b64 = base64.b64encode(img_buffer.getvalue()).decode()
return render_template('setup_2fa.html',
title='Setup Two-Factor Authentication',
secret=g.user.two_factor_secret,
qr_code=qr_code_b64)
@app.route('/2fa/disable', methods=['POST'])
@login_required
def disable_2fa():
password = request.form.get('password')
if not password or not g.user.check_password(password):
flash('Please enter your correct password to disable 2FA.', 'error')
return redirect(url_for('profile'))
g.user.two_factor_enabled = False
g.user.two_factor_secret = None
db.session.commit()
flash('Two-factor authentication has been disabled.', 'success')
return redirect(url_for('profile'))
@app.route('/2fa/verify', methods=['GET', 'POST'])
def verify_2fa():
# Check if user is in 2FA verification state
user_id = session.get('2fa_user_id')
if not user_id:
return redirect(url_for('login'))
user = User.query.get(user_id)
if not user or not user.two_factor_enabled:
session.pop('2fa_user_id', None)
return redirect(url_for('login'))
if request.method == 'POST':
totp_code = request.form.get('totp_code')
if user.verify_2fa_token(totp_code):
# Complete login process
session.pop('2fa_user_id', None)
session['user_id'] = user.id
session['username'] = user.username
session['role'] = user.role.value
# Log successful 2FA login
SystemEvent.log_event(
'user_login_2fa',
f'User {user.username} logged in successfully with 2FA',
'auth',
'info',
user_id=user.id,
company_id=user.company_id,
ip_address=request.remote_addr,
user_agent=request.headers.get('User-Agent')
)
flash('Login successful!', 'success')
return redirect(url_for('home'))
else:
# Log failed 2FA attempt
SystemEvent.log_event(
'2fa_failed',
f'Failed 2FA verification for user {user.username}',
'auth',
'warning',
user_id=user.id,
company_id=user.company_id,
ip_address=request.remote_addr,
user_agent=request.headers.get('User-Agent')
)
flash('Invalid verification code. Please try again.', 'error')
return render_template('verify_2fa.html', title='Two-Factor Authentication')
@app.route('/about')
def about():
return render_template('about.html', title='About')
@app.route('/contact', methods=['GET', 'POST'])
@login_required
def contact():
# redacted
return render_template('contact.html', title='Contact')
# We can keep this route as a redirect to home for backward compatibility
@app.route('/timetrack')
@login_required
def timetrack():
return redirect(url_for('home'))
@app.route('/api/arrive', methods=['POST'])
@login_required
def arrive():
# Get project and notes from request
project_id = request.json.get('project_id') if request.json else None
notes = request.json.get('notes') if request.json else None
# Validate project access if project is specified
if project_id:
project = Project.query.get(project_id)
if not project or not project.is_user_allowed(g.user):
return jsonify({'error': 'Invalid or unauthorized project'}), 403
# Create a new time entry with arrival time for the current user
new_entry = TimeEntry(
user_id=g.user.id,
arrival_time=datetime.now(),
project_id=int(project_id) if project_id else None,
notes=notes
)
db.session.add(new_entry)
db.session.commit()
# Format response with user preferences
from time_utils import format_datetime_by_preference, get_user_format_settings
date_format, time_format_24h = get_user_format_settings(g.user)
return jsonify({
'id': new_entry.id,
'arrival_time': format_datetime_by_preference(new_entry.arrival_time, date_format, time_format_24h),
'project': {
'id': new_entry.project.id,
'code': new_entry.project.code,
'name': new_entry.project.name
} if new_entry.project else None,
'notes': new_entry.notes
})
@app.route('/api/leave/<int:entry_id>', methods=['POST'])
@login_required
def leave(entry_id):
# Find the time entry for the current user
entry = TimeEntry.query.filter_by(id=entry_id, user_id=session['user_id']).first_or_404()
# Set the departure time
departure_time = datetime.now()
# Apply time rounding if enabled
rounded_arrival, rounded_departure = apply_time_rounding(entry.arrival_time, departure_time, g.user)
entry.arrival_time = rounded_arrival
entry.departure_time = rounded_departure
# If currently paused, add the final break duration
if entry.is_paused and entry.pause_start_time:
final_break_duration = int((rounded_departure - entry.pause_start_time).total_seconds())
entry.total_break_duration += final_break_duration
entry.is_paused = False
entry.pause_start_time = None
# Apply rounding to break duration if enabled
interval_minutes, round_to_nearest = get_user_rounding_settings(g.user)
if interval_minutes > 0:
entry.total_break_duration = round_duration_to_interval(
entry.total_break_duration, interval_minutes, round_to_nearest
)
# Calculate work duration considering breaks
entry.duration, effective_break = calculate_work_duration(
rounded_arrival,
rounded_departure,
entry.total_break_duration,
g.user
)
db.session.commit()
return jsonify({
'id': entry.id,
'arrival_time': entry.arrival_time.isoformat(),
'departure_time': entry.departure_time.isoformat(),
'duration': entry.duration,
'total_break_duration': entry.total_break_duration,
'effective_break_duration': effective_break
})
# Add this new route to handle pausing/resuming
@app.route('/api/toggle-pause/<int:entry_id>', methods=['POST'])
@login_required
def toggle_pause(entry_id):
# Find the time entry for the current user
entry = TimeEntry.query.filter_by(id=entry_id, user_id=session['user_id']).first_or_404()
now = datetime.now()
if entry.is_paused:
# Resuming work - calculate break duration
break_duration = int((now - entry.pause_start_time).total_seconds())
entry.total_break_duration += break_duration
entry.is_paused = False
entry.pause_start_time = None
message = "Work resumed"
else:
# Pausing work
entry.is_paused = True
entry.pause_start_time = now
message = "Work paused"
db.session.commit()
return jsonify({
'id': entry.id,
'is_paused': entry.is_paused,
'total_break_duration': entry.total_break_duration,
'message': message
})
@app.route('/config', methods=['GET', 'POST'])
@login_required
def config():
# Get user preferences or create default if none exists
preferences = UserPreferences.query.filter_by(user_id=g.user.id).first()
if not preferences:
preferences = UserPreferences(user_id=g.user.id)
db.session.add(preferences)
db.session.commit()
if request.method == 'POST':
try:
# Update only user preferences (no company policies)
preferences.time_format_24h = 'time_format_24h' in request.form
preferences.date_format = request.form.get('date_format', 'ISO')
preferences.time_rounding_minutes = int(request.form.get('time_rounding_minutes', 0))
preferences.round_to_nearest = 'round_to_nearest' in request.form
db.session.commit()
flash('Preferences updated successfully!', 'success')
return redirect(url_for('config'))
except ValueError:
flash('Please enter valid values for all fields', 'error')
# Get company work policies for display (read-only)
company_config = CompanyWorkConfig.query.filter_by(company_id=g.user.company_id).first()
# Import time utils for display options
from time_utils import get_available_rounding_options, get_available_date_formats
rounding_options = get_available_rounding_options()
date_format_options = get_available_date_formats()
return render_template('config.html', title='User Preferences',
preferences=preferences,
company_config=company_config,
rounding_options=rounding_options,
date_format_options=date_format_options)
@app.route('/api/delete/<int:entry_id>', methods=['DELETE'])
@login_required
def delete_entry(entry_id):
entry = TimeEntry.query.filter_by(id=entry_id, user_id=session['user_id']).first_or_404()
db.session.delete(entry)
db.session.commit()
return jsonify({'success': True, 'message': 'Entry deleted successfully'})
@app.route('/api/update/<int:entry_id>', methods=['PUT'])
@login_required
def update_entry(entry_id):
entry = TimeEntry.query.filter_by(id=entry_id, user_id=session['user_id']).first_or_404()
data = request.json
if not data:
return jsonify({'success': False, 'message': 'No JSON data provided'}), 400
if 'arrival_time' in data:
try:
# Accept only ISO 8601 format
arrival_time_str = data['arrival_time']
entry.arrival_time = datetime.fromisoformat(arrival_time_str.replace('Z', '+00:00'))
except (ValueError, AttributeError) as e:
return jsonify({'success': False, 'message': f'Invalid arrival time format. Expected ISO 8601: {arrival_time_str}'}), 400
if 'departure_time' in data and data['departure_time']:
try:
# Accept only ISO 8601 format
departure_time_str = data['departure_time']
entry.departure_time = datetime.fromisoformat(departure_time_str.replace('Z', '+00:00'))
# Recalculate duration if both times are present
if entry.arrival_time and entry.departure_time:
# Calculate work duration considering breaks
entry.duration, _ = calculate_work_duration(
entry.arrival_time,
entry.departure_time,
entry.total_break_duration,
g.user
)
except (ValueError, AttributeError) as e:
return jsonify({'success': False, 'message': f'Invalid departure time format. Expected ISO 8601: {departure_time_str}'}), 400
db.session.commit()
return jsonify({
'success': True,
'message': 'Entry updated successfully',
'entry': {
'id': entry.id,
'arrival_time': entry.arrival_time.isoformat(),
'departure_time': entry.departure_time.isoformat() if entry.departure_time else None,
'duration': entry.duration,
'is_paused': entry.is_paused,
'total_break_duration': entry.total_break_duration
}
})
def calculate_work_duration(arrival_time, departure_time, total_break_duration, user):
"""
Calculate work duration considering both configured and actual break times.
Args:
arrival_time: Datetime of arrival
departure_time: Datetime of departure
total_break_duration: Actual logged break duration in seconds
user: User object to get company configuration
Returns:
tuple: (work_duration_in_seconds, effective_break_duration_in_seconds)
"""
# Calculate raw duration
raw_duration = (departure_time - arrival_time).total_seconds()
# Get company work configuration for break rules
company_config = CompanyWorkConfig.query.filter_by(company_id=user.company_id).first()
if not company_config:
# Use Germany defaults if no company config exists
preset = CompanyWorkConfig.get_regional_preset(WorkRegion.GERMANY)
break_threshold_hours = preset['break_threshold_hours']
mandatory_break_minutes = preset['mandatory_break_minutes']
additional_break_threshold_hours = preset['additional_break_threshold_hours']
additional_break_minutes = preset['additional_break_minutes']
else:
break_threshold_hours = company_config.break_threshold_hours
mandatory_break_minutes = company_config.mandatory_break_minutes
additional_break_threshold_hours = company_config.additional_break_threshold_hours
additional_break_minutes = company_config.additional_break_minutes
# Calculate mandatory breaks based on work duration
work_hours = raw_duration / 3600 # Convert seconds to hours
configured_break_seconds = 0
# Apply primary break if work duration exceeds threshold
if work_hours > break_threshold_hours:
configured_break_seconds += mandatory_break_minutes * 60
# Apply additional break if work duration exceeds additional threshold
if work_hours > additional_break_threshold_hours:
configured_break_seconds += additional_break_minutes * 60
# Use the greater of configured breaks or actual logged breaks
effective_break_duration = max(configured_break_seconds, total_break_duration)
# Calculate final work duration
work_duration = int(raw_duration - effective_break_duration)
return work_duration, effective_break_duration
@app.route('/api/resume/<int:entry_id>', methods=['POST'])
@login_required
def resume_entry(entry_id):
# Find the entry to resume for the current user
entry_to_resume = TimeEntry.query.filter_by(id=entry_id, user_id=session['user_id']).first_or_404()
# Check if there's already an active entry
active_entry = TimeEntry.query.filter_by(user_id=session['user_id'], departure_time=None).first()
if active_entry:
return jsonify({
'success': False,
'message': 'Cannot resume this entry. Another session is already active.'
}), 400
# Clear the departure time to make this entry active again
entry_to_resume.departure_time = None
# Reset pause state if it was paused
entry_to_resume.is_paused = False
entry_to_resume.pause_start_time = None
db.session.commit()
return jsonify({
'success': True,
'message': 'Work resumed on existing entry',
'id': entry_to_resume.id,
'arrival_time': entry_to_resume.arrival_time.isoformat(),
'total_break_duration': entry_to_resume.total_break_duration
})
@app.route('/api/manual-entry', methods=['POST'])
@login_required
def manual_entry():
try:
data = request.get_json()
# Extract data from request
project_id = data.get('project_id')
start_date = data.get('start_date')
start_time = data.get('start_time')
end_date = data.get('end_date')
end_time = data.get('end_time')
break_minutes = int(data.get('break_minutes', 0))
notes = data.get('notes', '')
# Validate required fields
if not all([start_date, start_time, end_date, end_time]):
return jsonify({'error': 'Start and end date/time are required'}), 400
# Parse datetime strings
try:
arrival_datetime = datetime.strptime(f"{start_date} {start_time}", '%Y-%m-%d %H:%M:%S')
departure_datetime = datetime.strptime(f"{end_date} {end_time}", '%Y-%m-%d %H:%M:%S')
except ValueError:
try:
# Try without seconds if parsing fails
arrival_datetime = datetime.strptime(f"{start_date} {start_time}:00", '%Y-%m-%d %H:%M:%S')
departure_datetime = datetime.strptime(f"{end_date} {end_time}:00", '%Y-%m-%d %H:%M:%S')
except ValueError:
return jsonify({'error': 'Invalid date/time format'}), 400
# Validate that end time is after start time
if departure_datetime <= arrival_datetime:
return jsonify({'error': 'End time must be after start time'}), 400
# Apply time rounding if enabled
rounded_arrival, rounded_departure = apply_time_rounding(arrival_datetime, departure_datetime, g.user)
# Validate project access if project is specified
if project_id:
project = Project.query.get(project_id)
if not project or not project.is_user_allowed(g.user):
return jsonify({'error': 'Invalid or unauthorized project'}), 403
# Check for overlapping entries for this user (using rounded times)
overlapping_entry = TimeEntry.query.filter(
TimeEntry.user_id == g.user.id,
TimeEntry.departure_time.isnot(None),
TimeEntry.arrival_time < rounded_departure,
TimeEntry.departure_time > rounded_arrival
).first()
if overlapping_entry:
return jsonify({
'error': 'This time entry overlaps with an existing entry'
}), 400
# Calculate total duration in seconds (using rounded times)
total_duration = int((rounded_departure - rounded_arrival).total_seconds())
break_duration_seconds = break_minutes * 60
# Apply rounding to break duration if enabled
interval_minutes, round_to_nearest = get_user_rounding_settings(g.user)
if interval_minutes > 0:
break_duration_seconds = round_duration_to_interval(
break_duration_seconds, interval_minutes, round_to_nearest
)
# Validate break duration doesn't exceed total duration
if break_duration_seconds >= total_duration:
return jsonify({'error': 'Break duration cannot exceed total work duration'}), 400
# Calculate work duration (total duration minus breaks)
work_duration = total_duration - break_duration_seconds
# Create the manual time entry (using rounded times)
new_entry = TimeEntry(
user_id=g.user.id,
arrival_time=rounded_arrival,
departure_time=rounded_departure,
duration=work_duration,
total_break_duration=break_duration_seconds,
project_id=int(project_id) if project_id else None,
notes=notes,
is_paused=False,
pause_start_time=None
)
db.session.add(new_entry)
db.session.commit()
return jsonify({
'success': True,
'message': 'Manual time entry added successfully',
'entry_id': new_entry.id
})
except Exception as e:
logger.error(f"Error creating manual time entry: {str(e)}")
db.session.rollback()
return jsonify({'error': 'An error occurred while creating the time entry'}), 500
@app.errorhandler(404)
def page_not_found(e):
return render_template('404.html'), 404
@app.errorhandler(500)
def internal_server_error(e):
return render_template('500.html'), 500
@app.route('/test')
def test():
return "App is working!"
@app.route('/admin/users/toggle-status/<int:user_id>')
@admin_required
@company_required
def toggle_user_status(user_id):
user = User.query.filter_by(id=user_id, company_id=g.user.company_id).first_or_404()
# Prevent blocking yourself
if user.id == session.get('user_id'):
flash('You cannot block your own account', 'error')
return redirect(url_for('admin_users'))
# Toggle the blocked status
user.is_blocked = not user.is_blocked
db.session.commit()
if user.is_blocked:
flash(f'User {user.username} has been blocked', 'success')
else:
flash(f'User {user.username} has been unblocked', 'success')
return redirect(url_for('admin_users'))
# Add this route to manage system settings
@app.route('/admin/settings', methods=['GET', 'POST'])
@admin_required
def admin_settings():
if request.method == 'POST':
# Update registration setting
registration_enabled = 'registration_enabled' in request.form
reg_setting = SystemSettings.query.filter_by(key='registration_enabled').first()
if reg_setting:
reg_setting.value = 'true' if registration_enabled else 'false'
# Update email verification setting
email_verification_required = 'email_verification_required' in request.form
email_setting = SystemSettings.query.filter_by(key='email_verification_required').first()
if email_setting:
email_setting.value = 'true' if email_verification_required else 'false'
db.session.commit()
flash('System settings updated successfully!', 'success')
# Get current settings
settings = {}
for setting in SystemSettings.query.all():
if setting.key == 'registration_enabled':
settings['registration_enabled'] = setting.value == 'true'
elif setting.key == 'email_verification_required':
settings['email_verification_required'] = setting.value == 'true'
return render_template('admin_settings.html', title='System Settings', settings=settings)
@app.route('/system-admin/dashboard')
@system_admin_required
def system_admin_dashboard():
"""System Administrator Dashboard - view all data across companies"""
# Global statistics
total_companies = Company.query.count()
total_users = User.query.count()
total_teams = Team.query.count()
total_projects = Project.query.count()
total_time_entries = TimeEntry.query.count()
# System admin count
system_admins = User.query.filter_by(role=Role.SYSTEM_ADMIN).count()
regular_admins = User.query.filter_by(role=Role.ADMIN).count()
# Recent activity (last 7 days)
from datetime import datetime, timedelta
week_ago = datetime.now() - timedelta(days=7)
recent_users = User.query.filter(User.created_at >= week_ago).count()
recent_companies = Company.query.filter(Company.created_at >= week_ago).count()
recent_time_entries = TimeEntry.query.filter(TimeEntry.arrival_time >= week_ago).count()
# Top companies by user count
top_companies = db.session.query(
Company.name,
Company.id,
db.func.count(User.id).label('user_count')
).join(User).group_by(Company.id).order_by(db.func.count(User.id).desc()).limit(5).all()
# Recent companies
recent_companies_list = Company.query.order_by(Company.created_at.desc()).limit(5).all()
# System health checks
orphaned_users = User.query.filter_by(company_id=None).count()
orphaned_time_entries = TimeEntry.query.filter_by(user_id=None).count()
blocked_users = User.query.filter_by(is_blocked=True).count()
return render_template('system_admin_dashboard.html',
title='System Administrator Dashboard',
total_companies=total_companies,
total_users=total_users,
total_teams=total_teams,
total_projects=total_projects,
total_time_entries=total_time_entries,
system_admins=system_admins,
regular_admins=regular_admins,
recent_users=recent_users,
recent_companies=recent_companies,
recent_time_entries=recent_time_entries,
top_companies=top_companies,
recent_companies_list=recent_companies_list,
orphaned_users=orphaned_users,
orphaned_time_entries=orphaned_time_entries,
blocked_users=blocked_users)
@app.route('/system-admin/users')
@system_admin_required
def system_admin_users():
"""System Admin: View all users across all companies"""
filter_type = request.args.get('filter', '')
page = request.args.get('page', 1, type=int)
per_page = 50
# Build query based on filter
query = User.query
if filter_type == 'blocked':
query = query.filter_by(is_blocked=True)
elif filter_type == 'system_admins':
query = query.filter_by(role=Role.SYSTEM_ADMIN)
elif filter_type == 'admins':
query = query.filter_by(role=Role.ADMIN)
elif filter_type == 'unverified':
query = query.filter_by(is_verified=False)
elif filter_type == 'freelancers':
query = query.filter_by(account_type=AccountType.FREELANCER)
# Add company join for display
query = query.join(Company).add_columns(Company.name.label('company_name'))
# Order by creation date (newest first)
query = query.order_by(User.created_at.desc())
# Paginate results
users = query.paginate(page=page, per_page=per_page, error_out=False)
return render_template('system_admin_users.html',
title='System Admin - All Users',
users=users,
current_filter=filter_type)
@app.route('/system-admin/users/<int:user_id>/edit', methods=['GET', 'POST'])
@system_admin_required
def system_admin_edit_user(user_id):
"""System Admin: Edit any user across companies"""
user = User.query.get_or_404(user_id)
if request.method == 'POST':
# Get form data
username = request.form.get('username')
email = request.form.get('email')
role = request.form.get('role')
is_blocked = request.form.get('is_blocked') == 'on'
is_verified = request.form.get('is_verified') == 'on'
company_id = request.form.get('company_id')
team_id = request.form.get('team_id') or None
# Validation
error = None
# Check if username is unique within the company
existing_user = User.query.filter(
User.username == username,
User.company_id == company_id,
User.id != user_id
).first()
if existing_user:
error = f'Username "{username}" is already taken in this company.'
# Check if email is unique within the company
existing_email = User.query.filter(
User.email == email,
User.company_id == company_id,
User.id != user_id
).first()
if existing_email:
error = f'Email "{email}" is already registered in this company.'
if not error:
# Update user
user.username = username
user.email = email
user.role = Role(role)
user.is_blocked = is_blocked
user.is_verified = is_verified
user.company_id = company_id
user.team_id = team_id
db.session.commit()
flash(f'User {username} updated successfully.', 'success')
return redirect(url_for('system_admin_users'))
flash(error, 'error')
# Get all companies and teams for form dropdowns
companies = Company.query.order_by(Company.name).all()
teams = Team.query.filter_by(company_id=user.company_id).order_by(Team.name).all()
roles = get_available_roles()
return render_template('system_admin_edit_user.html',
title=f'Edit User: {user.username}',
user=user,
companies=companies,
teams=teams,
roles=roles)
@app.route('/system-admin/users/<int:user_id>/delete', methods=['POST'])
@system_admin_required
def system_admin_delete_user(user_id):
"""System Admin: Delete any user (with safety checks)"""
user = User.query.get_or_404(user_id)
# Safety check: prevent deleting the last system admin
if user.role == Role.SYSTEM_ADMIN:
system_admin_count = User.query.filter_by(role=Role.SYSTEM_ADMIN).count()
if system_admin_count <= 1:
flash('Cannot delete the last system administrator.', 'error')
return redirect(url_for('system_admin_users'))
# Safety check: prevent deleting yourself
if user.id == g.user.id:
flash('Cannot delete your own account.', 'error')
return redirect(url_for('system_admin_users'))
username = user.username
company_name = user.company.name if user.company else 'Unknown'
# Delete related data first
TimeEntry.query.filter_by(user_id=user.id).delete()
WorkConfig.query.filter_by(user_id=user.id).delete()
# Delete the user
db.session.delete(user)
db.session.commit()
flash(f'User "{username}" from company "{company_name}" has been deleted.', 'success')
return redirect(url_for('system_admin_users'))
@app.route('/system-admin/companies')
@system_admin_required
def system_admin_companies():
"""System Admin: View all companies"""
page = request.args.get('page', 1, type=int)
per_page = 20
companies = Company.query.order_by(Company.created_at.desc()).paginate(
page=page, per_page=per_page, error_out=False)
# Get user counts for each company
company_stats = {}
for company in companies.items:
user_count = User.query.filter_by(company_id=company.id).count()
admin_count = User.query.filter(
User.company_id == company.id,
User.role.in_([Role.ADMIN, Role.SYSTEM_ADMIN])
).count()
company_stats[company.id] = {
'user_count': user_count,
'admin_count': admin_count
}
return render_template('system_admin_companies.html',
title='System Admin - All Companies',
companies=companies,
company_stats=company_stats)
@app.route('/system-admin/companies/<int:company_id>')
@system_admin_required
def system_admin_company_detail(company_id):
"""System Admin: View detailed company information"""
company = Company.query.get_or_404(company_id)
# Get company statistics
users = User.query.filter_by(company_id=company.id).all()
teams = Team.query.filter_by(company_id=company.id).all()
projects = Project.query.filter_by(company_id=company.id).all()
# Recent activity
from datetime import datetime, timedelta
week_ago = datetime.now() - timedelta(days=7)
recent_time_entries = TimeEntry.query.join(User).filter(
User.company_id == company.id,
TimeEntry.arrival_time >= week_ago
).count()
# Role distribution
role_counts = {}
for role in Role:
count = User.query.filter_by(company_id=company.id, role=role).count()
if count > 0:
role_counts[role.value] = count
return render_template('system_admin_company_detail.html',
title=f'Company: {company.name}',
company=company,
users=users,
teams=teams,
projects=projects,
recent_time_entries=recent_time_entries,
role_counts=role_counts)
@app.route('/system-admin/time-entries')
@system_admin_required
def system_admin_time_entries():
"""System Admin: View time entries across all companies"""
page = request.args.get('page', 1, type=int)
company_filter = request.args.get('company', '')
per_page = 50
# Build query
query = TimeEntry.query.join(User).join(Company)
if company_filter:
query = query.filter(Company.id == company_filter)
# Add columns for display
query = query.add_columns(
User.username,
Company.name.label('company_name'),
Project.name.label('project_name')
).outerjoin(Project)
# Order by arrival time (newest first)
query = query.order_by(TimeEntry.arrival_time.desc())
# Paginate
entries = query.paginate(page=page, per_page=per_page, error_out=False)
# Get companies for filter dropdown
companies = Company.query.order_by(Company.name).all()
return render_template('system_admin_time_entries.html',
title='System Admin - Time Entries',
entries=entries,
companies=companies,
current_company=company_filter)
@app.route('/system-admin/settings', methods=['GET', 'POST'])
@system_admin_required
def system_admin_settings():
"""System Admin: Global system settings"""
if request.method == 'POST':
# Update system settings
registration_enabled = request.form.get('registration_enabled') == 'on'
email_verification = request.form.get('email_verification_required') == 'on'
tracking_script_enabled = request.form.get('tracking_script_enabled') == 'on'
tracking_script_code = request.form.get('tracking_script_code', '')
# Update or create settings
reg_setting = SystemSettings.query.filter_by(key='registration_enabled').first()
if reg_setting:
reg_setting.value = 'true' if registration_enabled else 'false'
else:
reg_setting = SystemSettings(
key='registration_enabled',
value='true' if registration_enabled else 'false',
description='Controls whether new user registration is allowed'
)
db.session.add(reg_setting)
email_setting = SystemSettings.query.filter_by(key='email_verification_required').first()
if email_setting:
email_setting.value = 'true' if email_verification else 'false'
else:
email_setting = SystemSettings(
key='email_verification_required',
value='true' if email_verification else 'false',
description='Controls whether email verification is required for new accounts'
)
db.session.add(email_setting)
tracking_enabled_setting = SystemSettings.query.filter_by(key='tracking_script_enabled').first()
if tracking_enabled_setting:
tracking_enabled_setting.value = 'true' if tracking_script_enabled else 'false'
else:
tracking_enabled_setting = SystemSettings(
key='tracking_script_enabled',
value='true' if tracking_script_enabled else 'false',
description='Controls whether custom tracking script is enabled'
)
db.session.add(tracking_enabled_setting)
tracking_code_setting = SystemSettings.query.filter_by(key='tracking_script_code').first()
if tracking_code_setting:
tracking_code_setting.value = tracking_script_code
else:
tracking_code_setting = SystemSettings(
key='tracking_script_code',
value=tracking_script_code,
description='Custom tracking script code (HTML/JavaScript)'
)
db.session.add(tracking_code_setting)
db.session.commit()
flash('System settings updated successfully.', 'success')
return redirect(url_for('system_admin_settings'))
# Get current settings
settings = {}
all_settings = SystemSettings.query.all()
for setting in all_settings:
if setting.key == 'registration_enabled':
settings['registration_enabled'] = setting.value == 'true'
elif setting.key == 'email_verification_required':
settings['email_verification_required'] = setting.value == 'true'
elif setting.key == 'tracking_script_enabled':
settings['tracking_script_enabled'] = setting.value == 'true'
elif setting.key == 'tracking_script_code':
settings['tracking_script_code'] = setting.value
# System statistics
total_companies = Company.query.count()
total_users = User.query.count()
total_system_admins = User.query.filter_by(role=Role.SYSTEM_ADMIN).count()
return render_template('system_admin_settings.html',
title='System Administrator Settings',
settings=settings,
total_companies=total_companies,
total_users=total_users,
total_system_admins=total_system_admins)
@app.route('/system-admin/health')
@system_admin_required
def system_admin_health():
"""System Admin: System health check and event log"""
# Get system health summary
health_summary = SystemEvent.get_system_health_summary()
# Get recent events (last 7 days)
recent_events = SystemEvent.get_recent_events(days=7, limit=100)
# Get events by severity for quick stats
errors = SystemEvent.get_events_by_severity('error', days=7, limit=20)
warnings = SystemEvent.get_events_by_severity('warning', days=7, limit=20)
# System metrics
from datetime import datetime, timedelta
now = datetime.now()
# Database connection test
db_healthy = True
db_error = None
try:
db.session.execute('SELECT 1')
except Exception as e:
db_healthy = False
db_error = str(e)
SystemEvent.log_event(
'database_check_failed',
f'Database health check failed: {str(e)}',
'system',
'error'
)
# Application uptime (approximate based on first event)
first_event = SystemEvent.query.order_by(SystemEvent.timestamp.asc()).first()
uptime_start = first_event.timestamp if first_event else now
uptime_duration = now - uptime_start
# Recent activity stats
today = now.date()
today_events = SystemEvent.query.filter(
func.date(SystemEvent.timestamp) == today
).count()
# Log the health check
SystemEvent.log_event(
'system_health_check',
f'System health check performed by {session.get("username", "unknown")}',
'system',
'info',
user_id=session.get('user_id'),
ip_address=request.remote_addr,
user_agent=request.headers.get('User-Agent')
)
return render_template('system_admin_health.html',
title='System Health Check',
health_summary=health_summary,
recent_events=recent_events,
errors=errors,
warnings=warnings,
db_healthy=db_healthy,
db_error=db_error,
uptime_duration=uptime_duration,
today_events=today_events)
@app.route('/system-admin/announcements')
@system_admin_required
def system_admin_announcements():
"""System Admin: Manage announcements"""
page = request.args.get('page', 1, type=int)
per_page = 20
announcements = Announcement.query.order_by(Announcement.created_at.desc()).paginate(
page=page, per_page=per_page, error_out=False)
return render_template('system_admin_announcements.html',
title='System Admin - Announcements',
announcements=announcements)
@app.route('/system-admin/announcements/new', methods=['GET', 'POST'])
@system_admin_required
def system_admin_announcement_new():
"""System Admin: Create new announcement"""
if request.method == 'POST':
title = request.form.get('title')
content = request.form.get('content')
announcement_type = request.form.get('announcement_type', 'info')
is_urgent = request.form.get('is_urgent') == 'on'
is_active = request.form.get('is_active') == 'on'
# Handle date fields
start_date = request.form.get('start_date')
end_date = request.form.get('end_date')
start_datetime = None
end_datetime = None
if start_date:
try:
start_datetime = datetime.strptime(start_date, '%Y-%m-%dT%H:%M')
except ValueError:
pass
if end_date:
try:
end_datetime = datetime.strptime(end_date, '%Y-%m-%dT%H:%M')
except ValueError:
pass
# Handle targeting
target_all_users = request.form.get('target_all_users') == 'on'
target_roles = None
target_companies = None
if not target_all_users:
selected_roles = request.form.getlist('target_roles')
selected_companies = request.form.getlist('target_companies')
if selected_roles:
import json
target_roles = json.dumps(selected_roles)
if selected_companies:
import json
target_companies = json.dumps([int(c) for c in selected_companies])
announcement = Announcement(
title=title,
content=content,
announcement_type=announcement_type,
is_urgent=is_urgent,
is_active=is_active,
start_date=start_datetime,
end_date=end_datetime,
target_all_users=target_all_users,
target_roles=target_roles,
target_companies=target_companies,
created_by_id=g.user.id
)
db.session.add(announcement)
db.session.commit()
flash('Announcement created successfully.', 'success')
return redirect(url_for('system_admin_announcements'))
# Get roles and companies for targeting options
roles = [role.value for role in Role]
companies = Company.query.order_by(Company.name).all()
return render_template('system_admin_announcement_form.html',
title='Create Announcement',
announcement=None,
roles=roles,
companies=companies)
@app.route('/system-admin/announcements/<int:id>/edit', methods=['GET', 'POST'])
@system_admin_required
def system_admin_announcement_edit(id):
"""System Admin: Edit announcement"""
announcement = Announcement.query.get_or_404(id)
if request.method == 'POST':
announcement.title = request.form.get('title')
announcement.content = request.form.get('content')
announcement.announcement_type = request.form.get('announcement_type', 'info')
announcement.is_urgent = request.form.get('is_urgent') == 'on'
announcement.is_active = request.form.get('is_active') == 'on'
# Handle date fields
start_date = request.form.get('start_date')
end_date = request.form.get('end_date')
if start_date:
try:
announcement.start_date = datetime.strptime(start_date, '%Y-%m-%dT%H:%M')
except ValueError:
announcement.start_date = None
else:
announcement.start_date = None
if end_date:
try:
announcement.end_date = datetime.strptime(end_date, '%Y-%m-%dT%H:%M')
except ValueError:
announcement.end_date = None
else:
announcement.end_date = None
# Handle targeting
announcement.target_all_users = request.form.get('target_all_users') == 'on'
if not announcement.target_all_users:
selected_roles = request.form.getlist('target_roles')
selected_companies = request.form.getlist('target_companies')
if selected_roles:
import json
announcement.target_roles = json.dumps(selected_roles)
else:
announcement.target_roles = None
if selected_companies:
import json
announcement.target_companies = json.dumps([int(c) for c in selected_companies])
else:
announcement.target_companies = None
else:
announcement.target_roles = None
announcement.target_companies = None
announcement.updated_at = datetime.now()
db.session.commit()
flash('Announcement updated successfully.', 'success')
return redirect(url_for('system_admin_announcements'))
# Get roles and companies for targeting options
roles = [role.value for role in Role]
companies = Company.query.order_by(Company.name).all()
return render_template('system_admin_announcement_form.html',
title='Edit Announcement',
announcement=announcement,
roles=roles,
companies=companies)
@app.route('/system-admin/announcements/<int:id>/delete', methods=['POST'])
@system_admin_required
def system_admin_announcement_delete(id):
"""System Admin: Delete announcement"""
announcement = Announcement.query.get_or_404(id)
db.session.delete(announcement)
db.session.commit()
flash('Announcement deleted successfully.', 'success')
return redirect(url_for('system_admin_announcements'))
@app.route('/admin/work-policies', methods=['GET', 'POST'])
@admin_required
@company_required
def admin_work_policies():
# Get or create company work config
work_config = CompanyWorkConfig.query.filter_by(company_id=g.user.company_id).first()
if not work_config:
# Create default config for the company
preset = CompanyWorkConfig.get_regional_preset(WorkRegion.GERMANY)
work_config = CompanyWorkConfig(
company_id=g.user.company_id,
work_hours_per_day=preset['work_hours_per_day'],
mandatory_break_minutes=preset['mandatory_break_minutes'],
break_threshold_hours=preset['break_threshold_hours'],
additional_break_minutes=preset['additional_break_minutes'],
additional_break_threshold_hours=preset['additional_break_threshold_hours'],
region=WorkRegion.GERMANY,
region_name=preset['region_name'],
created_by_id=g.user.id
)
db.session.add(work_config)
db.session.commit()
if request.method == 'POST':
try:
# Handle regional preset selection
if request.form.get('action') == 'apply_preset':
region_code = request.form.get('region_preset')
if region_code:
region = WorkRegion(region_code)
preset = CompanyWorkConfig.get_regional_preset(region)
work_config.work_hours_per_day = preset['work_hours_per_day']
work_config.mandatory_break_minutes = preset['mandatory_break_minutes']
work_config.break_threshold_hours = preset['break_threshold_hours']
work_config.additional_break_minutes = preset['additional_break_minutes']
work_config.additional_break_threshold_hours = preset['additional_break_threshold_hours']
work_config.region = region
work_config.region_name = preset['region_name']
db.session.commit()
flash(f'Applied {preset["region_name"]} work policy preset', 'success')
return redirect(url_for('admin_work_policies'))
# Handle manual configuration update
else:
work_config.work_hours_per_day = float(request.form.get('work_hours_per_day', 8.0))
work_config.mandatory_break_minutes = int(request.form.get('mandatory_break_minutes', 30))
work_config.break_threshold_hours = float(request.form.get('break_threshold_hours', 6.0))
work_config.additional_break_minutes = int(request.form.get('additional_break_minutes', 15))
work_config.additional_break_threshold_hours = float(request.form.get('additional_break_threshold_hours', 9.0))
work_config.region = WorkRegion.CUSTOM
work_config.region_name = 'Custom Configuration'
db.session.commit()
flash('Work policies updated successfully!', 'success')
return redirect(url_for('admin_work_policies'))
except ValueError:
flash('Please enter valid numbers for all fields', 'error')
# Get available regional presets
regional_presets = []
for region in WorkRegion:
preset = CompanyWorkConfig.get_regional_preset(region)
regional_presets.append({
'code': region.value,
'name': preset['region_name'],
'description': f"{preset['work_hours_per_day']}h/day, {preset['mandatory_break_minutes']}min break after {preset['break_threshold_hours']}h"
})
return render_template('admin_work_policies.html',
title='Work Policies',
work_config=work_config,
regional_presets=regional_presets,
WorkRegion=WorkRegion)
# Company Management Routes
@app.route('/admin/company')
@admin_required
@company_required
def admin_company():
"""View and manage company settings"""
company = g.company
# Get company statistics
stats = {
'total_users': User.query.filter_by(company_id=company.id).count(),
'total_teams': Team.query.filter_by(company_id=company.id).count(),
'total_projects': Project.query.filter_by(company_id=company.id).count(),
'active_projects': Project.query.filter_by(company_id=company.id, is_active=True).count(),
}
return render_template('admin_company.html', title='Company Management', company=company, stats=stats)
@app.route('/admin/company/edit', methods=['GET', 'POST'])
@admin_required
@company_required
def edit_company():
"""Edit company details"""
company = g.company
if request.method == 'POST':
name = request.form.get('name')
description = request.form.get('description', '')
max_users = request.form.get('max_users')
is_active = 'is_active' in request.form
# Validate input
error = None
if not name:
error = 'Company name is required'
elif name != company.name and Company.query.filter_by(name=name).first():
error = 'Company name already exists'
if max_users:
try:
max_users = int(max_users)
if max_users < 1:
error = 'Maximum users must be at least 1'
except ValueError:
error = 'Maximum users must be a valid number'
else:
max_users = None
if error is None:
company.name = name
company.description = description
company.max_users = max_users
company.is_active = is_active
db.session.commit()
flash('Company details updated successfully!', 'success')
return redirect(url_for('admin_company'))
else:
flash(error, 'error')
return render_template('edit_company.html', title='Edit Company', company=company)
@app.route('/admin/company/users')
@admin_required
@company_required
def company_users():
"""List all users in the company with detailed information"""
users = User.query.filter_by(company_id=g.company.id).order_by(User.created_at.desc()).all()
# Calculate user statistics
user_stats = {
'total': len(users),
'verified': len([u for u in users if u.is_verified]),
'unverified': len([u for u in users if not u.is_verified]),
'blocked': len([u for u in users if u.is_blocked]),
'active': len([u for u in users if not u.is_blocked and u.is_verified]),
'admins': len([u for u in users if u.role == Role.ADMIN]),
'supervisors': len([u for u in users if u.role == Role.SUPERVISOR]),
'team_leaders': len([u for u in users if u.role == Role.TEAM_LEADER]),
'team_members': len([u for u in users if u.role == Role.TEAM_MEMBER]),
}
return render_template('company_users.html', title='Company Users',
users=users, stats=user_stats, company=g.company)
# Add these routes for team management
@app.route('/admin/teams')
@admin_required
@company_required
def admin_teams():
teams = Team.query.filter_by(company_id=g.user.company_id).all()
return render_template('admin_teams.html', title='Team Management', teams=teams)
@app.route('/admin/teams/create', methods=['GET', 'POST'])
@admin_required
@company_required
def create_team():
if request.method == 'POST':
name = request.form.get('name')
description = request.form.get('description')
# Validate input
error = None
if not name:
error = 'Team name is required'
elif Team.query.filter_by(name=name, company_id=g.user.company_id).first():
error = 'Team name already exists in your company'
if error is None:
new_team = Team(name=name, description=description, company_id=g.user.company_id)
db.session.add(new_team)
db.session.commit()
flash(f'Team "{name}" created successfully!', 'success')
return redirect(url_for('admin_teams'))
flash(error, 'error')
return render_template('create_team.html', title='Create Team')
@app.route('/admin/teams/edit/<int:team_id>', methods=['GET', 'POST'])
@admin_required
@company_required
def edit_team(team_id):
team = Team.query.filter_by(id=team_id, company_id=g.user.company_id).first_or_404()
if request.method == 'POST':
name = request.form.get('name')
description = request.form.get('description')
# Validate input
error = None
if not name:
error = 'Team name is required'
elif name != team.name and Team.query.filter_by(name=name, company_id=g.user.company_id).first():
error = 'Team name already exists in your company'
if error is None:
team.name = name
team.description = description
db.session.commit()
flash(f'Team "{name}" updated successfully!', 'success')
return redirect(url_for('admin_teams'))
flash(error, 'error')
return render_template('edit_team.html', title='Edit Team', team=team)
@app.route('/admin/teams/delete/<int:team_id>', methods=['POST'])
@admin_required
@company_required
def delete_team(team_id):
team = Team.query.filter_by(id=team_id, company_id=g.user.company_id).first_or_404()
# Check if team has members
if team.users:
flash('Cannot delete team with members. Remove all members first.', 'error')
return redirect(url_for('admin_teams'))
team_name = team.name
db.session.delete(team)
db.session.commit()
flash(f'Team "{team_name}" deleted successfully!', 'success')
return redirect(url_for('admin_teams'))
@app.route('/admin/teams/<int:team_id>', methods=['GET', 'POST'])
@admin_required
@company_required
def manage_team(team_id):
team = Team.query.filter_by(id=team_id, company_id=g.user.company_id).first_or_404()
if request.method == 'POST':
action = request.form.get('action')
if action == 'update_team':
# Update team details
name = request.form.get('name')
description = request.form.get('description')
# Validate input
error = None
if not name:
error = 'Team name is required'
elif name != team.name and Team.query.filter_by(name=name, company_id=g.user.company_id).first():
error = 'Team name already exists in your company'
if error is None:
team.name = name
team.description = description
db.session.commit()
flash(f'Team "{name}" updated successfully!', 'success')
else:
flash(error, 'error')
elif action == 'add_member':
# Add user to team
user_id = request.form.get('user_id')
if user_id:
user = User.query.get(user_id)
if user:
user.team_id = team.id
db.session.commit()
flash(f'User {user.username} added to team!', 'success')
else:
flash('User not found', 'error')
else:
flash('No user selected', 'error')
elif action == 'remove_member':
# Remove user from team
user_id = request.form.get('user_id')
if user_id:
user = User.query.get(user_id)
if user and user.team_id == team.id:
user.team_id = None
db.session.commit()
flash(f'User {user.username} removed from team!', 'success')
else:
flash('User not found or not in this team', 'error')
else:
flash('No user selected', 'error')
# Get team members
team_members = User.query.filter_by(team_id=team.id).all()
# Get users not in this team for the add member form (company-scoped)
available_users = User.query.filter(
User.company_id == g.user.company_id,
(User.team_id != team.id) | (User.team_id == None)
).all()
return render_template(
'manage_team.html',
title=f'Manage Team: {team.name}',
team=team,
team_members=team_members,
available_users=available_users
)
# Project Management Routes
@app.route('/admin/projects')
@role_required(Role.SUPERVISOR) # Supervisors and Admins can manage projects
@company_required
def admin_projects():
projects = Project.query.filter_by(company_id=g.user.company_id).order_by(Project.created_at.desc()).all()
categories = ProjectCategory.query.filter_by(company_id=g.user.company_id).order_by(ProjectCategory.name).all()
return render_template('admin_projects.html', title='Project Management', projects=projects, categories=categories)
@app.route('/admin/projects/create', methods=['GET', 'POST'])
@role_required(Role.SUPERVISOR)
@company_required
def create_project():
if request.method == 'POST':
name = request.form.get('name')
description = request.form.get('description')
code = request.form.get('code')
team_id = request.form.get('team_id') or None
category_id = request.form.get('category_id') or None
start_date_str = request.form.get('start_date')
end_date_str = request.form.get('end_date')
# Validate input
error = None
if not name:
error = 'Project name is required'
elif not code:
error = 'Project code is required'
elif Project.query.filter_by(code=code, company_id=g.user.company_id).first():
error = 'Project code already exists in your company'
# Parse dates
start_date = None
end_date = None
if start_date_str:
try:
start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date()
except ValueError:
error = 'Invalid start date format'
if end_date_str:
try:
end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date()
except ValueError:
error = 'Invalid end date format'
if start_date and end_date and start_date > end_date:
error = 'Start date cannot be after end date'
if error is None:
project = Project(
name=name,
description=description,
code=code.upper(),
company_id=g.user.company_id,
team_id=int(team_id) if team_id else None,
category_id=int(category_id) if category_id else None,
start_date=start_date,
end_date=end_date,
created_by_id=g.user.id
)
db.session.add(project)
db.session.commit()
flash(f'Project "{name}" created successfully!', 'success')
return redirect(url_for('admin_projects'))
else:
flash(error, 'error')
# Get available teams and categories for the form (company-scoped)
teams = Team.query.filter_by(company_id=g.user.company_id).order_by(Team.name).all()
categories = ProjectCategory.query.filter_by(company_id=g.user.company_id).order_by(ProjectCategory.name).all()
return render_template('create_project.html', title='Create Project', teams=teams, categories=categories)
@app.route('/admin/projects/edit/<int:project_id>', methods=['GET', 'POST'])
@role_required(Role.SUPERVISOR)
@company_required
def edit_project(project_id):
project = Project.query.filter_by(id=project_id, company_id=g.user.company_id).first_or_404()
if request.method == 'POST':
name = request.form.get('name')
description = request.form.get('description')
code = request.form.get('code')
team_id = request.form.get('team_id') or None
category_id = request.form.get('category_id') or None
is_active = request.form.get('is_active') == 'on'
start_date_str = request.form.get('start_date')
end_date_str = request.form.get('end_date')
# Validate input
error = None
if not name:
error = 'Project name is required'
elif not code:
error = 'Project code is required'
elif code != project.code and Project.query.filter_by(code=code, company_id=g.user.company_id).first():
error = 'Project code already exists in your company'
# Parse dates
start_date = None
end_date = None
if start_date_str:
try:
start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date()
except ValueError:
error = 'Invalid start date format'
if end_date_str:
try:
end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date()
except ValueError:
error = 'Invalid end date format'
if start_date and end_date and start_date > end_date:
error = 'Start date cannot be after end date'
if error is None:
project.name = name
project.description = description
project.code = code.upper()
project.team_id = int(team_id) if team_id else None
project.category_id = int(category_id) if category_id else None
project.is_active = is_active
project.start_date = start_date
project.end_date = end_date
db.session.commit()
flash(f'Project "{name}" updated successfully!', 'success')
return redirect(url_for('admin_projects'))
else:
flash(error, 'error')
# Get available teams and categories for the form (company-scoped)
teams = Team.query.filter_by(company_id=g.user.company_id).order_by(Team.name).all()
categories = ProjectCategory.query.filter_by(company_id=g.user.company_id).order_by(ProjectCategory.name).all()
return render_template('edit_project.html', title='Edit Project', project=project, teams=teams, categories=categories)
@app.route('/admin/projects/delete/<int:project_id>', methods=['POST'])
@role_required(Role.ADMIN) # Only admins can delete projects
@company_required
def delete_project(project_id):
project = Project.query.filter_by(id=project_id, company_id=g.user.company_id).first_or_404()
# Check if there are time entries associated with this project
time_entries_count = TimeEntry.query.filter_by(project_id=project_id).count()
if time_entries_count > 0:
flash(f'Cannot delete project "{project.name}" - it has {time_entries_count} time entries associated with it. Deactivate the project instead.', 'error')
else:
project_name = project.name
db.session.delete(project)
db.session.commit()
flash(f'Project "{project_name}" deleted successfully!', 'success')
return redirect(url_for('admin_projects'))
@app.route('/api/team/hours_data', methods=['GET'])
@login_required
@role_required(Role.TEAM_LEADER) # Only team leaders and above can access
@company_required
def team_hours_data():
# Get the current user's team
team = Team.query.get(g.user.team_id)
if not team:
return jsonify({
'success': False,
'message': 'You are not assigned to any team.'
}), 400
# Get date range from query parameters or use current week as default
today = datetime.now().date()
start_of_week = today - timedelta(days=today.weekday())
end_of_week = start_of_week + timedelta(days=6)
start_date_str = request.args.get('start_date', start_of_week.strftime('%Y-%m-%d'))
end_date_str = request.args.get('end_date', end_of_week.strftime('%Y-%m-%d'))
include_self = request.args.get('include_self', 'false') == 'true'
try:
start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date()
end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date()
except ValueError:
return jsonify({
'success': False,
'message': 'Invalid date format.'
}), 400
# Get all team members
team_members = User.query.filter_by(team_id=team.id).all()
# Prepare data structure for team members' hours
team_data = []
for member in team_members:
# Skip if the member is the current user (team leader) and include_self is False
if member.id == g.user.id and not include_self:
continue
# Get time entries for this member in the date range
entries = TimeEntry.query.filter(
TimeEntry.user_id == member.id,
TimeEntry.arrival_time >= datetime.combine(start_date, time.min),
TimeEntry.arrival_time <= datetime.combine(end_date, time.max)
).order_by(TimeEntry.arrival_time).all()
# Calculate daily and total hours
daily_hours = {}
total_seconds = 0
for entry in entries:
if entry.duration: # Only count completed entries
entry_date = entry.arrival_time.date()
date_str = entry_date.strftime('%Y-%m-%d')
if date_str not in daily_hours:
daily_hours[date_str] = 0
daily_hours[date_str] += entry.duration
total_seconds += entry.duration
# Convert seconds to hours for display
for date_str in daily_hours:
daily_hours[date_str] = round(daily_hours[date_str] / 3600, 2) # Convert to hours
total_hours = round(total_seconds / 3600, 2) # Convert to hours
# Format entries for JSON response
formatted_entries = []
for entry in entries:
formatted_entries.append({
'id': entry.id,
'arrival_time': entry.arrival_time.isoformat(),
'departure_time': entry.departure_time.isoformat() if entry.departure_time else None,
'duration': entry.duration,
'total_break_duration': entry.total_break_duration
})
# Add member data to team data
team_data.append({
'user': {
'id': member.id,
'username': member.username,
'email': member.email
},
'daily_hours': daily_hours,
'total_hours': total_hours,
'entries': formatted_entries
})
# Generate a list of dates in the range for the table header
date_range = []
current_date = start_date
while current_date <= end_date:
date_range.append(current_date.strftime('%Y-%m-%d'))
current_date += timedelta(days=1)
return jsonify({
'success': True,
'team': {
'id': team.id,
'name': team.name,
'description': team.description
},
'team_data': team_data,
'date_range': date_range,
'start_date': start_date.isoformat(),
'end_date': end_date.isoformat()
})
@app.route('/export')
def export():
return render_template('export.html', title='Export Data')
def get_date_range(period, start_date_str=None, end_date_str=None):
"""Get start and end date based on period or custom date range."""
today = datetime.now().date()
if period:
if period == 'today':
return today, today
elif period == 'week':
start_date = today - timedelta(days=today.weekday())
return start_date, today
elif period == 'month':
start_date = today.replace(day=1)
return start_date, today
elif period == 'all':
earliest_entry = TimeEntry.query.order_by(TimeEntry.arrival_time).first()
start_date = earliest_entry.arrival_time.date() if earliest_entry else today
return start_date, today
else:
# Custom date range
try:
start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date()
end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date()
return start_date, end_date
except (ValueError, TypeError):
raise ValueError('Invalid date format')
@app.route('/download_export')
def download_export():
"""Handle export download requests."""
export_format = request.args.get('format', 'csv')
period = request.args.get('period')
try:
start_date, end_date = get_date_range(
period,
request.args.get('start_date'),
request.args.get('end_date')
)
except ValueError:
flash('Invalid date format. Please use YYYY-MM-DD format.')
return redirect(url_for('export'))
# Query entries within the date range
start_datetime = datetime.combine(start_date, time.min)
end_datetime = datetime.combine(end_date, time.max)
entries = TimeEntry.query.filter(
TimeEntry.arrival_time >= start_datetime,
TimeEntry.arrival_time <= end_datetime
).order_by(TimeEntry.arrival_time).all()
if not entries:
flash('No entries found for the selected date range.')
return redirect(url_for('export'))
# Prepare data and filename
data = prepare_export_data(entries)
filename = f"timetrack_export_{start_date.strftime('%Y%m%d')}_to_{end_date.strftime('%Y%m%d')}"
# Export based on format
if export_format == 'csv':
return export_to_csv(data, filename)
elif export_format == 'excel':
return export_to_excel(data, filename)
else:
flash('Invalid export format.')
return redirect(url_for('export'))
@app.route('/analytics')
@app.route('/analytics/<mode>')
@login_required
def analytics(mode='personal'):
"""Unified analytics view combining history, team hours, and graphs"""
# Validate mode parameter
if mode not in ['personal', 'team']:
mode = 'personal'
# Check team access for team mode
if mode == 'team':
if not g.user.team_id:
flash('You must be assigned to a team to view team analytics.', 'warning')
return redirect(url_for('analytics', mode='personal'))
if g.user.role not in [Role.TEAM_LEADER, Role.SUPERVISOR, Role.ADMIN]:
flash('You do not have permission to view team analytics.', 'error')
return redirect(url_for('analytics', mode='personal'))
# Get available projects for filtering
available_projects = []
all_projects = Project.query.filter_by(is_active=True).all()
for project in all_projects:
if project.is_user_allowed(g.user):
available_projects.append(project)
# Get team members if in team mode
team_members = []
if mode == 'team' and g.user.team_id:
team_members = User.query.filter_by(team_id=g.user.team_id).all()
# Default date range (current week)
today = datetime.now().date()
start_of_week = today - timedelta(days=today.weekday())
end_of_week = start_of_week + timedelta(days=6)
return render_template('analytics.html',
title='Time Analytics',
mode=mode,
available_projects=available_projects,
team_members=team_members,
default_start_date=start_of_week.strftime('%Y-%m-%d'),
default_end_date=end_of_week.strftime('%Y-%m-%d'))
@app.route('/api/analytics/data')
@login_required
def analytics_data():
"""API endpoint for analytics data"""
mode = request.args.get('mode', 'personal')
view_type = request.args.get('view', 'table')
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
project_filter = request.args.get('project_id')
granularity = request.args.get('granularity', 'daily')
# Validate mode
if mode not in ['personal', 'team']:
return jsonify({'error': 'Invalid mode'}), 400
# Check permissions for team mode
if mode == 'team':
if not g.user.team_id:
return jsonify({'error': 'No team assigned'}), 403
if g.user.role not in [Role.TEAM_LEADER, Role.SUPERVISOR, Role.ADMIN]:
return jsonify({'error': 'Insufficient permissions'}), 403
try:
# Parse dates
if start_date:
start_date = datetime.strptime(start_date, '%Y-%m-%d').date()
if end_date:
end_date = datetime.strptime(end_date, '%Y-%m-%d').date()
# Get filtered data
data = get_filtered_analytics_data(g.user, mode, start_date, end_date, project_filter)
# Format data based on view type
if view_type == 'graph':
formatted_data = format_graph_data(data, granularity)
elif view_type == 'team':
formatted_data = format_team_data(data, granularity)
else:
formatted_data = format_table_data(data)
return jsonify(formatted_data)
except Exception as e:
logger.error(f"Error in analytics_data: {str(e)}")
return jsonify({'error': 'Internal server error'}), 500
def get_filtered_analytics_data(user, mode, start_date=None, end_date=None, project_filter=None):
"""Get filtered time entry data for analytics"""
# Base query
query = TimeEntry.query
# Apply user/team filter
if mode == 'personal':
query = query.filter(TimeEntry.user_id == user.id)
elif mode == 'team' and user.team_id:
team_user_ids = [u.id for u in User.query.filter_by(team_id=user.team_id).all()]
query = query.filter(TimeEntry.user_id.in_(team_user_ids))
# Apply date filters
if start_date:
query = query.filter(func.date(TimeEntry.arrival_time) >= start_date)
if end_date:
query = query.filter(func.date(TimeEntry.arrival_time) <= end_date)
# Apply project filter
if project_filter:
if project_filter == 'none':
query = query.filter(TimeEntry.project_id.is_(None))
else:
try:
project_id = int(project_filter)
query = query.filter(TimeEntry.project_id == project_id)
except ValueError:
pass
return query.order_by(TimeEntry.arrival_time.desc()).all()
@app.route('/api/companies/<int:company_id>/teams')
@system_admin_required
def api_company_teams(company_id):
"""API: Get teams for a specific company (System Admin only)"""
teams = Team.query.filter_by(company_id=company_id).order_by(Team.name).all()
return jsonify([{
'id': team.id,
'name': team.name,
'description': team.description
} for team in teams])
@app.route('/api/system-admin/stats')
@system_admin_required
def api_system_admin_stats():
"""API: Get real-time system statistics for dashboard"""
from datetime import datetime, timedelta
# Get basic counts
total_companies = Company.query.count()
total_users = User.query.count()
total_teams = Team.query.count()
total_projects = Project.query.count()
total_time_entries = TimeEntry.query.count()
# Active sessions
active_sessions = TimeEntry.query.filter_by(departure_time=None, is_paused=False).count()
paused_sessions = TimeEntry.query.filter_by(is_paused=True).count()
# Recent activity (last 24 hours)
yesterday = datetime.now() - timedelta(days=1)
recent_users = User.query.filter(User.created_at >= yesterday).count()
recent_companies = Company.query.filter(Company.created_at >= yesterday).count()
recent_time_entries = TimeEntry.query.filter(TimeEntry.arrival_time >= yesterday).count()
# System health
orphaned_users = User.query.filter_by(company_id=None).count()
orphaned_time_entries = TimeEntry.query.filter_by(user_id=None).count()
blocked_users = User.query.filter_by(is_blocked=True).count()
unverified_users = User.query.filter_by(is_verified=False).count()
return jsonify({
'totals': {
'companies': total_companies,
'users': total_users,
'teams': total_teams,
'projects': total_projects,
'time_entries': total_time_entries
},
'active': {
'sessions': active_sessions,
'paused_sessions': paused_sessions
},
'recent': {
'users': recent_users,
'companies': recent_companies,
'time_entries': recent_time_entries
},
'health': {
'orphaned_users': orphaned_users,
'orphaned_time_entries': orphaned_time_entries,
'blocked_users': blocked_users,
'unverified_users': unverified_users
}
})
@app.route('/api/system-admin/companies/<int:company_id>/users')
@system_admin_required
def api_company_users(company_id):
"""API: Get users for a specific company (System Admin only)"""
company = Company.query.get_or_404(company_id)
users = User.query.filter_by(company_id=company.id).order_by(User.username).all()
return jsonify({
'company': {
'id': company.id,
'name': company.name,
'is_personal': company.is_personal
},
'users': [{
'id': user.id,
'username': user.username,
'email': user.email,
'role': user.role.value,
'is_blocked': user.is_blocked,
'is_verified': user.is_verified,
'created_at': user.created_at.isoformat(),
'team_id': user.team_id
} for user in users]
})
@app.route('/api/system-admin/users/<int:user_id>/toggle-block', methods=['POST'])
@system_admin_required
def api_toggle_user_block(user_id):
"""API: Toggle user blocked status (System Admin only)"""
user = User.query.get_or_404(user_id)
# Safety check: prevent blocking yourself
if user.id == g.user.id:
return jsonify({'error': 'Cannot block your own account'}), 400
# Safety check: prevent blocking the last system admin
if user.role == Role.SYSTEM_ADMIN and not user.is_blocked:
system_admin_count = User.query.filter_by(role=Role.SYSTEM_ADMIN, is_blocked=False).count()
if system_admin_count <= 1:
return jsonify({'error': 'Cannot block the last system administrator'}), 400
user.is_blocked = not user.is_blocked
db.session.commit()
return jsonify({
'id': user.id,
'username': user.username,
'is_blocked': user.is_blocked,
'message': f'User {"blocked" if user.is_blocked else "unblocked"} successfully'
})
@app.route('/api/system-admin/companies/<int:company_id>/stats')
@system_admin_required
def api_company_stats(company_id):
"""API: Get detailed statistics for a specific company"""
company = Company.query.get_or_404(company_id)
# User counts by role
role_counts = {}
for role in Role:
count = User.query.filter_by(company_id=company.id, role=role).count()
if count > 0:
role_counts[role.value] = count
# Team and project counts
team_count = Team.query.filter_by(company_id=company.id).count()
project_count = Project.query.filter_by(company_id=company.id).count()
active_projects = Project.query.filter_by(company_id=company.id, is_active=True).count()
# Time entries statistics
from datetime import datetime, timedelta
week_ago = datetime.now() - timedelta(days=7)
month_ago = datetime.now() - timedelta(days=30)
weekly_entries = TimeEntry.query.join(User).filter(
User.company_id == company.id,
TimeEntry.arrival_time >= week_ago
).count()
monthly_entries = TimeEntry.query.join(User).filter(
User.company_id == company.id,
TimeEntry.arrival_time >= month_ago
).count()
# Active sessions
active_sessions = TimeEntry.query.join(User).filter(
User.company_id == company.id,
TimeEntry.departure_time == None,
TimeEntry.is_paused == False
).count()
return jsonify({
'company': {
'id': company.id,
'name': company.name,
'is_personal': company.is_personal,
'is_active': company.is_active
},
'users': {
'total': sum(role_counts.values()),
'by_role': role_counts
},
'structure': {
'teams': team_count,
'projects': project_count,
'active_projects': active_projects
},
'activity': {
'weekly_entries': weekly_entries,
'monthly_entries': monthly_entries,
'active_sessions': active_sessions
}
})
@app.route('/api/analytics/export')
@login_required
def analytics_export():
"""Export analytics data in various formats"""
export_format = request.args.get('format', 'csv')
view_type = request.args.get('view', 'table')
mode = request.args.get('mode', 'personal')
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
project_filter = request.args.get('project_id')
# Validate permissions
if mode == 'team':
if not g.user.team_id:
flash('No team assigned', 'error')
return redirect(url_for('analytics'))
if g.user.role not in [Role.TEAM_LEADER, Role.SUPERVISOR, Role.ADMIN]:
flash('Insufficient permissions', 'error')
return redirect(url_for('analytics'))
try:
# Parse dates
if start_date:
start_date = datetime.strptime(start_date, '%Y-%m-%d').date()
if end_date:
end_date = datetime.strptime(end_date, '%Y-%m-%d').date()
# Get data
data = get_filtered_analytics_data(g.user, mode, start_date, end_date, project_filter)
if export_format == 'csv':
return export_analytics_csv(data, view_type, mode)
elif export_format == 'excel':
return export_analytics_excel(data, view_type, mode)
else:
flash('Invalid export format', 'error')
return redirect(url_for('analytics'))
except Exception as e:
logger.error(f"Error in analytics export: {str(e)}")
flash('Error generating export', 'error')
return redirect(url_for('analytics'))
# Task Management Routes
@app.route('/admin/projects/<int:project_id>/tasks')
@role_required(Role.TEAM_MEMBER) # All authenticated users can view tasks
@company_required
def manage_project_tasks(project_id):
project = Project.query.filter_by(id=project_id, company_id=g.user.company_id).first_or_404()
# Check if user has access to this project
if not project.is_user_allowed(g.user):
flash('You do not have access to this project.', 'error')
return redirect(url_for('admin_projects'))
# Get all tasks for this project
tasks = Task.query.filter_by(project_id=project_id).order_by(Task.created_at.desc()).all()
# Get team members for assignment dropdown
if project.team_id:
# If project is assigned to a specific team, only show team members
team_members = User.query.filter_by(team_id=project.team_id, company_id=g.user.company_id).all()
else:
# If project is available to all teams, show all company users
team_members = User.query.filter_by(company_id=g.user.company_id).all()
return render_template('manage_project_tasks.html',
title=f'Tasks - {project.name}',
project=project,
tasks=tasks,
team_members=team_members)
@app.route('/admin/projects/<int:project_id>/kanban')
@role_required(Role.TEAM_MEMBER)
@company_required
def project_kanban(project_id):
project = Project.query.filter_by(id=project_id, company_id=g.user.company_id).first_or_404()
# Check if user has access to this project
if not project.is_user_allowed(g.user):
flash('You do not have access to this project.', 'error')
return redirect(url_for('admin_projects'))
# Get all Kanban boards for this project
boards = KanbanBoard.query.filter_by(project_id=project_id, is_active=True).order_by(KanbanBoard.created_at.desc()).all()
# Get team members for assignment dropdown
if project.team_id:
team_members = User.query.filter_by(team_id=project.team_id, company_id=g.user.company_id).all()
else:
team_members = User.query.filter_by(company_id=g.user.company_id).all()
# Get tasks for task assignment dropdown
tasks = Task.query.filter_by(project_id=project_id).order_by(Task.name).all()
return render_template('project_kanban.html',
title=f'Kanban - {project.name}',
project=project,
boards=boards,
team_members=team_members,
tasks=tasks)
@app.route('/kanban')
@role_required(Role.TEAM_MEMBER)
@company_required
def kanban_overview():
# Get all projects the user has access to
if g.user.role in [Role.ADMIN, Role.SUPERVISOR]:
# Admins and Supervisors can see all company projects
projects = Project.query.filter_by(company_id=g.user.company_id, is_active=True).order_by(Project.name).all()
elif g.user.team_id:
# Team members see team projects + unassigned projects
projects = Project.query.filter(
Project.company_id == g.user.company_id,
Project.is_active == True,
db.or_(Project.team_id == g.user.team_id, Project.team_id == None)
).order_by(Project.name).all()
else:
# Unassigned users see only unassigned projects
projects = Project.query.filter_by(
company_id=g.user.company_id,
team_id=None,
is_active=True
).order_by(Project.name).all()
# Get Kanban boards for each project
project_boards = {}
for project in projects:
boards = KanbanBoard.query.filter_by(
project_id=project.id,
is_active=True
).order_by(KanbanBoard.created_at.desc()).all()
if boards: # Only include projects that have Kanban boards
project_boards[project] = boards
return render_template('kanban_overview.html',
title='Kanban Overview',
project_boards=project_boards)
# Task API Routes
@app.route('/api/tasks', methods=['POST'])
@role_required(Role.TEAM_MEMBER)
@company_required
def create_task():
try:
data = request.get_json()
project_id = data.get('project_id')
# Verify project access
project = Project.query.filter_by(id=project_id, company_id=g.user.company_id).first()
if not project or not project.is_user_allowed(g.user):
return jsonify({'success': False, 'message': 'Project not found or access denied'})
# Validate required fields
name = data.get('name')
if not name:
return jsonify({'success': False, 'message': 'Task name is required'})
# Parse dates
start_date = None
due_date = None
if data.get('start_date'):
start_date = datetime.strptime(data.get('start_date'), '%Y-%m-%d').date()
if data.get('due_date'):
due_date = datetime.strptime(data.get('due_date'), '%Y-%m-%d').date()
# Create task
task = Task(
name=name,
description=data.get('description', ''),
status=TaskStatus(data.get('status', 'Not Started')),
priority=TaskPriority(data.get('priority', 'Medium')),
estimated_hours=float(data.get('estimated_hours')) if data.get('estimated_hours') else None,
project_id=project_id,
assigned_to_id=int(data.get('assigned_to_id')) if data.get('assigned_to_id') else None,
start_date=start_date,
due_date=due_date,
created_by_id=g.user.id
)
db.session.add(task)
db.session.commit()
return jsonify({'success': True, 'message': 'Task created successfully'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/tasks/<int:task_id>', methods=['GET'])
@role_required(Role.TEAM_MEMBER)
@company_required
def get_task(task_id):
try:
task = Task.query.join(Project).filter(
Task.id == task_id,
Project.company_id == g.user.company_id
).first()
if not task or not task.can_user_access(g.user):
return jsonify({'success': False, 'message': 'Task not found or access denied'})
task_data = {
'id': task.id,
'name': task.name,
'description': task.description,
'status': task.status.value,
'priority': task.priority.value,
'estimated_hours': task.estimated_hours,
'assigned_to_id': task.assigned_to_id,
'start_date': task.start_date.isoformat() if task.start_date else None,
'due_date': task.due_date.isoformat() if task.due_date else None
}
return jsonify({'success': True, 'task': task_data})
except Exception as e:
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/tasks/<int:task_id>', methods=['PUT'])
@role_required(Role.TEAM_MEMBER)
@company_required
def update_task(task_id):
try:
task = Task.query.join(Project).filter(
Task.id == task_id,
Project.company_id == g.user.company_id
).first()
if not task or not task.can_user_access(g.user):
return jsonify({'success': False, 'message': 'Task not found or access denied'})
data = request.get_json()
# Update task fields
if 'name' in data:
task.name = data['name']
if 'description' in data:
task.description = data['description']
if 'status' in data:
task.status = TaskStatus(data['status'])
if data['status'] == 'Completed':
task.completed_date = datetime.now().date()
else:
task.completed_date = None
if 'priority' in data:
task.priority = TaskPriority(data['priority'])
if 'estimated_hours' in data:
task.estimated_hours = float(data['estimated_hours']) if data['estimated_hours'] else None
if 'assigned_to_id' in data:
task.assigned_to_id = int(data['assigned_to_id']) if data['assigned_to_id'] else None
if 'start_date' in data:
task.start_date = datetime.strptime(data['start_date'], '%Y-%m-%d').date() if data['start_date'] else None
if 'due_date' in data:
task.due_date = datetime.strptime(data['due_date'], '%Y-%m-%d').date() if data['due_date'] else None
db.session.commit()
return jsonify({'success': True, 'message': 'Task updated successfully'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/tasks/<int:task_id>', methods=['DELETE'])
@role_required(Role.TEAM_LEADER) # Only team leaders and above can delete tasks
@company_required
def delete_task(task_id):
try:
task = Task.query.join(Project).filter(
Task.id == task_id,
Project.company_id == g.user.company_id
).first()
if not task or not task.can_user_access(g.user):
return jsonify({'success': False, 'message': 'Task not found or access denied'})
db.session.delete(task)
db.session.commit()
return jsonify({'success': True, 'message': 'Task deleted successfully'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': str(e)})
# Subtask API Routes
@app.route('/api/subtasks', methods=['POST'])
@role_required(Role.TEAM_MEMBER)
@company_required
def create_subtask():
try:
data = request.get_json()
task_id = data.get('task_id')
# Verify task access
task = Task.query.join(Project).filter(
Task.id == task_id,
Project.company_id == g.user.company_id
).first()
if not task or not task.can_user_access(g.user):
return jsonify({'success': False, 'message': 'Task not found or access denied'})
# Validate required fields
name = data.get('name')
if not name:
return jsonify({'success': False, 'message': 'Subtask name is required'})
# Parse dates
start_date = None
due_date = None
if data.get('start_date'):
start_date = datetime.strptime(data.get('start_date'), '%Y-%m-%d').date()
if data.get('due_date'):
due_date = datetime.strptime(data.get('due_date'), '%Y-%m-%d').date()
# Create subtask
subtask = SubTask(
name=name,
description=data.get('description', ''),
status=TaskStatus(data.get('status', 'Not Started')),
priority=TaskPriority(data.get('priority', 'Medium')),
estimated_hours=float(data.get('estimated_hours')) if data.get('estimated_hours') else None,
task_id=task_id,
assigned_to_id=int(data.get('assigned_to_id')) if data.get('assigned_to_id') else None,
start_date=start_date,
due_date=due_date,
created_by_id=g.user.id
)
db.session.add(subtask)
db.session.commit()
return jsonify({'success': True, 'message': 'Subtask created successfully'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/subtasks/<int:subtask_id>', methods=['GET'])
@role_required(Role.TEAM_MEMBER)
@company_required
def get_subtask(subtask_id):
try:
subtask = SubTask.query.join(Task).join(Project).filter(
SubTask.id == subtask_id,
Project.company_id == g.user.company_id
).first()
if not subtask or not subtask.can_user_access(g.user):
return jsonify({'success': False, 'message': 'Subtask not found or access denied'})
subtask_data = {
'id': subtask.id,
'name': subtask.name,
'description': subtask.description,
'status': subtask.status.value,
'priority': subtask.priority.value,
'estimated_hours': subtask.estimated_hours,
'assigned_to_id': subtask.assigned_to_id,
'start_date': subtask.start_date.isoformat() if subtask.start_date else None,
'due_date': subtask.due_date.isoformat() if subtask.due_date else None
}
return jsonify({'success': True, 'subtask': subtask_data})
except Exception as e:
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/subtasks/<int:subtask_id>', methods=['PUT'])
@role_required(Role.TEAM_MEMBER)
@company_required
def update_subtask(subtask_id):
try:
subtask = SubTask.query.join(Task).join(Project).filter(
SubTask.id == subtask_id,
Project.company_id == g.user.company_id
).first()
if not subtask or not subtask.can_user_access(g.user):
return jsonify({'success': False, 'message': 'Subtask not found or access denied'})
data = request.get_json()
# Update subtask fields
if 'name' in data:
subtask.name = data['name']
if 'description' in data:
subtask.description = data['description']
if 'status' in data:
subtask.status = TaskStatus(data['status'])
if data['status'] == 'Completed':
subtask.completed_date = datetime.now().date()
else:
subtask.completed_date = None
if 'priority' in data:
subtask.priority = TaskPriority(data['priority'])
if 'estimated_hours' in data:
subtask.estimated_hours = float(data['estimated_hours']) if data['estimated_hours'] else None
if 'assigned_to_id' in data:
subtask.assigned_to_id = int(data['assigned_to_id']) if data['assigned_to_id'] else None
if 'start_date' in data:
subtask.start_date = datetime.strptime(data['start_date'], '%Y-%m-%d').date() if data['start_date'] else None
if 'due_date' in data:
subtask.due_date = datetime.strptime(data['due_date'], '%Y-%m-%d').date() if data['due_date'] else None
db.session.commit()
return jsonify({'success': True, 'message': 'Subtask updated successfully'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/subtasks/<int:subtask_id>', methods=['DELETE'])
@role_required(Role.TEAM_LEADER) # Only team leaders and above can delete subtasks
@company_required
def delete_subtask(subtask_id):
try:
subtask = SubTask.query.join(Task).join(Project).filter(
SubTask.id == subtask_id,
Project.company_id == g.user.company_id
).first()
if not subtask or not subtask.can_user_access(g.user):
return jsonify({'success': False, 'message': 'Subtask not found or access denied'})
db.session.delete(subtask)
db.session.commit()
return jsonify({'success': True, 'message': 'Subtask deleted successfully'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': str(e)})
# Category Management API Routes
@app.route('/api/admin/categories', methods=['POST'])
@role_required(Role.ADMIN)
@company_required
def create_category():
try:
data = request.get_json()
name = data.get('name')
description = data.get('description', '')
color = data.get('color', '#007bff')
icon = data.get('icon', '')
if not name:
return jsonify({'success': False, 'message': 'Category name is required'})
# Check if category already exists
existing = ProjectCategory.query.filter_by(
name=name,
company_id=g.user.company_id
).first()
if existing:
return jsonify({'success': False, 'message': 'Category name already exists'})
category = ProjectCategory(
name=name,
description=description,
color=color,
icon=icon,
company_id=g.user.company_id,
created_by_id=g.user.id
)
db.session.add(category)
db.session.commit()
return jsonify({'success': True, 'message': 'Category created successfully'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/admin/categories/<int:category_id>', methods=['PUT'])
@role_required(Role.ADMIN)
@company_required
def update_category(category_id):
try:
category = ProjectCategory.query.filter_by(
id=category_id,
company_id=g.user.company_id
).first()
if not category:
return jsonify({'success': False, 'message': 'Category not found'})
data = request.get_json()
name = data.get('name')
if not name:
return jsonify({'success': False, 'message': 'Category name is required'})
# Check if name conflicts with another category
existing = ProjectCategory.query.filter(
ProjectCategory.name == name,
ProjectCategory.company_id == g.user.company_id,
ProjectCategory.id != category_id
).first()
if existing:
return jsonify({'success': False, 'message': 'Category name already exists'})
category.name = name
category.description = data.get('description', '')
category.color = data.get('color', category.color)
category.icon = data.get('icon', '')
db.session.commit()
return jsonify({'success': True, 'message': 'Category updated successfully'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/admin/categories/<int:category_id>', methods=['DELETE'])
@role_required(Role.ADMIN)
@company_required
def delete_category(category_id):
try:
category = ProjectCategory.query.filter_by(
id=category_id,
company_id=g.user.company_id
).first()
if not category:
return jsonify({'success': False, 'message': 'Category not found'})
# Unassign projects from this category
projects = Project.query.filter_by(category_id=category_id).all()
for project in projects:
project.category_id = None
db.session.delete(category)
db.session.commit()
return jsonify({'success': True, 'message': 'Category deleted successfully'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': str(e)})
# Kanban API Routes
@app.route('/api/kanban/stats')
@role_required(Role.TEAM_MEMBER)
@company_required
def get_kanban_stats():
try:
# Get all projects the user has access to
if g.user.role in [Role.ADMIN, Role.SUPERVISOR]:
projects = Project.query.filter_by(company_id=g.user.company_id, is_active=True).all()
elif g.user.team_id:
projects = Project.query.filter(
Project.company_id == g.user.company_id,
Project.is_active == True,
db.or_(Project.team_id == g.user.team_id, Project.team_id == None)
).all()
else:
projects = Project.query.filter_by(
company_id=g.user.company_id,
team_id=None,
is_active=True
).all()
# Count boards and cards
total_boards = 0
total_cards = 0
projects_with_boards = 0
for project in projects:
boards = KanbanBoard.query.filter_by(project_id=project.id, is_active=True).all()
if boards:
projects_with_boards += 1
total_boards += len(boards)
for board in boards:
for column in board.columns:
total_cards += len([card for card in column.cards if card.is_active])
return jsonify({
'success': True,
'stats': {
'projects_with_boards': projects_with_boards,
'total_boards': total_boards,
'total_cards': total_cards
}
})
except Exception as e:
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/kanban/boards', methods=['GET'])
@role_required(Role.TEAM_MEMBER)
@company_required
def get_kanban_boards():
try:
project_id = request.args.get('project_id')
# Verify project access
project = Project.query.filter_by(id=project_id, company_id=g.user.company_id).first()
if not project or not project.is_user_allowed(g.user):
return jsonify({'success': False, 'message': 'Project not found or access denied'})
boards = KanbanBoard.query.filter_by(project_id=project_id, is_active=True).all()
boards_data = []
for board in boards:
boards_data.append({
'id': board.id,
'name': board.name,
'description': board.description,
'is_default': board.is_default,
'created_at': board.created_at.isoformat(),
'column_count': len(board.columns)
})
return jsonify({'success': True, 'boards': boards_data})
except Exception as e:
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/kanban/boards', methods=['POST'])
@role_required(Role.TEAM_LEADER)
@company_required
def create_kanban_board():
try:
data = request.get_json()
project_id = int(data.get('project_id'))
# Verify project access
project = Project.query.filter_by(id=project_id, company_id=g.user.company_id).first()
if not project or not project.is_user_allowed(g.user):
return jsonify({'success': False, 'message': 'Project not found or access denied'})
name = data.get('name')
if not name:
return jsonify({'success': False, 'message': 'Board name is required'})
# Check if board name already exists in project
existing = KanbanBoard.query.filter_by(project_id=project_id, name=name).first()
if existing:
return jsonify({'success': False, 'message': 'Board name already exists in this project'})
# Create board
board = KanbanBoard(
name=name,
description=data.get('description', ''),
project_id=project_id,
is_default=data.get('is_default') in ['true', 'on', True],
created_by_id=g.user.id
)
db.session.add(board)
db.session.flush() # Get board ID for columns
# Create default columns
default_columns = [
{'name': 'To Do', 'position': 1, 'color': '#6c757d'},
{'name': 'In Progress', 'position': 2, 'color': '#007bff'},
{'name': 'Done', 'position': 3, 'color': '#28a745'}
]
for col_data in default_columns:
column = KanbanColumn(
name=col_data['name'],
position=col_data['position'],
color=col_data['color'],
board_id=board.id
)
db.session.add(column)
db.session.commit()
return jsonify({'success': True, 'message': 'Board created successfully', 'board_id': board.id})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/kanban/boards/<int:board_id>', methods=['GET'])
@role_required(Role.TEAM_MEMBER)
@company_required
def get_kanban_board(board_id):
try:
board = KanbanBoard.query.join(Project).filter(
KanbanBoard.id == board_id,
Project.company_id == g.user.company_id
).first()
if not board or not board.project.is_user_allowed(g.user):
return jsonify({'success': False, 'message': 'Board not found or access denied'})
columns_data = []
for column in board.columns:
if not column.is_active:
continue
cards_data = []
for card in column.cards:
if not card.is_active:
continue
cards_data.append({
'id': card.id,
'title': card.title,
'description': card.description,
'position': card.position,
'color': card.color,
'assigned_to': {
'id': card.assigned_to.id,
'username': card.assigned_to.username
} if card.assigned_to else None,
'task_id': card.task_id,
'task_name': card.task.name if card.task else None,
'due_date': card.due_date.isoformat() if card.due_date else None,
'created_at': card.created_at.isoformat()
})
columns_data.append({
'id': column.id,
'name': column.name,
'description': column.description,
'position': column.position,
'color': column.color,
'wip_limit': column.wip_limit,
'card_count': column.card_count,
'is_over_wip_limit': column.is_over_wip_limit,
'cards': cards_data
})
board_data = {
'id': board.id,
'name': board.name,
'description': board.description,
'project': {
'id': board.project.id,
'name': board.project.name,
'code': board.project.code
},
'columns': columns_data
}
return jsonify({'success': True, 'board': board_data})
except Exception as e:
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/kanban/cards', methods=['POST'])
@role_required(Role.TEAM_MEMBER)
@company_required
def create_kanban_card():
try:
data = request.get_json()
column_id = data.get('column_id')
# Verify column access
column = KanbanColumn.query.join(KanbanBoard).join(Project).filter(
KanbanColumn.id == column_id,
Project.company_id == g.user.company_id
).first()
if not column or not column.board.project.is_user_allowed(g.user):
return jsonify({'success': False, 'message': 'Column not found or access denied'})
title = data.get('title')
if not title:
return jsonify({'success': False, 'message': 'Card title is required'})
# Calculate position (add to end of column)
max_position = db.session.query(func.max(KanbanCard.position)).filter_by(
column_id=column_id, is_active=True
).scalar() or 0
# Parse due date
due_date = None
if data.get('due_date'):
due_date = datetime.strptime(data.get('due_date'), '%Y-%m-%d').date()
# Create card
card = KanbanCard(
title=title,
description=data.get('description', ''),
position=max_position + 1,
color=data.get('color'),
column_id=column_id,
task_id=int(data.get('task_id')) if data.get('task_id') else None,
assigned_to_id=int(data.get('assigned_to_id')) if data.get('assigned_to_id') else None,
due_date=due_date,
created_by_id=g.user.id
)
db.session.add(card)
db.session.commit()
return jsonify({'success': True, 'message': 'Card created successfully', 'card_id': card.id})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/kanban/cards/<int:card_id>/move', methods=['PUT'])
@role_required(Role.TEAM_MEMBER)
@company_required
def move_kanban_card(card_id):
try:
data = request.get_json()
new_column_id = data.get('column_id')
new_position = data.get('position')
# Verify card access
card = KanbanCard.query.join(KanbanColumn).join(KanbanBoard).join(Project).filter(
KanbanCard.id == card_id,
Project.company_id == g.user.company_id
).first()
if not card or not card.can_user_access(g.user):
return jsonify({'success': False, 'message': 'Card not found or access denied'})
# Verify new column access
new_column = KanbanColumn.query.join(KanbanBoard).join(Project).filter(
KanbanColumn.id == new_column_id,
Project.company_id == g.user.company_id
).first()
if not new_column or not new_column.board.project.is_user_allowed(g.user):
return jsonify({'success': False, 'message': 'Target column not found or access denied'})
old_column_id = card.column_id
old_position = card.position
# Update positions in old column (if moving to different column)
if old_column_id != new_column_id:
# Shift cards down in old column
cards_to_shift = KanbanCard.query.filter(
KanbanCard.column_id == old_column_id,
KanbanCard.position > old_position,
KanbanCard.is_active == True
).all()
for c in cards_to_shift:
c.position -= 1
# Update positions in new column
cards_to_shift = KanbanCard.query.filter(
KanbanCard.column_id == new_column_id,
KanbanCard.position >= new_position,
KanbanCard.is_active == True,
KanbanCard.id != card_id
).all()
for c in cards_to_shift:
c.position += 1
# Update card
card.column_id = new_column_id
card.position = new_position
card.updated_at = datetime.now()
db.session.commit()
return jsonify({'success': True, 'message': 'Card moved successfully'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/kanban/cards/<int:card_id>', methods=['PUT'])
@role_required(Role.TEAM_MEMBER)
@company_required
def update_kanban_card(card_id):
try:
card = KanbanCard.query.join(KanbanColumn).join(KanbanBoard).join(Project).filter(
KanbanCard.id == card_id,
Project.company_id == g.user.company_id
).first()
if not card or not card.can_user_access(g.user):
return jsonify({'success': False, 'message': 'Card not found or access denied'})
data = request.get_json()
# Update card fields
if 'title' in data:
card.title = data['title']
if 'description' in data:
card.description = data['description']
if 'color' in data:
card.color = data['color']
if 'assigned_to_id' in data:
card.assigned_to_id = int(data['assigned_to_id']) if data['assigned_to_id'] else None
if 'task_id' in data:
card.task_id = int(data['task_id']) if data['task_id'] else None
if 'due_date' in data:
card.due_date = datetime.strptime(data['due_date'], '%Y-%m-%d').date() if data['due_date'] else None
card.updated_at = datetime.now()
db.session.commit()
return jsonify({'success': True, 'message': 'Card updated successfully'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/kanban/cards/<int:card_id>', methods=['DELETE'])
@role_required(Role.TEAM_MEMBER)
@company_required
def delete_kanban_card(card_id):
try:
card = KanbanCard.query.join(KanbanColumn).join(KanbanBoard).join(Project).filter(
KanbanCard.id == card_id,
Project.company_id == g.user.company_id
).first()
if not card or not card.can_user_access(g.user):
return jsonify({'success': False, 'message': 'Card not found or access denied'})
column_id = card.column_id
position = card.position
# Soft delete
card.is_active = False
card.updated_at = datetime.now()
# Shift remaining cards up
cards_to_shift = KanbanCard.query.filter(
KanbanCard.column_id == column_id,
KanbanCard.position > position,
KanbanCard.is_active == True
).all()
for c in cards_to_shift:
c.position -= 1
db.session.commit()
return jsonify({'success': True, 'message': 'Card deleted successfully'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': str(e)})
# Kanban Column Management API
@app.route('/api/kanban/columns', methods=['POST'])
@role_required(Role.TEAM_LEADER)
@company_required
def create_kanban_column():
try:
data = request.get_json()
board_id = int(data.get('board_id'))
# Verify board access
board = KanbanBoard.query.join(Project).filter(
KanbanBoard.id == board_id,
Project.company_id == g.user.company_id
).first()
if not board or not board.project.is_user_allowed(g.user):
return jsonify({'success': False, 'message': 'Board not found or access denied'})
name = data.get('name')
if not name:
return jsonify({'success': False, 'message': 'Column name is required'})
# Check if column name already exists in board
existing = KanbanColumn.query.filter_by(board_id=board_id, name=name).first()
if existing:
return jsonify({'success': False, 'message': 'Column name already exists in this board'})
# Calculate position (add to end)
max_position = db.session.query(func.max(KanbanColumn.position)).filter_by(
board_id=board_id, is_active=True
).scalar() or 0
# Create column
column = KanbanColumn(
name=name,
description=data.get('description', ''),
position=max_position + 1,
color=data.get('color', '#6c757d'),
wip_limit=int(data.get('wip_limit')) if data.get('wip_limit') else None,
board_id=board_id
)
db.session.add(column)
db.session.commit()
return jsonify({'success': True, 'message': 'Column created successfully', 'column_id': column.id})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/kanban/columns/<int:column_id>', methods=['PUT'])
@role_required(Role.TEAM_LEADER)
@company_required
def update_kanban_column(column_id):
try:
column = KanbanColumn.query.join(KanbanBoard).join(Project).filter(
KanbanColumn.id == column_id,
Project.company_id == g.user.company_id
).first()
if not column or not column.board.project.is_user_allowed(g.user):
return jsonify({'success': False, 'message': 'Column not found or access denied'})
data = request.get_json()
# Check for name conflicts (excluding current column)
if 'name' in data:
existing = KanbanColumn.query.filter(
KanbanColumn.board_id == column.board_id,
KanbanColumn.name == data['name'],
KanbanColumn.id != column_id
).first()
if existing:
return jsonify({'success': False, 'message': 'Column name already exists in this board'})
# Update column fields
if 'name' in data:
column.name = data['name']
if 'description' in data:
column.description = data['description']
if 'color' in data:
column.color = data['color']
if 'wip_limit' in data:
column.wip_limit = int(data['wip_limit']) if data['wip_limit'] else None
column.updated_at = datetime.now()
db.session.commit()
return jsonify({'success': True, 'message': 'Column updated successfully'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/kanban/columns/<int:column_id>/move', methods=['PUT'])
@role_required(Role.TEAM_LEADER)
@company_required
def move_kanban_column(column_id):
try:
data = request.get_json()
new_position = int(data.get('position'))
# Verify column access
column = KanbanColumn.query.join(KanbanBoard).join(Project).filter(
KanbanColumn.id == column_id,
Project.company_id == g.user.company_id
).first()
if not column or not column.board.project.is_user_allowed(g.user):
return jsonify({'success': False, 'message': 'Column not found or access denied'})
old_position = column.position
board_id = column.board_id
if old_position == new_position:
return jsonify({'success': True, 'message': 'Column position unchanged'})
# Update positions of other columns
if old_position < new_position:
# Moving right: shift columns left
columns_to_shift = KanbanColumn.query.filter(
KanbanColumn.board_id == board_id,
KanbanColumn.position > old_position,
KanbanColumn.position <= new_position,
KanbanColumn.is_active == True,
KanbanColumn.id != column_id
).all()
for c in columns_to_shift:
c.position -= 1
else:
# Moving left: shift columns right
columns_to_shift = KanbanColumn.query.filter(
KanbanColumn.board_id == board_id,
KanbanColumn.position >= new_position,
KanbanColumn.position < old_position,
KanbanColumn.is_active == True,
KanbanColumn.id != column_id
).all()
for c in columns_to_shift:
c.position += 1
# Update the moved column
column.position = new_position
column.updated_at = datetime.now()
db.session.commit()
return jsonify({'success': True, 'message': 'Column moved successfully'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': str(e)})
@app.route('/api/kanban/columns/<int:column_id>', methods=['DELETE'])
@role_required(Role.TEAM_LEADER)
@company_required
def delete_kanban_column(column_id):
try:
column = KanbanColumn.query.join(KanbanBoard).join(Project).filter(
KanbanColumn.id == column_id,
Project.company_id == g.user.company_id
).first()
if not column or not column.board.project.is_user_allowed(g.user):
return jsonify({'success': False, 'message': 'Column not found or access denied'})
# Check if column has active cards
active_cards = KanbanCard.query.filter_by(column_id=column_id, is_active=True).count()
if active_cards > 0:
return jsonify({'success': False, 'message': f'Cannot delete column with {active_cards} active cards. Move or delete cards first.'})
board_id = column.board_id
position = column.position
# Soft delete the column
column.is_active = False
column.updated_at = datetime.now()
# Shift remaining columns left
columns_to_shift = KanbanColumn.query.filter(
KanbanColumn.board_id == board_id,
KanbanColumn.position > position,
KanbanColumn.is_active == True
).all()
for c in columns_to_shift:
c.position -= 1
db.session.commit()
return jsonify({'success': True, 'message': 'Column deleted successfully'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'message': str(e)})
# Dashboard API Endpoints
@app.route('/api/dashboard')
@role_required(Role.TEAM_MEMBER)
@company_required
def get_dashboard():
"""Get user's dashboard configuration and widgets."""
try:
# Get or create user dashboard
dashboard = UserDashboard.query.filter_by(user_id=g.user.id).first()
if not dashboard:
dashboard = UserDashboard(user_id=g.user.id)
db.session.add(dashboard)
db.session.commit()
logger.info(f"Created new dashboard {dashboard.id} for user {g.user.id}")
else:
logger.info(f"Using existing dashboard {dashboard.id} for user {g.user.id}")
# Get user's widgets
widgets = DashboardWidget.query.filter_by(dashboard_id=dashboard.id).order_by(DashboardWidget.grid_y, DashboardWidget.grid_x).all()
logger.info(f"Found {len(widgets)} widgets for dashboard {dashboard.id}")
# Convert to JSON format
widget_data = []
for widget in widgets:
# Convert grid size to simple size names
if widget.grid_width == 1 and widget.grid_height == 1:
size = 'small'
elif widget.grid_width == 2 and widget.grid_height == 1:
size = 'medium'
elif widget.grid_width == 2 and widget.grid_height == 2:
size = 'large'
elif widget.grid_width == 3 and widget.grid_height == 1:
size = 'wide'
else:
size = 'small'
# Parse config JSON
try:
import json
config = json.loads(widget.config) if widget.config else {}
except (json.JSONDecodeError, TypeError):
config = {}
widget_data.append({
'id': widget.id,
'type': widget.widget_type.value,
'title': widget.title,
'size': size,
'grid_x': widget.grid_x,
'grid_y': widget.grid_y,
'grid_width': widget.grid_width,
'grid_height': widget.grid_height,
'config': config
})
return jsonify({
'success': True,
'dashboard': {
'id': dashboard.id,
'layout_config': dashboard.layout_config,
'grid_columns': dashboard.grid_columns,
'theme': dashboard.theme
},
'widgets': widget_data
})
except Exception as e:
logger.error(f"Error loading dashboard: {e}")
return jsonify({'success': False, 'error': str(e)})
@app.route('/api/dashboard/widgets', methods=['POST'])
@role_required(Role.TEAM_MEMBER)
@company_required
def create_or_update_widget():
"""Create or update a dashboard widget."""
try:
data = request.get_json()
# Get or create user dashboard
dashboard = UserDashboard.query.filter_by(user_id=g.user.id).first()
if not dashboard:
dashboard = UserDashboard(user_id=g.user.id)
db.session.add(dashboard)
db.session.flush() # Get the ID
logger.info(f"Created new dashboard {dashboard.id} for user {g.user.id} in widget creation")
else:
logger.info(f"Using existing dashboard {dashboard.id} for user {g.user.id} in widget creation")
# Check if updating existing widget
widget_id = data.get('widget_id')
if widget_id:
widget = DashboardWidget.query.filter_by(
id=widget_id,
dashboard_id=dashboard.id
).first()
if not widget:
return jsonify({'success': False, 'error': 'Widget not found'})
else:
# Create new widget
widget = DashboardWidget(dashboard_id=dashboard.id)
# Find next available position
max_y = db.session.query(func.max(DashboardWidget.grid_y)).filter_by(
dashboard_id=dashboard.id
).scalar() or 0
widget.grid_y = max_y + 1
widget.grid_x = 0
# Update widget properties
widget.widget_type = WidgetType(data['type'])
widget.title = data['title']
# Convert size to grid dimensions
size = data.get('size', 'small')
if size == 'small':
widget.grid_width = 1
widget.grid_height = 1
elif size == 'medium':
widget.grid_width = 2
widget.grid_height = 1
elif size == 'large':
widget.grid_width = 2
widget.grid_height = 2
elif size == 'wide':
widget.grid_width = 3
widget.grid_height = 1
# Build config from form data
config = {}
for key, value in data.items():
if key not in ['type', 'title', 'size', 'widget_id']:
config[key] = value
# Store config as JSON string
if config:
import json
widget.config = json.dumps(config)
else:
widget.config = None
if not widget_id:
db.session.add(widget)
logger.info(f"Creating new widget: {widget.widget_type.value} for dashboard {dashboard.id}")
else:
logger.info(f"Updating existing widget {widget_id}")
db.session.commit()
logger.info(f"Widget saved successfully with ID: {widget.id}")
# Verify the widget was actually saved
saved_widget = DashboardWidget.query.filter_by(id=widget.id).first()
if saved_widget:
logger.info(f"Verification: Widget {widget.id} exists in database with dashboard_id: {saved_widget.dashboard_id}")
else:
logger.error(f"Verification failed: Widget {widget.id} not found in database")
# Convert grid size back to simple size name for response
if widget.grid_width == 1 and widget.grid_height == 1:
size_name = 'small'
elif widget.grid_width == 2 and widget.grid_height == 1:
size_name = 'medium'
elif widget.grid_width == 2 and widget.grid_height == 2:
size_name = 'large'
elif widget.grid_width == 3 and widget.grid_height == 1:
size_name = 'wide'
else:
size_name = 'small'
# Parse config for response
try:
import json
config_dict = json.loads(widget.config) if widget.config else {}
except (json.JSONDecodeError, TypeError):
config_dict = {}
return jsonify({
'success': True,
'message': 'Widget saved successfully',
'widget': {
'id': widget.id,
'type': widget.widget_type.value,
'title': widget.title,
'size': size_name,
'grid_x': widget.grid_x,
'grid_y': widget.grid_y,
'grid_width': widget.grid_width,
'grid_height': widget.grid_height,
'config': config_dict
}
})
except Exception as e:
db.session.rollback()
logger.error(f"Error saving widget: {e}")
return jsonify({'success': False, 'error': str(e)})
@app.route('/api/dashboard/widgets/<int:widget_id>', methods=['DELETE'])
@role_required(Role.TEAM_MEMBER)
@company_required
def delete_widget(widget_id):
"""Delete a dashboard widget."""
try:
# Get user dashboard
dashboard = UserDashboard.query.filter_by(user_id=g.user.id).first()
if not dashboard:
return jsonify({'success': False, 'error': 'Dashboard not found'})
# Find and delete widget
widget = DashboardWidget.query.filter_by(
id=widget_id,
dashboard_id=dashboard.id
).first()
if not widget:
return jsonify({'success': False, 'error': 'Widget not found'})
# No need to update positions for grid-based layout
db.session.delete(widget)
db.session.commit()
return jsonify({'success': True, 'message': 'Widget deleted successfully'})
except Exception as e:
db.session.rollback()
logger.error(f"Error deleting widget: {e}")
return jsonify({'success': False, 'error': str(e)})
@app.route('/api/dashboard/positions', methods=['POST'])
@role_required(Role.TEAM_MEMBER)
@company_required
def update_widget_positions():
"""Update widget grid positions after drag and drop."""
try:
data = request.get_json()
positions = data.get('positions', [])
# Get user dashboard
dashboard = UserDashboard.query.filter_by(user_id=g.user.id).first()
if not dashboard:
return jsonify({'success': False, 'error': 'Dashboard not found'})
# Update grid positions
for pos_data in positions:
widget = DashboardWidget.query.filter_by(
id=pos_data['id'],
dashboard_id=dashboard.id
).first()
if widget:
# For now, just assign sequential grid positions
# In a more advanced implementation, we'd calculate actual grid coordinates
widget.grid_x = pos_data.get('grid_x', 0)
widget.grid_y = pos_data.get('grid_y', pos_data.get('position', 0))
db.session.commit()
return jsonify({'success': True, 'message': 'Positions updated successfully'})
except Exception as e:
db.session.rollback()
logger.error(f"Error updating positions: {e}")
return jsonify({'success': False, 'error': str(e)})
# Widget data endpoints
@app.route('/api/dashboard/widgets/<int:widget_id>/data')
@role_required(Role.TEAM_MEMBER)
@company_required
def get_widget_data(widget_id):
"""Get data for a specific widget."""
try:
# Get user dashboard
dashboard = UserDashboard.query.filter_by(user_id=g.user.id).first()
if not dashboard:
return jsonify({'success': False, 'error': 'Dashboard not found'})
# Find widget
widget = DashboardWidget.query.filter_by(
id=widget_id,
dashboard_id=dashboard.id
).first()
if not widget:
return jsonify({'success': False, 'error': 'Widget not found'})
# Get widget-specific data based on type
widget_data = {}
if widget.widget_type == WidgetType.DAILY_SUMMARY:
from datetime import datetime, timedelta
config = widget.config or {}
period = config.get('summary_period', 'daily')
# Calculate time summaries
now = datetime.now()
# Today's summary
start_of_today = now.replace(hour=0, minute=0, second=0, microsecond=0)
today_entries = TimeEntry.query.filter(
TimeEntry.user_id == g.user.id,
TimeEntry.arrival_time >= start_of_today,
TimeEntry.departure_time.isnot(None)
).all()
today_seconds = sum(entry.duration or 0 for entry in today_entries)
# This week's summary
start_of_week = start_of_today - timedelta(days=start_of_today.weekday())
week_entries = TimeEntry.query.filter(
TimeEntry.user_id == g.user.id,
TimeEntry.arrival_time >= start_of_week,
TimeEntry.departure_time.isnot(None)
).all()
week_seconds = sum(entry.duration or 0 for entry in week_entries)
# This month's summary
start_of_month = start_of_today.replace(day=1)
month_entries = TimeEntry.query.filter(
TimeEntry.user_id == g.user.id,
TimeEntry.arrival_time >= start_of_month,
TimeEntry.departure_time.isnot(None)
).all()
month_seconds = sum(entry.duration or 0 for entry in month_entries)
widget_data.update({
'today': f"{today_seconds // 3600}h {(today_seconds % 3600) // 60}m",
'week': f"{week_seconds // 3600}h {(week_seconds % 3600) // 60}m",
'month': f"{month_seconds // 3600}h {(month_seconds % 3600) // 60}m",
'entries_today': len(today_entries),
'entries_week': len(week_entries),
'entries_month': len(month_entries)
})
elif widget.widget_type == WidgetType.ACTIVE_PROJECTS:
config = widget.config or {}
project_filter = config.get('project_filter', 'all')
max_projects = int(config.get('max_projects', 5))
# Get user's projects
if g.user.role in [Role.ADMIN, Role.SUPERVISOR]:
projects = Project.query.filter_by(
company_id=g.user.company_id,
is_active=True
).limit(max_projects).all()
elif g.user.team_id:
projects = Project.query.filter(
Project.company_id == g.user.company_id,
Project.is_active == True,
db.or_(Project.team_id == g.user.team_id, Project.team_id == None)
).limit(max_projects).all()
else:
projects = []
widget_data['projects'] = [{
'id': p.id,
'name': p.name,
'code': p.code,
'description': p.description
} for p in projects]
elif widget.widget_type == WidgetType.ASSIGNED_TASKS:
config = widget.config or {}
task_filter = config.get('task_filter', 'assigned')
task_status = config.get('task_status', 'active')
# Get user's tasks based on filter
if task_filter == 'assigned':
tasks = Task.query.filter_by(assigned_to_id=g.user.id)
elif task_filter == 'created':
tasks = Task.query.filter_by(created_by_id=g.user.id)
else:
# Get tasks from user's projects
if g.user.team_id:
project_ids = [p.id for p in Project.query.filter(
Project.company_id == g.user.company_id,
db.or_(Project.team_id == g.user.team_id, Project.team_id == None)
).all()]
tasks = Task.query.filter(Task.project_id.in_(project_ids))
else:
tasks = Task.query.join(Project).filter(Project.company_id == g.user.company_id)
# Filter by status if specified
if task_status != 'all':
if task_status == 'active':
tasks = tasks.filter(Task.status.in_([TaskStatus.PENDING, TaskStatus.IN_PROGRESS]))
elif task_status == 'pending':
tasks = tasks.filter_by(status=TaskStatus.PENDING)
elif task_status == 'completed':
tasks = tasks.filter_by(status=TaskStatus.COMPLETED)
tasks = tasks.limit(10).all()
widget_data['tasks'] = [{
'id': t.id,
'name': t.name,
'description': t.description,
'status': t.status.value if t.status else 'Pending',
'priority': t.priority.value if t.priority else 'Medium',
'project_name': t.project.name if t.project else 'No Project'
} for t in tasks]
elif widget.widget_type == WidgetType.WEEKLY_CHART:
from datetime import datetime, timedelta
# Get weekly data for chart
now = datetime.now()
start_of_week = now - timedelta(days=now.weekday())
weekly_data = []
for i in range(7):
day = start_of_week + timedelta(days=i)
day_start = day.replace(hour=0, minute=0, second=0, microsecond=0)
day_end = day_start + timedelta(days=1)
day_entries = TimeEntry.query.filter(
TimeEntry.user_id == g.user.id,
TimeEntry.arrival_time >= day_start,
TimeEntry.arrival_time < day_end,
TimeEntry.departure_time.isnot(None)
).all()
total_seconds = sum(entry.duration or 0 for entry in day_entries)
weekly_data.append({
'day': day.strftime('%A'),
'date': day.strftime('%Y-%m-%d'),
'hours': round(total_seconds / 3600, 2),
'entries': len(day_entries)
})
widget_data['weekly_data'] = weekly_data
elif widget.widget_type == WidgetType.TASK_PRIORITY:
# Get tasks by priority
if g.user.team_id:
project_ids = [p.id for p in Project.query.filter(
Project.company_id == g.user.company_id,
db.or_(Project.team_id == g.user.team_id, Project.team_id == None)
).all()]
tasks = Task.query.filter(
Task.project_id.in_(project_ids),
Task.assigned_to_id == g.user.id
).order_by(Task.priority.desc(), Task.created_at.desc()).limit(10).all()
else:
tasks = Task.query.filter_by(assigned_to_id=g.user.id).order_by(
Task.priority.desc(), Task.created_at.desc()
).limit(10).all()
widget_data['priority_tasks'] = [{
'id': t.id,
'name': t.name,
'description': t.description,
'priority': t.priority.value if t.priority else 'Medium',
'status': t.status.value if t.status else 'Pending',
'project_name': t.project.name if t.project else 'No Project'
} for t in tasks]
elif widget.widget_type == WidgetType.KANBAN_SUMMARY:
# Get kanban data summary
if g.user.team_id:
project_ids = [p.id for p in Project.query.filter(
Project.company_id == g.user.company_id,
db.or_(Project.team_id == g.user.team_id, Project.team_id == None)
).all()]
boards = KanbanBoard.query.filter(
KanbanBoard.project_id.in_(project_ids),
KanbanBoard.is_active == True
).limit(3).all()
else:
boards = []
board_summaries = []
for board in boards:
columns = KanbanColumn.query.filter_by(board_id=board.id).order_by(KanbanColumn.position).all()
total_cards = sum(len([c for c in col.cards if c.is_active]) for col in columns)
board_summaries.append({
'id': board.id,
'name': board.name,
'project_name': board.project.name,
'total_cards': total_cards,
'columns': len(columns)
})
widget_data['kanban_boards'] = board_summaries
elif widget.widget_type == WidgetType.PROJECT_PROGRESS:
# Get project progress data
if g.user.role in [Role.ADMIN, Role.SUPERVISOR]:
projects = Project.query.filter_by(
company_id=g.user.company_id,
is_active=True
).limit(5).all()
elif g.user.team_id:
projects = Project.query.filter(
Project.company_id == g.user.company_id,
Project.is_active == True,
db.or_(Project.team_id == g.user.team_id, Project.team_id == None)
).limit(5).all()
else:
projects = []
project_progress = []
for project in projects:
total_tasks = Task.query.filter_by(project_id=project.id).count()
completed_tasks = Task.query.filter_by(
project_id=project.id,
status=TaskStatus.COMPLETED
).count()
progress = (completed_tasks / total_tasks * 100) if total_tasks > 0 else 0
project_progress.append({
'id': project.id,
'name': project.name,
'code': project.code,
'progress': round(progress, 1),
'completed_tasks': completed_tasks,
'total_tasks': total_tasks
})
widget_data['project_progress'] = project_progress
elif widget.widget_type == WidgetType.PRODUCTIVITY_METRICS:
from datetime import datetime, timedelta
# Calculate productivity metrics
now = datetime.now()
today = now.replace(hour=0, minute=0, second=0, microsecond=0)
week_ago = today - timedelta(days=7)
# This week vs last week comparison
this_week_entries = TimeEntry.query.filter(
TimeEntry.user_id == g.user.id,
TimeEntry.arrival_time >= week_ago,
TimeEntry.departure_time.isnot(None)
).all()
last_week_entries = TimeEntry.query.filter(
TimeEntry.user_id == g.user.id,
TimeEntry.arrival_time >= week_ago - timedelta(days=7),
TimeEntry.arrival_time < week_ago,
TimeEntry.departure_time.isnot(None)
).all()
this_week_hours = sum(entry.duration or 0 for entry in this_week_entries) / 3600
last_week_hours = sum(entry.duration or 0 for entry in last_week_entries) / 3600
productivity_change = ((this_week_hours - last_week_hours) / last_week_hours * 100) if last_week_hours > 0 else 0
widget_data.update({
'this_week_hours': round(this_week_hours, 1),
'last_week_hours': round(last_week_hours, 1),
'productivity_change': round(productivity_change, 1),
'avg_daily_hours': round(this_week_hours / 7, 1),
'total_entries': len(this_week_entries)
})
return jsonify({
'success': True,
'data': widget_data
})
except Exception as e:
logger.error(f"Error getting widget data: {e}")
return jsonify({'success': False, 'error': str(e)})
@app.route('/api/current-timer-status')
@role_required(Role.TEAM_MEMBER)
@company_required
def get_current_timer_status():
"""Get current timer status for dashboard widgets."""
try:
# Get the user's current active time entry
active_entry = TimeEntry.query.filter_by(
user_id=g.user.id,
departure_time=None
).first()
if active_entry:
# Calculate current duration
now = datetime.now()
elapsed_seconds = int((now - active_entry.arrival_time).total_seconds())
return jsonify({
'success': True,
'isActive': True,
'startTime': active_entry.arrival_time.isoformat(),
'currentDuration': elapsed_seconds,
'entryId': active_entry.id
})
else:
return jsonify({
'success': True,
'isActive': False,
'message': 'No active timer'
})
except Exception as e:
logger.error(f"Error getting timer status: {e}")
return jsonify({'success': False, 'error': str(e)})
if __name__ == '__main__':
port = int(os.environ.get('PORT', 5000))
app.run(debug=True, host='0.0.0.0', port=port)