Improve db migrations.

Move all migration code to python module and use it from app.py. Use Enum values to avoid problems with Enum names in DB.
This commit is contained in:
2025-07-03 12:04:03 +02:00
committed by Jens Luedicke
parent 387243d516
commit d4e56c5cde
7 changed files with 1440 additions and 1445 deletions

View File

@@ -91,12 +91,6 @@ The integrated migration system handles:
- Sample data initialization
- Data integrity maintenance during upgrades
**Legacy Migration Files**: The following files are maintained for reference but are no longer needed:
- `migrate_db.py`: Legacy core database migration (now integrated)
- `migrate_roles_teams.py`: Legacy role and team migration (now integrated)
- `migrate_projects.py`: Legacy project migration (now integrated)
- `repair_roles.py`: Legacy role repair utility (functionality now integrated)
### Configuration
The application can be configured through:

1113
app.py

File diff suppressed because it is too large Load Diff

View File

@@ -1,52 +1,126 @@
from app import app, db
#!/usr/bin/env python3
"""
Database Migration Script for TimeTrack
Consolidates all database migrations and provides command line interface.
"""
import sqlite3
import os
from models import User, TimeEntry, WorkConfig, SystemSettings, Team, Role, Project, Company, AccountType
from werkzeug.security import generate_password_hash
import sys
import argparse
from datetime import datetime
def migrate_database():
db_path = 'timetrack.db'
# Try to import from Flask app context if available
try:
from app import app, db
from models import (User, TimeEntry, WorkConfig, SystemSettings, Team, Role, Project,
Company, CompanyWorkConfig, UserPreferences, WorkRegion, AccountType,
ProjectCategory, Task, SubTask, TaskStatus, TaskPriority)
from werkzeug.security import generate_password_hash
FLASK_AVAILABLE = True
except ImportError:
print("Flask app not available. Running in standalone mode.")
FLASK_AVAILABLE = False
# Define Role and AccountType enums for standalone mode
import enum
class Role(enum.Enum):
TEAM_MEMBER = "Team Member"
TEAM_LEADER = "Team Leader"
SUPERVISOR = "Supervisor"
ADMIN = "Administrator"
SYSTEM_ADMIN = "System Administrator"
class AccountType(enum.Enum):
COMPANY_USER = "Company User"
FREELANCER = "Freelancer"
def get_db_path(db_file=None):
"""Determine database path based on environment or provided file."""
if db_file:
return db_file
# Check for Docker environment
if os.path.exists('/data'):
return '/data/timetrack.db'
return 'timetrack.db'
def run_all_migrations(db_path=None):
"""Run all database migrations in sequence."""
db_path = get_db_path(db_path)
print(f"Running migrations on database: {db_path}")
# Check if database exists
if not os.path.exists(db_path):
print("Database doesn't exist. Creating new database.")
if FLASK_AVAILABLE:
with app.app_context():
db.create_all()
# Initialize system settings
init_system_settings()
else:
create_new_database(db_path)
return
print("Migrating existing database...")
print("Running database migrations...")
# Connect to the database
# Run migrations in sequence
run_basic_migrations(db_path)
migrate_to_company_model(db_path)
migrate_work_config_data(db_path)
migrate_task_system(db_path)
if FLASK_AVAILABLE:
with app.app_context():
# Handle company migration and admin user setup
migrate_data()
print("Database migrations completed successfully!")
def run_basic_migrations(db_path):
"""Run basic table structure migrations."""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Check if the time_entry columns already exist
try:
# Check if time_entry table exists first
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='time_entry'")
if not cursor.fetchone():
print("time_entry table doesn't exist. Creating all tables...")
if FLASK_AVAILABLE:
with app.app_context():
db.create_all()
init_system_settings()
else:
create_all_tables(cursor)
conn.commit()
conn.close()
return
# Migrate time_entry table
cursor.execute("PRAGMA table_info(time_entry)")
time_entry_columns = [column[1] for column in cursor.fetchall()]
# Add new columns to time_entry if they don't exist
if 'is_paused' not in time_entry_columns:
print("Adding is_paused column to time_entry...")
cursor.execute("ALTER TABLE time_entry ADD COLUMN is_paused BOOLEAN DEFAULT 0")
migrations = [
('is_paused', "ALTER TABLE time_entry ADD COLUMN is_paused BOOLEAN DEFAULT 0"),
('pause_start_time', "ALTER TABLE time_entry ADD COLUMN pause_start_time TIMESTAMP"),
('total_break_duration', "ALTER TABLE time_entry ADD COLUMN total_break_duration INTEGER DEFAULT 0"),
('user_id', "ALTER TABLE time_entry ADD COLUMN user_id INTEGER"),
('project_id', "ALTER TABLE time_entry ADD COLUMN project_id INTEGER"),
('notes', "ALTER TABLE time_entry ADD COLUMN notes TEXT"),
('task_id', "ALTER TABLE time_entry ADD COLUMN task_id INTEGER"),
('subtask_id', "ALTER TABLE time_entry ADD COLUMN subtask_id INTEGER")
]
if 'pause_start_time' not in time_entry_columns:
print("Adding pause_start_time column to time_entry...")
cursor.execute("ALTER TABLE time_entry ADD COLUMN pause_start_time TIMESTAMP")
for column_name, sql_command in migrations:
if column_name not in time_entry_columns:
print(f"Adding {column_name} column to time_entry...")
cursor.execute(sql_command)
if 'total_break_duration' not in time_entry_columns:
print("Adding total_break_duration column to time_entry...")
cursor.execute("ALTER TABLE time_entry ADD COLUMN total_break_duration INTEGER DEFAULT 0")
# Add user_id column if it doesn't exist
if 'user_id' not in time_entry_columns:
print("Adding user_id column to time_entry...")
cursor.execute("ALTER TABLE time_entry ADD COLUMN user_id INTEGER")
# Check if the work_config table exists
# Migrate work_config table
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='work_config'")
if not cursor.fetchone():
print("Creating work_config table...")
@@ -58,103 +132,88 @@ def migrate_database():
break_threshold_hours FLOAT DEFAULT 6.0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
user_id INTEGER
user_id INTEGER,
additional_break_minutes INTEGER DEFAULT 15,
additional_break_threshold_hours FLOAT DEFAULT 9.0
)
""")
# Insert default config
cursor.execute("""
INSERT INTO work_config (work_hours_per_day, mandatory_break_minutes, break_threshold_hours)
VALUES (8.0, 30, 6.0)
""")
else:
# Check if the work_config columns already exist
cursor.execute("PRAGMA table_info(work_config)")
work_config_columns = [column[1] for column in cursor.fetchall()]
# Add new columns to work_config if they don't exist
if 'additional_break_minutes' not in work_config_columns:
print("Adding additional_break_minutes column to work_config...")
cursor.execute("ALTER TABLE work_config ADD COLUMN additional_break_minutes INTEGER DEFAULT 15")
work_config_migrations = [
('additional_break_minutes', "ALTER TABLE work_config ADD COLUMN additional_break_minutes INTEGER DEFAULT 15"),
('additional_break_threshold_hours', "ALTER TABLE work_config ADD COLUMN additional_break_threshold_hours FLOAT DEFAULT 9.0"),
('user_id', "ALTER TABLE work_config ADD COLUMN user_id INTEGER")
]
if 'additional_break_threshold_hours' not in work_config_columns:
print("Adding additional_break_threshold_hours column to work_config...")
cursor.execute("ALTER TABLE work_config ADD COLUMN additional_break_threshold_hours FLOAT DEFAULT 9.0")
for column_name, sql_command in work_config_migrations:
if column_name not in work_config_columns:
print(f"Adding {column_name} column to work_config...")
cursor.execute(sql_command)
# Add user_id column to work_config if it doesn't exist
if 'user_id' not in work_config_columns:
print("Adding user_id column to work_config...")
cursor.execute("ALTER TABLE work_config ADD COLUMN user_id INTEGER")
# Check if the user table exists and has the verification columns
# Migrate user table
cursor.execute("PRAGMA table_info(user)")
user_columns = [column[1] for column in cursor.fetchall()]
# Add new columns to user table for email verification
if 'is_verified' not in user_columns:
print("Adding is_verified column to user table...")
cursor.execute("ALTER TABLE user ADD COLUMN is_verified BOOLEAN DEFAULT 0")
user_migrations = [
('is_verified', "ALTER TABLE user ADD COLUMN is_verified BOOLEAN DEFAULT 0"),
('verification_token', "ALTER TABLE user ADD COLUMN verification_token VARCHAR(100)"),
('token_expiry', "ALTER TABLE user ADD COLUMN token_expiry TIMESTAMP"),
('is_blocked', "ALTER TABLE user ADD COLUMN is_blocked BOOLEAN DEFAULT 0"),
('role', "ALTER TABLE user ADD COLUMN role VARCHAR(50) DEFAULT 'Team Member'"),
('team_id', "ALTER TABLE user ADD COLUMN team_id INTEGER"),
('account_type', f"ALTER TABLE user ADD COLUMN account_type VARCHAR(20) DEFAULT '{AccountType.COMPANY_USER.value}'"),
('business_name', "ALTER TABLE user ADD COLUMN business_name VARCHAR(100)"),
('company_id', "ALTER TABLE user ADD COLUMN company_id INTEGER"),
('two_factor_enabled', "ALTER TABLE user ADD COLUMN two_factor_enabled BOOLEAN DEFAULT 0"),
('two_factor_secret', "ALTER TABLE user ADD COLUMN two_factor_secret VARCHAR(32)")
]
if 'verification_token' not in user_columns:
print("Adding verification_token column to user table...")
cursor.execute("ALTER TABLE user ADD COLUMN verification_token VARCHAR(100)")
for column_name, sql_command in user_migrations:
if column_name not in user_columns:
print(f"Adding {column_name} column to user...")
cursor.execute(sql_command)
if 'token_expiry' not in user_columns:
print("Adding token_expiry column to user table...")
cursor.execute("ALTER TABLE user ADD COLUMN token_expiry TIMESTAMP")
# Handle is_admin to role migration
if 'is_admin' in user_columns and 'role' in user_columns:
print("Migrating is_admin column to role...")
cursor.execute("UPDATE user SET role = ? WHERE is_admin = 1 AND (role IS NULL OR role = '')", (Role.ADMIN.value,))
cursor.execute("UPDATE user SET role = ? WHERE is_admin = 0 AND (role IS NULL OR role = '')", (Role.TEAM_MEMBER.value,))
# Add is_blocked column to user table if it doesn't exist
if 'is_blocked' not in user_columns:
print("Adding is_blocked column to user table...")
cursor.execute("ALTER TABLE user ADD COLUMN is_blocked BOOLEAN DEFAULT 0")
# Create other tables if they don't exist
create_missing_tables(cursor)
# Add role column to user table if it doesn't exist
if 'role' not in user_columns:
print("Adding role column to user table...")
cursor.execute("ALTER TABLE user ADD COLUMN role VARCHAR(50) DEFAULT 'Team Member'")
conn.commit()
# Add team_id column to user table if it doesn't exist
if 'team_id' not in user_columns:
print("Adding team_id column to user table...")
cursor.execute("ALTER TABLE user ADD COLUMN team_id INTEGER")
except Exception as e:
print(f"Error during basic migrations: {e}")
conn.rollback()
raise
finally:
conn.close()
# Add freelancer support columns to user table
if 'account_type' not in user_columns:
print("Adding account_type column to user table...")
cursor.execute("ALTER TABLE user ADD COLUMN account_type VARCHAR(20) DEFAULT 'COMPANY_USER'")
if 'business_name' not in user_columns:
print("Adding business_name column to user table...")
cursor.execute("ALTER TABLE user ADD COLUMN business_name VARCHAR(100)")
def create_missing_tables(cursor):
"""Create missing tables."""
# Add company_id to user table for multi-tenancy
if 'company_id' not in user_columns:
print("Adding company_id column to user table...")
# Note: We can't add NOT NULL constraint to existing table, so allow NULL initially
cursor.execute("ALTER TABLE user ADD COLUMN company_id INTEGER")
# Add 2FA columns to user table if they don't exist
if 'two_factor_enabled' not in user_columns:
print("Adding two_factor_enabled column to user table...")
cursor.execute("ALTER TABLE user ADD COLUMN two_factor_enabled BOOLEAN DEFAULT 0")
if 'two_factor_secret' not in user_columns:
print("Adding two_factor_secret column to user table...")
cursor.execute("ALTER TABLE user ADD COLUMN two_factor_secret VARCHAR(32)")
# Check if the team table exists
# Team table
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='team'")
if not cursor.fetchone():
print("Creating team table...")
cursor.execute("""
CREATE TABLE team (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name VARCHAR(100) UNIQUE NOT NULL,
name VARCHAR(100) NOT NULL,
description VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
company_id INTEGER NOT NULL,
FOREIGN KEY (company_id) REFERENCES company (id),
UNIQUE(company_id, name)
)
""")
# Check if the system_settings table exists
# System settings table
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='system_settings'")
if not cursor.fetchone():
print("Creating system_settings table...")
@@ -168,7 +227,7 @@ def migrate_database():
)
""")
# Check if the project table exists
# Project table
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='project'")
if not cursor.fetchone():
print("Creating project table...")
@@ -177,184 +236,424 @@ def migrate_database():
id INTEGER PRIMARY KEY AUTOINCREMENT,
name VARCHAR(100) NOT NULL,
description TEXT,
code VARCHAR(20) NOT NULL UNIQUE,
code VARCHAR(20) NOT NULL,
is_active BOOLEAN DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
company_id INTEGER NOT NULL,
created_by_id INTEGER NOT NULL,
team_id INTEGER,
category_id INTEGER,
start_date DATE,
end_date DATE,
FOREIGN KEY (company_id) REFERENCES company (id),
FOREIGN KEY (created_by_id) REFERENCES user (id),
FOREIGN KEY (team_id) REFERENCES team (id)
FOREIGN KEY (team_id) REFERENCES team (id),
FOREIGN KEY (category_id) REFERENCES project_category (id),
UNIQUE(company_id, code)
)
""")
# Check if the company table exists
# Company table
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='company'")
if not cursor.fetchone():
print("Creating company table...")
cursor.execute("""
CREATE TABLE company (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name VARCHAR(100) UNIQUE NOT NULL,
name VARCHAR(100) NOT NULL,
slug VARCHAR(50) UNIQUE NOT NULL,
description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
is_personal BOOLEAN DEFAULT 0,
is_active BOOLEAN DEFAULT 1,
max_users INTEGER DEFAULT 100
max_users INTEGER DEFAULT 100,
UNIQUE(name)
)
""")
else:
# Check if company table has freelancer columns
def migrate_to_company_model(db_path):
"""Migrate to company-based multi-tenancy model."""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
try:
# Check if company table exists, create if not
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='company'")
if not cursor.fetchone():
create_missing_tables(cursor)
# Check and add missing columns to existing company table
cursor.execute("PRAGMA table_info(company)")
company_columns = [column[1] for column in cursor.fetchall()]
if 'is_personal' not in company_columns:
print("Adding is_personal column to company table...")
cursor.execute("ALTER TABLE company ADD COLUMN is_personal BOOLEAN DEFAULT 0")
company_migrations = [
('is_personal', "ALTER TABLE company ADD COLUMN is_personal BOOLEAN DEFAULT 0")
]
# Add project-related columns to time_entry table
for column_name, sql_command in company_migrations:
if column_name not in company_columns:
print(f"Adding {column_name} column to company...")
cursor.execute(sql_command)
# Add company_id to tables that need it
add_company_id_to_tables(cursor)
# Handle user role enum migration
migrate_user_roles(cursor)
conn.commit()
except Exception as e:
print(f"Error during company model migration: {e}")
conn.rollback()
raise
finally:
conn.close()
def add_company_id_to_tables(cursor):
"""Add company_id columns to tables that need multi-tenancy."""
tables_needing_company = ['project', 'team']
for table_name in tables_needing_company:
cursor.execute(f"PRAGMA table_info({table_name})")
columns = [column[1] for column in cursor.fetchall()]
if 'company_id' not in columns:
print(f"Adding company_id column to {table_name}...")
cursor.execute(f"ALTER TABLE {table_name} ADD COLUMN company_id INTEGER")
def migrate_user_roles(cursor):
"""Handle user role enum migration with constraint updates."""
cursor.execute("PRAGMA table_info(user)")
user_columns = cursor.fetchall()
# Check if we need to migrate the role enum constraint
cursor.execute("SELECT sql FROM sqlite_master WHERE type='table' AND name='user'")
create_table_sql = cursor.fetchone()
if create_table_sql and 'System Administrator' not in create_table_sql[0]:
print("Updating role enum constraint to include SYSTEM_ADMIN...")
# Check existing role values
cursor.execute("SELECT DISTINCT role FROM user WHERE role IS NOT NULL")
existing_roles = [row[0] for row in cursor.fetchall()]
print(f"Found existing roles: {existing_roles}")
# First normalize role values in the existing table
print("Normalizing role values before table recreation...")
role_mapping = {
'TEAM_MEMBER': Role.TEAM_MEMBER.value,
'TEAM_LEADER': Role.TEAM_LEADER.value,
'SUPERVISOR': Role.SUPERVISOR.value,
'ADMIN': Role.ADMIN.value,
'SYSTEM_ADMIN': Role.SYSTEM_ADMIN.value
}
for old_role, new_role in role_mapping.items():
cursor.execute("UPDATE user SET role = ? WHERE role = ?", (new_role, old_role))
updated_count = cursor.rowcount
if updated_count > 0:
print(f"Updated {updated_count} users from role '{old_role}' to '{new_role}'")
# Set any NULL or invalid roles to defaults
cursor.execute("UPDATE user SET role = ? WHERE role IS NULL OR role NOT IN (?, ?, ?, ?, ?)",
(Role.TEAM_MEMBER.value, Role.TEAM_MEMBER.value, Role.TEAM_LEADER.value,
Role.SUPERVISOR.value, Role.ADMIN.value, Role.SYSTEM_ADMIN.value))
null_roles = cursor.rowcount
if null_roles > 0:
print(f"Set {null_roles} NULL/invalid roles to 'Team Member'")
# Drop user_new table if it exists from previous failed migration
cursor.execute("DROP TABLE IF EXISTS user_new")
# Create a backup table with the new enum constraint
cursor.execute("""
CREATE TABLE user_new (
id INTEGER PRIMARY KEY,
username VARCHAR(80) NOT NULL,
email VARCHAR(120) NOT NULL,
password_hash VARCHAR(128),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
company_id INTEGER NOT NULL,
is_verified BOOLEAN DEFAULT 0,
verification_token VARCHAR(100),
token_expiry TIMESTAMP,
is_blocked BOOLEAN DEFAULT 0,
role VARCHAR(50) DEFAULT 'Team Member' CHECK (role IN ('Team Member', 'Team Leader', 'Supervisor', 'Administrator', 'System Administrator')),
team_id INTEGER,
account_type VARCHAR(20) DEFAULT 'Company User' CHECK (account_type IN ('Company User', 'Freelancer')),
business_name VARCHAR(100),
two_factor_enabled BOOLEAN DEFAULT 0,
two_factor_secret VARCHAR(32),
FOREIGN KEY (company_id) REFERENCES company (id),
FOREIGN KEY (team_id) REFERENCES team (id)
)
""")
# Copy all data from old table to new table with validation
cursor.execute("""
INSERT INTO user_new
SELECT id, username, email, password_hash, created_at, company_id,
is_verified, verification_token, token_expiry, is_blocked,
CASE
WHEN role IN (?, ?, ?, ?, ?) THEN role
ELSE ?
END as role,
team_id,
CASE
WHEN account_type IN (?, ?) THEN account_type
ELSE ?
END as account_type,
business_name, two_factor_enabled, two_factor_secret
FROM user
""", (Role.TEAM_MEMBER.value, Role.TEAM_LEADER.value, Role.SUPERVISOR.value,
Role.ADMIN.value, Role.SYSTEM_ADMIN.value, Role.TEAM_MEMBER.value,
AccountType.COMPANY_USER.value, AccountType.FREELANCER.value,
AccountType.COMPANY_USER.value))
# Drop the old table and rename the new one
cursor.execute("DROP TABLE user")
cursor.execute("ALTER TABLE user_new RENAME TO user")
print("✓ Role enum constraint updated successfully")
# Additional normalization for account_type values
print("Normalizing account_type values...")
account_type_mapping = {
'COMPANY_USER': AccountType.COMPANY_USER.value,
'FREELANCER': AccountType.FREELANCER.value
}
for old_type, new_type in account_type_mapping.items():
cursor.execute("UPDATE user SET account_type = ? WHERE account_type = ?", (new_type, old_type))
updated_count = cursor.rowcount
if updated_count > 0:
print(f"Updated {updated_count} users account_type from '{old_type}' to '{new_type}'")
# Set any remaining NULL values to defaults
cursor.execute("UPDATE user SET account_type = ? WHERE account_type IS NULL", (AccountType.COMPANY_USER.value,))
null_accounts = cursor.rowcount
if null_accounts > 0:
print(f"Set {null_accounts} NULL account_types to 'Company User'")
def migrate_work_config_data(db_path):
"""Migrate work configuration data to new company-based model."""
if not FLASK_AVAILABLE:
print("Skipping work config data migration - Flask not available")
return
with app.app_context():
try:
# Create CompanyWorkConfig for all companies that don't have one
companies = Company.query.all()
for company in companies:
existing_config = CompanyWorkConfig.query.filter_by(company_id=company.id).first()
if not existing_config:
print(f"Creating CompanyWorkConfig for {company.name}")
# Use Germany defaults (existing system default)
preset = CompanyWorkConfig.get_regional_preset(WorkRegion.GERMANY)
company_config = CompanyWorkConfig(
company_id=company.id,
work_hours_per_day=preset['work_hours_per_day'],
mandatory_break_minutes=preset['mandatory_break_minutes'],
break_threshold_hours=preset['break_threshold_hours'],
additional_break_minutes=preset['additional_break_minutes'],
additional_break_threshold_hours=preset['additional_break_threshold_hours'],
region=preset['region'],
region_name=preset['region_name']
)
db.session.add(company_config)
# Migrate existing WorkConfig user preferences to UserPreferences
old_configs = WorkConfig.query.filter(WorkConfig.user_id.isnot(None)).all()
for old_config in old_configs:
user = User.query.get(old_config.user_id)
if user:
existing_prefs = UserPreferences.query.filter_by(user_id=user.id).first()
if not existing_prefs:
print(f"Migrating preferences for user {user.username}")
user_prefs = UserPreferences(
user_id=user.id,
time_format_24h=getattr(old_config, 'time_format_24h', True),
date_format=getattr(old_config, 'date_format', 'YYYY-MM-DD'),
round_minutes_interval=getattr(old_config, 'round_minutes_interval', 0),
round_to_nearest=getattr(old_config, 'round_to_nearest', True)
)
db.session.add(user_prefs)
db.session.commit()
print("Work config data migration completed successfully")
except Exception as e:
print(f"Error during work config migration: {e}")
db.session.rollback()
def migrate_task_system(db_path):
"""Create tables for the task management system."""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
try:
# Check if project_category table exists
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='project_category'")
if not cursor.fetchone():
print("Creating project_category table...")
cursor.execute("""
CREATE TABLE project_category (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name VARCHAR(100) NOT NULL,
description TEXT,
color VARCHAR(7) DEFAULT '#007bff',
icon VARCHAR(50),
company_id INTEGER NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
created_by_id INTEGER NOT NULL,
FOREIGN KEY (company_id) REFERENCES company (id),
FOREIGN KEY (created_by_id) REFERENCES user (id),
UNIQUE(company_id, name)
)
""")
# Check if task table exists
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='task'")
if not cursor.fetchone():
print("Creating task table...")
cursor.execute("""
CREATE TABLE task (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name VARCHAR(200) NOT NULL,
description TEXT,
status VARCHAR(50) DEFAULT 'Not Started',
priority VARCHAR(50) DEFAULT 'Medium',
estimated_hours FLOAT,
project_id INTEGER NOT NULL,
assigned_to_id INTEGER,
start_date DATE,
due_date DATE,
completed_date DATE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
created_by_id INTEGER NOT NULL,
FOREIGN KEY (project_id) REFERENCES project (id),
FOREIGN KEY (assigned_to_id) REFERENCES user (id),
FOREIGN KEY (created_by_id) REFERENCES user (id)
)
""")
# Check if sub_task table exists
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='sub_task'")
if not cursor.fetchone():
print("Creating sub_task table...")
cursor.execute("""
CREATE TABLE sub_task (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name VARCHAR(200) NOT NULL,
description TEXT,
status VARCHAR(50) DEFAULT 'Not Started',
priority VARCHAR(50) DEFAULT 'Medium',
estimated_hours FLOAT,
task_id INTEGER NOT NULL,
assigned_to_id INTEGER,
start_date DATE,
due_date DATE,
completed_date DATE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
created_by_id INTEGER NOT NULL,
FOREIGN KEY (task_id) REFERENCES task (id),
FOREIGN KEY (assigned_to_id) REFERENCES user (id),
FOREIGN KEY (created_by_id) REFERENCES user (id)
)
""")
# Add category_id to project table if it doesn't exist
cursor.execute("PRAGMA table_info(project)")
project_columns = [column[1] for column in cursor.fetchall()]
if 'category_id' not in project_columns:
print("Adding category_id column to project table...")
cursor.execute("ALTER TABLE project ADD COLUMN category_id INTEGER")
# Add task_id and subtask_id to time_entry table if they don't exist
cursor.execute("PRAGMA table_info(time_entry)")
time_entry_columns = [column[1] for column in cursor.fetchall()]
if 'project_id' not in time_entry_columns:
print("Adding project_id column to time_entry...")
cursor.execute("ALTER TABLE time_entry ADD COLUMN project_id INTEGER")
task_migrations = [
('task_id', "ALTER TABLE time_entry ADD COLUMN task_id INTEGER"),
('subtask_id', "ALTER TABLE time_entry ADD COLUMN subtask_id INTEGER")
]
if 'notes' not in time_entry_columns:
print("Adding notes column to time_entry...")
cursor.execute("ALTER TABLE time_entry ADD COLUMN notes TEXT")
for column_name, sql_command in task_migrations:
if column_name not in time_entry_columns:
print(f"Adding {column_name} column to time_entry...")
cursor.execute(sql_command)
# Commit changes and close connection
conn.commit()
print("Task system migration completed successfully!")
except Exception as e:
print(f"Error during task system migration: {e}")
conn.rollback()
raise
finally:
conn.close()
with app.app_context():
# Create tables if they don't exist
db.create_all()
# Initialize system settings
init_system_settings()
def migrate_data():
"""Handle data migration with Flask app context."""
if not FLASK_AVAILABLE:
print("Skipping data migration - Flask not available")
return
# Check if admin user exists
admin = User.query.filter_by(username='admin').first()
if not admin:
# Create admin user
admin = User(
username='admin',
email='admin@timetrack.local',
is_verified=True, # Admin is automatically verified
role=Role.ADMIN,
two_factor_enabled=False
)
admin.set_password('admin') # Default password, should be changed
db.session.add(admin)
db.session.commit()
print("Created admin user with username 'admin' and password 'admin'")
print("Please change the admin password after first login!")
else:
# Make sure existing admin user is verified and has correct role
if not hasattr(admin, 'is_verified') or not admin.is_verified:
admin.is_verified = True
if not hasattr(admin, 'role') or admin.role is None:
admin.role = Role.ADMIN
if not hasattr(admin, 'two_factor_enabled') or admin.two_factor_enabled is None:
admin.two_factor_enabled = False
db.session.commit()
print("Updated existing admin user with new fields")
# Update existing time entries to associate with admin user
orphan_entries = TimeEntry.query.filter_by(user_id=None).all()
for entry in orphan_entries:
entry.user_id = admin.id
# Update existing work configs to associate with admin user
orphan_configs = WorkConfig.query.filter_by(user_id=None).all()
for config in orphan_configs:
config.user_id = admin.id
# Mark all existing users as verified for backward compatibility
existing_users = User.query.filter_by(is_verified=None).all()
for user in existing_users:
user.is_verified = True
# Update existing users with default role and 2FA settings
users_to_update = User.query.all()
updated_count = 0
for user in users_to_update:
updated = False
if not hasattr(user, 'role') or user.role is None:
try:
# Update existing users with null/invalid data
users = User.query.all()
for user in users:
if user.role is None:
user.role = Role.TEAM_MEMBER
updated = True
if not hasattr(user, 'two_factor_enabled') or user.two_factor_enabled is None:
if user.two_factor_enabled is None:
user.two_factor_enabled = False
updated = True
if updated:
updated_count += 1
# Check if any system admin users exist
system_admin_count = User.query.filter_by(role=Role.SYSTEM_ADMIN).count()
if system_admin_count == 0:
print("No system administrators found. Consider promoting a user to SYSTEM_ADMIN role manually.")
print("Use: UPDATE user SET role = 'System Administrator' WHERE username = 'your_username';")
print(f"To promote a user: UPDATE user SET role = '{Role.SYSTEM_ADMIN.value}' WHERE username = 'your_username';")
else:
print(f"Found {system_admin_count} system administrator(s)")
db.session.commit()
print(f"Associated {len(orphan_entries)} existing time entries with admin user")
print(f"Associated {len(orphan_configs)} existing work configs with admin user")
print(f"Marked {len(existing_users)} existing users as verified")
print(f"Updated {updated_count} users with default role and 2FA settings")
print("Data migration completed successfully")
# Create sample projects if none exist
existing_projects = Project.query.count()
if existing_projects == 0 and admin:
sample_projects = [
{
'name': 'General Administration',
'code': 'ADMIN001',
'description': 'General administrative tasks and meetings',
'team_id': None,
},
{
'name': 'Development Project',
'code': 'DEV001',
'description': 'Software development and maintenance tasks',
'team_id': None,
},
{
'name': 'Customer Support',
'code': 'SUPPORT001',
'description': 'Customer service and technical support activities',
'team_id': None,
}
]
except Exception as e:
print(f"Error during data migration: {e}")
db.session.rollback()
for proj_data in sample_projects:
project = Project(
name=proj_data['name'],
code=proj_data['code'],
description=proj_data['description'],
team_id=proj_data['team_id'],
created_by_id=admin.id,
is_active=True
)
db.session.add(project)
db.session.commit()
print(f"Created {len(sample_projects)} sample projects")
def init_system_settings():
"""Initialize system settings with default values if they don't exist"""
"""Initialize system settings with default values if they don't exist."""
if not FLASK_AVAILABLE:
print("Skipping system settings initialization - Flask not available")
return
# Check if registration_enabled setting exists
reg_setting = SystemSettings.query.filter_by(key='registration_enabled').first()
if not reg_setting:
print("Adding registration_enabled system setting...")
reg_setting = SystemSettings(
key='registration_enabled',
value='true', # Default to enabled
value='true',
description='Controls whether new user registration is allowed'
)
db.session.add(reg_setting)
@@ -367,13 +666,132 @@ def init_system_settings():
print("Adding email_verification_required system setting...")
email_verification_setting = SystemSettings(
key='email_verification_required',
value='true', # Default to enabled for security
value='true',
description='Controls whether email verification is required for new user accounts'
)
db.session.add(email_verification_setting)
db.session.commit()
print("Email verification setting initialized to enabled")
def create_new_database(db_path):
"""Create a new database with all tables."""
print(f"Creating new database at {db_path}")
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
try:
create_all_tables(cursor)
conn.commit()
print("New database created successfully")
except Exception as e:
print(f"Error creating new database: {e}")
conn.rollback()
raise
finally:
conn.close()
def create_all_tables(cursor):
"""Create all tables from scratch."""
# This would contain all CREATE TABLE statements
# For brevity, showing key tables only
cursor.execute("""
CREATE TABLE company (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name VARCHAR(100) NOT NULL,
slug VARCHAR(50) UNIQUE NOT NULL,
description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
is_personal BOOLEAN DEFAULT 0,
is_active BOOLEAN DEFAULT 1,
max_users INTEGER DEFAULT 100,
UNIQUE(name)
)
""")
cursor.execute("""
CREATE TABLE user (
id INTEGER PRIMARY KEY,
username VARCHAR(80) NOT NULL,
email VARCHAR(120) NOT NULL,
password_hash VARCHAR(128),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
company_id INTEGER NOT NULL,
is_verified BOOLEAN DEFAULT 0,
verification_token VARCHAR(100),
token_expiry TIMESTAMP,
is_blocked BOOLEAN DEFAULT 0,
role VARCHAR(50) DEFAULT 'Team Member' CHECK (role IN ('Team Member', 'Team Leader', 'Supervisor', 'Administrator', 'System Administrator')),
team_id INTEGER,
account_type VARCHAR(20) DEFAULT 'Company User' CHECK (account_type IN ('Company User', 'Freelancer')),
business_name VARCHAR(100),
two_factor_enabled BOOLEAN DEFAULT 0,
two_factor_secret VARCHAR(32),
FOREIGN KEY (company_id) REFERENCES company (id),
FOREIGN KEY (team_id) REFERENCES team (id)
)
""")
# Add other table creation statements as needed
print("All tables created")
def main():
"""Main function with command line interface."""
parser = argparse.ArgumentParser(description='TimeTrack Database Migration Tool')
parser.add_argument('--db-file', '-d', help='Path to SQLite database file')
parser.add_argument('--create-new', '-c', action='store_true',
help='Create a new database (will overwrite existing)')
parser.add_argument('--migrate-all', '-m', action='store_true',
help='Run all migrations (default action)')
parser.add_argument('--task-system', '-t', action='store_true',
help='Run only task system migration')
parser.add_argument('--company-model', '-p', action='store_true',
help='Run only company model migration')
parser.add_argument('--basic', '-b', action='store_true',
help='Run only basic table migrations')
args = parser.parse_args()
db_path = get_db_path(args.db_file)
print(f"TimeTrack Database Migration Tool")
print(f"Database: {db_path}")
print(f"Flask available: {FLASK_AVAILABLE}")
print("-" * 50)
try:
if args.create_new:
if os.path.exists(db_path):
response = input(f"Database {db_path} exists. Overwrite? (y/N): ")
if response.lower() != 'y':
print("Operation cancelled")
return
os.remove(db_path)
create_new_database(db_path)
elif args.task_system:
migrate_task_system(db_path)
elif args.company_model:
migrate_to_company_model(db_path)
elif args.basic:
run_basic_migrations(db_path)
else:
# Default: run all migrations
run_all_migrations(db_path)
print("\nMigration completed successfully!")
except Exception as e:
print(f"\nError during migration: {e}")
sys.exit(1)
if __name__ == "__main__":
migrate_database()
print("Database migration completed")
main()

