import meshtastic.serial_interface from pubsub import pub from datetime import datetime, timedelta import textwrap import meshtastic import time import random from queue import Queue, Empty import threading from collections import defaultdict, deque import string import subprocess import json import os import requests import shlex #### TODO #### ## Move logging into folder ## Telegram link ## Improve untrusted, and auth code gen ## Move dependencies into repository ## Use encrypted channels ## Bit Escrow ## Nostrmesh ## Log ## Qwen wake word on public channel, or 20 messages ## Curl ## post class MeshtasticController: def __init__(self): self.logs = {} self.load_logs() self.config = { # These settings can be changed by authorized users while running "debug": False, "mute_public_channel": True, "mute_public_commands": True, "print_all_packets": False, } self.config_file = self.get_config_file_path() if os.path.exists(self.config_file): self.ollama_url = self.load_config(self.config_file, 'url') self.light_url = self.load_config(self.config_file, 'lightwebhook') self.ollama_host = "http://localhost:11434/api/chat" self.ollama_url = '' self.ollama_model = 'qwen2.5-coder:0.5b' """Initializes the Meshtastic interface and sets up the message listener.""" self.scheduled_messages = defaultdict(Queue) self.sent_messages = defaultdict(deque) self.message_ids = defaultdict(lambda: 1) self.scheduler_thread = threading.Thread(target=self.message_sender, daemon=True) self.scheduler_thread.start() self.max_message_length = 210 # 233 Adjust this to the maximum payload length supported by your Meshtastic device self.auth_bypass = False self.interface = meshtastic.serial_interface.SerialInterface() pub.subscribe(self.on_receive, 'meshtastic.receive') self.trusted_nodes = {} self.pending_auth_requests = {} self.aliases = {"!about": "!help"} self.public_commands = {"!echo", "!time", "!help", "!auth", "!packet", "!who", "!about", "!clear", "!ping", "hey", "!llm"} self.commands = { # Dictionary for mapping commands to methods. "!echo": self.handle_echo, "!time": self.handle_time, "!help": self.handle_help, "!packet": self.handle_packet, "!who": self.handle_who, "!auth": self.handle_auth, "!info": self.handle_system_info, "!trust": self.handle_trust, #"!config": self.handle_config, "!clear": self.handle_clear_messages, "!spam": self.handle_spam, "!resend": self.handle_resend, "!receipt": self.handle_receipt, "!source": self.handle_source, "!ping": self.handle_ping, "!llm": self.handle_llm, "hey": self.handle_hey, "\N{LIZARD}": self.handle_lizard, "!light": self.handle_light, } self.private_commands = set(self.commands.keys()) - self.public_commands def handle_light(self, message, from_node, packet): """Runs the message through ollama.""" try: # URL to which the request will be sent url = self.light_url # Data payload for POST request (optional) data = { "key": "value", "message": message } # Sending a GET request response = requests.get(url) if response.status_code == 200: #print("GET Request Successful") #print(response.json()) self.schedule_command_message("Toggled light", from_node) else: error_message = f"Error: {response.status_code} - {response.text}" self.schedule_command_message(error_message, from_node) except subprocess.CalledProcessError as e: # Handle any errors that occur during the subprocess execution error_message = f"Error running LLM: {e.stderr}" self.schedule_command_message(error_message, from_node) def load_logs(self): # Load existing logs from files into memory for toId in range(1, 26): # Assuming we have up to 25 different To IDs (for example) log_file_path = f"log_{toId}.json" if os.path.exists(log_file_path): with open(log_file_path, 'r') as log_file: logs = json.load(log_file) for log_entry in logs: time = int(datetime.strptime(log_entry['timestamp'], '%Y-%m-%dT%H:%M:%S.%fZ').timestamp()) if toId not in self.logs: self.logs[toId] = [] self.logs[toId].append((time, (log_entry['timestamp'], f"{log_entry['from']}, {log_entry['to']}, Message: {log_entry['message']}, ID: {log_entry['id']}, Time: {log_entry['time']}"))) def log(self, from_node, to, message, message_id, time, toId, inbound=True): # Ensure the log file exists and is accessible log_file_path = "" if inbound: log_file_path = f"log_{from_node}.json" else: log_file_path = f"log_{toId}.json" # Prepare the log entry as a dictionary log_entry = { "timestamp": datetime.now().isoformat(), "from": from_node, "to": to, "message": message, "id": message_id, "time": time, "toId": toId } # Append the log entry to the file or create a new one if it doesn't exist try: with open(log_file_path, 'r') as log_file: logs = json.load(log_file) logs.append(log_entry) except (FileNotFoundError, json.JSONDecodeError): logs = [log_entry] # Write the updated logs back to the file with open(log_file_path, 'w') as log_file: json.dump(logs, log_file, indent=4) # Ensure we only keep the most recent 25 messages in memory if toId not in self.logs: self.logs[toId] = [] self.logs[toId].append((time, (log_entry['timestamp'], f"{log_entry['from']}, {log_entry['to']}, Message: {log_entry['message']}, ID: {log_entry['id']}, Time: {log_entry['time']}"))) while len(self.logs[toId]) > 25: oldest_time, _ = self.logs[toId].pop(0) with open(log_file_path, 'r') as log_file: logs = json.load(log_file) with open(log_file_path, 'w') as log_file: updated_logs = [log for log in logs if log['timestamp'] != oldest_time] json.dump(updated_logs, log_file, indent=4) def get_chat_logs(self, chat_id): # Load chat logs from file log_file_path = f"log_{chat_id}.json" if os.path.exists(log_file_path): with open(log_file_path, 'r') as log_file: try: logs = json.load(log_file) except json.JSONDecodeError: logs = [] clean_logs = [] for log_entry in logs: #print(f'Log entry: {log_entry}') message_parts = { "role": "assistant" if log_entry['from'] == "Qwen" else "user", "content": log_entry['message'] } #print(f'Message parts: {message_parts}') clean_logs.append(message_parts) return clean_logs else: return [] def handle_llm(self, message, from_node, packet): """Runs the message through ollama.""" try: chat_logs = self.get_chat_logs(from_node) # Prepare the message history and parameters for the Alibaba Cloud API messages = [{"role": "user", "content": msg['content']} for msg in chat_logs] #print(f'Messages {messages}') payload = { "model": self.ollama_model, "messages": messages, "parameters": { "temperature": 0.7, "max_tokens": 500 }, "stream": False } #print(payload) # Make the POST request to the Alibaba Cloud API response = requests.post(self.ollama_host, json=payload) if response.status_code == 200: next_message = response.json().get('message', {}) #print(f"Assistant: {next_message.get('content')}") self.schedule_command_message(next_message['content'], from_node) else: error_message = f"Error: {response.status_code} - {response.text}" self.schedule_command_message(error_message, from_node) except subprocess.CalledProcessError as e: # Handle any errors that occur during the subprocess execution error_message = f"Error running LLM: {e.stderr}" self.schedule_command_message(error_message, from_node) def get_config_file_path(self): current_dir = os.getcwd() config_file = "config.json" return os.path.join(current_dir, config_file) def load_config(self, file_path, key): if os.path.exists(file_path): with open(file_path, 'r') as config_file: config = json.load(config_file) value = config.get(key) if self.config.get("debug", False): print(value) return value def handle_lizard(self, message, from_node, packet): """\N{LIZARD}""" self.schedule_command_message("\N{LIZARD}\N{LIZARD}\N{LIZARD}\N{LIZARD}\N{LIZARD}", from_node) def handle_hey(self, message, from_node, packet): """Introduces itself""" self.schedule_command_message("I'm a control server written in python, send me !help for a list of commands.", from_node) def schedule_command_message(self, message, destination=None): if destination is None and self.config["mute_public_commands"] is True: return # don't send public messages from commands self.schedule_message(message, destination=destination) def handle_ping(self, message, from_node, packet): self.schedule_command_message("pong", from_node) def handle_source(self, message, from_node, packet): """Sends the entire source code in chunks.""" try: with open(__file__, 'r') as f: source_code = f.read() self.schedule_command_message(source_code, from_node) except Exception as e: error_message = f"Failed to send source code: {e}" self.schedule_command_message(error_message, from_node) def handle_receipt(self, message, from_node, packet): """Confirm receipt of a message chunk and clean up sent messages.""" parts = message.split() if len(parts) != 2: response = "Usage: !receipt " self.schedule_command_message(response, from_node) return message_id = parts[1] # Clean up sent messages up to and including the received message ID self.sent_messages[from_node] = [ msg for msg in self.sent_messages[from_node] if not msg.startswith(f"{message_id}|") ] response = f"Receipt confirmed for message ID {message_id}." self.schedule_command_message(response, from_node) def handle_resend(self, message, from_node, packet): """Request for resending missing message chunks.""" parts = message.split() if len(parts) != 2: response = "Usage: !resend " self.schedule_command_message(response, from_node) return message_id = parts[1] if from_node in self.sent_messages: for sent_message in self.sent_messages[from_node]: if sent_message.startswith(f"{message_id}-"): self.schedule_command_message(sent_message, from_node) response = f"Resend request for message ID {message_id} processed." self.schedule_command_message(response, from_node) def handle_spam(self, message, from_node, packet): """Sends a series of messages with a specified number of random characters.""" parts = message.split() if len(parts) == 2 and not parts[1].isdigit(): response = "Usage: !spam [num_messages] [num_characters]" self.schedule_command_message(response, from_node) return num_messages = int(parts[1]) if len(parts) > 1 and parts[1] else 2 num_characters = int(parts[2]) if len(parts) > 2 and parts[2].isdigit() else None for i in range(num_messages): current_message_id = self.message_ids[from_node] + i # Calculate current message ID if num_characters is None: # Generate a random string that fits within a single chunk temp_label_length = self.calculate_label_length(current_message_id) adjusted_num_characters = self.max_message_length - temp_label_length # Adjust the content length else: adjusted_num_characters = num_characters spam_content = ''.join(random.choices(string.ascii_letters + string.digits, k=adjusted_num_characters)) self.schedule_command_message(spam_content, from_node) def calculate_label_length(self, message_id, chunk_num = 1, total_chunks = 1): """Calculate the label length based on the message ID, chunk number, and total chunks.""" return ( len(str(message_id)) + # Length of the message ID len(str(chunk_num)) + # Length of the current chunk number len(str(total_chunks)) + # Length of the total number of chunks 4 # For the "-/: " part of the label ) def schedule_message(self, message, destination=None): """Adds messages to the sending queue with unique IDs, accounting for message label lengths.""" try: trimmed_message = message.strip() message_id = self.message_ids[destination] message_chunks = [] current_pos = 0 while current_pos < len(trimmed_message): chunk_num = len(message_chunks) + 1 #print(message_id, chunk_num, len(message_chunks)) label_length = self.calculate_label_length(message_id, chunk_num, len(message_chunks)) # is this right? available_length = self.max_message_length - label_length if available_length > 0: chunk = trimmed_message[current_pos:current_pos + available_length] message_chunks.append(chunk) current_pos += len(chunk) else: break # Stop if the calculated label is longer than allowed message length total_chunks = len(message_chunks) for i, chunk in enumerate(message_chunks): labeled_chunk = f"{message_id}-{i + 1}/{total_chunks}: {chunk}" self.scheduled_messages[destination].put(labeled_chunk) self.sent_messages[destination].append(labeled_chunk) # Track sent messages for potential resending message_id += 1 # Increment the message ID for the next chunk self.message_ids[destination] = message_id # Update the message_id tracker after all chunks are labeled except Exception as e: print(f"Failed to schedule message: {e}") def handle_clear_messages(self, message, from_node, packet): """Clears the scheduled messages for the specified node.""" if from_node in self.scheduled_messages: del self.scheduled_messages[from_node] response = f"Cleared all scheduled messages for node {from_node}." else: response = f"No scheduled messages found for node {from_node}." self.schedule_command_message(response, from_node) def message_sender(self): """Scheduler that sends out messages.""" while True: destinations = list(self.scheduled_messages.keys()) if destinations: for destination in destinations: try: while not self.scheduled_messages[destination].empty(): print("Sleeping for 2 seconds before sending a message.") time.sleep(2) message = self.scheduled_messages[destination].get_nowait() if self.config["debug"]: print(f"Debug: Message to {destination or 'public channel'}: {message}") elif self.config["mute_public_channel"] and not destination: print(f"Public channel muted: {message}") else: print(f"Sending to {destination or 'public channel'}: {message}") self.interface.sendText(message, destinationId=destination) self.log( from_node="Qwen", to=destination, message=message.replace("\n", " "), message_id=None, # Assuming we don't have an ID for outgoing messages time=datetime.now().isoformat(), toId=destination or "public_channel", inbound=False ) self.sent_messages[destination].append(message) # Store the sent message except Exception as e: print(f"Failed to send message: {e}") time.sleep(0.1) def is_trusted(self, message, from_node, packet): """Checks if a node is in the trusted nodes list based on the packet data.""" if self.auth_bypass: print(f"Auth bypass: {self.auth_bypass}") return True node_id = packet.get('fromId') if node_id in self.trusted_nodes: trusted_node_info = self.trusted_nodes[node_id] if trusted_node_info.get('config', {}) == self.extract_static_node_info(packet): if trusted_node_info.get('auth', False): return True return False def handle_who(self, message, from_node, packet): """Replies with sender name.""" self.schedule_command_message(f"Message received from: {from_node}", from_node) def handle_packet(self, message, from_node, packet): """Replies with info about the recieved packet.""" self.schedule_command_message(repr(packet), from_node) def dict_to_string(self, dictionary, separator=", "): """Converts a dictionary into a flattened string with key-value pairs.""" flattened_items = [] for key, value in dictionary.items(): if isinstance(value, dict): # Handle the parent key once, followed by the nested dictionary values flattened_items.append(f"{key}: {self.dict_to_string(value, separator)}") else: flattened_items.append(f"{key}: {value}") return separator.join(flattened_items) def handle_config(self, message, from_node, packet): """Command for modifying or viewing configuration settings.""" parts = message.split() response = "" if len(parts) == 2 and parts[1].lower() == "info": response = "Current Configuration: " response += self.dict_to_string(self.config) else: if len(parts) != 3: response = "Usage: !config or !config info" else: setting = parts[1] value = parts[2].lower() if setting in self.config: if isinstance(self.config[setting], bool): if value in ["true", "on", "yes", "1"]: self.config[setting] = True elif value in ["false", "off", "no", "0"]: self.config[setting] = False else: response = f"Invalid value for {setting}. Must be 'true' or 'false'." else: response = f"Unsupported setting type for {setting}." if not response: response = f"Configuration '{setting}' has been set to {self.config[setting]}." else: response = f"Configuration setting '{setting}' does not exist." self.schedule_command_message(response, from_node) def untrusted(self, message, from_node, packet): """Action for untrusted requests.""" # Todo: Improve logging, and security around untrusted spam #if self.config["debug"]: print(f"Untrusted Node: {from_node}, Requested: {message}\nPacket: \n{packet}") def handle_system_info(self, message, from_node, packet): """Sends the system information of the local node.""" response = "" try: if self.interface.nodes: for node in self.interface.nodes.values(): if node["num"] == self.interface.myInfo.my_node_num: response = "System Information: " response += repr(node) self.schedule_command_message(response, from_node) return response = "Failed to retrieve system info: Node not found." except Exception as e: response = f"Failed to retrieve system info: {e}" self.schedule_command_message(response, from_node) def extract_static_node_info(self, packet): """Extracts static node information that is unlikely to change.""" static_info = { 'node_num': packet.get('from'), 'node_id': packet.get('fromId'), } return static_info def handle_auth(self, message, from_node, packet): """Handles the authentication process for adding trusted nodes.""" # Improve auth try: # Extract the auth code from the message, if any parts = message.split() if len(parts) > 1: # Auth code provided, verify it auth_code = parts[1] if from_node in self.pending_auth_requests: request_info = self.pending_auth_requests[from_node] if request_info['auth_code'] == auth_code: # Correct auth code self.trusted_nodes[from_node] = { 'config': request_info['config'], 'auth': True, 'timestamp': time.time() } response = "Authentication successful. Node trusted." del self.pending_auth_requests[from_node] else: # Incorrect auth code response = "Authentication failed. Incorrect code." else: response = "No pending authentication request found." else: # No auth code provided, generate one auth_code = self.generate_auth_code() node_info = self.extract_static_node_info(packet) # Store in pending auth requests self.pending_auth_requests[from_node] = { 'auth_code': auth_code, 'config': node_info, 'timestamp': time.time() } print(f"2FA Code for node {from_node}: {auth_code}") response = "Please resend !auth followed by the 2FA code displayed in terminal. EX !auth 123456" except Exception as e: response = f"Authentication process failed: {e}" self.schedule_command_message(response, from_node) def generate_auth_code(self): """Generates a random 2FA authentication code.""" auth_length=8 if self.config["debug"]: auth_length=6 return ''.join(random.choices('0123456789', k=auth_length)) def handle_echo(self, message, from_node, packet): """Echos the message back to the sender.""" response = message[5:].strip() self.schedule_command_message(response, from_node) def handle_time(self, message, from_node, packet): """Returns the current server time.""" current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") response = f"The current time is: {current_time}" self.schedule_command_message(response, from_node) def handle_help(self, message, from_node, packet): """Lists all available commands and their descriptions, including aliases.""" response = "Available commands: " def get_command_description(func): """Helper to get the docstring or a default value if none is present.""" return func.__doc__.strip() if func.__doc__ else "No description available" if self.is_trusted(message, from_node, packet): commands_dict = {cmd: get_command_description(func) for cmd, func in self.commands.items()} for cmd, alias in self.aliases.items(): if cmd in commands_dict: commands_dict[alias] = commands_dict.pop(cmd) response += self.dict_to_string(commands_dict, separator=" ") else: public_commands_dict = {cmd: get_command_description(func) for cmd, func in self.commands.items() if cmd in self.public_commands} response += self.dict_to_string(public_commands_dict, separator=" ") self.schedule_command_message(response, from_node) def on_receive(self, packet, interface): """Callback function for handling received packets and processing commands.""" try: if self.config["print_all_packets"]: print(f"\n{packet}") if 'decoded' in packet and packet['decoded']['portnum'] == 'TEXT_MESSAGE_APP' and packet['toId'] != '^all': message_bytes = packet['decoded']['payload'] message_string = message_bytes.decode('utf-8') from_node = packet['fromId'] # Get the node ID of the sender print(f"\nReceived from {from_node}: {message_string}\n> ", end="", flush=True) # Log from_node, to, message_id, time, toId self.log( from_node, packet['to'], message_string.replace("\n", " "), packet.get('id'), packet.get('rx_time'), packet['toId']) # Extract the command from the message command = message_string.split()[0].lower() # Check if the command is an alias and resolve it command = self.aliases.get(command, command) # Default handling to LLM handler = self.commands.get('!llm') # Determine if the command requires trust if command in self.private_commands and not self.is_trusted(message_string, from_node, packet): self.untrusted(message_string, from_node, packet) elif command in self.commands: handler = self.commands.get(command) # Execute the command handler(message_string, from_node, packet) except KeyError as e: print(f"Error processing packet: {e}") def handle_trust(self, message, from_node, packet): """Returns list of trusted nodes.""" response = "These nodes are currently trusted: " response += repr(self.trusted_nodes) self.schedule_command_message(response, from_node) def start(self): """Starts the command and control loop to send messages.""" print("Meshtastic Command and Control App") print("Type a message and press Enter to send. Type 'exit' to quit.") while True: try: text = input(">>> ") if text.lower() == "exit": print("Exiting the app.") break self.schedule_message(text) except KeyboardInterrupt: print("\nExiting the app.") break except Exception as e: print(f"An error occurred: {e}") self.interface.close() print("Interface closed.") if __name__ == "__main__": controller = MeshtasticController() controller.start()