diff --git a/utils/ai.py b/utils/ai.py new file mode 100644 index 0000000..d77d80a --- /dev/null +++ b/utils/ai.py @@ -0,0 +1,99 @@ +#region Imports +import tomllib +import requests +from utils.logger import Log +#endregion + +#region Variables +# Load configuration from the config.toml file +with open("./config.toml", "rb") as f: + data = tomllib.load(f) + +enabled = data["INTEGRATION"]["AI"]["enabled"] +generate_endpoint = data["INTEGRATION"]["AI"]["generate_endpoint"] +generate_model = data["INTEGRATION"]["AI"]["generate_model"] +use_groq = data["INTEGRATION"]["AI"]["use_groq"] +groq_api = data["INTEGRATION"]["AI"]["groq_api_token"] +prompt = data["INTEGRATION"]["AI"]["prompt"] + +# If Groq is enabled, update the generate endpoint +if use_groq: + generate_endpoint = "https://api.groq.com/openai/v1/chat/completions" +#endregion + +def generate_response(data): + """Generate a response using the Groq API.""" + try: + # Create headers + headers = { + "Content-Type": "application/json", + } + + # Add authorization header if using Groq + if use_groq: + headers["Authorization"] = f"Bearer {groq_api}" + + # Create payload + payload = { + "model": generate_model, + "temperature": 1, + "max_completion_tokens": 1024, + "top_p": 1, + "stream": False, + "stop": None, + } + + # Conditional message structure for Groq + if use_groq: + payload["messages"] = [ + { + "role": "user", + "content": f"Using this data: {data}. Respond to this prompt: {prompt}" + } + ] + else: + payload["prompt"] = f"Using this data: {data}. Respond to this prompt: {prompt}" + + response = requests.post(generate_endpoint, json=payload, headers=headers) + response.raise_for_status() + if use_groq: + return response.json()["choices"][0]["message"]["content"] + else: + return response.json() + + except requests.exceptions.RequestException as e: + Log.e(f"Failed to generate response: {e}") + return None + + +def ai_analyse(file_path): + """Analyze a file and generate a response based on the user's input.""" + if enabled: + try: + # Open and read file data + with open(file_path, "r", encoding="utf-8") as file: + file_data = file.read() + + # Generate response using the file data + response = generate_response(file_data) + if response: + #Log.s(f"Generated Response: {response}") + return response + else: + Log.e("AI did not respond.") + except FileNotFoundError: + Log.e(f"File not found: {file_path}") + except Exception as e: + Log.e(f"Unexpected error: {e}") + else: + return "AI integration is disabled in the configuration, enable AI integration for AI File Analyse." + return None + + +# Example usage +if __name__ == "__main__": + file_path = "example.txt" # Replace with your input file path + result = ai_analyse(file_path) + if result: + print("[INFO] Analysis Result:") + print(result) diff --git a/utils/discord.py b/utils/discord.py new file mode 100644 index 0000000..1e41a9c --- /dev/null +++ b/utils/discord.py @@ -0,0 +1,104 @@ +import tomllib +import time +import requests +from utils.logger import Log + +def load_config(file_path): + """Load configuration from a TOML file.""" + try: + with open(file_path, "rb") as f: + data = tomllib.load(f) + return data + except FileNotFoundError: + Log.e(f"Config file {file_path} not found.") + return None + except tomllib.TOMLDecodeError as e: + Log.e(f"Failed to parse TOML file: {e}") + return None + +def truncate_text(text, limit): + """Truncate text to the specified character limit with an ellipsis.""" + if len(text) > limit: + return text[:limit - 3] + "..." # Truncate and add ellipsis + return text + +def split_text(text, limit): + """Split text into chunks of a specified character limit.""" + return [text[i:i + limit] for i in range(0, len(text), limit)] + +def load_config_values(data): + """Extract relevant values from the loaded configuration.""" + try: + enabled = data["INTEGRATION"]["DISCORD"]["enabled"] + discord_webhook_url = data["INTEGRATION"]["DISCORD"]["webhook_url"] + ai_integration = data["INTEGRATION"]["AI"]["enabled"] + truncate_text_flag = data["INTEGRATION"]["DISCORD"].get("truncate_text", True) + return enabled, discord_webhook_url, ai_integration, truncate_text_flag + except KeyError as e: + Log.e(f"Missing key in config: {e}") + return None, None, None, True + +def webhook(file_path, yara_matches, ai=""): + """Send a webhook to Discord with the given parameters.""" + config_file_path = "./config.toml" + config_data = load_config(config_file_path) + if config_data is None: + Log.e("Failed to load configuration.") + return + + enabled, discord_webhook_url, ai_integration, truncate_text_flag = load_config_values(config_data) + if enabled: + description = ai #if ai_integration and ai else "No Description Provided for this action." + + # Handle truncation or splitting based on config + if truncate_text_flag: + description = truncate_text(description, 4092) + else: + description_chunks = split_text(description, 4092) + + # Create embeds + embeds = [] + if truncate_text_flag: + # Single embed if truncated + embeds.append({ + "title": "⚠️ WATCHDOG ALERT ⚠️", + "description": description, + "color": 65280, + "fields": yara_matches, + "author": { + "name": file_path + }, + "thumbnail": { + "url": "https://images-ext-1.discordapp.net/external/ZdQffnnucK3DWYPeokYDWnFPATtlvszVNozmNhOdXBg/https/upload.wikimedia.org/wikipedia/commons/5/59/Empty.png?format=webp&quality=lossless" + } + }) + else: + # Multiple embeds if split + for idx, chunk in enumerate(description_chunks): + embeds.append({ + "title": f"⚠️ WATCHDOG ALERT ⚠️ (Part {idx + 1})", + "description": chunk, + "color": 65280, + "fields": yara_matches if idx == 0 else [], # Fields only in the first embed + "author": { + "name": file_path if idx == 0 else None + }, + "thumbnail": { + "url": "https://images-ext-1.discordapp.net/external/ZdQffnnucK3DWYPeokYDWnFPATtlvszVNozmNhOdXBg/https/upload.wikimedia.org/wikipedia/commons/5/59/Empty.png?format=webp&quality=lossless" + } + }) + + # Construct the payload + payload = { + "content": "", + "embeds": embeds[:10], # Discord allows a maximum of 10 embeds per payload + "attachments": [] + } + + try: + # Send POST request to Discord webhook + response = requests.post(discord_webhook_url, json=payload) + response.raise_for_status() # Raise exception for HTTP errors + Log.v(f"Report sent to Discord webhook for {file_path}") + except requests.exceptions.RequestException as e: + Log.e(f"Report was not sent to Discord webhook, error: {e}") diff --git a/utils/logger.py b/utils/logger.py new file mode 100644 index 0000000..e1e399c --- /dev/null +++ b/utils/logger.py @@ -0,0 +1,18 @@ +from pystyle import Colors, Colorate +from datetime import datetime +import time + + +class Log: + @staticmethod + def s(text): # success + time_now = datetime.fromtimestamp(time.time()).strftime('%H:%M') + print(Colors.gray + time_now + " " + Colorate.Horizontal(Colors.green_to_cyan, "SUCCESS", 1) + Colors.gray + " > " + Colors.light_gray + text + Colors.reset) + @staticmethod + def e(text): # error + time_now = datetime.fromtimestamp(time.time()).strftime('%H:%M') + print(Colors.gray + time_now + " " + Colorate.Horizontal(Colors.red_to_purple, " ERROR ", 1) + Colors.gray + " > " + Colors.light_gray + text + Colors.reset) + @staticmethod + def v(data): # verbose + time_now = datetime.fromtimestamp(time.time()).strftime('%H:%M') + print(Colors.gray + time_now + " " + Colorate.Horizontal(Colors.blue_to_white, "VERBOSE", 1) + Colors.gray + " > " + Colors.light_gray + data + Colors.reset) diff --git a/utils/scanner.py b/utils/scanner.py new file mode 100644 index 0000000..c8b2d8d --- /dev/null +++ b/utils/scanner.py @@ -0,0 +1,45 @@ +#region Imports +import os +import yara +import tomllib +#endregion + +#region Variables +scanned_files_map = set() +ignored_files = {} +ignored_directories = {} + +with open("./config.toml", "rb") as f: + data = tomllib.load(f) +#endregion + +#region scanfile + +def scan(file_path): + """ + Scan a file with YARA rules and return the matches. + + Args: + file_path (str): The path to the file to be scanned. + + Returns: + matches[filename], error_message + """ + matches = {} + error_messages = {} + + for filename in os.listdir(data['DETECTION']['SignaturePath']): + if filename.endswith((".yara")): + rule_path = os.path.join(data['DETECTION']['SignaturePath'], filename) + try: + rules = yara.compile(filepath=rule_path) + file_matches = rules.match(file_path) + if file_matches: + matches[filename] = file_matches + # for match in file_matches: + # print(f" - Rule: {match.rule}") + except yara.Error as e: + error_messages[filename] = e + + return matches, error_messages +#endregion \ No newline at end of file