Compare commits

...

No commits in common. "main" and "20250325d" have entirely different histories.

12 changed files with 343 additions and 465 deletions

View File

@ -1,5 +1,3 @@
[BETA WARNING: SOME FEATURES MIGHT BE NOT RELEASED OR ARE NOT WORKING AS EXPECTED!]
![Novel](/Frame_12x.webp) ![Novel](/Frame_12x.webp)
# Novel, Anti-Abuse # Novel, Anti-Abuse
@ -13,7 +11,7 @@ Anti-Abuse is an ✨ FREE, Open-Sourced radar based on yara rules built for pter
2. Easily customizable by [Yara Rule](https://yara.readthedocs.io/en/stable/writingrules.html). 2. Easily customizable by [Yara Rule](https://yara.readthedocs.io/en/stable/writingrules.html).
3. Various Integrations(discord webhook, etc). 3. Various Integrations(discord webhook, etc).
4. Easy re-check action through AI-Based Analysis. 4. Easy re-check action through AI-Based Analysis.
5. Plugin feature to implement feature on your needs
## Installation ## Installation
Requirements: python, keyboard, brain Requirements: python, keyboard, brain

View File

@ -1,5 +1,5 @@
ver = "250326b-dev/plugin" ver = "250325d"
machineID = "node1" machineID = "node1"
#*************************************************# #*************************************************#
@ -10,7 +10,8 @@ machineID = "node1"
[LANGUGAE.english] [LANGUGAE.english]
novelStarted = "Novel's Anti-Abuse Started within - {}s." novelStarted = "Novel(Anti Abuse) Started within - {}s."
novelLoaded = "Novel(Anti Abuse) Loaded within - {}s."
#**************************************************# #**************************************************#
@ -21,12 +22,7 @@ novelStarted = "Novel's Anti-Abuse Started within - {}s."
[LOGS] [LOGS]
processStartMsg = true processStartMsg = true
flaggedNoti = true
fileModified = true
fileDeleted = true
fileMoved = true
fileCreated = true
#**************************************************# #**************************************************#
# # # #
@ -55,6 +51,7 @@ generate_models = ["llama-3.2-90b-vision-preview","llama-3.3-70b-versatile","lla
generate_endpoint = "http://IP:PORT/api/generate" # Can be empty if using groq generate_endpoint = "http://IP:PORT/api/generate" # Can be empty if using groq
use_groq = true use_groq = true
groq_api_token = "" # Get one at https://console.groq.com/keys groq_api_token = "" # Get one at https://console.groq.com/keys
# Example API key
prompt = "Analyze the given code and return an abuse score (0-10) with a brief reason. Example abuses: Crypto Mining, Shell Access, Nezha Proxy (VPN/Proxy usage), Disk Filling, Tor, DDoS, Abusive Resource Usage. Response format: '**5/10** <your reason>'. No extra messages." prompt = "Analyze the given code and return an abuse score (0-10) with a brief reason. Example abuses: Crypto Mining, Shell Access, Nezha Proxy (VPN/Proxy usage), Disk Filling, Tor, DDoS, Abusive Resource Usage. Response format: '**5/10** <your reason>'. No extra messages."
@ -63,4 +60,5 @@ prompt = "Analyze the given code and return an abuse score (0-10) with a brief r
enabled = true enabled = true
webhook_url = "" webhook_url = ""
# Example webhook
truncate_text = true # Used only if AI INTEGRATION is enabled, trunclates text if true to maxium allowed characters or when false splits in few webhook messages. truncate_text = true # Used only if AI INTEGRATION is enabled, trunclates text if true to maxium allowed characters or when false splits in few webhook messages.

45
core.py
View File

@ -1,45 +0,0 @@
import time, importlib, tomllib, os
from utils.Logger import Log
from utils.WatchdogHandler import DirWatcher
class PluginHandler:
def __init__(self):
self.t = time.time()
plugin_dir = './plugins'
if not os.path.isdir(plugin_dir):
Log.e(f"{plugin_dir} not found. Exiting...")
raise FileNotFoundError(f"Make sure '{plugin_dir}' is existing as plugin dir.")
self._plugins = []
for file in os.listdir(plugin_dir):
if file.endswith('.py'):
path = os.path.join(plugin_dir, file)
try:
spec = importlib.util.spec_from_file_location("plugin", path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
self._plugins.append(module.Plugin())
except (ImportError, AttributeError, SyntaxError) as e:
Log.e(f"[PLUGIN] \"{file}\": {e}")
with open("config.toml", "rb") as f:
self.data = tomllib.load(f)
self.path = self.data['DETECTION']['watchdogPath']
def app_run(self):
for plugin in self._plugins:
try:
Log.v(f"[PLUGIN] Loading \"{plugin.name}\" v{plugin.version}\"")
plugin.on_start()
except Exception as e:
Log.e(f"[PLUGIN] \"{plugin.name}\": {str(e)}")
with DirWatcher(self.path, interval=1, plugins=self._plugins) as watcher:
watcher.run()
Log.s(self.data['LANGUGAE']['english']['novelStarted'].format(str(round(time.time() - self.t, 1))))
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
exit()

91
main.py
View File

@ -1,28 +1,79 @@
#region Imports #region Imports
import tomllib import time, os, tomllib, sys
from plugin_base import PrivilegedPluginBase, SelfContainedPluginBase
from utils.Logger import Log from utils.Logger import Log
from core import PluginHandler from utils.WatchdogHandler import DirWatcher
from utils.Scanner import scan
#endregion #endregion
#region Initialize
t = time.time()
with open("config.toml", "rb") as f: with open("config.toml", "rb") as f:
data = tomllib.load(f) data = tomllib.load(f)
if __name__ == "__main__": Log.v(str(data))
Log.v(""" path = data['DETECTION']['watchdogPath']
o o 8
8b 8 8
8`b 8 .oPYo. o o .oPYo. 8
8 `b 8 8 8 Y. .P 8oooo8 8
8 `b8 8 8 `b..d' 8. 8
8 `8 `YooP' `YP' `Yooo' 8
..:::..:.....:::...:::.....:..
::::::::::::::::::::::::::::::
Product - ANTI-ABUSE
Release - {} Log.v("""
License - GNU GENERAL PUBLIC LICENSE, Version 3
o o 8
""".format(data['ver'])) 8b 8 8
PluginHandler().app_run() 8`b 8 .oPYo. o o .oPYo. 8
8 `b 8 8 8 Y. .P 8oooo8 8
8 `b8 8 8 `b..d' 8. 8
8 `8 `YooP' `YP' `Yooo' 8
..:::..:.....:::...:::.....:..
::::::::::::::::::::::::::::::
Product - ANTI-ABUSE
Release - {}
License - GNU GENERAL PUBLIC LICENSE, Version 3
""".format(data['ver']))
#endregion
def load_plugins(plugin_dir, scanner, logger, watchdog, config):
plugins = []
sys.path.insert(0, plugin_dir) # Add plugin directory to sys.path
for filename in os.listdir(plugin_dir):
if filename.endswith(".py") and filename != "__init__.py":
module_name = filename[:-3]
try:
module = __import__(module_name)
for attr in dir(module):
plugin_class = getattr(module, attr)
if isinstance(plugin_class, type):
if issubclass(plugin_class, PrivilegedPluginBase) and plugin_class is not PrivilegedPluginBase:
logger.s(f"Loaded privileged plugin {module_name}")
plugin_instance = plugin_class(module_name, scanner, logger, watchdog, config)
plugins.append(plugin_instance)
elif issubclass(plugin_class, SelfContainedPluginBase) and plugin_class is not SelfContainedPluginBase:
logger.s(f"Loaded self-contained plugin {module_name}")
restricted_scanner = lambda src: logger.e(f"Access denied to plugin {module_name}")
plugin_instance = plugin_class(module_name, restricted_scanner, logger, watchdog, config)
plugins.append(plugin_instance)
except Exception as e:
logger.e(f"Failed to load plugin {module_name}: {e}")
return plugins
if __name__ == "__main__":
logger = Log()
scanner = scan
watchdog = DirWatcher(path, interval=1)
logger.s(data['LANGUGAE']['english']['novelStarted'].format(str(round(time.time() - t, 1))))
try:
plugin_dir = "plugins"
plugins = load_plugins(plugin_dir, scanner, logger, watchdog, data)
for plugin in plugins:
plugin.execute()
with watchdog as watcher:
watcher.run()
except KeyboardInterrupt:
exit()

21
plugin_base.py Normal file
View File

@ -0,0 +1,21 @@
class PrivilegedPluginBase:
def __init__(self, name, scanner, logger, watchdog, config):
self.name = name
self.scanner = scanner
self.logger = logger
self.watchdog = watchdog
self.config = config
def execute(self, *args, **kwargs):
raise NotImplementedError("Privileged plugins must implement the execute method.")
class SelfContainedPluginBase:
def __init__(self, name, scanner, logger, watchdog, config):
self.name = name
self.scanner = scanner
self.logger = logger
self.watchdog = watchdog
self.config = config
def execute(self, *args, **kwargs):
raise NotImplementedError("Self-contained plugins must implement the execute method.")

View File

@ -1,80 +0,0 @@
from utils.Logger import Log
import inspect
def _get_plugin_name():
try:
# First try to get the caller's plugin name
for frame_record in inspect.stack():
frame = frame_record[0]
if 'self' in frame.f_locals:
instance = frame.f_locals['self']
# Check if this is a plugin instance with a name attribute
if hasattr(instance, 'name') and 'plugins' in frame.f_globals.get('__file__', ''):
return instance.name
# If we couldn't find a plugin name in the call stack, check if we're being called by another plugin
for frame_record in inspect.stack():
module = inspect.getmodule(frame_record[0])
if module and hasattr(module, '__file__') and 'plugins' in module.__file__:
# Extract the plugin name from the filename
return "Called from " + module.__name__.split('.')[-1]
except Exception as e:
return f"Error: {str(e)}"
return "Unknown"
class Plugin:
def __init__(self):
self.version = "1.0.0"
self.name = "Example - Main"
def demo(self, *args, **kwargs):
Log.v(f"This is \"demo\" function. I am called by \"{_get_plugin_name()}\" with the argument received: args={args}, kwargs={kwargs}")
def on_start(self, *args, **kwargs):
Log.v("="*10)
Log.v("")
Log.v("Hello, this is an example plugin!")
Log.v("You can find this plugin, and disable by removing this plugin at plugins/example.py")
Log.v("With plugins, you can implement your own integrations, or features.")
Log.v("For more informations, please check our documentation!")
Log.v("")
Log.v("="*10)
"""
These codeblocks below are available events.
You can implement your own events by using the examples below.
on_created() : Called when a file is created.
on_deleted() : Called when a file is deleted.
on_modified() : Called when a file is modified.
on_moved() : Called when a file is moved.
on_scan() : Called when a scan is started.
on_scan_completed() : Called when a scan is completed.
on_ai_analysis_completed() : Called when an AI analysis is completed.
Good luck, have fun!
"""
# def on_created(self, *args, **kwargs):
# Log.v(f"File created args={args}, kwargs={kwargs}")
# def on_deleted(self, *args, **kwargs):
# Log.v(f"File deleted args={args}, kwargs={kwargs}")
# def on_modified(self, *args, **kwargs):
# Log.v(f"File modified args={args}, kwargs={kwargs}")
# def on_moved(self, *args, **kwargs):
# Log.v(f"File moved args={args}, kwargs={kwargs}")
# def on_scan(self, *args, **kwargs):
# Log.v(f"Scan started args={args}, kwargs={kwargs}")
# def on_scan_completed(self, *args, **kwargs):
# Log.v(f"Scan completed args={args}, kwargs={kwargs}")
# def on_ai_analysis_completed(self, *args, **kwargs):
# Log.v(f"AI analysis completed args={args}, kwargs={kwargs}")

View File

@ -1,17 +0,0 @@
from utils.Logger import Log
from plugins.example_1 import Plugin as example1
class Plugin:
def __init__(self):
self.version = "1.0.0"
self.name = "Example - Call Other Plugin"
def on_start(self, *args, **kwargs):
Log.v("="*10)
Log.v("")
Log.v("Hello, this is \"Example - Call Other Plugin!\"")
Log.v("This plugin will call example_1's \"demo\" function with arguments.")
Log.v("")
Log.v("="*10)
example1().demo(1,2,3, a=2)

View File

@ -0,0 +1,22 @@
from plugin_base import PrivilegedPluginBase
class PrivilegedPlugin(PrivilegedPluginBase):
def execute(self):
self.logger.s(f"Executing {self.name} privileged plugin")
# Replace the module-level scan function
def new_scan(src):
self.logger.s("This is the new scan function from the privileged plugin.")
# Custom scan logic here
return {}, {}
# Replace the original scan function globally
global scan
scan = new_scan
# Now, when scan is called anywhere in the module, it will use the new_scan function
matches, errors = scan("some file content")
if matches:
self.logger.s(f"Matches found: {matches}")
if errors:
self.logger.e(f"Errors: {errors}")

View File

@ -0,0 +1,22 @@
from plugin_base import PrivilegedPluginBase
class PrivilegedPlugin(PrivilegedPluginBase):
def execute(self):
self.logger.s(f"Executing {self.name} privileged plugin")
# Replace the module-level scan function
def new_scan(src):
self.logger.s("This is the new scan function from the privileged plugin.")
# Custom scan logic here
return {}, {}
# Replace the original scan function globally
global scan
scan = new_scan
# Now, when scan is called anywhere in the module, it will use the new_scan function
matches, errors = scan("some file content")
if matches:
self.logger.s(f"Matches found: {matches}")
if errors:
self.logger.e(f"Errors: {errors}")

View File

@ -15,9 +15,6 @@ rule CHINESE_NEZHA_ARGO {
$a13 = "Server\x20is\x20running\x20on\x20port\x20" $a13 = "Server\x20is\x20running\x20on\x20port\x20"
$a14 = "nysteria2" $a14 = "nysteria2"
$a15 = "openssl req" $a15 = "openssl req"
$a16 = "hysteria2"
$a17 = "NEZHA" nocase
$a18 = "babama1001980"
condition: condition:
2 of ($a*) 2 of ($a*)
} }

View File

@ -1,48 +1,18 @@
from pystyle import Colors, Colorate from pystyle import Colors, Colorate
from datetime import datetime from datetime import datetime
import time, os, sys, inspect import time
class Log: class Log:
@staticmethod
def _get_plugin_name():
try:
for frame_record in inspect.stack():
frame = frame_record[0]
if 'self' in frame.f_locals:
instance = frame.f_locals['self']
# Check if this is a plugin instance with a name attribute
if hasattr(instance, 'name') and 'plugins' in frame.f_globals.get('__file__', ''):
return instance.name
except: return None
return None
@staticmethod @staticmethod
def s(text): # success def s(text): # success
time_now = datetime.fromtimestamp(time.time()).strftime('%H:%M') time_now = datetime.fromtimestamp(time.time()).strftime('%H:%M')
plugin_name = Log._get_plugin_name()
if plugin_name:
text = f"[{plugin_name}] {text}"
print(Colors.gray + time_now + " " + Colorate.Horizontal(Colors.green_to_cyan, "SUCCESS", 1) + Colors.gray + " > " + Colors.light_gray + text + Colors.reset) print(Colors.gray + time_now + " " + Colorate.Horizontal(Colors.green_to_cyan, "SUCCESS", 1) + Colors.gray + " > " + Colors.light_gray + text + Colors.reset)
@staticmethod @staticmethod
def e(text): # error def e(text): # error
time_now = datetime.fromtimestamp(time.time()).strftime('%H:%M') time_now = datetime.fromtimestamp(time.time()).strftime('%H:%M')
plugin_name = Log._get_plugin_name()
if plugin_name:
text = f"[{plugin_name}] {text}"
print(Colors.gray + time_now + " " + Colorate.Horizontal(Colors.red_to_purple, " ERROR ", 1) + Colors.gray + " > " + Colors.light_gray + text + Colors.reset) print(Colors.gray + time_now + " " + Colorate.Horizontal(Colors.red_to_purple, " ERROR ", 1) + Colors.gray + " > " + Colors.light_gray + text + Colors.reset)
@staticmethod @staticmethod
def v(data): # verbose def v(data): # verbose
time_now = datetime.fromtimestamp(time.time()).strftime('%H:%M') time_now = datetime.fromtimestamp(time.time()).strftime('%H:%M')
plugin_name = Log._get_plugin_name() print(Colors.gray + time_now + " " + Colorate.Horizontal(Colors.blue_to_white, "VERBOSE", 1) + Colors.gray + " > " + Colors.light_gray + data + Colors.reset)
if plugin_name:
data = f"[{plugin_name}] {data}"
print(Colors.gray + time_now + " " + Colorate.Horizontal(Colors.blue_to_white, "VERBOSE", 1) + Colors.gray + " > " + Colors.light_gray + data + Colors.reset)

View File

@ -1,257 +1,198 @@
""" """
CREDIT CREDIT
Context manager for basic directory watching. Context manager for basic directory watching.
- <https://github.com/gorakhargosh/watchdog/issues/346>. - <https://github.com/gorakhargosh/watchdog/issues/346>.
""" """
from datetime import datetime, timedelta from datetime import datetime, timedelta
from pathlib import Path from pathlib import Path
from time import sleep from time import sleep
import threading import threading
import time import time
from typing import Callable, Self from typing import Callable, Self
from utils.Logger import Log from utils.Logger import Log
import tomllib import tomllib
from watchdog.events import FileSystemEvent, FileSystemEventHandler from watchdog.events import FileSystemEvent, FileSystemEventHandler
from watchdog.observers import Observer from watchdog.observers import Observer
from utils.Scanner import scan from utils.Scanner import scan
from utils.integration.Discord import webhook from utils.integration.Discord import webhook
from utils.integration.AI import ai_analyse from utils.integration.AI import ai_analyse
t = time.time() t = time.time()
with open("config.toml", "rb") as f: with open("config.toml", "rb") as f:
data = tomllib.load(f) data = tomllib.load(f)
paths = data['DETECTION']['watchdogPath'] paths = data['DETECTION']['watchdogPath']
if not isinstance(paths, list): if not isinstance(paths, list):
paths = [paths] paths = [paths]
ignore_paths = data['DETECTION'].get('watchdogIgnorePath', []) ignore_paths = data['DETECTION'].get('watchdogIgnorePath', [])
ignore_files = data['DETECTION'].get('watchdogIgnoreFile', []) ignore_files = data['DETECTION'].get('watchdogIgnoreFile', [])
def s(input_dict): def s(input_dict):
return [ return [
{"name": key, "value": '\n'.join(' - ' + str(item) for item in items)} {"name": key, "value": '\n'.join(' - ' + str(item) for item in items)}
for key, items in input_dict.items() for key, items in input_dict.items()
] ]
def c(d): def c(d):
count = 0 count = 0
for key in d: for key in d:
if isinstance(d[key], list): if isinstance(d[key], list):
count += len(d[key]) count += len(d[key])
return count return count
def analysis(event_path: str, file_content: str, flag_type: str): def analysis(event_path: str, file_content: str, flag_type: str):
""" """
Process file events in a separate thread. Process file events in a separate thread.
This function scans the file content, and if flagged, This function scans the file content, and if flagged,
performs AI analysis and sends a webhook notification. performs AI analysis and sends a webhook notification.
""" """
# Notify plugins that scan is starting results = scan(file_content)
for plugin in ModifiedFileHandler.active_plugins: if results[0]:
try: Log.s(f"Flagged {event_path}")
if hasattr(plugin, 'on_scan') and callable(plugin.on_scan): analysis = ai_analyse(file_content)
plugin.on_scan(event_path, file_content, flag_type) msg = f"Total Flagged Pattern: {str(c(results[0]))}\n\n{analysis}"
except Exception as e: webhook(event_path, s(results[0]), msg)
Log.e(f"{plugin.name}: {str(e)}")
results = scan(file_content) class DirWatcher:
"""Run a function when a directory changes."""
# Notify plugins that scan is completed
for plugin in ModifiedFileHandler.active_plugins: min_cooldown = 0.1
try:
if hasattr(plugin, 'on_scan_completed') and callable(plugin.on_scan_completed): def __init__(
plugin.on_scan_completed(event_path, file_content, flag_type, results) self,
except Exception as e: watch_dir: Path,
Log.e(f"{plugin.name}: {str(e)}") interval: float = 0.2,
cooldown: float = 0.1,
if results[0]: ):
Log.s(f"Flagged {event_path}") if interval < self.min_cooldown:
analysis_result = ai_analyse(file_content) raise ValueError(
f"Interval of {interval} seconds is less than the minimum cooldown of "
# Notify plugins that AI analysis is completed f"{self.min_cooldown} seconds."
for plugin in ModifiedFileHandler.active_plugins: )
try: if cooldown < self.min_cooldown:
if hasattr(plugin, 'on_ai_analysis_completed') and callable(plugin.on_ai_analysis_completed): raise ValueError(
plugin.on_ai_analysis_completed(event_path, file_content, flag_type, results, analysis_result) f"Cooldown of {cooldown} seconds is less than the minimum cooldown of "
except Exception as e: f"{self.min_cooldown} seconds."
Log.e(f"{plugin.name}: {str(e)}") )
self.watch_dir = watch_dir
msg = f"Total Flagged Pattern: {str(c(results[0]))}\n\n{analysis_result}" self.interval = interval
webhook(event_path, s(results[0]), msg) self.cooldown = cooldown
def __enter__(self) -> Self:
class DirWatcher: self.observer = Observer()
"""Run a function when a directory changes.""" self.observer.schedule(
ModifiedFileHandler(scan, self.cooldown), self.watch_dir, recursive=True
min_cooldown = 0.1 )
def __init__( Log.s(data['LANGUGAE']['english']['novelLoaded'].format(str(round(time.time() - t, 5))))
self, self.observer.start()
watch_dir: Path, return self
interval: float = 0.2,
cooldown: float = 0.1, def __exit__(self, exc_type: Exception | None, *_) -> bool:
plugins=None if exc_type and exc_type is KeyboardInterrupt:
): self.observer.stop()
if interval < self.min_cooldown: handled_exception = True
raise ValueError( elif exc_type:
f"Interval of {interval} seconds is less than the minimum cooldown of " handled_exception = False
f"{self.min_cooldown} seconds." else:
) handled_exception = True
if cooldown < self.min_cooldown: self.observer.join()
raise ValueError( return handled_exception
f"Cooldown of {cooldown} seconds is less than the minimum cooldown of "
f"{self.min_cooldown} seconds." def run(self):
) """Check for changes on an interval."""
self.watch_dir = watch_dir try:
self.interval = interval while True:
self.cooldown = cooldown sleep(self.interval)
# Store the plugins passed from PluginHandler except KeyboardInterrupt:
self.plugins = plugins or [] self.observer.stop()
exit()
def __enter__(self) -> Self: exit()
self.observer = Observer()
self.observer.schedule(
ModifiedFileHandler(scan, self.cooldown, self.plugins), self.watch_dir, recursive=True class ModifiedFileHandler(FileSystemEventHandler):
) """Handle modified files using threading for processing."""
Log.s(data['LANGUGAE']['english']['novelStarted'].format(str(round(time.time() - t, 5)))) def __init__(self, func: Callable[[FileSystemEvent], None], cooldown: float):
self.observer.start() self.cooldown = timedelta(seconds=cooldown)
return self self.triggered_time = datetime.min
def __exit__(self, exc_type: Exception | None, *_) -> bool: def ignore_event(self, event: FileSystemEvent) -> bool:
if exc_type and exc_type is KeyboardInterrupt: for ignore_path in ignore_paths:
self.observer.stop() if event.src_path.startswith(ignore_path):
handled_exception = True return True
elif exc_type: for ignore_file in ignore_files:
handled_exception = False if event.src_path.endswith(ignore_file):
else: return True
handled_exception = True if event.src_path == ".":
self.observer.join() return True
return handled_exception return False
def run(self): def on_any_event(self, event: FileSystemEvent):
"""Check for changes on an interval.""" if self.ignore_event(event):
try: return True
while True:
sleep(self.interval) def on_modified(self, event: FileSystemEvent):
except KeyboardInterrupt: if self.ignore_event(event):
self.observer.stop() return
exit() if (datetime.now() - self.triggered_time) > self.cooldown:
exit() try:
with open(event.src_path, "r") as f:
src = f.read()
class ModifiedFileHandler(FileSystemEventHandler): Log.v(f"FILE MODF | {event.src_path}")
"""Handle modified files using threading for processing.""" # Process in a separate thread
threading.Thread(target=analysis, args=(event.src_path, src, "modification")).start()
# Class variable to store plugins for access from the analysis function self.triggered_time = datetime.now()
active_plugins = [] except Exception:
pass
def __init__(self, func: Callable[[FileSystemEvent], None], cooldown: float, plugins=None):
self.cooldown = timedelta(seconds=cooldown) def on_moved(self, event: FileSystemEvent):
self.triggered_time = datetime.min if self.ignore_event(event):
self.plugins = plugins or [] return
# Update the class variable with the current plugins if (datetime.now() - self.triggered_time) > self.cooldown:
ModifiedFileHandler.active_plugins = self.plugins try:
Log.v(f"FILE MOV | {event.src_path} > {event.dest_path}")
def trigger(self, event_type, event): # For moved events, you might choose to scan the original or destination file.
"""Notify all plugins about the event""" # Here, we'll scan the source path.
for plugin in self.plugins: with open(event.src_path, "r") as f:
try: src = f.read()
method = getattr(plugin, f"on_{event_type}", None) threading.Thread(target=analysis, args=(event.src_path, src, "moved")).start()
if method and callable(method): self.triggered_time = datetime.now()
# Call the plugin's event handler method except Exception:
method(event.src_path) pass
except Exception as e:
Log.e(f"Error calling plugin {plugin.name} for {event_type} event: {str(e)}") def on_deleted(self, event: FileSystemEvent):
if self.ignore_event(event):
def ignore_event(self, event: FileSystemEvent) -> bool: return
for ignore_path in ignore_paths: if (datetime.now() - self.triggered_time) > self.cooldown:
if event.src_path.startswith(ignore_path): try:
return True Log.v(f"FILE DEL | {event.src_path}")
for ignore_file in ignore_files: self.triggered_time = datetime.now()
if event.src_path.endswith(ignore_file): except Exception:
return True pass
if event.src_path == ".":
return True def on_created(self, event: FileSystemEvent):
return False if self.ignore_event(event):
return
def on_any_event(self, event: FileSystemEvent): if (datetime.now() - self.triggered_time) > self.cooldown:
if self.ignore_event(event): try:
self.trigger("any_event", event) if event.is_directory:
return True return
else:
def on_modified(self, event: FileSystemEvent): Log.v(f"file created: {event.src_path}")
if self.ignore_event(event): with open(event.src_path, "r") as f:
return content = f.read()
if (datetime.now() - self.triggered_time) > self.cooldown: threading.Thread(target=analysis, args=(event.src_path, content, "creation")).start()
try: self.triggered_time = datetime.now()
with open(event.src_path, "r") as f: except Exception:
src = f.read() pass
if data['LOGS']['fileModified']:
Log.v(f"FILE MODF | {event.src_path}")
threading.Thread(target=analysis, args=(event.src_path, src, "modification")).start()
self.trigger("modified", event)
self.triggered_time = datetime.now()
except Exception:
pass
def on_moved(self, event: FileSystemEvent):
if self.ignore_event(event):
return
if (datetime.now() - self.triggered_time) > self.cooldown:
try:
if data['LOGS']['fileMoved']:
Log.v(f"FILE MOV | {event.src_path} > {event.dest_path}")
with open(event.src_path, "r") as f:
src = f.read()
threading.Thread(target=analysis, args=(event.src_path, src, "moved")).start()
self.trigger("moved", event)
self.triggered_time = datetime.now()
except Exception:
pass
def on_deleted(self, event: FileSystemEvent):
if self.ignore_event(event):
return
if (datetime.now() - self.triggered_time) > self.cooldown:
try:
if data['LOGS']['fileDeleted']:
Log.v(f"FILE DEL | {event.src_path}")
self.trigger("deleted", event)
self.triggered_time = datetime.now()
except Exception:
pass
def on_created(self, event: FileSystemEvent):
if self.ignore_event(event):
return
if (datetime.now() - self.triggered_time) > self.cooldown:
try:
if event.is_directory:
return
else:
if data['LOGS']['fileCreated']:
Log.v(f"file created: {event.src_path}")
with open(event.src_path, "r") as f:
content = f.read()
threading.Thread(target=analysis, args=(event.src_path, content, "creation")).start()
self.trigger("created", event)
self.triggered_time = datetime.now()
except Exception:
pass