Upload files to "utils"
This commit is contained in:
parent
3226450e2d
commit
dc894a0cab
99
utils/ai.py
Normal file
99
utils/ai.py
Normal file
@ -0,0 +1,99 @@
|
|||||||
|
#region Imports
|
||||||
|
import tomllib
|
||||||
|
import requests
|
||||||
|
from utils.logger import Log
|
||||||
|
#endregion
|
||||||
|
|
||||||
|
#region Variables
|
||||||
|
# Load configuration from the config.toml file
|
||||||
|
with open("./config.toml", "rb") as f:
|
||||||
|
data = tomllib.load(f)
|
||||||
|
|
||||||
|
enabled = data["INTEGRATION"]["AI"]["enabled"]
|
||||||
|
generate_endpoint = data["INTEGRATION"]["AI"]["generate_endpoint"]
|
||||||
|
generate_model = data["INTEGRATION"]["AI"]["generate_model"]
|
||||||
|
use_groq = data["INTEGRATION"]["AI"]["use_groq"]
|
||||||
|
groq_api = data["INTEGRATION"]["AI"]["groq_api_token"]
|
||||||
|
prompt = data["INTEGRATION"]["AI"]["prompt"]
|
||||||
|
|
||||||
|
# If Groq is enabled, update the generate endpoint
|
||||||
|
if use_groq:
|
||||||
|
generate_endpoint = "https://api.groq.com/openai/v1/chat/completions"
|
||||||
|
#endregion
|
||||||
|
|
||||||
|
def generate_response(data):
|
||||||
|
"""Generate a response using the Groq API."""
|
||||||
|
try:
|
||||||
|
# Create headers
|
||||||
|
headers = {
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
}
|
||||||
|
|
||||||
|
# Add authorization header if using Groq
|
||||||
|
if use_groq:
|
||||||
|
headers["Authorization"] = f"Bearer {groq_api}"
|
||||||
|
|
||||||
|
# Create payload
|
||||||
|
payload = {
|
||||||
|
"model": generate_model,
|
||||||
|
"temperature": 1,
|
||||||
|
"max_completion_tokens": 1024,
|
||||||
|
"top_p": 1,
|
||||||
|
"stream": False,
|
||||||
|
"stop": None,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Conditional message structure for Groq
|
||||||
|
if use_groq:
|
||||||
|
payload["messages"] = [
|
||||||
|
{
|
||||||
|
"role": "user",
|
||||||
|
"content": f"Using this data: {data}. Respond to this prompt: {prompt}"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
else:
|
||||||
|
payload["prompt"] = f"Using this data: {data}. Respond to this prompt: {prompt}"
|
||||||
|
|
||||||
|
response = requests.post(generate_endpoint, json=payload, headers=headers)
|
||||||
|
response.raise_for_status()
|
||||||
|
if use_groq:
|
||||||
|
return response.json()["choices"][0]["message"]["content"]
|
||||||
|
else:
|
||||||
|
return response.json()
|
||||||
|
|
||||||
|
except requests.exceptions.RequestException as e:
|
||||||
|
Log.e(f"Failed to generate response: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def ai_analyse(file_path):
|
||||||
|
"""Analyze a file and generate a response based on the user's input."""
|
||||||
|
if enabled:
|
||||||
|
try:
|
||||||
|
# Open and read file data
|
||||||
|
with open(file_path, "r", encoding="utf-8") as file:
|
||||||
|
file_data = file.read()
|
||||||
|
|
||||||
|
# Generate response using the file data
|
||||||
|
response = generate_response(file_data)
|
||||||
|
if response:
|
||||||
|
#Log.s(f"Generated Response: {response}")
|
||||||
|
return response
|
||||||
|
else:
|
||||||
|
Log.e("AI did not respond.")
|
||||||
|
except FileNotFoundError:
|
||||||
|
Log.e(f"File not found: {file_path}")
|
||||||
|
except Exception as e:
|
||||||
|
Log.e(f"Unexpected error: {e}")
|
||||||
|
else:
|
||||||
|
return "AI integration is disabled in the configuration, enable AI integration for AI File Analyse."
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
# Example usage
|
||||||
|
if __name__ == "__main__":
|
||||||
|
file_path = "example.txt" # Replace with your input file path
|
||||||
|
result = ai_analyse(file_path)
|
||||||
|
if result:
|
||||||
|
print("[INFO] Analysis Result:")
|
||||||
|
print(result)
|
104
utils/discord.py
Normal file
104
utils/discord.py
Normal file
@ -0,0 +1,104 @@
|
|||||||
|
import tomllib
|
||||||
|
import time
|
||||||
|
import requests
|
||||||
|
from utils.logger import Log
|
||||||
|
|
||||||
|
def load_config(file_path):
|
||||||
|
"""Load configuration from a TOML file."""
|
||||||
|
try:
|
||||||
|
with open(file_path, "rb") as f:
|
||||||
|
data = tomllib.load(f)
|
||||||
|
return data
|
||||||
|
except FileNotFoundError:
|
||||||
|
Log.e(f"Config file {file_path} not found.")
|
||||||
|
return None
|
||||||
|
except tomllib.TOMLDecodeError as e:
|
||||||
|
Log.e(f"Failed to parse TOML file: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def truncate_text(text, limit):
|
||||||
|
"""Truncate text to the specified character limit with an ellipsis."""
|
||||||
|
if len(text) > limit:
|
||||||
|
return text[:limit - 3] + "..." # Truncate and add ellipsis
|
||||||
|
return text
|
||||||
|
|
||||||
|
def split_text(text, limit):
|
||||||
|
"""Split text into chunks of a specified character limit."""
|
||||||
|
return [text[i:i + limit] for i in range(0, len(text), limit)]
|
||||||
|
|
||||||
|
def load_config_values(data):
|
||||||
|
"""Extract relevant values from the loaded configuration."""
|
||||||
|
try:
|
||||||
|
enabled = data["INTEGRATION"]["DISCORD"]["enabled"]
|
||||||
|
discord_webhook_url = data["INTEGRATION"]["DISCORD"]["webhook_url"]
|
||||||
|
ai_integration = data["INTEGRATION"]["AI"]["enabled"]
|
||||||
|
truncate_text_flag = data["INTEGRATION"]["DISCORD"].get("truncate_text", True)
|
||||||
|
return enabled, discord_webhook_url, ai_integration, truncate_text_flag
|
||||||
|
except KeyError as e:
|
||||||
|
Log.e(f"Missing key in config: {e}")
|
||||||
|
return None, None, None, True
|
||||||
|
|
||||||
|
def webhook(file_path, yara_matches, ai=""):
|
||||||
|
"""Send a webhook to Discord with the given parameters."""
|
||||||
|
config_file_path = "./config.toml"
|
||||||
|
config_data = load_config(config_file_path)
|
||||||
|
if config_data is None:
|
||||||
|
Log.e("Failed to load configuration.")
|
||||||
|
return
|
||||||
|
|
||||||
|
enabled, discord_webhook_url, ai_integration, truncate_text_flag = load_config_values(config_data)
|
||||||
|
if enabled:
|
||||||
|
description = ai #if ai_integration and ai else "No Description Provided for this action."
|
||||||
|
|
||||||
|
# Handle truncation or splitting based on config
|
||||||
|
if truncate_text_flag:
|
||||||
|
description = truncate_text(description, 4092)
|
||||||
|
else:
|
||||||
|
description_chunks = split_text(description, 4092)
|
||||||
|
|
||||||
|
# Create embeds
|
||||||
|
embeds = []
|
||||||
|
if truncate_text_flag:
|
||||||
|
# Single embed if truncated
|
||||||
|
embeds.append({
|
||||||
|
"title": "⚠️ WATCHDOG ALERT ⚠️",
|
||||||
|
"description": description,
|
||||||
|
"color": 65280,
|
||||||
|
"fields": yara_matches,
|
||||||
|
"author": {
|
||||||
|
"name": file_path
|
||||||
|
},
|
||||||
|
"thumbnail": {
|
||||||
|
"url": "https://images-ext-1.discordapp.net/external/ZdQffnnucK3DWYPeokYDWnFPATtlvszVNozmNhOdXBg/https/upload.wikimedia.org/wikipedia/commons/5/59/Empty.png?format=webp&quality=lossless"
|
||||||
|
}
|
||||||
|
})
|
||||||
|
else:
|
||||||
|
# Multiple embeds if split
|
||||||
|
for idx, chunk in enumerate(description_chunks):
|
||||||
|
embeds.append({
|
||||||
|
"title": f"⚠️ WATCHDOG ALERT ⚠️ (Part {idx + 1})",
|
||||||
|
"description": chunk,
|
||||||
|
"color": 65280,
|
||||||
|
"fields": yara_matches if idx == 0 else [], # Fields only in the first embed
|
||||||
|
"author": {
|
||||||
|
"name": file_path if idx == 0 else None
|
||||||
|
},
|
||||||
|
"thumbnail": {
|
||||||
|
"url": "https://images-ext-1.discordapp.net/external/ZdQffnnucK3DWYPeokYDWnFPATtlvszVNozmNhOdXBg/https/upload.wikimedia.org/wikipedia/commons/5/59/Empty.png?format=webp&quality=lossless"
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
# Construct the payload
|
||||||
|
payload = {
|
||||||
|
"content": "",
|
||||||
|
"embeds": embeds[:10], # Discord allows a maximum of 10 embeds per payload
|
||||||
|
"attachments": []
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Send POST request to Discord webhook
|
||||||
|
response = requests.post(discord_webhook_url, json=payload)
|
||||||
|
response.raise_for_status() # Raise exception for HTTP errors
|
||||||
|
Log.v(f"Report sent to Discord webhook for {file_path}")
|
||||||
|
except requests.exceptions.RequestException as e:
|
||||||
|
Log.e(f"Report was not sent to Discord webhook, error: {e}")
|
18
utils/logger.py
Normal file
18
utils/logger.py
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
from pystyle import Colors, Colorate
|
||||||
|
from datetime import datetime
|
||||||
|
import time
|
||||||
|
|
||||||
|
|
||||||
|
class Log:
|
||||||
|
@staticmethod
|
||||||
|
def s(text): # success
|
||||||
|
time_now = datetime.fromtimestamp(time.time()).strftime('%H:%M')
|
||||||
|
print(Colors.gray + time_now + " " + Colorate.Horizontal(Colors.green_to_cyan, "SUCCESS", 1) + Colors.gray + " > " + Colors.light_gray + text + Colors.reset)
|
||||||
|
@staticmethod
|
||||||
|
def e(text): # error
|
||||||
|
time_now = datetime.fromtimestamp(time.time()).strftime('%H:%M')
|
||||||
|
print(Colors.gray + time_now + " " + Colorate.Horizontal(Colors.red_to_purple, " ERROR ", 1) + Colors.gray + " > " + Colors.light_gray + text + Colors.reset)
|
||||||
|
@staticmethod
|
||||||
|
def v(data): # verbose
|
||||||
|
time_now = datetime.fromtimestamp(time.time()).strftime('%H:%M')
|
||||||
|
print(Colors.gray + time_now + " " + Colorate.Horizontal(Colors.blue_to_white, "VERBOSE", 1) + Colors.gray + " > " + Colors.light_gray + data + Colors.reset)
|
45
utils/scanner.py
Normal file
45
utils/scanner.py
Normal file
@ -0,0 +1,45 @@
|
|||||||
|
#region Imports
|
||||||
|
import os
|
||||||
|
import yara
|
||||||
|
import tomllib
|
||||||
|
#endregion
|
||||||
|
|
||||||
|
#region Variables
|
||||||
|
scanned_files_map = set()
|
||||||
|
ignored_files = {}
|
||||||
|
ignored_directories = {}
|
||||||
|
|
||||||
|
with open("./config.toml", "rb") as f:
|
||||||
|
data = tomllib.load(f)
|
||||||
|
#endregion
|
||||||
|
|
||||||
|
#region scanfile
|
||||||
|
|
||||||
|
def scan(file_path):
|
||||||
|
"""
|
||||||
|
Scan a file with YARA rules and return the matches.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
file_path (str): The path to the file to be scanned.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
matches[filename], error_message
|
||||||
|
"""
|
||||||
|
matches = {}
|
||||||
|
error_messages = {}
|
||||||
|
|
||||||
|
for filename in os.listdir(data['DETECTION']['SignaturePath']):
|
||||||
|
if filename.endswith((".yara")):
|
||||||
|
rule_path = os.path.join(data['DETECTION']['SignaturePath'], filename)
|
||||||
|
try:
|
||||||
|
rules = yara.compile(filepath=rule_path)
|
||||||
|
file_matches = rules.match(file_path)
|
||||||
|
if file_matches:
|
||||||
|
matches[filename] = file_matches
|
||||||
|
# for match in file_matches:
|
||||||
|
# print(f" - Rule: {match.rule}")
|
||||||
|
except yara.Error as e:
|
||||||
|
error_messages[filename] = e
|
||||||
|
|
||||||
|
return matches, error_messages
|
||||||
|
#endregion
|
Loading…
x
Reference in New Issue
Block a user