View File

@@ -1,172 +0,0 @@
#!/usr/bin/env python3
"""
Migration script for freelancer support in TimeTrack.
This migration adds:
1. AccountType enum support (handled by SQLAlchemy)
2. account_type column to user table
3. business_name column to user table
4. is_personal column to company table
Usage:
python migrate_freelancers.py # Run migration
python migrate_freelancers.py rollback # Rollback migration
"""
from app import app, db
import sqlite3
import os
import sys
from models import User, Company, AccountType
from datetime import datetime
def migrate_freelancer_support():
"""Add freelancer support to existing database"""
db_path = 'timetrack.db'
# Check if database exists
if not os.path.exists(db_path):
print("Database doesn't exist. Please run main migration first.")
return False
print("Migrating database for freelancer support...")
# Connect to the database
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
try:
# Check company table structure
cursor.execute("PRAGMA table_info(company)")
company_columns = [column[1] for column in cursor.fetchall()]
# Add is_personal column to company table if it doesn't exist
if 'is_personal' not in company_columns:
print("Adding is_personal column to company table...")
cursor.execute("ALTER TABLE company ADD COLUMN is_personal BOOLEAN DEFAULT 0")
# Check user table structure
cursor.execute("PRAGMA table_info(user)")
user_columns = [column[1] for column in cursor.fetchall()]
# Add account_type column to user table if it doesn't exist
if 'account_type' not in user_columns:
print("Adding account_type column to user table...")
# Default to 'COMPANY_USER' for existing users
cursor.execute("ALTER TABLE user ADD COLUMN account_type VARCHAR(20) DEFAULT 'COMPANY_USER'")
# Add business_name column to user table if it doesn't exist
if 'business_name' not in user_columns:
print("Adding business_name column to user table...")
cursor.execute("ALTER TABLE user ADD COLUMN business_name VARCHAR(100)")
# Commit changes
conn.commit()
print("✓ Freelancer migration completed successfully!")
# Update existing users to have explicit account_type
print("Updating existing users to COMPANY_USER account type...")
cursor.execute("UPDATE user SET account_type = 'COMPANY_USER' WHERE account_type IS NULL OR account_type = ''")
conn.commit()
return True
except Exception as e:
print(f"✗ Migration failed: {str(e)}")
conn.rollback()
return False
finally:
conn.close()
def rollback_freelancer_support():
"""Rollback freelancer support migration"""
db_path = 'timetrack.db'
if not os.path.exists(db_path):
print("Database doesn't exist.")
return False
print("Rolling back freelancer support migration...")
# Connect to the database
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
try:
print("WARNING: SQLite doesn't support dropping columns directly.")
print("To fully rollback, you would need to:")
print("1. Create new tables without the freelancer columns")
print("2. Copy data from old tables to new tables")
print("3. Drop old tables and rename new ones")
print("\nFor safety, leaving columns in place but marking rollback as complete.")
print("The application will work without issues with the extra columns present.")
return True
except Exception as e:
print(f"✗ Rollback failed: {str(e)}")
return False
finally:
conn.close()
def verify_migration():
"""Verify that the migration was applied correctly"""
db_path = 'timetrack.db'
if not os.path.exists(db_path):
print("Database doesn't exist.")
return False
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
try:
# Check company table
cursor.execute("PRAGMA table_info(company)")
company_columns = [column[1] for column in cursor.fetchall()]
# Check user table
cursor.execute("PRAGMA table_info(user)")
user_columns = [column[1] for column in cursor.fetchall()]
print("\n=== Migration Verification ===")
print("Company table columns:", company_columns)
print("User table columns:", user_columns)
# Verify required columns exist
missing_columns = []
if 'is_personal' not in company_columns:
missing_columns.append('company.is_personal')
if 'account_type' not in user_columns:
missing_columns.append('user.account_type')
if 'business_name' not in user_columns:
missing_columns.append('user.business_name')
if missing_columns:
print(f"✗ Missing columns: {', '.join(missing_columns)}")
return False
else:
print("✓ All required columns present")
return True
except Exception as e:
print(f"✗ Verification failed: {str(e)}")
return False
finally:
conn.close()
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == 'rollback':
success = rollback_freelancer_support()
elif len(sys.argv) > 1 and sys.argv[1] == 'verify':
success = verify_migration()
else:
success = migrate_freelancer_support()
if success:
verify_migration()
if success:
print("\n✓ Operation completed successfully!")
else:
print("\n✗ Operation failed!")
sys.exit(1)

