Add missing cron scripts.

This commit is contained in:
2025-11-22 11:12:40 +01:00
parent c375b9ee3d
commit 8de4378ad9
3 changed files with 483 additions and 0 deletions

327
cleanup_inactive_accounts.py Executable file
View File

@@ -0,0 +1,327 @@
#!/usr/bin/env python
"""
Cleanup verified but inactive user accounts.
Identifies and removes bot-created or unused accounts that:
- Are verified (is_verified = True)
- Have never logged in (no SystemEvent login records)
- Have never created time entries
- Are older than a specified number of days
This script can be run manually or scheduled via cron.
"""
import os
import sys
from datetime import datetime, timedelta
from sqlalchemy import and_, or_
# Add the application path to Python path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from app import app, db
from models import User, Company, SystemEvent, TimeEntry
from models.enums import Role
def find_inactive_verified_accounts(min_age_days=30, dry_run=False):
"""
Find verified user accounts that have never been used.
Args:
min_age_days (int): Minimum age in days for accounts to be considered
dry_run (bool): If True, only show what would be deleted without actually deleting.
Returns:
tuple: (accounts_to_delete, details_list)
"""
with app.app_context():
cutoff_time = datetime.utcnow() - timedelta(days=min_age_days)
# Find verified accounts older than cutoff
verified_old_users = User.query.filter(
and_(
User.is_verified == True,
User.created_at < cutoff_time
)
).all()
inactive_accounts = []
for user in verified_old_users:
# Check if user has any time entries
time_entry_count = TimeEntry.query.filter_by(user_id=user.id).count()
# Check if user has any login events
login_event_count = SystemEvent.query.filter(
and_(
SystemEvent.user_id == user.id,
SystemEvent.event_type == 'user_login'
)
).count()
# If no time entries and no logins, this is an inactive account
if time_entry_count == 0 and login_event_count == 0:
account_age_days = (datetime.utcnow() - user.created_at).days
# Get company info
company = Company.query.get(user.company_id) if user.company_id else None
company_name = company.name if company else "Unknown"
# Check if user is the only admin in their company
is_only_admin = False
if user.role in [Role.ADMIN, Role.SYSTEM_ADMIN]:
other_admins = User.query.filter(
and_(
User.company_id == user.company_id,
User.id != user.id,
User.role.in_([Role.ADMIN, Role.SYSTEM_ADMIN])
)
).count()
is_only_admin = (other_admins == 0)
inactive_accounts.append({
'user': user,
'company_name': company_name,
'age_days': account_age_days,
'is_only_admin': is_only_admin,
'company': company
})
return inactive_accounts
def cleanup_inactive_accounts(min_age_days=30, dry_run=False, skip_admins=True):
"""
Delete inactive verified accounts.
Args:
min_age_days (int): Minimum age in days for accounts to be considered
dry_run (bool): If True, only show what would be deleted without actually deleting
skip_admins (bool): If True, skip accounts that are the only admin in their company
Returns:
int: Number of accounts deleted (or would be deleted in dry run mode)
"""
with app.app_context():
inactive_accounts = find_inactive_verified_accounts(min_age_days, dry_run)
deleted_count = 0
skipped_count = 0
companies_deleted = 0
print(f"\nFound {len(inactive_accounts)} inactive verified account(s) older than {min_age_days} days")
print("=" * 80)
for account_info in inactive_accounts:
user = account_info['user']
company_name = account_info['company_name']
age_days = account_info['age_days']
is_only_admin = account_info['is_only_admin']
company = account_info['company']
# Skip if this is the only admin and skip_admins is True
if is_only_admin and skip_admins:
print(f"\nSKIPPED (only admin): {user.username}")
print(f" Email: {user.email}")
print(f" Company: {company_name} (ID: {user.company_id})")
print(f" Role: {user.role.value}")
print(f" Created: {user.created_at} ({age_days} days ago)")
skipped_count += 1
continue
if dry_run:
print(f"\nWOULD DELETE: {user.username}")
print(f" Email: {user.email}")
print(f" Company: {company_name} (ID: {user.company_id})")
print(f" Role: {user.role.value}")
print(f" Created: {user.created_at} ({age_days} days ago)")
# Check if company would be deleted too
if company:
other_users = User.query.filter(
and_(
User.company_id == user.company_id,
User.id != user.id
)
).count()
if other_users == 0:
print(f" -> Would also delete empty company: {company_name}")
deleted_count += 1
else:
print(f"\nDELETING: {user.username}")
print(f" Email: {user.email}")
print(f" Company: {company_name} (ID: {user.company_id})")
print(f" Role: {user.role.value}")
print(f" Created: {user.created_at} ({age_days} days ago)")
# Log the deletion as a system event
try:
SystemEvent.log_event(
event_type='user_deleted',
event_category='system',
description=f'Inactive verified user {user.username} deleted (no activity for {age_days} days)',
user_id=None, # No user context for system cleanup
company_id=user.company_id
)
except Exception as e:
print(f" Warning: Could not log deletion event: {e}")
# Check if company should be deleted
if company:
other_users = User.query.filter(
and_(
User.company_id == user.company_id,
User.id != user.id
)
).count()
# Delete the user
db.session.delete(user)
deleted_count += 1
# If no other users, delete the company too
if other_users == 0:
print(f" -> Also deleting empty company: {company_name}")
db.session.delete(company)
companies_deleted += 1
else:
# Delete the user even if company doesn't exist
db.session.delete(user)
deleted_count += 1
if not dry_run and deleted_count > 0:
db.session.commit()
print("\n" + "=" * 80)
print(f"Successfully deleted {deleted_count} inactive account(s)")
if companies_deleted > 0:
print(f"Successfully deleted {companies_deleted} empty company/companies")
if skipped_count > 0:
print(f"Skipped {skipped_count} account(s) (only admin in company)")
elif dry_run:
print("\n" + "=" * 80)
print(f"DRY RUN: Would delete {deleted_count} inactive account(s)")
if skipped_count > 0:
print(f"Would skip {skipped_count} account(s) (only admin in company)")
else:
print("\n" + "=" * 80)
print("No inactive accounts found to delete")
if skipped_count > 0:
print(f"Skipped {skipped_count} account(s) (only admin in company)")
return deleted_count
def generate_report(min_age_days=30):
"""
Generate a detailed report of inactive accounts without deleting anything.
Args:
min_age_days (int): Minimum age in days for accounts to be considered
"""
with app.app_context():
inactive_accounts = find_inactive_verified_accounts(min_age_days, dry_run=True)
print(f"\n{'='*80}")
print(f"INACTIVE VERIFIED ACCOUNTS REPORT")
print(f"Accounts older than {min_age_days} days with no activity")
print(f"Generated: {datetime.utcnow()}")
print(f"{'='*80}\n")
if not inactive_accounts:
print("No inactive verified accounts found.")
return
# Group by company
by_company = {}
for account_info in inactive_accounts:
company_name = account_info['company_name']
if company_name not in by_company:
by_company[company_name] = []
by_company[company_name].append(account_info)
# Print summary
print(f"Total inactive accounts: {len(inactive_accounts)}")
print(f"Companies affected: {len(by_company)}\n")
# Print details by company
for company_name, accounts in sorted(by_company.items()):
print(f"\nCompany: {company_name}")
print("-" * 80)
for account_info in accounts:
user = account_info['user']
age_days = account_info['age_days']
is_only_admin = account_info['is_only_admin']
admin_warning = " [ONLY ADMIN]" if is_only_admin else ""
print(f"{user.username}{admin_warning}")
print(f" Email: {user.email or 'N/A'}")
print(f" Role: {user.role.value}")
print(f" Created: {user.created_at} ({age_days} days ago)")
print()
def main():
"""Main function to handle command line arguments"""
import argparse
parser = argparse.ArgumentParser(
description='Cleanup verified but inactive user accounts (no logins, no time entries)'
)
parser.add_argument(
'--dry-run',
action='store_true',
help='Show what would be deleted without actually deleting'
)
parser.add_argument(
'--min-age',
type=int,
default=30,
help='Minimum account age in days (default: 30)'
)
parser.add_argument(
'--include-admins',
action='store_true',
help='Include accounts that are the only admin in their company (default: skip them)'
)
parser.add_argument(
'--report-only',
action='store_true',
help='Generate a detailed report only, do not delete anything'
)
parser.add_argument(
'--quiet',
action='store_true',
help='Suppress output except for errors'
)
args = parser.parse_args()
if not args.quiet:
print(f"Starting cleanup of inactive verified accounts at {datetime.utcnow()}")
print("-" * 80)
try:
if args.report_only:
generate_report(min_age_days=args.min_age)
else:
deleted_count = cleanup_inactive_accounts(
min_age_days=args.min_age,
dry_run=args.dry_run,
skip_admins=not args.include_admins
)
if not args.quiet:
print("-" * 80)
print(f"Cleanup completed at {datetime.utcnow()}")
# Exit with 0 for success
sys.exit(0)
except Exception as e:
print(f"Error during cleanup: {str(e)}", file=sys.stderr)
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == '__main__':
main()

