"""Context manager for basic directory watching. Includes a workaround for . """ from datetime import datetime, timedelta from pathlib import Path from time import sleep from typing import Callable, Self from utils.Logger import Log import tomllib, time from watchdog.events import FileSystemEvent, FileSystemEventHandler from watchdog.observers import Observer from utils.Scanner import scan from utils.integration.Discord import webhook from utils.integration.AI import ai_analyse t = time.time() with open("config.toml", "rb") as f: data = tomllib.load(f) paths = data['DETECTION']['watchdogPath'] if not isinstance(paths, list): paths = [paths] ignore_paths = data['DETECTION'].get('watchdogIgnorePath', []) ignore_files = data['DETECTION'].get('watchdogIgnoreFile', []) def s(input_dict): return [ {"name": key, "value": '\n'.join(' - ' + str(item) for item in items)} for key, items in input_dict.items() ] def c(d): c=0 for key in d: if isinstance(d[key], list): c += len(d[key]) return c class DirWatcher: """Run a function when a directory changes.""" min_cooldown = 0.1 def __init__( self, watch_dir: Path, interval: int = 0.2, cooldown: int = 0.1, ): if interval < self.min_cooldown: raise ValueError( f"Interval of {interval} seconds is less than the minimum cooldown of" f" {self.min_cooldown} seconds." ) if cooldown < self.min_cooldown: raise ValueError( f"Cooldown of {cooldown} seconds is less than the minimum cooldown of" f" {self.min_cooldown} seconds." ) self.watch_dir = watch_dir self.interval = interval self.cooldown = cooldown def __enter__(self) -> Self: self.observer = Observer() self.observer.schedule( ModifiedFileHandler(scan, self.cooldown), self.watch_dir, recursive=True ) Log.s(data['LANGUGAE']['english']['radarStarted'].format(str(round(time.time() - t, 5)))) self.observer.start() return self def __exit__(self, exc_type: Exception | None, *_) -> bool: if exc_type and exc_type is KeyboardInterrupt: self.observer.stop() handled_exception = True elif exc_type: handled_exception = False else: handled_exception = True self.observer.join() return handled_exception def run(self): """Check for changes on an interval.""" try: while True: sleep(self.interval) except KeyboardInterrupt: self.observer.stop() exit() exit() class ModifiedFileHandler(FileSystemEventHandler): """Handle modified files.""" def __init__(self, func: Callable[[FileSystemEvent], None], cooldown: int): self.cooldown = timedelta(seconds=cooldown) self.triggered_time = datetime.min def on_any_event(self, event): for ignore_path in ignore_paths: if event.src_path.startswith(ignore_path): return True for ignore_file in ignore_files: if event.src_path.endswith(ignore_file): return True if(event.src_path == "."): return True def on_modified(self, event: FileSystemEvent): try: if (datetime.now() - self.triggered_time) > self.cooldown: src = open(event.src_path, "r").read() if(event.src_path == "."): return Log.v(f"FILE MODF | {event.src_path}") r = scan(src) if r[0]: Log.s(f"Flagged {event.src_path}") analyse = ai_analyse(src) webhook(event.src_path, s(r[0]), f"Total Flagged Pattern: {str(c(r[0]))}\n\n{analyse}") self.triggered_time = datetime.now() except: pass def on_moved(self, event: FileSystemEvent): try: if (datetime.now() - self.triggered_time) > self.cooldown: Log.v(f"FILE MOV | {event.src_path} > {event.dest_path}") r = scan(event.src_path) if r[0]: Log.s(f"Flagged {event.src_path}") analyse = ai_analyse(event.src_path) webhook(event.src_path, s(r[0]), f"Total Flagged Pattern: {str(c(r[0]))}\n\n{analyse}") self.triggered_time = datetime.now() except: pass def on_deleted(self, event: FileSystemEvent): try: if (datetime.now() - self.triggered_time) > self.cooldown: Log.v(f"FILE DEL | {event.src_path}") self.triggered_time = datetime.now() except: pass def on_created(self, event: FileSystemEvent): try: print(1) if (datetime.now() - self.triggered_time) > self.cooldown: if event.is_directory: return None else: Log.v(f"file created: {event.src_path}") r = scan(event.src_path) if r[0]: Log.s(f"Flagged {event.src_path}") analyse = ai_analyse(event.src_path) webhook(event.src_path, s(r[0]), f"Total Flagged Pattern: {str(c(r[0]))}\n\n{analyse}") self.triggered_time = datetime.now() except: pass