View File

@@ -1,182 +0,0 @@
from app import app, db
from models import User, TimeEntry, Project, Team, Role
from sqlalchemy import text
import logging
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def migrate_projects():
"""Migration script to add project time logging functionality"""
with app.app_context():
logger.info("Starting migration for project time logging...")
# Check if the project table exists
try:
# Create the project table if it doesn't exist
db.engine.execute(text("""
CREATE TABLE IF NOT EXISTS project (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name VARCHAR(100) NOT NULL,
description TEXT,
code VARCHAR(20) NOT NULL UNIQUE,
is_active BOOLEAN DEFAULT 1,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
created_by_id INTEGER NOT NULL,
team_id INTEGER,
start_date DATE,
end_date DATE,
FOREIGN KEY (created_by_id) REFERENCES user (id),
FOREIGN KEY (team_id) REFERENCES team (id)
)
"""))
logger.info("Project table created or already exists")
except Exception as e:
logger.error(f"Error creating project table: {e}")
return
# Check if the time_entry table has the project-related columns
try:
# Check if project_id and notes columns exist in time_entry
result = db.engine.execute(text("PRAGMA table_info(time_entry)"))
columns = [row[1] for row in result]
if 'project_id' not in columns:
db.engine.execute(text("ALTER TABLE time_entry ADD COLUMN project_id INTEGER REFERENCES project(id)"))
logger.info("Added project_id column to time_entry table")
if 'notes' not in columns:
db.engine.execute(text("ALTER TABLE time_entry ADD COLUMN notes TEXT"))
logger.info("Added notes column to time_entry table")
except Exception as e:
logger.error(f"Error updating time_entry table: {e}")
return
# Create some default projects for demonstration
try:
# Check if any projects exist
existing_projects = Project.query.count()
if existing_projects == 0:
# Find an admin or supervisor user to be the creator
admin_user = User.query.filter(User.role.in_([Role.ADMIN, Role.SUPERVISOR])).first()
if admin_user:
# Create some sample projects
sample_projects = [
{
'name': 'General Administration',
'code': 'ADMIN001',
'description': 'General administrative tasks and meetings',
'team_id': None, # Available to all teams
},
{
'name': 'Development Project',
'code': 'DEV001',
'description': 'Software development and maintenance tasks',
'team_id': None, # Available to all teams
},
{
'name': 'Customer Support',
'code': 'SUPPORT001',
'description': 'Customer service and technical support activities',
'team_id': None, # Available to all teams
}
]
for proj_data in sample_projects:
project = Project(
name=proj_data['name'],
code=proj_data['code'],
description=proj_data['description'],
team_id=proj_data['team_id'],
created_by_id=admin_user.id,
is_active=True
)
db.session.add(project)
db.session.commit()
logger.info(f"Created {len(sample_projects)} sample projects")
else:
logger.warning("No admin or supervisor user found to create sample projects")
else:
logger.info(f"Found {existing_projects} existing projects, skipping sample creation")
except Exception as e:
logger.error(f"Error creating sample projects: {e}")
db.session.rollback()
# Update database schema to match the current models
try:
db.create_all()
logger.info("Database schema updated successfully")
except Exception as e:
logger.error(f"Error updating database schema: {e}")
return
# Verify the migration
try:
# Check if we can query the new tables and columns
project_count = Project.query.count()
logger.info(f"Project table accessible with {project_count} projects")
# Check if time_entry has the new columns
result = db.engine.execute(text("PRAGMA table_info(time_entry)"))
columns = [row[1] for row in result]
required_columns = ['project_id', 'notes']
missing_columns = [col for col in required_columns if col not in columns]
if missing_columns:
logger.error(f"Missing columns in time_entry: {missing_columns}")
return
else:
logger.info("All required columns present in time_entry table")
except Exception as e:
logger.error(f"Error verifying migration: {e}")
return
logger.info("Project time logging migration completed successfully!")
print("\n" + "="*60)
print("PROJECT TIME LOGGING FEATURE ENABLED")
print("="*60)
print("✅ Project management interface available for Admins/Supervisors")
print("✅ Time tracking with optional project selection")
print("✅ Project-based reporting and filtering")
print("✅ Enhanced export functionality with project data")
print("\nAccess project management via:")
print("- Admin dropdown → Manage Projects")
print("- Supervisor dropdown → Manage Projects")
print("="*60)
def rollback_projects():
"""Rollback migration (removes project functionality)"""
with app.app_context():
logger.warning("Rolling back project time logging migration...")
try:
# Drop the project table
db.engine.execute(text("DROP TABLE IF EXISTS project"))
logger.info("Dropped project table")
# Note: SQLite doesn't support dropping columns, so we can't remove
# project_id and notes columns from time_entry table
logger.warning("Note: project_id and notes columns in time_entry table cannot be removed due to SQLite limitations")
logger.warning("These columns will remain but will not be used")
except Exception as e:
logger.error(f"Error during rollback: {e}")
return
logger.info("Project time logging rollback completed")
if __name__ == "__main__":
import sys
if len(sys.argv) > 1 and sys.argv[1] == "rollback":
rollback_projects()
else:
migrate_projects()

