Compare commits

...

No commits in common. "main" and "20250325d" have entirely different histories.

12 changed files with 343 additions and 465 deletions

View File

@ -1,5 +1,3 @@
[BETA WARNING: SOME FEATURES MIGHT BE NOT RELEASED OR ARE NOT WORKING AS EXPECTED!]
![Novel](/Frame_12x.webp)
# Novel, Anti-Abuse
@ -13,7 +11,7 @@ Anti-Abuse is an ✨ FREE, Open-Sourced radar based on yara rules built for pter
2. Easily customizable by [Yara Rule](https://yara.readthedocs.io/en/stable/writingrules.html).
3. Various Integrations(discord webhook, etc).
4. Easy re-check action through AI-Based Analysis.
5. Plugin feature to implement feature on your needs
## Installation
Requirements: python, keyboard, brain

View File

@ -1,5 +1,5 @@
ver = "250326b-dev/plugin"
ver = "250325d"
machineID = "node1"
#*************************************************#
@ -10,7 +10,8 @@ machineID = "node1"
[LANGUGAE.english]
novelStarted = "Novel's Anti-Abuse Started within - {}s."
novelStarted = "Novel(Anti Abuse) Started within - {}s."
novelLoaded = "Novel(Anti Abuse) Loaded within - {}s."
#**************************************************#
@ -22,11 +23,6 @@ novelStarted = "Novel's Anti-Abuse Started within - {}s."
[LOGS]
processStartMsg = true
flaggedNoti = true
fileModified = true
fileDeleted = true
fileMoved = true
fileCreated = true
#**************************************************#
# #
@ -55,6 +51,7 @@ generate_models = ["llama-3.2-90b-vision-preview","llama-3.3-70b-versatile","lla
generate_endpoint = "http://IP:PORT/api/generate" # Can be empty if using groq
use_groq = true
groq_api_token = "" # Get one at https://console.groq.com/keys
# Example API key
prompt = "Analyze the given code and return an abuse score (0-10) with a brief reason. Example abuses: Crypto Mining, Shell Access, Nezha Proxy (VPN/Proxy usage), Disk Filling, Tor, DDoS, Abusive Resource Usage. Response format: '**5/10** <your reason>'. No extra messages."
@ -63,4 +60,5 @@ prompt = "Analyze the given code and return an abuse score (0-10) with a brief r
enabled = true
webhook_url = ""
# Example webhook
truncate_text = true # Used only if AI INTEGRATION is enabled, trunclates text if true to maxium allowed characters or when false splits in few webhook messages.

45
core.py
View File

@ -1,45 +0,0 @@
import time, importlib, tomllib, os
from utils.Logger import Log
from utils.WatchdogHandler import DirWatcher
class PluginHandler:
def __init__(self):
self.t = time.time()
plugin_dir = './plugins'
if not os.path.isdir(plugin_dir):
Log.e(f"{plugin_dir} not found. Exiting...")
raise FileNotFoundError(f"Make sure '{plugin_dir}' is existing as plugin dir.")
self._plugins = []
for file in os.listdir(plugin_dir):
if file.endswith('.py'):
path = os.path.join(plugin_dir, file)
try:
spec = importlib.util.spec_from_file_location("plugin", path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
self._plugins.append(module.Plugin())
except (ImportError, AttributeError, SyntaxError) as e:
Log.e(f"[PLUGIN] \"{file}\": {e}")
with open("config.toml", "rb") as f:
self.data = tomllib.load(f)
self.path = self.data['DETECTION']['watchdogPath']
def app_run(self):
for plugin in self._plugins:
try:
Log.v(f"[PLUGIN] Loading \"{plugin.name}\" v{plugin.version}\"")
plugin.on_start()
except Exception as e:
Log.e(f"[PLUGIN] \"{plugin.name}\": {str(e)}")
with DirWatcher(self.path, interval=1, plugins=self._plugins) as watcher:
watcher.run()
Log.s(self.data['LANGUGAE']['english']['novelStarted'].format(str(round(time.time() - self.t, 1))))
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
exit()

61
main.py
View File

@ -1,14 +1,21 @@
#region Imports
import tomllib
import time, os, tomllib, sys
from plugin_base import PrivilegedPluginBase, SelfContainedPluginBase
from utils.Logger import Log
from core import PluginHandler
from utils.WatchdogHandler import DirWatcher
from utils.Scanner import scan
#endregion
#region Initialize
t = time.time()
with open("config.toml", "rb") as f:
data = tomllib.load(f)
if __name__ == "__main__":
Log.v(str(data))
path = data['DETECTION']['watchdogPath']
Log.v("""
o o 8
@ -25,4 +32,48 @@ if __name__ == "__main__":
License - GNU GENERAL PUBLIC LICENSE, Version 3
""".format(data['ver']))
PluginHandler().app_run()
#endregion
def load_plugins(plugin_dir, scanner, logger, watchdog, config):
plugins = []
sys.path.insert(0, plugin_dir) # Add plugin directory to sys.path
for filename in os.listdir(plugin_dir):
if filename.endswith(".py") and filename != "__init__.py":
module_name = filename[:-3]
try:
module = __import__(module_name)
for attr in dir(module):
plugin_class = getattr(module, attr)
if isinstance(plugin_class, type):
if issubclass(plugin_class, PrivilegedPluginBase) and plugin_class is not PrivilegedPluginBase:
logger.s(f"Loaded privileged plugin {module_name}")
plugin_instance = plugin_class(module_name, scanner, logger, watchdog, config)
plugins.append(plugin_instance)
elif issubclass(plugin_class, SelfContainedPluginBase) and plugin_class is not SelfContainedPluginBase:
logger.s(f"Loaded self-contained plugin {module_name}")
restricted_scanner = lambda src: logger.e(f"Access denied to plugin {module_name}")
plugin_instance = plugin_class(module_name, restricted_scanner, logger, watchdog, config)
plugins.append(plugin_instance)
except Exception as e:
logger.e(f"Failed to load plugin {module_name}: {e}")
return plugins
if __name__ == "__main__":
logger = Log()
scanner = scan
watchdog = DirWatcher(path, interval=1)
logger.s(data['LANGUGAE']['english']['novelStarted'].format(str(round(time.time() - t, 1))))
try:
plugin_dir = "plugins"
plugins = load_plugins(plugin_dir, scanner, logger, watchdog, data)
for plugin in plugins:
plugin.execute()
with watchdog as watcher:
watcher.run()
except KeyboardInterrupt:
exit()

21
plugin_base.py Normal file
View File

@ -0,0 +1,21 @@
class PrivilegedPluginBase:
def __init__(self, name, scanner, logger, watchdog, config):
self.name = name
self.scanner = scanner
self.logger = logger
self.watchdog = watchdog
self.config = config
def execute(self, *args, **kwargs):
raise NotImplementedError("Privileged plugins must implement the execute method.")
class SelfContainedPluginBase:
def __init__(self, name, scanner, logger, watchdog, config):
self.name = name
self.scanner = scanner
self.logger = logger
self.watchdog = watchdog
self.config = config
def execute(self, *args, **kwargs):
raise NotImplementedError("Self-contained plugins must implement the execute method.")

View File

@ -1,80 +0,0 @@
from utils.Logger import Log
import inspect
def _get_plugin_name():
try:
# First try to get the caller's plugin name
for frame_record in inspect.stack():
frame = frame_record[0]
if 'self' in frame.f_locals:
instance = frame.f_locals['self']
# Check if this is a plugin instance with a name attribute
if hasattr(instance, 'name') and 'plugins' in frame.f_globals.get('__file__', ''):
return instance.name
# If we couldn't find a plugin name in the call stack, check if we're being called by another plugin
for frame_record in inspect.stack():
module = inspect.getmodule(frame_record[0])
if module and hasattr(module, '__file__') and 'plugins' in module.__file__:
# Extract the plugin name from the filename
return "Called from " + module.__name__.split('.')[-1]
except Exception as e:
return f"Error: {str(e)}"
return "Unknown"
class Plugin:
def __init__(self):
self.version = "1.0.0"
self.name = "Example - Main"
def demo(self, *args, **kwargs):
Log.v(f"This is \"demo\" function. I am called by \"{_get_plugin_name()}\" with the argument received: args={args}, kwargs={kwargs}")
def on_start(self, *args, **kwargs):
Log.v("="*10)
Log.v("")
Log.v("Hello, this is an example plugin!")
Log.v("You can find this plugin, and disable by removing this plugin at plugins/example.py")
Log.v("With plugins, you can implement your own integrations, or features.")
Log.v("For more informations, please check our documentation!")
Log.v("")
Log.v("="*10)
"""
These codeblocks below are available events.
You can implement your own events by using the examples below.
on_created() : Called when a file is created.
on_deleted() : Called when a file is deleted.
on_modified() : Called when a file is modified.
on_moved() : Called when a file is moved.
on_scan() : Called when a scan is started.
on_scan_completed() : Called when a scan is completed.
on_ai_analysis_completed() : Called when an AI analysis is completed.
Good luck, have fun!
"""
# def on_created(self, *args, **kwargs):
# Log.v(f"File created args={args}, kwargs={kwargs}")
# def on_deleted(self, *args, **kwargs):
# Log.v(f"File deleted args={args}, kwargs={kwargs}")
# def on_modified(self, *args, **kwargs):
# Log.v(f"File modified args={args}, kwargs={kwargs}")
# def on_moved(self, *args, **kwargs):
# Log.v(f"File moved args={args}, kwargs={kwargs}")
# def on_scan(self, *args, **kwargs):
# Log.v(f"Scan started args={args}, kwargs={kwargs}")
# def on_scan_completed(self, *args, **kwargs):
# Log.v(f"Scan completed args={args}, kwargs={kwargs}")
# def on_ai_analysis_completed(self, *args, **kwargs):
# Log.v(f"AI analysis completed args={args}, kwargs={kwargs}")

View File

@ -1,17 +0,0 @@
from utils.Logger import Log
from plugins.example_1 import Plugin as example1
class Plugin:
def __init__(self):
self.version = "1.0.0"
self.name = "Example - Call Other Plugin"
def on_start(self, *args, **kwargs):
Log.v("="*10)
Log.v("")
Log.v("Hello, this is \"Example - Call Other Plugin!\"")
Log.v("This plugin will call example_1's \"demo\" function with arguments.")
Log.v("")
Log.v("="*10)
example1().demo(1,2,3, a=2)

View File

@ -0,0 +1,22 @@
from plugin_base import PrivilegedPluginBase
class PrivilegedPlugin(PrivilegedPluginBase):
def execute(self):
self.logger.s(f"Executing {self.name} privileged plugin")
# Replace the module-level scan function
def new_scan(src):
self.logger.s("This is the new scan function from the privileged plugin.")
# Custom scan logic here
return {}, {}
# Replace the original scan function globally
global scan
scan = new_scan
# Now, when scan is called anywhere in the module, it will use the new_scan function
matches, errors = scan("some file content")
if matches:
self.logger.s(f"Matches found: {matches}")
if errors:
self.logger.e(f"Errors: {errors}")

View File

@ -0,0 +1,22 @@
from plugin_base import PrivilegedPluginBase
class PrivilegedPlugin(PrivilegedPluginBase):
def execute(self):
self.logger.s(f"Executing {self.name} privileged plugin")
# Replace the module-level scan function
def new_scan(src):
self.logger.s("This is the new scan function from the privileged plugin.")
# Custom scan logic here
return {}, {}
# Replace the original scan function globally
global scan
scan = new_scan
# Now, when scan is called anywhere in the module, it will use the new_scan function
matches, errors = scan("some file content")
if matches:
self.logger.s(f"Matches found: {matches}")
if errors:
self.logger.e(f"Errors: {errors}")

View File

@ -15,9 +15,6 @@ rule CHINESE_NEZHA_ARGO {
$a13 = "Server\x20is\x20running\x20on\x20port\x20"
$a14 = "nysteria2"
$a15 = "openssl req"
$a16 = "hysteria2"
$a17 = "NEZHA" nocase
$a18 = "babama1001980"
condition:
2 of ($a*)
}

View File

@ -1,48 +1,18 @@
from pystyle import Colors, Colorate
from datetime import datetime
import time, os, sys, inspect
import time
class Log:
@staticmethod
def _get_plugin_name():
try:
for frame_record in inspect.stack():
frame = frame_record[0]
if 'self' in frame.f_locals:
instance = frame.f_locals['self']
# Check if this is a plugin instance with a name attribute
if hasattr(instance, 'name') and 'plugins' in frame.f_globals.get('__file__', ''):
return instance.name
except: return None
return None
@staticmethod
def s(text): # success
time_now = datetime.fromtimestamp(time.time()).strftime('%H:%M')
plugin_name = Log._get_plugin_name()
if plugin_name:
text = f"[{plugin_name}] {text}"
print(Colors.gray + time_now + " " + Colorate.Horizontal(Colors.green_to_cyan, "SUCCESS", 1) + Colors.gray + " > " + Colors.light_gray + text + Colors.reset)
@staticmethod
def e(text): # error
time_now = datetime.fromtimestamp(time.time()).strftime('%H:%M')
plugin_name = Log._get_plugin_name()
if plugin_name:
text = f"[{plugin_name}] {text}"
print(Colors.gray + time_now + " " + Colorate.Horizontal(Colors.red_to_purple, " ERROR ", 1) + Colors.gray + " > " + Colors.light_gray + text + Colors.reset)
@staticmethod
def v(data): # verbose
time_now = datetime.fromtimestamp(time.time()).strftime('%H:%M')
plugin_name = Log._get_plugin_name()
if plugin_name:
data = f"[{plugin_name}] {data}"
print(Colors.gray + time_now + " " + Colorate.Horizontal(Colors.blue_to_white, "VERBOSE", 1) + Colors.gray + " > " + Colors.light_gray + data + Colors.reset)

View File

@ -55,37 +55,11 @@ def analysis(event_path: str, file_content: str, flag_type: str):
This function scans the file content, and if flagged,
performs AI analysis and sends a webhook notification.
"""
# Notify plugins that scan is starting
for plugin in ModifiedFileHandler.active_plugins:
try:
if hasattr(plugin, 'on_scan') and callable(plugin.on_scan):
plugin.on_scan(event_path, file_content, flag_type)
except Exception as e:
Log.e(f"{plugin.name}: {str(e)}")
results = scan(file_content)
# Notify plugins that scan is completed
for plugin in ModifiedFileHandler.active_plugins:
try:
if hasattr(plugin, 'on_scan_completed') and callable(plugin.on_scan_completed):
plugin.on_scan_completed(event_path, file_content, flag_type, results)
except Exception as e:
Log.e(f"{plugin.name}: {str(e)}")
if results[0]:
Log.s(f"Flagged {event_path}")
analysis_result = ai_analyse(file_content)
# Notify plugins that AI analysis is completed
for plugin in ModifiedFileHandler.active_plugins:
try:
if hasattr(plugin, 'on_ai_analysis_completed') and callable(plugin.on_ai_analysis_completed):
plugin.on_ai_analysis_completed(event_path, file_content, flag_type, results, analysis_result)
except Exception as e:
Log.e(f"{plugin.name}: {str(e)}")
msg = f"Total Flagged Pattern: {str(c(results[0]))}\n\n{analysis_result}"
analysis = ai_analyse(file_content)
msg = f"Total Flagged Pattern: {str(c(results[0]))}\n\n{analysis}"
webhook(event_path, s(results[0]), msg)
@ -99,7 +73,6 @@ class DirWatcher:
watch_dir: Path,
interval: float = 0.2,
cooldown: float = 0.1,
plugins=None
):
if interval < self.min_cooldown:
raise ValueError(
@ -114,16 +87,14 @@ class DirWatcher:
self.watch_dir = watch_dir
self.interval = interval
self.cooldown = cooldown
# Store the plugins passed from PluginHandler
self.plugins = plugins or []
def __enter__(self) -> Self:
self.observer = Observer()
self.observer.schedule(
ModifiedFileHandler(scan, self.cooldown, self.plugins), self.watch_dir, recursive=True
ModifiedFileHandler(scan, self.cooldown), self.watch_dir, recursive=True
)
Log.s(data['LANGUGAE']['english']['novelStarted'].format(str(round(time.time() - t, 5))))
Log.s(data['LANGUGAE']['english']['novelLoaded'].format(str(round(time.time() - t, 5))))
self.observer.start()
return self
@ -152,26 +123,9 @@ class DirWatcher:
class ModifiedFileHandler(FileSystemEventHandler):
"""Handle modified files using threading for processing."""
# Class variable to store plugins for access from the analysis function
active_plugins = []
def __init__(self, func: Callable[[FileSystemEvent], None], cooldown: float, plugins=None):
def __init__(self, func: Callable[[FileSystemEvent], None], cooldown: float):
self.cooldown = timedelta(seconds=cooldown)
self.triggered_time = datetime.min
self.plugins = plugins or []
# Update the class variable with the current plugins
ModifiedFileHandler.active_plugins = self.plugins
def trigger(self, event_type, event):
"""Notify all plugins about the event"""
for plugin in self.plugins:
try:
method = getattr(plugin, f"on_{event_type}", None)
if method and callable(method):
# Call the plugin's event handler method
method(event.src_path)
except Exception as e:
Log.e(f"Error calling plugin {plugin.name} for {event_type} event: {str(e)}")
def ignore_event(self, event: FileSystemEvent) -> bool:
for ignore_path in ignore_paths:
@ -186,7 +140,6 @@ class ModifiedFileHandler(FileSystemEventHandler):
def on_any_event(self, event: FileSystemEvent):
if self.ignore_event(event):
self.trigger("any_event", event)
return True
def on_modified(self, event: FileSystemEvent):
@ -196,12 +149,9 @@ class ModifiedFileHandler(FileSystemEventHandler):
try:
with open(event.src_path, "r") as f:
src = f.read()
if data['LOGS']['fileModified']:
Log.v(f"FILE MODF | {event.src_path}")
# Process in a separate thread
threading.Thread(target=analysis, args=(event.src_path, src, "modification")).start()
self.trigger("modified", event)
self.triggered_time = datetime.now()
except Exception:
pass
@ -211,15 +161,12 @@ class ModifiedFileHandler(FileSystemEventHandler):
return
if (datetime.now() - self.triggered_time) > self.cooldown:
try:
if data['LOGS']['fileMoved']:
Log.v(f"FILE MOV | {event.src_path} > {event.dest_path}")
# For moved events, you might choose to scan the original or destination file.
# Here, we'll scan the source path.
with open(event.src_path, "r") as f:
src = f.read()
threading.Thread(target=analysis, args=(event.src_path, src, "moved")).start()
self.trigger("moved", event)
self.triggered_time = datetime.now()
except Exception:
pass
@ -229,10 +176,7 @@ class ModifiedFileHandler(FileSystemEventHandler):
return
if (datetime.now() - self.triggered_time) > self.cooldown:
try:
if data['LOGS']['fileDeleted']:
Log.v(f"FILE DEL | {event.src_path}")
self.trigger("deleted", event)
self.triggered_time = datetime.now()
except Exception:
pass
@ -245,13 +189,10 @@ class ModifiedFileHandler(FileSystemEventHandler):
if event.is_directory:
return
else:
if data['LOGS']['fileCreated']:
Log.v(f"file created: {event.src_path}")
with open(event.src_path, "r") as f:
content = f.read()
threading.Thread(target=analysis, args=(event.src_path, content, "creation")).start()
self.trigger("created", event)
self.triggered_time = datetime.now()
except Exception:
pass