From 0798ee31372ac8c7484916bf7831fe225ff54dbc Mon Sep 17 00:00:00 2001 From: root Date: Tue, 25 Mar 2025 14:06:27 +0000 Subject: [PATCH] Added plugins --- config.toml | 3 ++ main.py | 49 +++++++++++++++++++++++++++----- plugin_base.py | 21 ++++++++++++++ plugins/privileged_plugin.py | 22 ++++++++++++++ plugins/self_contained_plugin.py | 22 ++++++++++++++ utils/WatchdogHandler.py | 2 +- 6 files changed, 111 insertions(+), 8 deletions(-) create mode 100644 plugin_base.py create mode 100644 plugins/privileged_plugin.py create mode 100644 plugins/self_contained_plugin.py diff --git a/config.toml b/config.toml index 38f41d2..8d0a7ae 100644 --- a/config.toml +++ b/config.toml @@ -11,6 +11,7 @@ machineID = "node1" [LANGUGAE.english] novelStarted = "Novel(Anti Abuse) Started within - {}s." +novelLoaded = "Novel(Anti Abuse) Loaded within - {}s." #**************************************************# @@ -50,6 +51,7 @@ generate_models = ["llama-3.2-90b-vision-preview","llama-3.3-70b-versatile","lla generate_endpoint = "http://IP:PORT/api/generate" # Can be empty if using groq use_groq = true groq_api_token = "" # Get one at https://console.groq.com/keys +# Example API key prompt = "Analyze the given code and return an abuse score (0-10) with a brief reason. Example abuses: Crypto Mining, Shell Access, Nezha Proxy (VPN/Proxy usage), Disk Filling, Tor, DDoS, Abusive Resource Usage. Response format: '**5/10** '. No extra messages." @@ -58,4 +60,5 @@ prompt = "Analyze the given code and return an abuse score (0-10) with a brief r enabled = true webhook_url = "" +# Example webhook truncate_text = true # Used only if AI INTEGRATION is enabled, trunclates text if true to maxium allowed characters or when false splits in few webhook messages. diff --git a/main.py b/main.py index ebd09ca..92e8a0e 100644 --- a/main.py +++ b/main.py @@ -1,8 +1,10 @@ #region Imports -import time, os, tomllib +import time, os, tomllib, sys +from plugin_base import PrivilegedPluginBase, SelfContainedPluginBase from utils.Logger import Log from utils.WatchdogHandler import DirWatcher +from utils.Scanner import scan #endregion #region Initialize @@ -32,13 +34,46 @@ Log.v(""" """.format(data['ver'])) #endregion -if __name__ == "__main__": - with DirWatcher(path, interval=1) as watcher: - watcher.run() +def load_plugins(plugin_dir, scanner, logger, watchdog, config): + plugins = [] + sys.path.insert(0, plugin_dir) # Add plugin directory to sys.path - Log.s(data['LANGUGAE']['english']['novelStarted'].format(str(round(time.time() - t, 1)))) + for filename in os.listdir(plugin_dir): + if filename.endswith(".py") and filename != "__init__.py": + module_name = filename[:-3] + try: + module = __import__(module_name) + for attr in dir(module): + plugin_class = getattr(module, attr) + if isinstance(plugin_class, type): + if issubclass(plugin_class, PrivilegedPluginBase) and plugin_class is not PrivilegedPluginBase: + logger.s(f"Loaded privileged plugin {module_name}") + plugin_instance = plugin_class(module_name, scanner, logger, watchdog, config) + plugins.append(plugin_instance) + elif issubclass(plugin_class, SelfContainedPluginBase) and plugin_class is not SelfContainedPluginBase: + logger.s(f"Loaded self-contained plugin {module_name}") + restricted_scanner = lambda src: logger.e(f"Access denied to plugin {module_name}") + plugin_instance = plugin_class(module_name, restricted_scanner, logger, watchdog, config) + plugins.append(plugin_instance) + except Exception as e: + logger.e(f"Failed to load plugin {module_name}: {e}") + + return plugins + +if __name__ == "__main__": + logger = Log() + scanner = scan + watchdog = DirWatcher(path, interval=1) + + logger.s(data['LANGUGAE']['english']['novelStarted'].format(str(round(time.time() - t, 1)))) try: - while True: - time.sleep(1) + plugin_dir = "plugins" + plugins = load_plugins(plugin_dir, scanner, logger, watchdog, data) + for plugin in plugins: + plugin.execute() + with watchdog as watcher: + watcher.run() except KeyboardInterrupt: exit() + + diff --git a/plugin_base.py b/plugin_base.py new file mode 100644 index 0000000..ea54630 --- /dev/null +++ b/plugin_base.py @@ -0,0 +1,21 @@ +class PrivilegedPluginBase: + def __init__(self, name, scanner, logger, watchdog, config): + self.name = name + self.scanner = scanner + self.logger = logger + self.watchdog = watchdog + self.config = config + + def execute(self, *args, **kwargs): + raise NotImplementedError("Privileged plugins must implement the execute method.") + +class SelfContainedPluginBase: + def __init__(self, name, scanner, logger, watchdog, config): + self.name = name + self.scanner = scanner + self.logger = logger + self.watchdog = watchdog + self.config = config + + def execute(self, *args, **kwargs): + raise NotImplementedError("Self-contained plugins must implement the execute method.") \ No newline at end of file diff --git a/plugins/privileged_plugin.py b/plugins/privileged_plugin.py new file mode 100644 index 0000000..520956a --- /dev/null +++ b/plugins/privileged_plugin.py @@ -0,0 +1,22 @@ +from plugin_base import PrivilegedPluginBase + +class PrivilegedPlugin(PrivilegedPluginBase): + def execute(self): + self.logger.s(f"Executing {self.name} privileged plugin") + + # Replace the module-level scan function + def new_scan(src): + self.logger.s("This is the new scan function from the privileged plugin.") + # Custom scan logic here + return {}, {} + + # Replace the original scan function globally + global scan + scan = new_scan + + # Now, when scan is called anywhere in the module, it will use the new_scan function + matches, errors = scan("some file content") + if matches: + self.logger.s(f"Matches found: {matches}") + if errors: + self.logger.e(f"Errors: {errors}") \ No newline at end of file diff --git a/plugins/self_contained_plugin.py b/plugins/self_contained_plugin.py new file mode 100644 index 0000000..520956a --- /dev/null +++ b/plugins/self_contained_plugin.py @@ -0,0 +1,22 @@ +from plugin_base import PrivilegedPluginBase + +class PrivilegedPlugin(PrivilegedPluginBase): + def execute(self): + self.logger.s(f"Executing {self.name} privileged plugin") + + # Replace the module-level scan function + def new_scan(src): + self.logger.s("This is the new scan function from the privileged plugin.") + # Custom scan logic here + return {}, {} + + # Replace the original scan function globally + global scan + scan = new_scan + + # Now, when scan is called anywhere in the module, it will use the new_scan function + matches, errors = scan("some file content") + if matches: + self.logger.s(f"Matches found: {matches}") + if errors: + self.logger.e(f"Errors: {errors}") \ No newline at end of file diff --git a/utils/WatchdogHandler.py b/utils/WatchdogHandler.py index 0f82442..c6cf1c5 100644 --- a/utils/WatchdogHandler.py +++ b/utils/WatchdogHandler.py @@ -94,7 +94,7 @@ class DirWatcher: ModifiedFileHandler(scan, self.cooldown), self.watch_dir, recursive=True ) - Log.s(data['LANGUGAE']['english']['novelStarted'].format(str(round(time.time() - t, 5)))) + Log.s(data['LANGUGAE']['english']['novelLoaded'].format(str(round(time.time() - t, 5)))) self.observer.start() return self