View File

@@ -1,89 +0,0 @@
from app import app, db
from models import User, Team, Role, SystemSettings
from sqlalchemy import text
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def migrate_roles_teams():
with app.app_context():
logger.info("Starting migration for roles and teams...")
# Check if the team table exists
try:
# Create the team table if it doesn't exist
db.engine.execute(text("""
CREATE TABLE IF NOT EXISTS team (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name VARCHAR(100) NOT NULL UNIQUE,
description VARCHAR(255),
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
"""))
logger.info("Team table created or already exists")
except Exception as e:
logger.error(f"Error creating team table: {e}")
return
# Check if the user table has the role and team_id columns
try:
# Check if role column exists
result = db.engine.execute(text("PRAGMA table_info(user)"))
columns = [row[1] for row in result]
if 'role' not in columns:
# Use the enum name instead of the value
db.engine.execute(text("ALTER TABLE user ADD COLUMN role VARCHAR(20) DEFAULT 'TEAM_MEMBER'"))
logger.info("Added role column to user table")
if 'team_id' not in columns:
db.engine.execute(text("ALTER TABLE user ADD COLUMN team_id INTEGER REFERENCES team(id)"))
logger.info("Added team_id column to user table")
# Create a default team for existing users
default_team = Team.query.filter_by(name="Default Team").first()
if not default_team:
default_team = Team(name="Default Team", description="Default team for existing users")
db.session.add(default_team)
db.session.commit()
logger.info("Created default team")
# Map string role values to enum values
role_mapping = {
'Team Member': Role.TEAM_MEMBER,
'TEAM_MEMBER': Role.TEAM_MEMBER,
'Team Leader': Role.TEAM_LEADER,
'TEAM_LEADER': Role.TEAM_LEADER,
'Supervisor': Role.SUPERVISOR,
'SUPERVISOR': Role.SUPERVISOR,
'Administrator': Role.ADMIN,
'admin': Role.ADMIN,
'ADMIN': Role.ADMIN
}
# Assign all existing users to the default team and set role based on admin status
users = User.query.all()
for user in users:
if user.team_id is None:
user.team_id = default_team.id
# Handle role conversion properly
if isinstance(user.role, str):
# Try to map the string to an enum value
user.role = role_mapping.get(user.role, Role.TEAM_MEMBER)
elif user.role is None:
# Set default role
user.role = Role.TEAM_MEMBER
db.session.commit()
logger.info(f"Assigned {len(users)} existing users to default team and updated roles")
except Exception as e:
logger.error(f"Error updating user table: {e}")
return
logger.info("Migration completed successfully")
if __name__ == "__main__":
migrate_roles_teams()