152
cleanup_unverified_accounts.py Executable file
View File

@@ -0,0 +1,152 @@
#!/usr/bin/env python
"""
Cleanup unverified user accounts older than 24 hours.
This script can be run manually or scheduled via cron.
"""
import os
import sys
from datetime import datetime, timedelta
from sqlalchemy import and_
# Add the application path to Python path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from app import app, db
from models import User, Company, SystemEvent
from models.enums import Role
def cleanup_unverified_accounts(dry_run=False):
"""
Delete unverified user accounts that are older than 24 hours.
Args:
dry_run (bool): If True, only show what would be deleted without actually deleting.
Returns:
int: Number of accounts deleted (or would be deleted in dry run mode)
"""
with app.app_context():
# Find unverified accounts older than 24 hours
cutoff_time = datetime.utcnow() - timedelta(hours=24)
unverified_users = User.query.filter(
and_(
User.is_verified == False,
User.created_at < cutoff_time
)
).all()
deleted_count = 0
for user in unverified_users:
# Check if this user is the only admin in their company
# We shouldn't delete them if they are, as it would orphan the company
if user.role in [Role.ADMIN, Role.SYSTEM_ADMIN]:
other_admins = User.query.filter(
and_(
User.company_id == user.company_id,
User.id != user.id,
User.role.in_([Role.ADMIN, Role.SYSTEM_ADMIN])
)
).count()
if other_admins == 0:
print(f"Skipping {user.username} (ID: {user.id}) - only admin in company {user.company_id}")
continue
if dry_run:
print(f"Would delete unverified user: {user.username} (ID: {user.id}, Email: {user.email}, Created: {user.created_at})")
deleted_count += 1
# Check if company would be deleted too (for dry run)
company = Company.query.get(user.company_id)
if company:
other_users = User.query.filter(
and_(
User.company_id == user.company_id,
User.id != user.id
)
).count()
if other_users == 0:
print(f" Would also delete empty company: {company.name} (ID: {company.id})")
else:
print(f"Deleting unverified user: {user.username} (ID: {user.id}, Email: {user.email}, Created: {user.created_at})")
# Log the deletion as a system event if SystemEvent exists
try:
SystemEvent.log_event(
event_type='user_deleted',
event_category='system',
description=f'Unverified user {user.username} deleted after 24 hours',
user_id=None, # No user context for system cleanup
company_id=user.company_id
)
except:
# SystemEvent might not exist, continue without logging
pass
# Check if the company should be deleted (if it was created with this user and has no other users)
company = Company.query.get(user.company_id)
if company:
other_users = User.query.filter(
and_(
User.company_id == user.company_id,
User.id != user.id
)
).count()
# Delete the user
db.session.delete(user)
deleted_count += 1
# If no other users, delete the company too
if other_users == 0:
print(f" Also deleting empty company: {company.name} (ID: {company.id})")
db.session.delete(company)
else:
# Delete the user even if company doesn't exist
db.session.delete(user)
deleted_count += 1
if not dry_run and deleted_count > 0:
db.session.commit()
print(f"\nSuccessfully deleted {deleted_count} unverified account(s)")
elif dry_run:
print(f"\nDry run: Would delete {deleted_count} unverified account(s)")
else:
print("No unverified accounts older than 24 hours found")
return deleted_count
def main():
"""Main function to handle command line arguments"""
import argparse
parser = argparse.ArgumentParser(description='Cleanup unverified user accounts older than 24 hours')
parser.add_argument('--dry-run', action='store_true',
help='Show what would be deleted without actually deleting')
parser.add_argument('--quiet', action='store_true',
help='Suppress output except for errors')
args = parser.parse_args()
if not args.quiet:
print(f"Starting cleanup of unverified accounts at {datetime.utcnow()}")
print("-" * 60)
try:
deleted_count = cleanup_unverified_accounts(dry_run=args.dry_run)
if not args.quiet:
print("-" * 60)
print(f"Cleanup completed at {datetime.utcnow()}")
# Exit with 0 for success
sys.exit(0)
except Exception as e:
print(f"Error during cleanup: {str(e)}", file=sys.stderr)
sys.exit(1)
if __name__ == '__main__':
main()

View File

@@ -2,4 +2,8 @@
# Run every hour at minute 0
0 * * * * cd /app && /usr/local/bin/python /app/cleanup_unverified_accounts.py --quiet >> /var/log/cron.log 2>&1
# Cron job for cleaning up inactive verified accounts (no logins, no time entries)
# Run monthly on the 1st at 2:00 AM, cleaning accounts older than 90 days
0 2 1 * * cd /app && /usr/local/bin/python /app/cleanup_inactive_accounts.py --min-age 90 --quiet >> /var/log/cron.log 2>&1
# Empty line required at the end