mirror of
https://gitlab.science.ru.nl/technicie/MarietjeDjango.git
synced 2025-12-13 19:02:21 +01:00
91 lines
3.5 KiB
Python
91 lines
3.5 KiB
Python
from datetime import datetime, timedelta
|
|
|
|
from django.core.management.base import BaseCommand
|
|
from django.contrib.auth import get_user_model
|
|
from django.conf import settings
|
|
from django.db.models import Q
|
|
from django.utils.timezone import get_default_timezone
|
|
|
|
from queues.models import PlaylistSong
|
|
from songs.models import Song
|
|
|
|
_IMPORTANT_USERNAMES = ["root", "admin", "postmaster", "dsprenkels", "gmulder", "bwesterb"]
|
|
|
|
|
|
class Command(BaseCommand):
|
|
help = "Get the list of users that has not logged in for quite a while"
|
|
|
|
def add_arguments(self, parser):
|
|
parser.add_argument(
|
|
"--purge", action="store_const", default=False, const=True, help="interactively purge the old users"
|
|
)
|
|
parser.add_argument(
|
|
"days_old",
|
|
nargs="?",
|
|
type=float,
|
|
default=365.25,
|
|
help="amount of days after which a user is considered old",
|
|
)
|
|
|
|
def handle(self, purge, *args, **kwargs):
|
|
if purge:
|
|
return self.handle_purge_users(*args, **kwargs)
|
|
else:
|
|
return self.handle_get_users(*args, **kwargs)
|
|
|
|
def handle_purge_users(self, days_old, *args, **kwargs):
|
|
users = get_old_users(days_old).all()
|
|
if not users:
|
|
self.stdout.write(self.style.NOTICE("No users to be deleted"))
|
|
return
|
|
|
|
self.stdout.write("{} {} {}\n".format("username".ljust(19), "name".ljust(29), "last_login"))
|
|
for user in users:
|
|
self.stdout.write("{} {} {}\n".format(user.username.ljust(19), user.name.ljust(29), user.last_login))
|
|
self.stdout.write("\n")
|
|
self.stdout.write(self.style.WARNING("I will be removing {} users, please check".format(len(users))))
|
|
confirmation = input("Type 'YES' to confirm: ")
|
|
|
|
for i in range(1, 3):
|
|
if confirmation == "YES":
|
|
break
|
|
confirmation = input("Please type 'YES' to confirm (or ^C to abort): ")
|
|
else:
|
|
self.stdout.write(self.style.ERROR("Aborting purge operation after {} prompts".format(i + 1)))
|
|
return
|
|
|
|
deleted = 0
|
|
for i, user in enumerate(users):
|
|
print("[{: 4.0f}% ] {}".format(100 * i / len(users), user))
|
|
|
|
if user.username in _IMPORTANT_USERNAMES:
|
|
self.stdout.write(self.style.WARNING("Not deleting user '{}': username looks important".format(user)))
|
|
continue
|
|
|
|
if Song.objects.filter(deleted=False, user=user).count() > 500:
|
|
self.stdout.write(self.style.WARNING("Not deleting user '{}': has high upload score".format(user)))
|
|
continue
|
|
|
|
if PlaylistSong.objects.filter(user=user).count() > 1000:
|
|
self.stdout.write(self.style.WARNING("Not deleting user '{}': has high queue score".format(user)))
|
|
continue
|
|
|
|
user.delete()
|
|
deleted += 1
|
|
self.stdout.write(self.style.SUCCESS("Deleted {} users".format(deleted)))
|
|
|
|
def handle_get_users(self, days_old, *args, **kwargs):
|
|
for user in get_old_users(days_old):
|
|
self.stdout.write("{}\n".format(user))
|
|
|
|
|
|
def get_old_users(days_old):
|
|
"""Return a queryset with all users older than an amount of days"""
|
|
User = get_user_model()
|
|
tz = get_default_timezone()
|
|
return (
|
|
User.objects.filter(is_staff=False, is_superuser=False, is_active=True)
|
|
.filter(Q(last_login__lt=datetime.now(tz) - timedelta(days_old)) | Q(last_login=None))
|
|
.order_by("username")
|
|
)
|