From 458e35943ac20cae1c2b517ae7c7a8e184bdb837 Mon Sep 17 00:00:00 2001 From: Clasko Date: Sat, 17 Jan 2026 21:18:44 +0000 Subject: [PATCH] Initial commit --- config.json | 3 + meshroller.py | 582 ++++++++++++++++++++++++++++++++++++++++++++++++++ prep.sh | 3 + readme.txt | 14 ++ 4 files changed, 602 insertions(+) create mode 100644 config.json create mode 100644 meshroller.py create mode 100644 prep.sh create mode 100644 readme.txt diff --git a/config.json b/config.json new file mode 100644 index 0000000..fd91315 --- /dev/null +++ b/config.json @@ -0,0 +1,3 @@ +{ + "url": "http://IP:Port/api/chat" +} \ No newline at end of file diff --git a/meshroller.py b/meshroller.py new file mode 100644 index 0000000..ec74e11 --- /dev/null +++ b/meshroller.py @@ -0,0 +1,582 @@ +import meshtastic.serial_interface +from pubsub import pub +from datetime import datetime, timedelta +import textwrap +import meshtastic +import time +import random +from queue import Queue, Empty +import threading +from collections import defaultdict, deque +import string +import subprocess +import json +import os +import requests +import shlex + +#### TODO #### +## Move logging into folder +## Telegram link +## Improve untrusted, and auth code gen +## Move dependencies into repository +## Use encrypted channels +## Bit Escrow +## Nostrmesh +## Log +## Qwen wake word on public channel, or 20 messages +## Curl +## post + +class MeshtasticController: + + def __init__(self): + self.logs = {} + self.load_logs() + self.config = { + "debug": False, + "mute_public_channel": True, + "mute_public_commands": True, + "print_all_packets": False, + } + self.ollama_host = "http://localhost:11434/api/chat" + self.ollama_url = '' + self.ollama_model = 'qwen2.5-coder:0.5b' + """Initializes the Meshtastic interface and sets up the message listener.""" + self.scheduled_messages = defaultdict(Queue) + self.sent_messages = defaultdict(deque) + self.message_ids = defaultdict(lambda: 1) + self.scheduler_thread = threading.Thread(target=self.message_sender, daemon=True) + self.scheduler_thread.start() + self.max_message_length = 210 # 233 Adjust this to the maximum payload length supported by your Meshtastic device + self.auth_bypass = False + self.interface = meshtastic.serial_interface.SerialInterface() + pub.subscribe(self.on_receive, 'meshtastic.receive') + self.trusted_nodes = {} + self.pending_auth_requests = {} + self.aliases = {"!about": "!help"} + self.public_commands = {"!echo", "!time", "!help", "!auth", "!packet", "!who", "!about", "!clear", "!ping", "hey", "!llm"} + self.commands = { # Dictionary for mapping commands to methods. + "!echo": self.handle_echo, + "!time": self.handle_time, + "!help": self.handle_help, + "!packet": self.handle_packet, + "!who": self.handle_who, + "!auth": self.handle_auth, + "!info": self.handle_system_info, + "!trust": self.handle_trust, + #"!config": self.handle_config, + "!clear": self.handle_clear_messages, + "!spam": self.handle_spam, + "!resend": self.handle_resend, + "!receipt": self.handle_receipt, + "!source": self.handle_source, + "!ping": self.handle_ping, + "!llm": self.handle_llm, + "hey": self.handle_hey, + "\N{LIZARD}": self.handle_lizard, + } + self.private_commands = set(self.commands.keys()) - self.public_commands + self.config_file = self.get_config_file_path() + if os.path.exists(self.config_file): + self.ollama_url = self.load_config(self.config_file, 'url') + + def load_logs(self): + # Load existing logs from files into memory + for toId in range(1, 26): # Assuming we have up to 25 different To IDs (for example) + log_file_path = f"log_{toId}.json" + if os.path.exists(log_file_path): + with open(log_file_path, 'r') as log_file: + logs = json.load(log_file) + for log_entry in logs: + time = int(datetime.strptime(log_entry['timestamp'], '%Y-%m-%dT%H:%M:%S.%fZ').timestamp()) + if toId not in self.logs: + self.logs[toId] = [] + self.logs[toId].append((time, (log_entry['timestamp'], f"{log_entry['from']}, {log_entry['to']}, Message: {log_entry['message']}, ID: {log_entry['id']}, Time: {log_entry['time']}"))) + + + def log(self, from_node, to, message, message_id, time, toId, inbound=True): + # Ensure the log file exists and is accessible + log_file_path = "" + if inbound: + log_file_path = f"log_{from_node}.json" + else: + log_file_path = f"log_{toId}.json" + # Prepare the log entry as a dictionary + log_entry = { + "timestamp": datetime.now().isoformat(), + "from": from_node, + "to": to, + "message": message, + "id": message_id, + "time": time, + "toId": toId + } + # Append the log entry to the file or create a new one if it doesn't exist + try: + with open(log_file_path, 'r') as log_file: + logs = json.load(log_file) + logs.append(log_entry) + except (FileNotFoundError, json.JSONDecodeError): + logs = [log_entry] + # Write the updated logs back to the file + with open(log_file_path, 'w') as log_file: + json.dump(logs, log_file, indent=4) + # Ensure we only keep the most recent 25 messages in memory + if toId not in self.logs: + self.logs[toId] = [] + self.logs[toId].append((time, (log_entry['timestamp'], f"{log_entry['from']}, {log_entry['to']}, Message: {log_entry['message']}, ID: {log_entry['id']}, Time: {log_entry['time']}"))) + while len(self.logs[toId]) > 25: + oldest_time, _ = self.logs[toId].pop(0) + with open(log_file_path, 'r') as log_file: + logs = json.load(log_file) + with open(log_file_path, 'w') as log_file: + updated_logs = [log for log in logs if log['timestamp'] != oldest_time] + json.dump(updated_logs, log_file, indent=4) + + def get_chat_logs(self, chat_id): + # Load chat logs from file + log_file_path = f"log_{chat_id}.json" + if os.path.exists(log_file_path): + with open(log_file_path, 'r') as log_file: + try: + logs = json.load(log_file) + except json.JSONDecodeError: + logs = [] + clean_logs = [] + for log_entry in logs: + #print(f'Log entry: {log_entry}') + message_parts = { + "role": "assistant" if log_entry['from'] == "Qwen" else "user", + "content": log_entry['message'] + } + print(f'Message parts: {message_parts}') + clean_logs.append(message_parts) + + return clean_logs + else: + return [] + + def handle_llm(self, message, from_node, packet): + """Runs the message through ollama.""" + try: + chat_logs = self.get_chat_logs(from_node) + # Prepare the message history and parameters for the Alibaba Cloud API + messages = [{"role": "user", "content": msg['content']} for msg in chat_logs] + #print(f'Messages {messages}') + payload = { + "model": self.ollama_model, + "messages": messages, + "parameters": { + "temperature": 0.7, + "max_tokens": 500 + }, + "stream": False + } + #print(payload) + # Make the POST request to the Alibaba Cloud API + response = requests.post(self.ollama_host, json=payload) + if response.status_code == 200: + next_message = response.json().get('message', {}) + #print(f"Assistant: {next_message.get('content')}") + self.schedule_command_message(next_message['content'], from_node) + else: + error_message = f"Error: {response.status_code} - {response.text}" + self.schedule_command_message(error_message, from_node) + except subprocess.CalledProcessError as e: + # Handle any errors that occur during the subprocess execution + error_message = f"Error running LLM: {e.stderr}" + self.schedule_command_message(error_message, from_node) + + def get_config_file_path(self): + current_dir = os.getcwd() + config_file = "config.json" + return os.path.join(current_dir, config_file) + + def load_config(self, file, key): + if os.path.exists(file): + with open(file, 'r') as self.config_file: + config = json.load(self.config_file) + value = config[key] + if self.config["debug"]: print(value) + return value + + def handle_lizard(self, message, from_node, packet): + """\N{LIZARD}""" + self.schedule_command_message("\N{LIZARD}\N{LIZARD}\N{LIZARD}\N{LIZARD}\N{LIZARD}", from_node) + + def handle_hey(self, message, from_node, packet): + """Introduces itself""" + self.schedule_command_message("I'm a control server written in python, send me !help for a list of commands.", from_node) + + def schedule_command_message(self, message, destination=None): + if destination is None and self.config["mute_public_commands"] is True: + return # don't send public messages from commands + self.schedule_message(message, destination=destination) + + def handle_ping(self, message, from_node, packet): + self.schedule_command_message("pong", from_node) + + def handle_source(self, message, from_node, packet): + """Sends the entire source code in chunks.""" + try: + with open(__file__, 'r') as f: + source_code = f.read() + self.schedule_command_message(source_code, from_node) + except Exception as e: + error_message = f"Failed to send source code: {e}" + self.schedule_command_message(error_message, from_node) + + def handle_receipt(self, message, from_node, packet): + """Confirm receipt of a message chunk and clean up sent messages.""" + parts = message.split() + if len(parts) != 2: + response = "Usage: !receipt " + self.schedule_command_message(response, from_node) + return + message_id = parts[1] + # Clean up sent messages up to and including the received message ID + self.sent_messages[from_node] = [ + msg for msg in self.sent_messages[from_node] if not msg.startswith(f"{message_id}|") + ] + response = f"Receipt confirmed for message ID {message_id}." + self.schedule_command_message(response, from_node) + + def handle_resend(self, message, from_node, packet): + """Request for resending missing message chunks.""" + parts = message.split() + if len(parts) != 2: + response = "Usage: !resend " + self.schedule_command_message(response, from_node) + return + message_id = parts[1] + if from_node in self.sent_messages: + for sent_message in self.sent_messages[from_node]: + if sent_message.startswith(f"{message_id}-"): + self.schedule_command_message(sent_message, from_node) + response = f"Resend request for message ID {message_id} processed." + self.schedule_command_message(response, from_node) + + def handle_spam(self, message, from_node, packet): + """Sends a series of messages with a specified number of random characters.""" + parts = message.split() + if len(parts) == 2 and not parts[1].isdigit(): + response = "Usage: !spam [num_messages] [num_characters]" + self.schedule_command_message(response, from_node) + return + num_messages = int(parts[1]) if len(parts) > 1 and parts[1] else 2 + num_characters = int(parts[2]) if len(parts) > 2 and parts[2].isdigit() else None + for i in range(num_messages): + current_message_id = self.message_ids[from_node] + i # Calculate current message ID + if num_characters is None: # Generate a random string that fits within a single chunk + temp_label_length = self.calculate_label_length(current_message_id) + adjusted_num_characters = self.max_message_length - temp_label_length # Adjust the content length + else: + adjusted_num_characters = num_characters + spam_content = ''.join(random.choices(string.ascii_letters + string.digits, k=adjusted_num_characters)) + self.schedule_command_message(spam_content, from_node) + + def calculate_label_length(self, message_id, chunk_num = 1, total_chunks = 1): + """Calculate the label length based on the message ID, chunk number, and total chunks.""" + return ( + len(str(message_id)) + # Length of the message ID + len(str(chunk_num)) + # Length of the current chunk number + len(str(total_chunks)) + # Length of the total number of chunks + 4 # For the "-/: " part of the label + ) + + def schedule_message(self, message, destination=None): + """Adds messages to the sending queue with unique IDs, accounting for message label lengths.""" + try: + trimmed_message = message.strip() + message_id = self.message_ids[destination] + message_chunks = [] + current_pos = 0 + while current_pos < len(trimmed_message): + chunk_num = len(message_chunks) + 1 + #print(message_id, chunk_num, len(message_chunks)) + label_length = self.calculate_label_length(message_id, chunk_num, len(message_chunks)) # is this right? + available_length = self.max_message_length - label_length + if available_length > 0: + chunk = trimmed_message[current_pos:current_pos + available_length] + message_chunks.append(chunk) + current_pos += len(chunk) + else: + break # Stop if the calculated label is longer than allowed message length + total_chunks = len(message_chunks) + for i, chunk in enumerate(message_chunks): + labeled_chunk = f"{message_id}-{i + 1}/{total_chunks}: {chunk}" + self.scheduled_messages[destination].put(labeled_chunk) + self.sent_messages[destination].append(labeled_chunk) # Track sent messages for potential resending + message_id += 1 # Increment the message ID for the next chunk + self.message_ids[destination] = message_id # Update the message_id tracker after all chunks are labeled + except Exception as e: + print(f"Failed to schedule message: {e}") + + def handle_clear_messages(self, message, from_node, packet): + """Clears the scheduled messages for the specified node.""" + if from_node in self.scheduled_messages: + del self.scheduled_messages[from_node] + response = f"Cleared all scheduled messages for node {from_node}." + else: + response = f"No scheduled messages found for node {from_node}." + self.schedule_command_message(response, from_node) + + def message_sender(self): + """Scheduler that sends out messages.""" + while True: + destinations = list(self.scheduled_messages.keys()) + if destinations: + for destination in destinations: + try: + while not self.scheduled_messages[destination].empty(): + print("Sleeping for 2 seconds before sending a message.") + time.sleep(2) + message = self.scheduled_messages[destination].get_nowait() + if self.config["debug"]: + print(f"Debug: Message to {destination or 'public channel'}: {message}") + elif self.config["mute_public_channel"] and not destination: + print(f"Public channel muted: {message}") + else: + #print(f"Sending to {destination or 'public channel'}: {message}") + self.interface.sendText(message, destinationId=destination) + self.log( + from_node="Qwen", + to=destination, + message=message.replace("\n", " "), + message_id=None, # Assuming we don't have an ID for outgoing messages + time=datetime.now().isoformat(), + toId=destination or "public_channel", + inbound=False + ) + self.sent_messages[destination].append(message) # Store the sent message + except Exception as e: + print(f"Failed to send message: {e}") + time.sleep(0.1) + + def is_trusted(self, message, from_node, packet): + """Checks if a node is in the trusted nodes list based on the packet data.""" + if self.auth_bypass: + print(f"Auth bypass: {self.auth_bypass}") + return True + node_id = packet.get('fromId') + if node_id in self.trusted_nodes: + trusted_node_info = self.trusted_nodes[node_id] + if trusted_node_info.get('config', {}) == self.extract_static_node_info(packet): + if trusted_node_info.get('auth', False): + return True + return False + + def handle_who(self, message, from_node, packet): + """Replies with sender name.""" + self.schedule_command_message(f"Message received from: {from_node}", from_node) + + def handle_packet(self, message, from_node, packet): + """Replies with info about the recieved packet.""" + self.schedule_command_message(repr(packet), from_node) + + def dict_to_string(self, dictionary, separator=", "): + """Converts a dictionary into a flattened string with key-value pairs.""" + flattened_items = [] + for key, value in dictionary.items(): + if isinstance(value, dict): # Handle the parent key once, followed by the nested dictionary values + flattened_items.append(f"{key}: {self.dict_to_string(value, separator)}") + else: + flattened_items.append(f"{key}: {value}") + return separator.join(flattened_items) + + def handle_config(self, message, from_node, packet): + """Command for modifying or viewing configuration settings.""" + parts = message.split() + response = "" + if len(parts) == 2 and parts[1].lower() == "info": + response = "Current Configuration: " + response += self.dict_to_string(self.config) + else: + if len(parts) != 3: + response = "Usage: !config or !config info" + else: + setting = parts[1] + value = parts[2].lower() + if setting in self.config: + if isinstance(self.config[setting], bool): + if value in ["true", "on", "yes", "1"]: + self.config[setting] = True + elif value in ["false", "off", "no", "0"]: + self.config[setting] = False + else: + response = f"Invalid value for {setting}. Must be 'true' or 'false'." + else: + response = f"Unsupported setting type for {setting}." + if not response: + response = f"Configuration '{setting}' has been set to {self.config[setting]}." + else: + response = f"Configuration setting '{setting}' does not exist." + self.schedule_command_message(response, from_node) + + def untrusted(self, message, from_node, packet): + """Action for untrusted requests.""" # Todo: Improve logging, and security around untrusted spam + #if self.config["debug"]: + print(f"Untrusted Node: {from_node}, Requested: {message}\nPacket: \n{packet}") + + def handle_system_info(self, message, from_node, packet): + """Sends the system information of the local node.""" + response = "" + try: + if self.interface.nodes: + for node in self.interface.nodes.values(): + if node["num"] == self.interface.myInfo.my_node_num: + response = "System Information: " + response += repr(node) + self.schedule_command_message(response, from_node) + return + response = "Failed to retrieve system info: Node not found." + except Exception as e: + response = f"Failed to retrieve system info: {e}" + self.schedule_command_message(response, from_node) + + def extract_static_node_info(self, packet): + """Extracts static node information that is unlikely to change.""" + static_info = { + 'node_num': packet.get('from'), + 'node_id': packet.get('fromId'), + } + return static_info + + def handle_auth(self, message, from_node, packet): + """Handles the authentication process for adding trusted nodes.""" # Improve auth + try: # Extract the auth code from the message, if any + parts = message.split() + if len(parts) > 1: # Auth code provided, verify it + auth_code = parts[1] + if from_node in self.pending_auth_requests: + request_info = self.pending_auth_requests[from_node] + if request_info['auth_code'] == auth_code: # Correct auth code + self.trusted_nodes[from_node] = { + 'config': request_info['config'], + 'auth': True, + 'timestamp': time.time() + } + response = "Authentication successful. Node trusted." + del self.pending_auth_requests[from_node] + else: # Incorrect auth code + response = "Authentication failed. Incorrect code." + else: + response = "No pending authentication request found." + else: # No auth code provided, generate one + auth_code = self.generate_auth_code() + node_info = self.extract_static_node_info(packet) + # Store in pending auth requests + self.pending_auth_requests[from_node] = { + 'auth_code': auth_code, + 'config': node_info, + 'timestamp': time.time() + } + print(f"2FA Code for node {from_node}: {auth_code}") + response = "Please resend !auth followed by the 2FA code displayed in terminal. EX !auth 123456" + except Exception as e: + response = f"Authentication process failed: {e}" + self.schedule_command_message(response, from_node) + + def generate_auth_code(self): + """Generates a random 2FA authentication code.""" + auth_length=8 + if self.config["debug"]: + auth_length=6 + return ''.join(random.choices('0123456789', k=auth_length)) + + def handle_echo(self, message, from_node, packet): + """Echos the message back to the sender.""" + response = message[5:].strip() + self.schedule_command_message(response, from_node) + + def handle_time(self, message, from_node, packet): + """Returns the current server time.""" + current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + response = f"The current time is: {current_time}" + self.schedule_command_message(response, from_node) + + def handle_help(self, message, from_node, packet): + """Lists all available commands and their descriptions, including aliases.""" + response = "Available commands: " + + def get_command_description(func): + """Helper to get the docstring or a default value if none is present.""" + return func.__doc__.strip() if func.__doc__ else "No description available" + + if self.is_trusted(message, from_node, packet): + commands_dict = {cmd: get_command_description(func) for cmd, func in self.commands.items()} + for cmd, alias in self.aliases.items(): + if cmd in commands_dict: + commands_dict[alias] = commands_dict.pop(cmd) + response += self.dict_to_string(commands_dict, separator=" ") + else: + public_commands_dict = {cmd: get_command_description(func) for cmd, func in self.commands.items() if cmd in self.public_commands} + response += self.dict_to_string(public_commands_dict, separator=" ") + + self.schedule_command_message(response, from_node) + + + + def on_receive(self, packet, interface): + """Callback function for handling received packets and processing commands.""" + try: + if self.config["print_all_packets"]: print(f"\n{packet}") + if 'decoded' in packet and packet['decoded']['portnum'] == 'TEXT_MESSAGE_APP': + message_bytes = packet['decoded']['payload'] + message_string = message_bytes.decode('utf-8') + from_node = packet['fromId'] # Get the node ID of the sender + print(f"\nReceived from {from_node}: {message_string}\n> ", end="", flush=True) + # Log from_node, to, message_id, time, toId + self.log( + from_node, + packet['to'], + message_string.replace("\n", " "), + packet.get('id'), + packet.get('rx_time'), + packet['toId']) + # Extract the command from the message + command = message_string.split()[0].lower() + # Check if the command is an alias and resolve it + command = self.aliases.get(command, command) + # Default handling to LLM + handler = self.commands.get('!llm') + # Determine if the command requires trust + if command in self.private_commands and not self.is_trusted(message_string, from_node, packet): + self.untrusted(message_string, from_node, packet) + elif command in self.commands: + handler = self.commands.get(command) + # Execute the command + handler(message_string, from_node, packet) + + except KeyError as e: + print(f"Error processing packet: {e}") + + def handle_trust(self, message, from_node, packet): + """Returns list of trusted nodes.""" + response = "These nodes are currently trusted: " + response += repr(self.trusted_nodes) + self.schedule_command_message(response, from_node) + + def start(self): + """Starts the command and control loop to send messages.""" + print("Meshtastic Command and Control App") + print("Type a message and press Enter to send. Type 'exit' to quit.") + while True: + try: + text = input(">>> ") + if text.lower() == "exit": + print("Exiting the app.") + break + self.schedule_message(text) + except KeyboardInterrupt: + print("\nExiting the app.") + break + except Exception as e: + print(f"An error occurred: {e}") + self.interface.close() + print("Interface closed.") + +if __name__ == "__main__": + controller = MeshtasticController() + controller.start() diff --git a/prep.sh b/prep.sh new file mode 100644 index 0000000..52bc6df --- /dev/null +++ b/prep.sh @@ -0,0 +1,3 @@ +python -m venv venv +source venv/bin/activate +pip install meshtastic \ No newline at end of file diff --git a/readme.txt b/readme.txt new file mode 100644 index 0000000..eb91531 --- /dev/null +++ b/readme.txt @@ -0,0 +1,14 @@ +Run these commands to prepare the venv and install the meshtastic module +``` +python -m venv venv +source venv/bin/activate +pip install meshtastic +``` + +If using a remote llama connection update config.json to be the remote llama link. +Going to the correct address in a browser will return "405 method not allowed" + +Run Python Meshroller +``` +python meshroller.py +``` \ No newline at end of file