This repository has been archived on 2025-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
Novel/utils/WatchdogHandler.py

199 lines
6.5 KiB
Python
Raw Normal View History

2025-03-24 20:07:14 +00:00
"""
CREDIT
2025-03-24 18:03:56 +00:00
2025-03-24 20:07:14 +00:00
Context manager for basic directory watching.
- <https://github.com/gorakhargosh/watchdog/issues/346>.
2025-03-24 18:03:56 +00:00
"""
from datetime import datetime, timedelta
from pathlib import Path
from time import sleep
2025-03-24 20:07:14 +00:00
import threading
import time
2025-03-24 18:03:56 +00:00
from typing import Callable, Self
from utils.Logger import Log
2025-03-24 20:07:14 +00:00
import tomllib
2025-03-24 18:03:56 +00:00
from watchdog.events import FileSystemEvent, FileSystemEventHandler
from watchdog.observers import Observer
from utils.Scanner import scan
from utils.integration.Discord import webhook
from utils.integration.AI import ai_analyse
t = time.time()
with open("config.toml", "rb") as f:
data = tomllib.load(f)
paths = data['DETECTION']['watchdogPath']
if not isinstance(paths, list):
paths = [paths]
ignore_paths = data['DETECTION'].get('watchdogIgnorePath', [])
ignore_files = data['DETECTION'].get('watchdogIgnoreFile', [])
2025-03-24 20:07:14 +00:00
2025-03-24 18:03:56 +00:00
def s(input_dict):
return [
{"name": key, "value": '\n'.join(' - ' + str(item) for item in items)}
for key, items in input_dict.items()
]
2025-03-24 20:07:14 +00:00
2025-03-24 18:03:56 +00:00
def c(d):
2025-03-24 20:07:14 +00:00
count = 0
2025-03-24 18:03:56 +00:00
for key in d:
if isinstance(d[key], list):
2025-03-24 20:07:14 +00:00
count += len(d[key])
return count
def analysis(event_path: str, file_content: str, flag_type: str):
"""
Process file events in a separate thread.
This function scans the file content, and if flagged,
performs AI analysis and sends a webhook notification.
"""
results = scan(file_content)
if results[0]:
Log.s(f"Flagged {event_path}")
analysis = ai_analyse(file_content)
msg = f"Total Flagged Pattern: {str(c(results[0]))}\n\n{analysis}"
webhook(event_path, s(results[0]), msg)
2025-03-24 18:03:56 +00:00
class DirWatcher:
"""Run a function when a directory changes."""
min_cooldown = 0.1
def __init__(
self,
watch_dir: Path,
2025-03-24 20:07:14 +00:00
interval: float = 0.2,
cooldown: float = 0.1,
2025-03-24 18:03:56 +00:00
):
if interval < self.min_cooldown:
raise ValueError(
2025-03-24 20:07:14 +00:00
f"Interval of {interval} seconds is less than the minimum cooldown of "
f"{self.min_cooldown} seconds."
2025-03-24 18:03:56 +00:00
)
if cooldown < self.min_cooldown:
raise ValueError(
2025-03-24 20:07:14 +00:00
f"Cooldown of {cooldown} seconds is less than the minimum cooldown of "
f"{self.min_cooldown} seconds."
2025-03-24 18:03:56 +00:00
)
self.watch_dir = watch_dir
self.interval = interval
self.cooldown = cooldown
def __enter__(self) -> Self:
self.observer = Observer()
self.observer.schedule(
ModifiedFileHandler(scan, self.cooldown), self.watch_dir, recursive=True
)
2025-03-24 20:07:14 +00:00
Log.s(data['LANGUGAE']['english']['novelStarted'].format(str(round(time.time() - t, 5))))
2025-03-24 18:03:56 +00:00
self.observer.start()
return self
def __exit__(self, exc_type: Exception | None, *_) -> bool:
if exc_type and exc_type is KeyboardInterrupt:
self.observer.stop()
handled_exception = True
elif exc_type:
handled_exception = False
else:
handled_exception = True
self.observer.join()
return handled_exception
def run(self):
"""Check for changes on an interval."""
try:
while True:
sleep(self.interval)
except KeyboardInterrupt:
self.observer.stop()
exit()
exit()
class ModifiedFileHandler(FileSystemEventHandler):
2025-03-24 20:07:14 +00:00
"""Handle modified files using threading for processing."""
2025-03-24 18:03:56 +00:00
2025-03-24 20:07:14 +00:00
def __init__(self, func: Callable[[FileSystemEvent], None], cooldown: float):
2025-03-24 18:03:56 +00:00
self.cooldown = timedelta(seconds=cooldown)
self.triggered_time = datetime.min
2025-03-24 20:07:14 +00:00
def ignore_event(self, event: FileSystemEvent) -> bool:
2025-03-24 18:03:56 +00:00
for ignore_path in ignore_paths:
if event.src_path.startswith(ignore_path):
return True
for ignore_file in ignore_files:
if event.src_path.endswith(ignore_file):
return True
2025-03-24 20:07:14 +00:00
if event.src_path == ".":
2025-03-24 18:03:56 +00:00
return True
2025-03-24 20:07:14 +00:00
return False
2025-03-24 18:03:56 +00:00
2025-03-24 20:07:14 +00:00
def on_any_event(self, event: FileSystemEvent):
if self.ignore_event(event):
return True
2025-03-24 18:03:56 +00:00
def on_modified(self, event: FileSystemEvent):
2025-03-24 20:07:14 +00:00
if self.ignore_event(event):
return
if (datetime.now() - self.triggered_time) > self.cooldown:
try:
with open(event.src_path, "r") as f:
src = f.read()
2025-03-24 18:03:56 +00:00
Log.v(f"FILE MODF | {event.src_path}")
2025-03-24 20:07:14 +00:00
# Process in a separate thread
threading.Thread(target=analysis, args=(event.src_path, src, "modification")).start()
2025-03-24 18:03:56 +00:00
self.triggered_time = datetime.now()
2025-03-24 20:07:14 +00:00
except Exception:
pass
2025-03-24 18:03:56 +00:00
def on_moved(self, event: FileSystemEvent):
2025-03-24 20:07:14 +00:00
if self.ignore_event(event):
return
if (datetime.now() - self.triggered_time) > self.cooldown:
try:
2025-03-24 18:03:56 +00:00
Log.v(f"FILE MOV | {event.src_path} > {event.dest_path}")
2025-03-24 20:07:14 +00:00
# For moved events, you might choose to scan the original or destination file.
# Here, we'll scan the source path.
with open(event.src_path, "r") as f:
src = f.read()
threading.Thread(target=analysis, args=(event.src_path, src, "moved")).start()
2025-03-24 18:03:56 +00:00
self.triggered_time = datetime.now()
2025-03-24 20:07:14 +00:00
except Exception:
pass
2025-03-24 18:03:56 +00:00
def on_deleted(self, event: FileSystemEvent):
2025-03-24 20:07:14 +00:00
if self.ignore_event(event):
return
if (datetime.now() - self.triggered_time) > self.cooldown:
try:
2025-03-24 18:03:56 +00:00
Log.v(f"FILE DEL | {event.src_path}")
self.triggered_time = datetime.now()
2025-03-24 20:07:14 +00:00
except Exception:
pass
2025-03-24 18:03:56 +00:00
def on_created(self, event: FileSystemEvent):
2025-03-24 20:07:14 +00:00
if self.ignore_event(event):
return
if (datetime.now() - self.triggered_time) > self.cooldown:
try:
2025-03-24 18:03:56 +00:00
if event.is_directory:
2025-03-24 20:07:14 +00:00
return
else:
2025-03-24 18:03:56 +00:00
Log.v(f"file created: {event.src_path}")
2025-03-24 20:07:14 +00:00
with open(event.src_path, "r") as f:
content = f.read()
threading.Thread(target=analysis, args=(event.src_path, content, "creation")).start()
self.triggered_time = datetime.now()
except Exception:
pass