anti-abuse/utils/WatchdogHandler.py
2025-03-25 14:06:27 +00:00

199 lines
6.5 KiB
Python

"""
CREDIT
Context manager for basic directory watching.
- <https://github.com/gorakhargosh/watchdog/issues/346>.
"""
from datetime import datetime, timedelta
from pathlib import Path
from time import sleep
import threading
import time
from typing import Callable, Self
from utils.Logger import Log
import tomllib
from watchdog.events import FileSystemEvent, FileSystemEventHandler
from watchdog.observers import Observer
from utils.Scanner import scan
from utils.integration.Discord import webhook
from utils.integration.AI import ai_analyse
t = time.time()
with open("config.toml", "rb") as f:
data = tomllib.load(f)
paths = data['DETECTION']['watchdogPath']
if not isinstance(paths, list):
paths = [paths]
ignore_paths = data['DETECTION'].get('watchdogIgnorePath', [])
ignore_files = data['DETECTION'].get('watchdogIgnoreFile', [])
def s(input_dict):
return [
{"name": key, "value": '\n'.join(' - ' + str(item) for item in items)}
for key, items in input_dict.items()
]
def c(d):
count = 0
for key in d:
if isinstance(d[key], list):
count += len(d[key])
return count
def analysis(event_path: str, file_content: str, flag_type: str):
"""
Process file events in a separate thread.
This function scans the file content, and if flagged,
performs AI analysis and sends a webhook notification.
"""
results = scan(file_content)
if results[0]:
Log.s(f"Flagged {event_path}")
analysis = ai_analyse(file_content)
msg = f"Total Flagged Pattern: {str(c(results[0]))}\n\n{analysis}"
webhook(event_path, s(results[0]), msg)
class DirWatcher:
"""Run a function when a directory changes."""
min_cooldown = 0.1
def __init__(
self,
watch_dir: Path,
interval: float = 0.2,
cooldown: float = 0.1,
):
if interval < self.min_cooldown:
raise ValueError(
f"Interval of {interval} seconds is less than the minimum cooldown of "
f"{self.min_cooldown} seconds."
)
if cooldown < self.min_cooldown:
raise ValueError(
f"Cooldown of {cooldown} seconds is less than the minimum cooldown of "
f"{self.min_cooldown} seconds."
)
self.watch_dir = watch_dir
self.interval = interval
self.cooldown = cooldown
def __enter__(self) -> Self:
self.observer = Observer()
self.observer.schedule(
ModifiedFileHandler(scan, self.cooldown), self.watch_dir, recursive=True
)
Log.s(data['LANGUGAE']['english']['novelLoaded'].format(str(round(time.time() - t, 5))))
self.observer.start()
return self
def __exit__(self, exc_type: Exception | None, *_) -> bool:
if exc_type and exc_type is KeyboardInterrupt:
self.observer.stop()
handled_exception = True
elif exc_type:
handled_exception = False
else:
handled_exception = True
self.observer.join()
return handled_exception
def run(self):
"""Check for changes on an interval."""
try:
while True:
sleep(self.interval)
except KeyboardInterrupt:
self.observer.stop()
exit()
exit()
class ModifiedFileHandler(FileSystemEventHandler):
"""Handle modified files using threading for processing."""
def __init__(self, func: Callable[[FileSystemEvent], None], cooldown: float):
self.cooldown = timedelta(seconds=cooldown)
self.triggered_time = datetime.min
def ignore_event(self, event: FileSystemEvent) -> bool:
for ignore_path in ignore_paths:
if event.src_path.startswith(ignore_path):
return True
for ignore_file in ignore_files:
if event.src_path.endswith(ignore_file):
return True
if event.src_path == ".":
return True
return False
def on_any_event(self, event: FileSystemEvent):
if self.ignore_event(event):
return True
def on_modified(self, event: FileSystemEvent):
if self.ignore_event(event):
return
if (datetime.now() - self.triggered_time) > self.cooldown:
try:
with open(event.src_path, "r") as f:
src = f.read()
Log.v(f"FILE MODF | {event.src_path}")
# Process in a separate thread
threading.Thread(target=analysis, args=(event.src_path, src, "modification")).start()
self.triggered_time = datetime.now()
except Exception:
pass
def on_moved(self, event: FileSystemEvent):
if self.ignore_event(event):
return
if (datetime.now() - self.triggered_time) > self.cooldown:
try:
Log.v(f"FILE MOV | {event.src_path} > {event.dest_path}")
# For moved events, you might choose to scan the original or destination file.
# Here, we'll scan the source path.
with open(event.src_path, "r") as f:
src = f.read()
threading.Thread(target=analysis, args=(event.src_path, src, "moved")).start()
self.triggered_time = datetime.now()
except Exception:
pass
def on_deleted(self, event: FileSystemEvent):
if self.ignore_event(event):
return
if (datetime.now() - self.triggered_time) > self.cooldown:
try:
Log.v(f"FILE DEL | {event.src_path}")
self.triggered_time = datetime.now()
except Exception:
pass
def on_created(self, event: FileSystemEvent):
if self.ignore_event(event):
return
if (datetime.now() - self.triggered_time) > self.cooldown:
try:
if event.is_directory:
return
else:
Log.v(f"file created: {event.src_path}")
with open(event.src_path, "r") as f:
content = f.read()
threading.Thread(target=analysis, args=(event.src_path, content, "creation")).start()
self.triggered_time = datetime.now()
except Exception:
pass