Initial commit

This commit is contained in:
2026-01-17 21:18:44 +00:00
commit 458e35943a
4 changed files with 602 additions and 0 deletions

582
meshroller.py Normal file
View File

@@ -0,0 +1,582 @@
import meshtastic.serial_interface
from pubsub import pub
from datetime import datetime, timedelta
import textwrap
import meshtastic
import time
import random
from queue import Queue, Empty
import threading
from collections import defaultdict, deque
import string
import subprocess
import json
import os
import requests
import shlex
#### TODO ####
## Move logging into folder
## Telegram link
## Improve untrusted, and auth code gen
## Move dependencies into repository
## Use encrypted channels
## Bit Escrow
## Nostrmesh
## Log
## Qwen wake word on public channel, or 20 messages
## Curl
## post
class MeshtasticController:
def __init__(self):
self.logs = {}
self.load_logs()
self.config = {
"debug": False,
"mute_public_channel": True,
"mute_public_commands": True,
"print_all_packets": False,
}
self.ollama_host = "http://localhost:11434/api/chat"
self.ollama_url = ''
self.ollama_model = 'qwen2.5-coder:0.5b'
"""Initializes the Meshtastic interface and sets up the message listener."""
self.scheduled_messages = defaultdict(Queue)
self.sent_messages = defaultdict(deque)
self.message_ids = defaultdict(lambda: 1)
self.scheduler_thread = threading.Thread(target=self.message_sender, daemon=True)
self.scheduler_thread.start()
self.max_message_length = 210 # 233 Adjust this to the maximum payload length supported by your Meshtastic device
self.auth_bypass = False
self.interface = meshtastic.serial_interface.SerialInterface()
pub.subscribe(self.on_receive, 'meshtastic.receive')
self.trusted_nodes = {}
self.pending_auth_requests = {}
self.aliases = {"!about": "!help"}
self.public_commands = {"!echo", "!time", "!help", "!auth", "!packet", "!who", "!about", "!clear", "!ping", "hey", "!llm"}
self.commands = { # Dictionary for mapping commands to methods.
"!echo": self.handle_echo,
"!time": self.handle_time,
"!help": self.handle_help,
"!packet": self.handle_packet,
"!who": self.handle_who,
"!auth": self.handle_auth,
"!info": self.handle_system_info,
"!trust": self.handle_trust,
#"!config": self.handle_config,
"!clear": self.handle_clear_messages,
"!spam": self.handle_spam,
"!resend": self.handle_resend,
"!receipt": self.handle_receipt,
"!source": self.handle_source,
"!ping": self.handle_ping,
"!llm": self.handle_llm,
"hey": self.handle_hey,
"\N{LIZARD}": self.handle_lizard,
}
self.private_commands = set(self.commands.keys()) - self.public_commands
self.config_file = self.get_config_file_path()
if os.path.exists(self.config_file):
self.ollama_url = self.load_config(self.config_file, 'url')
def load_logs(self):
# Load existing logs from files into memory
for toId in range(1, 26): # Assuming we have up to 25 different To IDs (for example)
log_file_path = f"log_{toId}.json"
if os.path.exists(log_file_path):
with open(log_file_path, 'r') as log_file:
logs = json.load(log_file)
for log_entry in logs:
time = int(datetime.strptime(log_entry['timestamp'], '%Y-%m-%dT%H:%M:%S.%fZ').timestamp())
if toId not in self.logs:
self.logs[toId] = []
self.logs[toId].append((time, (log_entry['timestamp'], f"{log_entry['from']}, {log_entry['to']}, Message: {log_entry['message']}, ID: {log_entry['id']}, Time: {log_entry['time']}")))
def log(self, from_node, to, message, message_id, time, toId, inbound=True):
# Ensure the log file exists and is accessible
log_file_path = ""
if inbound:
log_file_path = f"log_{from_node}.json"
else:
log_file_path = f"log_{toId}.json"
# Prepare the log entry as a dictionary
log_entry = {
"timestamp": datetime.now().isoformat(),
"from": from_node,
"to": to,
"message": message,
"id": message_id,
"time": time,
"toId": toId
}
# Append the log entry to the file or create a new one if it doesn't exist
try:
with open(log_file_path, 'r') as log_file:
logs = json.load(log_file)
logs.append(log_entry)
except (FileNotFoundError, json.JSONDecodeError):
logs = [log_entry]
# Write the updated logs back to the file
with open(log_file_path, 'w') as log_file:
json.dump(logs, log_file, indent=4)
# Ensure we only keep the most recent 25 messages in memory
if toId not in self.logs:
self.logs[toId] = []
self.logs[toId].append((time, (log_entry['timestamp'], f"{log_entry['from']}, {log_entry['to']}, Message: {log_entry['message']}, ID: {log_entry['id']}, Time: {log_entry['time']}")))
while len(self.logs[toId]) > 25:
oldest_time, _ = self.logs[toId].pop(0)
with open(log_file_path, 'r') as log_file:
logs = json.load(log_file)
with open(log_file_path, 'w') as log_file:
updated_logs = [log for log in logs if log['timestamp'] != oldest_time]
json.dump(updated_logs, log_file, indent=4)
def get_chat_logs(self, chat_id):
# Load chat logs from file
log_file_path = f"log_{chat_id}.json"
if os.path.exists(log_file_path):
with open(log_file_path, 'r') as log_file:
try:
logs = json.load(log_file)
except json.JSONDecodeError:
logs = []
clean_logs = []
for log_entry in logs:
#print(f'Log entry: {log_entry}')
message_parts = {
"role": "assistant" if log_entry['from'] == "Qwen" else "user",
"content": log_entry['message']
}
print(f'Message parts: {message_parts}')
clean_logs.append(message_parts)
return clean_logs
else:
return []
def handle_llm(self, message, from_node, packet):
"""Runs the message through ollama."""
try:
chat_logs = self.get_chat_logs(from_node)
# Prepare the message history and parameters for the Alibaba Cloud API
messages = [{"role": "user", "content": msg['content']} for msg in chat_logs]
#print(f'Messages {messages}')
payload = {
"model": self.ollama_model,
"messages": messages,
"parameters": {
"temperature": 0.7,
"max_tokens": 500
},
"stream": False
}
#print(payload)
# Make the POST request to the Alibaba Cloud API
response = requests.post(self.ollama_host, json=payload)
if response.status_code == 200:
next_message = response.json().get('message', {})
#print(f"Assistant: {next_message.get('content')}")
self.schedule_command_message(next_message['content'], from_node)
else:
error_message = f"Error: {response.status_code} - {response.text}"
self.schedule_command_message(error_message, from_node)
except subprocess.CalledProcessError as e:
# Handle any errors that occur during the subprocess execution
error_message = f"Error running LLM: {e.stderr}"
self.schedule_command_message(error_message, from_node)
def get_config_file_path(self):
current_dir = os.getcwd()
config_file = "config.json"
return os.path.join(current_dir, config_file)
def load_config(self, file, key):
if os.path.exists(file):
with open(file, 'r') as self.config_file:
config = json.load(self.config_file)
value = config[key]
if self.config["debug"]: print(value)
return value
def handle_lizard(self, message, from_node, packet):
"""\N{LIZARD}"""
self.schedule_command_message("\N{LIZARD}\N{LIZARD}\N{LIZARD}\N{LIZARD}\N{LIZARD}", from_node)
def handle_hey(self, message, from_node, packet):
"""Introduces itself"""
self.schedule_command_message("I'm a control server written in python, send me !help for a list of commands.", from_node)
def schedule_command_message(self, message, destination=None):
if destination is None and self.config["mute_public_commands"] is True:
return # don't send public messages from commands
self.schedule_message(message, destination=destination)
def handle_ping(self, message, from_node, packet):
self.schedule_command_message("pong", from_node)
def handle_source(self, message, from_node, packet):
"""Sends the entire source code in chunks."""
try:
with open(__file__, 'r') as f:
source_code = f.read()
self.schedule_command_message(source_code, from_node)
except Exception as e:
error_message = f"Failed to send source code: {e}"
self.schedule_command_message(error_message, from_node)
def handle_receipt(self, message, from_node, packet):
"""Confirm receipt of a message chunk and clean up sent messages."""
parts = message.split()
if len(parts) != 2:
response = "Usage: !receipt <message_id>"
self.schedule_command_message(response, from_node)
return
message_id = parts[1]
# Clean up sent messages up to and including the received message ID
self.sent_messages[from_node] = [
msg for msg in self.sent_messages[from_node] if not msg.startswith(f"{message_id}|")
]
response = f"Receipt confirmed for message ID {message_id}."
self.schedule_command_message(response, from_node)
def handle_resend(self, message, from_node, packet):
"""Request for resending missing message chunks."""
parts = message.split()
if len(parts) != 2:
response = "Usage: !resend <message_id>"
self.schedule_command_message(response, from_node)
return
message_id = parts[1]
if from_node in self.sent_messages:
for sent_message in self.sent_messages[from_node]:
if sent_message.startswith(f"{message_id}-"):
self.schedule_command_message(sent_message, from_node)
response = f"Resend request for message ID {message_id} processed."
self.schedule_command_message(response, from_node)
def handle_spam(self, message, from_node, packet):
"""Sends a series of messages with a specified number of random characters."""
parts = message.split()
if len(parts) == 2 and not parts[1].isdigit():
response = "Usage: !spam [num_messages] [num_characters]"
self.schedule_command_message(response, from_node)
return
num_messages = int(parts[1]) if len(parts) > 1 and parts[1] else 2
num_characters = int(parts[2]) if len(parts) > 2 and parts[2].isdigit() else None
for i in range(num_messages):
current_message_id = self.message_ids[from_node] + i # Calculate current message ID
if num_characters is None: # Generate a random string that fits within a single chunk
temp_label_length = self.calculate_label_length(current_message_id)
adjusted_num_characters = self.max_message_length - temp_label_length # Adjust the content length
else:
adjusted_num_characters = num_characters
spam_content = ''.join(random.choices(string.ascii_letters + string.digits, k=adjusted_num_characters))
self.schedule_command_message(spam_content, from_node)
def calculate_label_length(self, message_id, chunk_num = 1, total_chunks = 1):
"""Calculate the label length based on the message ID, chunk number, and total chunks."""
return (
len(str(message_id)) + # Length of the message ID
len(str(chunk_num)) + # Length of the current chunk number
len(str(total_chunks)) + # Length of the total number of chunks
4 # For the "-/: " part of the label
)
def schedule_message(self, message, destination=None):
"""Adds messages to the sending queue with unique IDs, accounting for message label lengths."""
try:
trimmed_message = message.strip()
message_id = self.message_ids[destination]
message_chunks = []
current_pos = 0
while current_pos < len(trimmed_message):
chunk_num = len(message_chunks) + 1
#print(message_id, chunk_num, len(message_chunks))
label_length = self.calculate_label_length(message_id, chunk_num, len(message_chunks)) # is this right?
available_length = self.max_message_length - label_length
if available_length > 0:
chunk = trimmed_message[current_pos:current_pos + available_length]
message_chunks.append(chunk)
current_pos += len(chunk)
else:
break # Stop if the calculated label is longer than allowed message length
total_chunks = len(message_chunks)
for i, chunk in enumerate(message_chunks):
labeled_chunk = f"{message_id}-{i + 1}/{total_chunks}: {chunk}"
self.scheduled_messages[destination].put(labeled_chunk)
self.sent_messages[destination].append(labeled_chunk) # Track sent messages for potential resending
message_id += 1 # Increment the message ID for the next chunk
self.message_ids[destination] = message_id # Update the message_id tracker after all chunks are labeled
except Exception as e:
print(f"Failed to schedule message: {e}")
def handle_clear_messages(self, message, from_node, packet):
"""Clears the scheduled messages for the specified node."""
if from_node in self.scheduled_messages:
del self.scheduled_messages[from_node]
response = f"Cleared all scheduled messages for node {from_node}."
else:
response = f"No scheduled messages found for node {from_node}."
self.schedule_command_message(response, from_node)
def message_sender(self):
"""Scheduler that sends out messages."""
while True:
destinations = list(self.scheduled_messages.keys())
if destinations:
for destination in destinations:
try:
while not self.scheduled_messages[destination].empty():
print("Sleeping for 2 seconds before sending a message.")
time.sleep(2)
message = self.scheduled_messages[destination].get_nowait()
if self.config["debug"]:
print(f"Debug: Message to {destination or 'public channel'}: {message}")
elif self.config["mute_public_channel"] and not destination:
print(f"Public channel muted: {message}")
else:
#print(f"Sending to {destination or 'public channel'}: {message}")
self.interface.sendText(message, destinationId=destination)
self.log(
from_node="Qwen",
to=destination,
message=message.replace("\n", " "),
message_id=None, # Assuming we don't have an ID for outgoing messages
time=datetime.now().isoformat(),
toId=destination or "public_channel",
inbound=False
)
self.sent_messages[destination].append(message) # Store the sent message
except Exception as e:
print(f"Failed to send message: {e}")
time.sleep(0.1)
def is_trusted(self, message, from_node, packet):
"""Checks if a node is in the trusted nodes list based on the packet data."""
if self.auth_bypass:
print(f"Auth bypass: {self.auth_bypass}")
return True
node_id = packet.get('fromId')
if node_id in self.trusted_nodes:
trusted_node_info = self.trusted_nodes[node_id]
if trusted_node_info.get('config', {}) == self.extract_static_node_info(packet):
if trusted_node_info.get('auth', False):
return True
return False
def handle_who(self, message, from_node, packet):
"""Replies with sender name."""
self.schedule_command_message(f"Message received from: {from_node}", from_node)
def handle_packet(self, message, from_node, packet):
"""Replies with info about the recieved packet."""
self.schedule_command_message(repr(packet), from_node)
def dict_to_string(self, dictionary, separator=", "):
"""Converts a dictionary into a flattened string with key-value pairs."""
flattened_items = []
for key, value in dictionary.items():
if isinstance(value, dict): # Handle the parent key once, followed by the nested dictionary values
flattened_items.append(f"{key}: {self.dict_to_string(value, separator)}")
else:
flattened_items.append(f"{key}: {value}")
return separator.join(flattened_items)
def handle_config(self, message, from_node, packet):
"""Command for modifying or viewing configuration settings."""
parts = message.split()
response = ""
if len(parts) == 2 and parts[1].lower() == "info":
response = "Current Configuration: "
response += self.dict_to_string(self.config)
else:
if len(parts) != 3:
response = "Usage: !config <setting> <value> or !config info"
else:
setting = parts[1]
value = parts[2].lower()
if setting in self.config:
if isinstance(self.config[setting], bool):
if value in ["true", "on", "yes", "1"]:
self.config[setting] = True
elif value in ["false", "off", "no", "0"]:
self.config[setting] = False
else:
response = f"Invalid value for {setting}. Must be 'true' or 'false'."
else:
response = f"Unsupported setting type for {setting}."
if not response:
response = f"Configuration '{setting}' has been set to {self.config[setting]}."
else:
response = f"Configuration setting '{setting}' does not exist."
self.schedule_command_message(response, from_node)
def untrusted(self, message, from_node, packet):
"""Action for untrusted requests.""" # Todo: Improve logging, and security around untrusted spam
#if self.config["debug"]:
print(f"Untrusted Node: {from_node}, Requested: {message}\nPacket: \n{packet}")
def handle_system_info(self, message, from_node, packet):
"""Sends the system information of the local node."""
response = ""
try:
if self.interface.nodes:
for node in self.interface.nodes.values():
if node["num"] == self.interface.myInfo.my_node_num:
response = "System Information: "
response += repr(node)
self.schedule_command_message(response, from_node)
return
response = "Failed to retrieve system info: Node not found."
except Exception as e:
response = f"Failed to retrieve system info: {e}"
self.schedule_command_message(response, from_node)
def extract_static_node_info(self, packet):
"""Extracts static node information that is unlikely to change."""
static_info = {
'node_num': packet.get('from'),
'node_id': packet.get('fromId'),
}
return static_info
def handle_auth(self, message, from_node, packet):
"""Handles the authentication process for adding trusted nodes.""" # Improve auth
try: # Extract the auth code from the message, if any
parts = message.split()
if len(parts) > 1: # Auth code provided, verify it
auth_code = parts[1]
if from_node in self.pending_auth_requests:
request_info = self.pending_auth_requests[from_node]
if request_info['auth_code'] == auth_code: # Correct auth code
self.trusted_nodes[from_node] = {
'config': request_info['config'],
'auth': True,
'timestamp': time.time()
}
response = "Authentication successful. Node trusted."
del self.pending_auth_requests[from_node]
else: # Incorrect auth code
response = "Authentication failed. Incorrect code."
else:
response = "No pending authentication request found."
else: # No auth code provided, generate one
auth_code = self.generate_auth_code()
node_info = self.extract_static_node_info(packet)
# Store in pending auth requests
self.pending_auth_requests[from_node] = {
'auth_code': auth_code,
'config': node_info,
'timestamp': time.time()
}
print(f"2FA Code for node {from_node}: {auth_code}")
response = "Please resend !auth followed by the 2FA code displayed in terminal. EX !auth 123456"
except Exception as e:
response = f"Authentication process failed: {e}"
self.schedule_command_message(response, from_node)
def generate_auth_code(self):
"""Generates a random 2FA authentication code."""
auth_length=8
if self.config["debug"]:
auth_length=6
return ''.join(random.choices('0123456789', k=auth_length))
def handle_echo(self, message, from_node, packet):
"""Echos the message back to the sender."""
response = message[5:].strip()
self.schedule_command_message(response, from_node)
def handle_time(self, message, from_node, packet):
"""Returns the current server time."""
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
response = f"The current time is: {current_time}"
self.schedule_command_message(response, from_node)
def handle_help(self, message, from_node, packet):
"""Lists all available commands and their descriptions, including aliases."""
response = "Available commands: "
def get_command_description(func):
"""Helper to get the docstring or a default value if none is present."""
return func.__doc__.strip() if func.__doc__ else "No description available"
if self.is_trusted(message, from_node, packet):
commands_dict = {cmd: get_command_description(func) for cmd, func in self.commands.items()}
for cmd, alias in self.aliases.items():
if cmd in commands_dict:
commands_dict[alias] = commands_dict.pop(cmd)
response += self.dict_to_string(commands_dict, separator=" ")
else:
public_commands_dict = {cmd: get_command_description(func) for cmd, func in self.commands.items() if cmd in self.public_commands}
response += self.dict_to_string(public_commands_dict, separator=" ")
self.schedule_command_message(response, from_node)
def on_receive(self, packet, interface):
"""Callback function for handling received packets and processing commands."""
try:
if self.config["print_all_packets"]: print(f"\n{packet}")
if 'decoded' in packet and packet['decoded']['portnum'] == 'TEXT_MESSAGE_APP':
message_bytes = packet['decoded']['payload']
message_string = message_bytes.decode('utf-8')
from_node = packet['fromId'] # Get the node ID of the sender
print(f"\nReceived from {from_node}: {message_string}\n> ", end="", flush=True)
# Log from_node, to, message_id, time, toId
self.log(
from_node,
packet['to'],
message_string.replace("\n", " "),
packet.get('id'),
packet.get('rx_time'),
packet['toId'])
# Extract the command from the message
command = message_string.split()[0].lower()
# Check if the command is an alias and resolve it
command = self.aliases.get(command, command)
# Default handling to LLM
handler = self.commands.get('!llm')
# Determine if the command requires trust
if command in self.private_commands and not self.is_trusted(message_string, from_node, packet):
self.untrusted(message_string, from_node, packet)
elif command in self.commands:
handler = self.commands.get(command)
# Execute the command
handler(message_string, from_node, packet)
except KeyError as e:
print(f"Error processing packet: {e}")
def handle_trust(self, message, from_node, packet):
"""Returns list of trusted nodes."""
response = "These nodes are currently trusted: "
response += repr(self.trusted_nodes)
self.schedule_command_message(response, from_node)
def start(self):
"""Starts the command and control loop to send messages."""
print("Meshtastic Command and Control App")
print("Type a message and press Enter to send. Type 'exit' to quit.")
while True:
try:
text = input(">>> ")
if text.lower() == "exit":
print("Exiting the app.")
break
self.schedule_message(text)
except KeyboardInterrupt:
print("\nExiting the app.")
break
except Exception as e:
print(f"An error occurred: {e}")
self.interface.close()
print("Interface closed.")
if __name__ == "__main__":
controller = MeshtasticController()
controller.start()