import cherrypy import threading import time from pubsub import pub import meshtastic import meshtastic.tcp_interface import json import datetime import hashlib import random class MeshAPI(object): def __init__(self, hostname='meshtastic.local'): """ Initialize the MeshAPI. Args: hostname (str): The hostname or IP address of the Meshtastic device. """ self.hostname = hostname self.iface = None self.whoami = None self.device_telemetry_cache = {} # Cache to store device telemetry data self.environment_telemetry_cache = {} # Cache to store environment telemetry data self.message_cache = {} # Cache to store messages self.seen_nodes = {} # Dictionary to store information about seen nodes self.connection_attempts = 0 # Number of connection attempts made self.last_connection_time = None # Time of the last successful connection self.connection_thread = threading.Thread(target=self.connect_to_mesh, daemon=True) def on_receive(self, packet, interface): """ Callback function called when a packet arrives. """ # Debug print(f"Received: from={packet.get('from')} to={packet.get('to')} portnum={packet['decoded'].get('portnum', 'UNKNOWN')}") # Check if the received packet contains telemetry data if 'decoded' in packet and 'telemetry' in packet['decoded']: node_id = packet['from'] telemetry_data = packet['decoded']['telemetry'] # Check if telemetry data contains deviceMetrics if 'deviceMetrics' in telemetry_data: self.device_telemetry_cache[node_id] = { "time": telemetry_data["time"], "deviceMetrics": { "batteryLevel": telemetry_data["deviceMetrics"].get("batteryLevel"), "voltage": telemetry_data["deviceMetrics"].get("voltage"), "channelUtilization": telemetry_data["deviceMetrics"].get("channelUtilization"), "airUtilTx": telemetry_data["deviceMetrics"].get("airUtilTx") } } # Check if telemetry data contains environmentMetrics if 'environmentMetrics' in telemetry_data: self.environment_telemetry_cache[node_id] = { "time": telemetry_data["time"], "environmentMetrics": { "temperature": telemetry_data["environmentMetrics"].get("temperature"), "relativeHumidity": telemetry_data["environmentMetrics"].get("relativeHumidity"), "barometricPressure": telemetry_data["environmentMetrics"].get("barometricPressure") } } # Check if the received packet is a message if 'decoded' in packet and 'text' in packet['decoded']: node_id = packet['from'] message_data = packet['decoded']['text'] internal_message_id = self.generate_internal_message_id(node_id, message_data) self.message_cache[internal_message_id] = {"node_id": node_id, "message": message_data} if len(self.message_cache) > 100: self.message_cache.pop(0) # Update seen nodes dictionary if 'fromId' in packet: node_id = packet['from'] long_id = packet['fromId'] if node_id not in self.seen_nodes: self.seen_nodes[node_id] = {"long_id": long_id} def on_connection(self, interface, topic=pub.AUTO_TOPIC): """ Callback function called when we (re)connect to the radio. """ self.last_connection_time = time.time() self.connection_attempts += 1 self.whoami = self.iface.myInfo.my_node_num print(f"Connected to Meshtastic device at {self.hostname}") @cherrypy.expose def index(self): """ Exposed endpoint for the index page. """ html = """ Meshttpd Poor Man's Swagger

Meshttpd Poor Man's Swagger

Endpoints:

Made by luhf for Monocul.us Mesh

""" return html @cherrypy.expose @cherrypy.tools.json_out() def send_message(self, message=None, node_id=None): """ Exposed endpoint for sending a message. Args: message (str): The message to be sent. node_id (optional, str): The ID of the node to which the message should be sent. Returns: dict: A JSON object containing the status of the operation. Raises: 404 (Missing parameters): If the `message` parameter is missing. 400 (Invalid node ID): If the specified node ID is invalid. """ if message is None: cherrypy.response.status = 404 return {"error": "Missing parameters: message"} try: if self.iface: if node_id: try: self.iface.sendText(message, node_id) except meshtastic.node_manager.NodeIdNotFound: raise cherrypy.HTTPError(400, "Invalid node ID") else: self.iface.sendText(message) return {"status": "success", "message": "Message sent successfully"} else: return {"status": "error", "message": "Mesh interface not initialized"} except Exception as ex: return {"status": "error", "message": str(ex)} @cherrypy.expose @cherrypy.tools.json_out() def get_device_telemetry(self): """ Exposed endpoint for retrieving device telemetry data from connected nodes. """ return self.device_telemetry_cache @cherrypy.expose @cherrypy.tools.json_out() def get_environment_telemetry(self): """ Exposed endpoint for retrieving environment telemetry data from connected nodes. """ return self.environment_telemetry_cache @cherrypy.expose @cherrypy.tools.json_out() def get_last_messages(self): """ Exposed endpoint for retrieving the last X cached messages. """ return self.message_cache @cherrypy.expose @cherrypy.tools.json_out() def delete_message(self, message_id=None): """ Exposed endpoint for deleting a message from the cache. Args: message_id (str): The ID of the message to be deleted. Returns: dict: A JSON object containing the status of the operation. Raises: 404 (Missing parameters): If the `message_id` parameter is missing. 400 (Invalid message ID): If the specified message ID is invalid. """ if message_id is None: cherrypy.response.status = 404 return {"error": "Missing parameters: message_id"} if message_id not in self.message_cache: raise cherrypy.HTTPError(400, "Invalid message ID") del self.message_cache[message_id] return {"status": "success", "message": "Message deleted successfully"} @cherrypy.expose @cherrypy.tools.json_out() def nodes(self): """ Exposed endpoint for listing all seen nodes. """ return self.seen_nodes @cherrypy.expose @cherrypy.tools.json_out() def status(self): """ Exposed endpoint for checking connection status. """ return { "connected": bool(self.iface), "nodeid": str(self.whoami), "last_connection_time": self.last_connection_time, "total_connection_attempts": self.connection_attempts } def connect_to_mesh(self): """ Function to connect to the Meshtastic device. """ while True: try: self.iface = meshtastic.tcp_interface.TCPInterface(hostname=self.hostname) return except Exception as ex: print(f"Error: Could not connect to {self.hostname} {ex}") time.sleep(1) # Wait for 1 second before attempting to reconnect def generate_internal_message_id(self, node_id, message): """ Function to generate an internal message ID using MD5 hashing of random data, node ID, and message. Args: node_id (str): The ID of the node. message (str): The message. Returns: str: The generated internal message ID. """ random_data = str(random.random()).encode() hash_input = random_data + str(node_id).encode() + message.encode() hash_object = hashlib.md5(hash_input) return hash_object.hexdigest()[:10] def start(self): """ Function to start the connection thread. """ self.connection_thread.start() if __name__ == '__main__': # Change the IP address of the meshtastic device here. # With some networks the local domain will not work, if that is your case use the IP address of the device. mesh_api = MeshAPI(hostname='meshtastic.local') # Start the connection thread mesh_api.start() # Subscribe to receive events pub.subscribe(mesh_api.on_receive, "meshtastic.receive") pub.subscribe(mesh_api.on_connection, "meshtastic.connection.established") cherrypy.tree.mount(mesh_api, '/api/mesh') #Please use a reverse proxy if you are planning to expose this to the internet! #Set server.socket_host to 127.0.0.1 cherrypy.config.update({'server.socket_host': '0.0.0.0'}) cherrypy.engine.start() cherrypy.engine.block()