diff options
author | Raghuram Subramani <raghus2247@gmail.com> | 2022-06-19 19:47:51 +0530 |
---|---|---|
committer | Raghuram Subramani <raghus2247@gmail.com> | 2022-06-19 19:47:51 +0530 |
commit | 4fd287655a72b9aea14cdac715ad5b90ed082ed2 (patch) | |
tree | 65d393bc0e699dd12d05b29ba568e04cea666207 /circuitpython/tools/cpboard.py | |
parent | 0150f70ce9c39e9e6dd878766c0620c85e47bed0 (diff) |
add circuitpython code
Diffstat (limited to 'circuitpython/tools/cpboard.py')
-rw-r--r-- | circuitpython/tools/cpboard.py | 696 |
1 files changed, 696 insertions, 0 deletions
diff --git a/circuitpython/tools/cpboard.py b/circuitpython/tools/cpboard.py new file mode 100644 index 0000000..1f399d1 --- /dev/null +++ b/circuitpython/tools/cpboard.py @@ -0,0 +1,696 @@ +#!/usr/bin/env python3 + +# SPDX-FileCopyrightText: Copyright (c) 2014-2016 Damien P. George +# SPDX-FileCopyrightText: Copyright (c) 2017 Paul Sokolovsky +# SPDX-FileCopyrightText: Copyright (c) 2017 Scott Shawcroft for Adafruit Industries +# SPDX-FileCopyrightText: Copyright (c) 2018 Noralf Trønnes +# SPDX-FileCopyrightText: 2014 MicroPython & CircuitPython contributors (https://github.com/adafruit/circuitpython/graphs/contributors) +# +# SPDX-License-Identifier: MIT + +import os +import re +import serial +import sys +import time + +import serial.tools.list_ports +import sh +import shutil + + +class CPboardError(BaseException): + pass + + +# supervisor/messages/default.h: +MSG_NEWLINE = b"\r\n" +MSG_SAFE_MODE_CRASH = b"Looks like our core CircuitPython code crashed hard. Whoops!" +MSG_SAFE_MODE_BROWN_OUT_LINE_1 = ( + b"The microcontroller's power dipped. Please make sure your power supply provides" +) +MSG_SAFE_MODE_BROWN_OUT_LINE_2 = ( + b"enough power for the whole circuit and press reset (after ejecting CIRCUITPY)." +) +MSG_WAIT_BEFORE_REPL = b"Press any key to enter the REPL. Use CTRL-D to reload." + + +class REPL: + CHAR_CTRL_A = b"\x01" + CHAR_CTRL_B = b"\x02" + CHAR_CTRL_C = b"\x03" + CHAR_CTRL_D = b"\x04" + + def __init__(self, board): + self.board = board + self.write_chunk_size = 32 + self.safe_mode = False + self.session = b"" + + def __enter__(self): + self.reset() + return self + + def __exit__(self, exception_type, exception_value, traceback): + pass + + @property + def serial(self): + return self.board.serial + + def read(self): + if self.serial.in_waiting: + data = self.serial.read(self.serial.in_waiting) + else: + data = b"" + self.session += data + return data + + def read_until(self, ending, timeout=10): + data = b"" + timeout_count = 0 + while True: + if data.endswith(ending): + break + elif self.serial.in_waiting > 0: + new_data = self.serial.read(1) + data += new_data + self.session += new_data + timeout_count = 0 + else: + timeout_count += 1 + if timeout is not None and timeout_count >= 100 * timeout: + raise TimeoutError(110, "timeout waiting for", ending) + time.sleep(0.01) + return data + + def write(self, data, chunk_size=None): + if chunk_size is None: + chunk_size = self.write_chunk_size + if not isinstance(data, bytes): + data = bytes(data, encoding="utf8") + + for i in range(0, len(data), chunk_size): + chunk = data[i : min(i + chunk_size, len(data))] + self.session += chunk + self.serial.write(chunk) + time.sleep(0.01) + + def reset(self): + # Use read() since serial.reset_input_buffer() fails with termios.error now and then + self.read() + self.session = b"" + self.write(b"\r" + REPL.CHAR_CTRL_C + REPL.CHAR_CTRL_C) # interrupt any running program + self.write(b"\r" + REPL.CHAR_CTRL_B) # enter or reset friendly repl + data = self.read_until(b">>> ") + + def execute(self, code, timeout=10, wait_for_response=True): + self.read() # Throw away + + self.write(REPL.CHAR_CTRL_A) + self.read_until(b"\r\n>") + + self.write(code) + + self.write(REPL.CHAR_CTRL_D) + if not wait_for_response: + return b"", b"" + self.read_until(b"OK") + + output = self.read_until(b"\x04", timeout=timeout) + output = output[:-1] + + error = self.read_until(b"\x04") + error = error[:-1] + + return output, error + + def run(self): + if self.safe_mode: + raise RuntimeError("Can't run in safe mode") + + self.reset() + + self.write(REPL.CHAR_CTRL_D) + data = self.read_until(b" output:\r\n") + if b"Running in safe mode" in data: + self.safe_mode = True + raise RuntimeError("Can't run in safe mode") + + # TODO: MSG_SAFE_MODE_CRASH + # TODO: BROWNOUT + + marker = MSG_NEWLINE + MSG_WAIT_BEFORE_REPL + MSG_NEWLINE + data = self.read_until(marker) + data = data.split(marker)[0] + + # Haven't found out why we have to strip off this... + if data.endswith(b"\r\n\r\n"): + data = data[:-4] + return data + + +class Disk: + def __init__(self, dev): + self.dev = os.path.realpath(dev) + self.mountpoint = None + with open("/etc/mtab", "r") as f: + mtab = f.read() + mount = [mount.split(" ") for mount in mtab.splitlines() if mount.startswith(self.dev)] + if mount: + self._path = mount[0][1] + else: + name = os.path.basename(dev) + sh.pmount("-tvfat", dev, name, _timeout=10) + self.mountpoint = "/media/" + name + self._path = self.mountpoint + + def __enter__(self): + return self + + def __exit__(self, exception_type, exception_value, traceback): + try: + self.close() + except: + pass + + @property + def path(self): + return self._path + + def close(self): + if not self.mountpoint: + return + mountpoint = self.mountpoint + self.mountpoint = None + + start_time = time.monotonic() + unmounted = False + while not unmounted and start_time - time.monotonic() < 30: + try: + sh.pumount(mountpoint) + unmounted = True + except sh.ErrorReturnCode_5: + time.sleep(0.1) + + def sync(self): + disk_device = os.path.basename(self.dev)[:-1] + os.sync() + # Monitor the block device so we know when the sync request is actually finished. + with open("/sys/block/" + disk_device + "/stat", "r") as f: + disk_inflight = 1 + last_wait_time = 0 + wait_time = 1 + while disk_inflight > 0 or wait_time > last_wait_time: + f.seek(0) + stats = f.read() + block_stats = stats.split() + disk_inflight = int(block_stats[8]) + last_wait_time = wait_time + wait_time = int(block_stats[9]) + + def copy(self, src, dst=None, sync=True): + if dst is None: + dst = os.path.basename(src) + shutil.copy(src, os.path.join(self.path, dst)) + if sync: + self.sync() + + +class Firmware: + def __init__(self, board): + self.board = board + + @property + def disk(self): + disks = self.board.get_disks() + if len(disks) != 1: + raise RuntimeError("Boot disk not found for: " + self.board.device) + return Disk(disks[0]) + + @property + def info(self): + with self.disk as disk: + fname = os.path.join(disk.path, "INFO_UF2.TXT") + with open(fname, "r") as f: + info = f.read() + lines = info.splitlines() + res = {} + res["header"] = lines[0] + for line in lines[1:]: + k, _, v = line.partition(":") + res[k.replace(":", "")] = v.strip() + return res + + def upload(self, fw): + with open(fw, "rb") as f: + header = f.read(32) + if header[0:4] != b"UF2\n": + raise ValueError("Only UF2 files are supported") + self.board.close() + with self.disk as disk: + disk.copy(fw, sync=False) + + +class CPboard: + @classmethod + def from_try_all(cls, name, **kwargs): + try: + return CPboard.from_build_name(name, **kwargs) + except ValueError: + pass + + vendor, _, product = name.partition(":") + if vendor and product: + return CPboard.from_usb(**kwargs, idVendor=int(vendor, 16), idProduct=int(product, 16)) + + return CPboard(name, **kwargs) + + @classmethod + def from_build_name(cls, name, **kwargs): + boards = { + #'arduino_zero' + "circuitplayground_express": (0x239A, 0x8019), + #'feather_m0_adalogger' : (0x239a, ), + #'feather_m0_basic' : (0x239a, ), + "feather_m0_express": (0x239A, 0x8023), + #'feather_m0_rfm69' : (0x239a, ), + #'feather_m0_rfm9x' : (0x239a, ), + #'feather_m0_supersized' : (0x239a, ), + #'feather_m4_express' : (0x239a, ), + #'gemma_m0' : (0x239a, ), + #'itsybitsy_m0_express' : (0x239a, ), + "itsybitsy_m4_express": (0x239A, 0x802C), + "metro_m0_express": (0x239A, 0x8014), + "metro_m4_express": (0x239A, 0x8021), + #'metro_m4_express_revb' : (0x239a, ), + #'pirkey_m0' : (0x239a, ), + #'trinket_m0' : (0x239a, ), + #'trinket_m0_haxpress' : (0x239a, ), + #'ugame10' + } + + try: + vendor, product = boards[name] + except KeyError: + raise ValueError("Unknown build name: " + name) + + return CPboard.from_usb(**kwargs, idVendor=vendor, idProduct=product) + + @classmethod + def from_build_name_bootloader(cls, name, **kwargs): + boards = { + #'arduino_zero' + #'circuitplayground_express' : (0x239a, ), + #'feather_m0_adalogger' : (0x239a, ), + #'feather_m0_basic' : (0x239a, ), + "feather_m0_express": (0x239A, 0x001B), + #'feather_m0_rfm69' : (0x239a, ), + #'feather_m0_rfm9x' : (0x239a, ), + #'feather_m0_supersized' : (0x239a, ), + #'feather_m4_express' : (0x239a, ), + #'gemma_m0' : (0x239a, ), + #'itsybitsy_m0_express' : (0x239a, ), + "itsybitsy_m4_express": (0x239A, 0x002B), + #'metro_m0_express' : (0x239a, 0x8014), + "metro_m4_express": (0x239A, 0x0021), + #'metro_m4_express_revb' : (0x239a, ), + #'pirkey_m0' : (0x239a, ), + #'trinket_m0' : (0x239a, ), + #'trinket_m0_haxpress' : (0x239a, ), + #'ugame10' + } + + try: + vendor, product = boards[name] + except KeyError: + raise ValueError("Unknown build name: " + name) + + board = CPboard.from_usb(**kwargs, idVendor=vendor, idProduct=product) + board.bootloader = True + return board + + @classmethod + def from_usb(cls, baudrate=115200, wait=0, timeout=10, **kwargs): + import usb.core + + dev = usb.core.find(**kwargs) + if not dev: + s = "Can't find USB device: " + args = [] + for x in kwargs.items(): + try: + args.append("%s=0x%x" % x) + except: + args.append("%s = %s" % x) + raise RuntimeError("Can't find USB device: " + ", ".join(args)) + return cls(dev, baudrate=baudrate, wait=wait, timeout=timeout) + + def __init__(self, device, baudrate=115200, wait=0, timeout=10): + self.device = device + self.usb_dev = None + try: + # Is it a usb.core.Device? + portstr = ":" + ".".join(map(str, device.port_numbers)) + ":" + except: + pass + else: + serials = [serial for serial in os.listdir("/dev/serial/by-path") if portstr in serial] + if len(serials) != 1: + raise RuntimeError("Can't find excatly one matching usb serial device") + self.device = os.path.realpath("/dev/serial/by-path/" + serials[0]) + self.usb_dev = device + + self.baudrate = baudrate + self.wait = wait + self.timeout = timeout + self.debug = True + self.mount = None + self.serial = None + self.bootloader = False + self.repl = REPL(self) + + def __enter__(self): + self.open() + return self + + def __exit__(self, exception_type, exception_value, traceback): + self.close() + + def open(self, baudrate=None, wait=None): + if self.serial: + return + if baudrate is None: + baudrate = self.baudrate + if wait is None: + wait = self.wait + + delayed = False + for attempt in range(wait + 1): + try: + self.serial = serial.Serial( + self.device, + baudrate=self.baudrate, + timeout=self.timeout, + inter_byte_timeout=10, + write_timeout=self.timeout, + ) + break + except (OSError, IOError): # Py2 and Py3 have different errors + if wait == 0: + continue + if attempt == 0: + sys.stdout.write("Waiting {} seconds for board ".format(wait)) + delayed = True + time.sleep(1) + sys.stdout.write(".") + sys.stdout.flush() + else: + if delayed: + print("") + raise CPboardError("failed to access " + self.device) + if delayed: + print("") + + def close(self): + if self.serial: + self.serial.close() + self.serial = None + + def exec(self, command, timeout=10, wait_for_response=True): + with self.repl as repl: + try: + output, error = repl.execute( + command, timeout=timeout, wait_for_response=wait_for_response + ) + except OSError as e: + if self.debug: + print("exec: session: ", self.repl.session) + raise CPboardError("timeout", e) + if error: + raise CPboardError("exception", output, error) + return output + + def eval(self, expression, timeout=10): + command = "print({})".format(expression) + with self.repl as repl: + output, error = repl.execute(command, timeout=timeout) + if error: + raise CPboardError("exception", output, error) + try: + res = eval(str(output, encoding="utf8")) + except: + raise CPboardError("failed to eval: %s" % output) + return res + + def _reset(self, mode="NORMAL"): + self.exec( + "import microcontroller;microcontroller.on_next_reset(microcontroller.RunMode.%s)" + % mode + ) + try: + self.exec("import microcontroller;microcontroller.reset()", wait_for_response=False) + except OSError: + pass + + def reset(self, safe_mode=False, delay=5, wait=10): + self._reset("SAFE_MODE" if safe_mode else "NORMAL") + self.close() + time.sleep(delay) + self.open(wait) + time.sleep(delay) + + def reset_to_bootloader(self, repl=False): + if repl: + self._reset("BOOTLOADER") + self.close() + else: + self.close() + s = serial.Serial(self.device, 1200, write_timeout=4, timeout=4) + s.close() + + def get_port_info(self): + portinfo = None + for port_iter in serial.tools.list_ports.comports(): + if port_iter.device == self.device: + portinfo = port_iter + break + return portinfo + + @property + def serial_number(self): + try: # Permissions are needed to read the value + return self.usb_dev.serial_number + except: + pass + p = self.get_port_info() + return p.serial_number if p else None + + def get_disks(self): + if self.usb_dev: + portstr = ":" + ".".join(map(str, self.usb_dev.port_numbers)) + ":" + return [ + "/dev/disk/by-path/" + disk + for disk in os.listdir("/dev/disk/by-path") + if portstr in disk + ] + serial = self.serial_number + if not serial: + raise RuntimeError("Serial number not found for: " + self.device) + return [ + "/dev/disk/by-id/" + disk for disk in os.listdir("/dev/disk/by-id") if serial in disk + ] + + @property + def disk(self): + disks = self.get_disks() + + part = [part for part in disks if "part1" in part] + if not part: + raise RuntimeError("Disk not found for: " + self.device) + + return Disk(part[0]) + + @property + def firmware(self): + return Firmware(self) + + def execfile_disk(self, filename): + with self.disk as disk: + disk.copy(filename, "code.py") + + with self.repl as repl: + try: + output = repl.run() + except OSError as e: + raise CPboardError("timeout", e) + except RuntimeError: + if self.repl.safe_mode: + raise PyboardError("Can't run in safe mode") + else: + raise + + return output + + def execfile(self, filename, timeout=10): + if os.environ.get("CPBOARD_EXEC_MODE") == "disk": + return self.execfile_disk(filename) + else: + with open(filename, "rb") as f: + pyfile = f.read() + return self.exec(pyfile, timeout=timeout) + + +# Implement just enough to make tests/run-tests work +PyboardError = CPboardError + + +class Pyboard: + def __init__(self, device, baudrate=115200, user="micro", password="python", wait=0): + self.board = CPboard.from_try_all(device, baudrate=baudrate, wait=wait) + with self.board.disk as disk: + disk.copy("skip_if.py") + + def close(self): + self.board.close() + + def enter_raw_repl(self): + self.board.open() + + def execfile(self, filename): + return self.board.execfile(filename) + + +def eval_namedtuple(board, command): + from collections import namedtuple + + s = board.exec("print(%s)" % command) + s = s.decode().strip() + items = [key.split("=") for key in s[1:-1].split(", ")] + keys = [item[0] for item in items] + vals = [item[1] for item in items] + nt = namedtuple("eval", keys) + res = nt(*[eval(val) for val in vals]) + return res + + +def os_uname(board): + return eval_namedtuple(board, "__import__('os').uname()") + + +def print_verbose(cargs, *args, **kwargs): + if cargs.verbose: + print(*args, flush=True, **kwargs) + + +def upload(args): + try: + board = CPboard.from_build_name_bootloader(args.board) + print_verbose(args, "Board is already in the bootloader") + except (ValueError, RuntimeError): + board = CPboard.from_try_all(args.board) + + print_verbose(args, "Serial number :", board.serial_number) + + if not (args.quiet or board.bootloader): + board.open() + print("Current version:", os_uname(board).version, flush=True) + + if not board.bootloader: + print_verbose(args, "Reset to bootloader...", end="") + board.reset_to_bootloader(repl=True) # Feather M0 Express doesn't respond to 1200 baud + time.sleep(5) + print_verbose(args, "done") + + print_verbose(args, "Bootloader:", board.firmware.info) + + print_verbose(args, "Upload firmware...", end="") + board.firmware.upload(args.firmware) + print_verbose(args, "done") + + print_verbose(args, "Wait for board...", end="") + time.sleep(5) + print_verbose(args, "done") + + if not args.quiet: + if board.bootloader: + board = CPboard.from_try_all(args.board) + board.open(wait=10) + print("New version:", os_uname(board).version, flush=True) + + +def print_error_exit(args, e): + if args.debug: + return False + if not args.quiet: + print(e, file=sys.stderr) + sys.exit(1) + + +def main(): + import argparse + + cmd_parser = argparse.ArgumentParser(description="CircuitPython Board Tool") + cmd_parser.add_argument("board", help="build_name, vid:pid or /dev/tty") + cmd_parser.add_argument("-f", "--firmware", help="upload UF2 firmware file") + cmd_parser.add_argument("-c", "--command", help="program passed in as string") + cmd_parser.add_argument("--tty", action="store_true", help="print tty") + cmd_parser.add_argument("--verbose", "-v", action="count", default=0, help="be verbose") + cmd_parser.add_argument("-q", "--quiet", action="store_true", help="be quiet") + cmd_parser.add_argument("--debug", action="store_true", help="raise exceptions") + args = cmd_parser.parse_args() + + if args.quiet: + args.verbose = 0 + args.debug = False + + if args.firmware: + try: + upload(args) + except BaseException as e: + if not print_error_exit(args, e): + raise + sys.exit(0) + + try: + board = CPboard.from_try_all(args.board) + except BaseException as e: + if not print_error_exit(args, e): + raise + + if args.verbose: + exec_mode = os.environ.get("CPBOARD_EXEC_MODE") + if exec_mode: + print("CPBOARD_EXEC_MODE =", exec_mode) + + # Make sure we can open serial + try: + with board: + pass + except BaseException as e: + if not print_error_exit(args, e): + raise + + if args.tty: + print(board.device) + elif args.command: + with board as b: + print(b.eval(args.command)) + else: + with board as b: + print("Device: ", end="") + if b.usb_dev: + print("%04x:%04x on " % (b.usb_dev.idVendor, b.usb_dev.idProduct), end="") + print(b.device) + print("Serial number:", b.serial_number) + uname = os_uname(b) + print("os.uname:") + print(" sysname:", uname.sysname) + print(" nodename:", uname.nodename) + print(" release:", uname.release) + print(" version:", uname.version) + print(" machine:", uname.machine) + + +if __name__ == "__main__": + main() |