149
models.py
View File

@@ -86,6 +86,9 @@ class Project(db.Model):
# Optional team assignment - if set, only team members can log time to this project
team_id = db.Column(db.Integer, db.ForeignKey('team.id'), nullable=True)
# Project categorization
category_id = db.Column(db.Integer, db.ForeignKey('project_category.id'), nullable=True)
# Project dates
start_date = db.Column(db.Date, nullable=True)
end_date = db.Column(db.Date, nullable=True)
@@ -94,6 +97,7 @@ class Project(db.Model):
created_by = db.relationship('User', foreign_keys=[created_by_id], backref='created_projects')
team = db.relationship('Team', backref='projects')
time_entries = db.relationship('TimeEntry', backref='project', lazy=True)
category = db.relationship('ProjectCategory', back_populates='projects')
# Unique constraint per company
__table_args__ = (db.UniqueConstraint('company_id', 'code', name='uq_project_code_per_company'),)
@@ -237,6 +241,10 @@ class TimeEntry(db.Model):
# Project association - nullable for backward compatibility
project_id = db.Column(db.Integer, db.ForeignKey('project.id'), nullable=True)
# Task/SubTask associations - nullable for backward compatibility
task_id = db.Column(db.Integer, db.ForeignKey('task.id'), nullable=True)
subtask_id = db.Column(db.Integer, db.ForeignKey('sub_task.id'), nullable=True)
# Optional notes/description for the time entry
notes = db.Column(db.Text, nullable=True)
@@ -379,3 +387,144 @@ class UserPreferences(db.Model):
def __repr__(self):
return f'<UserPreferences {self.user.username}: {self.date_format}, {"24h" if self.time_format_24h else "12h"}>'
# Project Category model for organizing projects
class ProjectCategory(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
description = db.Column(db.Text, nullable=True)
color = db.Column(db.String(7), default='#007bff') # Hex color for UI
icon = db.Column(db.String(50), nullable=True) # Icon name/emoji
# Company association for multi-tenancy
company_id = db.Column(db.Integer, db.ForeignKey('company.id'), nullable=False)
# Metadata
created_at = db.Column(db.DateTime, default=datetime.now)
updated_at = db.Column(db.DateTime, default=datetime.now, onupdate=datetime.now)
created_by_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
# Relationships
company = db.relationship('Company', backref='project_categories')
created_by = db.relationship('User', foreign_keys=[created_by_id])
projects = db.relationship('Project', back_populates='category', lazy=True)
# Unique constraint per company
__table_args__ = (db.UniqueConstraint('company_id', 'name', name='uq_category_name_per_company'),)
def __repr__(self):
return f'<ProjectCategory {self.name}>'
# Task status enumeration
class TaskStatus(enum.Enum):
NOT_STARTED = "Not Started"
IN_PROGRESS = "In Progress"
ON_HOLD = "On Hold"
COMPLETED = "Completed"
CANCELLED = "Cancelled"
# Task priority enumeration
class TaskPriority(enum.Enum):
LOW = "Low"
MEDIUM = "Medium"
HIGH = "High"
URGENT = "Urgent"
# Task model for project breakdown
class Task(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(200), nullable=False)
description = db.Column(db.Text, nullable=True)
# Task properties
status = db.Column(db.Enum(TaskStatus), default=TaskStatus.NOT_STARTED)
priority = db.Column(db.Enum(TaskPriority), default=TaskPriority.MEDIUM)
estimated_hours = db.Column(db.Float, nullable=True) # Estimated time to complete
# Project association
project_id = db.Column(db.Integer, db.ForeignKey('project.id'), nullable=False)
# Task assignment
assigned_to_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=True)
# Task dates
start_date = db.Column(db.Date, nullable=True)
due_date = db.Column(db.Date, nullable=True)
completed_date = db.Column(db.Date, nullable=True)
# Metadata
created_at = db.Column(db.DateTime, default=datetime.now)
updated_at = db.Column(db.DateTime, default=datetime.now, onupdate=datetime.now)
created_by_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
# Relationships
project = db.relationship('Project', backref='tasks')
assigned_to = db.relationship('User', foreign_keys=[assigned_to_id], backref='assigned_tasks')
created_by = db.relationship('User', foreign_keys=[created_by_id])
subtasks = db.relationship('SubTask', backref='parent_task', lazy=True, cascade='all, delete-orphan')
time_entries = db.relationship('TimeEntry', backref='task', lazy=True)
def __repr__(self):
return f'<Task {self.name} ({self.status.value})>'
@property
def progress_percentage(self):
"""Calculate task progress based on subtasks completion"""
if not self.subtasks:
return 100 if self.status == TaskStatus.COMPLETED else 0
completed_subtasks = sum(1 for subtask in self.subtasks if subtask.status == TaskStatus.COMPLETED)
return int((completed_subtasks / len(self.subtasks)) * 100)
@property
def total_time_logged(self):
"""Calculate total time logged to this task (in seconds)"""
return sum(entry.duration or 0 for entry in self.time_entries if entry.duration)
def can_user_access(self, user):
"""Check if a user can access this task"""
return self.project.is_user_allowed(user)
# SubTask model for task breakdown
class SubTask(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(200), nullable=False)
description = db.Column(db.Text, nullable=True)
# SubTask properties
status = db.Column(db.Enum(TaskStatus), default=TaskStatus.NOT_STARTED)
priority = db.Column(db.Enum(TaskPriority), default=TaskPriority.MEDIUM)
estimated_hours = db.Column(db.Float, nullable=True)
# Parent task association
task_id = db.Column(db.Integer, db.ForeignKey('task.id'), nullable=False)
# Assignment
assigned_to_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=True)
# Dates
start_date = db.Column(db.Date, nullable=True)
due_date = db.Column(db.Date, nullable=True)
completed_date = db.Column(db.Date, nullable=True)
# Metadata
created_at = db.Column(db.DateTime, default=datetime.now)
updated_at = db.Column(db.DateTime, default=datetime.now, onupdate=datetime.now)
created_by_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
# Relationships
assigned_to = db.relationship('User', foreign_keys=[assigned_to_id], backref='assigned_subtasks')
created_by = db.relationship('User', foreign_keys=[created_by_id])
time_entries = db.relationship('TimeEntry', backref='subtask', lazy=True)
def __repr__(self):
return f'<SubTask {self.name} ({self.status.value})>'
@property
def total_time_logged(self):
"""Calculate total time logged to this subtask (in seconds)"""
return sum(entry.duration or 0 for entry in self.time_entries if entry.duration)
def can_user_access(self, user):
"""Check if a user can access this subtask"""
return self.parent_task.can_user_access(user)