diff options
author | Raghuram Subramani <raghus2247@gmail.com> | 2022-06-19 19:47:51 +0530 |
---|---|---|
committer | Raghuram Subramani <raghus2247@gmail.com> | 2022-06-19 19:47:51 +0530 |
commit | 4fd287655a72b9aea14cdac715ad5b90ed082ed2 (patch) | |
tree | 65d393bc0e699dd12d05b29ba568e04cea666207 /circuitpython/tools/pyboard.py | |
parent | 0150f70ce9c39e9e6dd878766c0620c85e47bed0 (diff) |
add circuitpython code
Diffstat (limited to '')
-rwxr-xr-x | circuitpython/tools/pyboard.py | 766 |
1 files changed, 766 insertions, 0 deletions
diff --git a/circuitpython/tools/pyboard.py b/circuitpython/tools/pyboard.py new file mode 100755 index 0000000..a43a4d0 --- /dev/null +++ b/circuitpython/tools/pyboard.py @@ -0,0 +1,766 @@ +#!/usr/bin/env python3 + +# SPDX-FileCopyrightText: Copyright (c) 2014-2021 Damien P. George +# SPDX-FileCopyrightText: Copyright (c) 2017 Paul Sokolovsky +# SPDX-FileCopyrightText: 2014 MicroPython & CircuitPython contributors (https://github.com/adafruit/circuitpython/graphs/contributors) +# +# SPDX-License-Identifier: MIT + +""" +pyboard interface + +This module provides the Pyboard class, used to communicate with and +control a MicroPython device over a communication channel. Both real +boards and emulated devices (e.g. running in QEMU) are supported. +Various communication channels are supported, including a serial +connection, telnet-style network connection, external process +connection. + +Example usage: + + import pyboard + pyb = pyboard.Pyboard('/dev/ttyACM0') + +Or: + + pyb = pyboard.Pyboard('192.168.1.1') + +Then: + + pyb.enter_raw_repl() + pyb.exec('import pyb') + pyb.exec('pyb.LED(1).on()') + pyb.exit_raw_repl() + +Note: if using Python2 then pyb.exec must be written as pyb.exec_. +To run a script from the local machine on the board and print out the results: + + import pyboard + pyboard.execfile('test.py', device='/dev/ttyACM0') + +This script can also be run directly. To execute a local script, use: + + ./pyboard.py test.py + +Or: + + python pyboard.py test.py + +""" + +import sys +import time +import os +import ast + +try: + stdout = sys.stdout.buffer +except AttributeError: + # Python2 doesn't have buffer attr + stdout = sys.stdout + + +def stdout_write_bytes(b): + b = b.replace(b"\x04", b"") + stdout.write(b) + stdout.flush() + + +class PyboardError(Exception): + pass + + +class TelnetToSerial: + def __init__(self, ip, user, password, read_timeout=None): + self.tn = None + import telnetlib + + self.tn = telnetlib.Telnet(ip, timeout=15) + self.read_timeout = read_timeout + if b"Login as:" in self.tn.read_until(b"Login as:", timeout=read_timeout): + self.tn.write(bytes(user, "ascii") + b"\r\n") + + if b"Password:" in self.tn.read_until(b"Password:", timeout=read_timeout): + # needed because of internal implementation details of the telnet server + time.sleep(0.2) + self.tn.write(bytes(password, "ascii") + b"\r\n") + + if b"for more information." in self.tn.read_until( + b'Type "help()" for more information.', timeout=read_timeout + ): + # login successful + from collections import deque + + self.fifo = deque() + return + + raise PyboardError("Failed to establish a telnet connection with the board") + + def __del__(self): + self.close() + + def close(self): + if self.tn: + self.tn.close() + + def read(self, size=1): + while len(self.fifo) < size: + timeout_count = 0 + data = self.tn.read_eager() + if len(data): + self.fifo.extend(data) + timeout_count = 0 + else: + time.sleep(0.25) + if self.read_timeout is not None and timeout_count > 4 * self.read_timeout: + break + timeout_count += 1 + + data = b"" + while len(data) < size and len(self.fifo) > 0: + data += bytes([self.fifo.popleft()]) + return data + + def write(self, data): + self.tn.write(data) + return len(data) + + def inWaiting(self): + n_waiting = len(self.fifo) + if not n_waiting: + data = self.tn.read_eager() + self.fifo.extend(data) + return len(data) + else: + return n_waiting + + +class ProcessToSerial: + "Execute a process and emulate serial connection using its stdin/stdout." + + def __init__(self, cmd): + import subprocess + + self.subp = subprocess.Popen( + cmd, + bufsize=0, + shell=True, + preexec_fn=os.setsid, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + ) + + # Initially was implemented with selectors, but that adds Python3 + # dependency. However, there can be race conditions communicating + # with a particular child process (like QEMU), and selectors may + # still work better in that case, so left inplace for now. + # + # import selectors + # self.sel = selectors.DefaultSelector() + # self.sel.register(self.subp.stdout, selectors.EVENT_READ) + + import select + + self.poll = select.poll() + self.poll.register(self.subp.stdout.fileno()) + + def close(self): + import signal + + os.killpg(os.getpgid(self.subp.pid), signal.SIGTERM) + + def read(self, size=1): + data = b"" + while len(data) < size: + data += self.subp.stdout.read(size - len(data)) + return data + + def write(self, data): + self.subp.stdin.write(data) + return len(data) + + def inWaiting(self): + # res = self.sel.select(0) + res = self.poll.poll(0) + if res: + return 1 + return 0 + + +class ProcessPtyToTerminal: + """Execute a process which creates a PTY and prints secondary PTY as + first line of its output, and emulate serial connection using + this PTY.""" + + def __init__(self, cmd): + import subprocess + import re + import serial + + self.subp = subprocess.Popen( + cmd.split(), + bufsize=0, + shell=False, + preexec_fn=os.setsid, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + pty_line = self.subp.stderr.readline().decode("utf-8") + m = re.search(r"/dev/pts/[0-9]+", pty_line) + if not m: + print("Error: unable to find PTY device in startup line:", pty_line) + self.close() + sys.exit(1) + pty = m.group() + # rtscts, dsrdtr params are to workaround pyserial bug: + # http://stackoverflow.com/questions/34831131/pyserial-does-not-play-well-with-virtual-port + self.ser = serial.Serial(pty, interCharTimeout=1, rtscts=True, dsrdtr=True) + + def close(self): + import signal + + os.killpg(os.getpgid(self.subp.pid), signal.SIGTERM) + + def read(self, size=1): + return self.ser.read(size) + + def write(self, data): + return self.ser.write(data) + + def inWaiting(self): + return self.ser.inWaiting() + + +class Pyboard: + def __init__( + self, device, baudrate=115200, user="micro", password="python", wait=0, exclusive=True + ): + self.in_raw_repl = False + self.use_raw_paste = True + if device.startswith("exec:"): + self.serial = ProcessToSerial(device[len("exec:") :]) + elif device.startswith("execpty:"): + self.serial = ProcessPtyToTerminal(device[len("qemupty:") :]) + elif device and device[0].isdigit() and device[-1].isdigit() and device.count(".") == 3: + # device looks like an IP address + self.serial = TelnetToSerial(device, user, password, read_timeout=10) + else: + import serial + + # Set options, and exclusive if pyserial supports it + serial_kwargs = {"baudrate": baudrate, "interCharTimeout": 1} + if serial.__version__ >= "3.3": + serial_kwargs["exclusive"] = exclusive + + delayed = False + for attempt in range(wait + 1): + try: + self.serial = serial.Serial(device, **serial_kwargs) + break + except (OSError, IOError): # Py2 and Py3 have different errors + if wait == 0: + continue + if attempt == 0: + sys.stdout.write("Waiting {} seconds for pyboard ".format(wait)) + delayed = True + time.sleep(1) + sys.stdout.write(".") + sys.stdout.flush() + else: + if delayed: + print("") + raise PyboardError("failed to access " + device) + if delayed: + print("") + + def close(self): + self.serial.close() + + def read_until(self, min_num_bytes, ending, timeout=10, data_consumer=None): + # if data_consumer is used then data is not accumulated and the ending must be 1 byte long + assert data_consumer is None or len(ending) == 1 + + data = self.serial.read(min_num_bytes) + if data_consumer: + data_consumer(data) + timeout_count = 0 + while True: + if data.endswith(ending): + break + elif self.serial.inWaiting() > 0: + new_data = self.serial.read(1) + if data_consumer: + data_consumer(new_data) + data = new_data + else: + data = data + new_data + timeout_count = 0 + else: + timeout_count += 1 + if timeout is not None and timeout_count >= 100 * timeout: + break + time.sleep(0.01) + return data + + def enter_raw_repl(self, soft_reset=True): + self.serial.write(b"\r\x03\x03") # ctrl-C twice: interrupt any running program + + # flush input (without relying on serial.flushInput()) + n = self.serial.inWaiting() + while n > 0: + self.serial.read(n) + n = self.serial.inWaiting() + + self.serial.write(b"\r\x01") # ctrl-A: enter raw REPL + + if soft_reset: + data = self.read_until(1, b"raw REPL; CTRL-B to exit\r\n>") + if not data.endswith(b"raw REPL; CTRL-B to exit\r\n>"): + print(data) + raise PyboardError("could not enter raw repl") + + self.serial.write(b"\x04") # ctrl-D: soft reset + + # Waiting for "soft reboot" independently to "raw REPL" (done below) + # allows boot.py to print, which will show up after "soft reboot" + # and before "raw REPL". + data = self.read_until(1, b"soft reboot\r\n") + if not data.endswith(b"soft reboot\r\n"): + print(data) + raise PyboardError("could not enter raw repl") + + data = self.read_until(1, b"raw REPL; CTRL-B to exit\r\n") + if not data.endswith(b"raw REPL; CTRL-B to exit\r\n"): + print(data) + raise PyboardError("could not enter raw repl") + + self.in_raw_repl = True + + def exit_raw_repl(self): + self.serial.write(b"\r\x02") # ctrl-B: enter friendly REPL + self.in_raw_repl = False + + def follow(self, timeout, data_consumer=None): + # wait for normal output + data = self.read_until(1, b"\x04", timeout=timeout, data_consumer=data_consumer) + if not data.endswith(b"\x04"): + raise PyboardError("timeout waiting for first EOF reception") + data = data[:-1] + + # wait for error output + data_err = self.read_until(1, b"\x04", timeout=timeout) + if not data_err.endswith(b"\x04"): + raise PyboardError("timeout waiting for second EOF reception") + data_err = data_err[:-1] + + # return normal and error output + return data, data_err + + def raw_paste_write(self, command_bytes): + # Read initial header, with window size. + data = self.serial.read(2) + window_size = data[0] | data[1] << 8 + window_remain = window_size + + # Write out the command_bytes data. + i = 0 + while i < len(command_bytes): + while window_remain == 0 or self.serial.inWaiting(): + data = self.serial.read(1) + if data == b"\x01": + # Device indicated that a new window of data can be sent. + window_remain += window_size + elif data == b"\x04": + # Device indicated abrupt end. Acknowledge it and finish. + self.serial.write(b"\x04") + return + else: + # Unexpected data from device. + raise PyboardError("unexpected read during raw paste: {}".format(data)) + # Send out as much data as possible that fits within the allowed window. + b = command_bytes[i : min(i + window_remain, len(command_bytes))] + self.serial.write(b) + window_remain -= len(b) + i += len(b) + + # Indicate end of data. + self.serial.write(b"\x04") + + # Wait for device to acknowledge end of data. + data = self.read_until(1, b"\x04") + if not data.endswith(b"\x04"): + raise PyboardError("could not complete raw paste: {}".format(data)) + + def exec_raw_no_follow(self, command): + if isinstance(command, bytes): + command_bytes = command + else: + command_bytes = bytes(command, encoding="utf8") + + # check we have a prompt + data = self.read_until(1, b">") + if not data.endswith(b">"): + raise PyboardError("could not enter raw repl") + + if self.use_raw_paste: + # Try to enter raw-paste mode. + self.serial.write(b"\x05A\x01") + data = self.serial.read(2) + if data == b"R\x00": + # Device understood raw-paste command but doesn't support it. + pass + elif data == b"R\x01": + # Device supports raw-paste mode, write out the command using this mode. + return self.raw_paste_write(command_bytes) + else: + # Device doesn't support raw-paste, fall back to normal raw REPL. + data = self.read_until(1, b"w REPL; CTRL-B to exit\r\n>") + if not data.endswith(b"w REPL; CTRL-B to exit\r\n>"): + print(data) + raise PyboardError("could not enter raw repl") + # Don't try to use raw-paste mode again for this connection. + self.use_raw_paste = False + + # Write command using standard raw REPL, 256 bytes every 10ms. + for i in range(0, len(command_bytes), 256): + self.serial.write(command_bytes[i : min(i + 256, len(command_bytes))]) + time.sleep(0.01) + self.serial.write(b"\x04") + + # check if we could exec command + data = self.serial.read(2) + if data != b"OK": + raise PyboardError("could not exec command (response: %r)" % data) + + def exec_raw(self, command, timeout=10, data_consumer=None): + self.exec_raw_no_follow(command) + return self.follow(timeout, data_consumer) + + def eval(self, expression): + ret = self.exec_("print({})".format(expression)) + ret = ret.strip() + return ret + + def exec_(self, command, data_consumer=None): + ret, ret_err = self.exec_raw(command, data_consumer=data_consumer) + if ret_err: + raise PyboardError("exception", ret, ret_err) + return ret + + def execfile(self, filename): + with open(filename, "rb") as f: + pyfile = f.read() + return self.exec_(pyfile) + + def get_time(self): + t = str(self.eval("pyb.RTC().datetime()"), encoding="utf8")[1:-1].split(", ") + return int(t[4]) * 3600 + int(t[5]) * 60 + int(t[6]) + + def fs_ls(self, src): + cmd = ( + "import uos\nfor f in uos.ilistdir(%s):\n" + " print('{:12} {}{}'.format(f[3]if len(f)>3 else 0,f[0],'/'if f[1]&0x4000 else ''))" + % (("'%s'" % src) if src else "") + ) + self.exec_(cmd, data_consumer=stdout_write_bytes) + + def fs_cat(self, src, chunk_size=256): + cmd = ( + "with open('%s') as f:\n while 1:\n" + " b=f.read(%u)\n if not b:break\n print(b,end='')" % (src, chunk_size) + ) + self.exec_(cmd, data_consumer=stdout_write_bytes) + + def fs_get(self, src, dest, chunk_size=256): + self.exec_("f=open('%s','rb')\nr=f.read" % src) + with open(dest, "wb") as f: + while True: + data = bytearray() + self.exec_("print(r(%u))" % chunk_size, data_consumer=lambda d: data.extend(d)) + assert data.endswith(b"\r\n\x04") + try: + data = ast.literal_eval(str(data[:-3], "ascii")) + if not isinstance(data, bytes): + raise ValueError("Not bytes") + except (UnicodeError, ValueError) as e: + raise PyboardError("fs_get: Could not interpret received data: %s" % str(e)) + if not data: + break + f.write(data) + self.exec_("f.close()") + + def fs_put(self, src, dest, chunk_size=256): + self.exec_("f=open('%s','wb')\nw=f.write" % dest) + with open(src, "rb") as f: + while True: + data = f.read(chunk_size) + if not data: + break + if sys.version_info < (3,): + self.exec_("w(b" + repr(data) + ")") + else: + self.exec_("w(" + repr(data) + ")") + self.exec_("f.close()") + + def fs_mkdir(self, dir): + self.exec_("import uos\nuos.mkdir('%s')" % dir) + + def fs_rmdir(self, dir): + self.exec_("import uos\nuos.rmdir('%s')" % dir) + + def fs_rm(self, src): + self.exec_("import uos\nuos.remove('%s')" % src) + + +# in Python2 exec is a keyword so one must use "exec_" +# but for Python3 we want to provide the nicer version "exec" +setattr(Pyboard, "exec", Pyboard.exec_) + + +def execfile(filename, device="/dev/ttyACM0", baudrate=115200, user="micro", password="python"): + pyb = Pyboard(device, baudrate, user, password) + pyb.enter_raw_repl() + output = pyb.execfile(filename) + stdout_write_bytes(output) + pyb.exit_raw_repl() + pyb.close() + + +def filesystem_command(pyb, args): + def fname_remote(src): + if src.startswith(":"): + src = src[1:] + return src + + def fname_cp_dest(src, dest): + src = src.rsplit("/", 1)[-1] + if dest is None or dest == "": + dest = src + elif dest == ".": + dest = "./" + src + elif dest.endswith("/"): + dest += src + return dest + + cmd = args[0] + args = args[1:] + try: + if cmd == "cp": + srcs = args[:-1] + dest = args[-1] + if srcs[0].startswith("./") or dest.startswith(":"): + op = pyb.fs_put + fmt = "cp %s :%s" + dest = fname_remote(dest) + else: + op = pyb.fs_get + fmt = "cp :%s %s" + for src in srcs: + src = fname_remote(src) + dest2 = fname_cp_dest(src, dest) + print(fmt % (src, dest2)) + op(src, dest2) + else: + op = { + "ls": pyb.fs_ls, + "cat": pyb.fs_cat, + "mkdir": pyb.fs_mkdir, + "rmdir": pyb.fs_rmdir, + "rm": pyb.fs_rm, + }[cmd] + if cmd == "ls" and not args: + args = [""] + for src in args: + src = fname_remote(src) + print("%s :%s" % (cmd, src)) + op(src) + except PyboardError as er: + print(str(er.args[2], "ascii")) + pyb.exit_raw_repl() + pyb.close() + sys.exit(1) + + +_injected_import_hook_code = """\ +import uos, uio +class _FS: + class File(uio.IOBase): + def __init__(self): + self.off = 0 + def ioctl(self, request, arg): + return 0 + def readinto(self, buf): + buf[:] = memoryview(_injected_buf)[self.off:self.off + len(buf)] + self.off += len(buf) + return len(buf) + mount = umount = chdir = lambda *args: None + def stat(self, path): + if path == '_injected.mpy': + return tuple(0 for _ in range(10)) + else: + raise OSError(-2) # ENOENT + def open(self, path, mode): + return self.File() +uos.mount(_FS(), '/_') +uos.chdir('/_') +from _injected import * +uos.umount('/_') +del _injected_buf, _FS +""" + + +def main(): + import argparse + + cmd_parser = argparse.ArgumentParser(description="Run scripts on the pyboard.") + cmd_parser.add_argument( + "-d", + "--device", + default=os.environ.get("PYBOARD_DEVICE", "/dev/ttyACM0"), + help="the serial device or the IP address of the pyboard", + ) + cmd_parser.add_argument( + "-b", + "--baudrate", + default=os.environ.get("PYBOARD_BAUDRATE", "115200"), + help="the baud rate of the serial device", + ) + cmd_parser.add_argument("-u", "--user", default="micro", help="the telnet login username") + cmd_parser.add_argument("-p", "--password", default="python", help="the telnet login password") + cmd_parser.add_argument("-c", "--command", help="program passed in as string") + cmd_parser.add_argument( + "-w", + "--wait", + default=0, + type=int, + help="seconds to wait for USB connected board to become available", + ) + group = cmd_parser.add_mutually_exclusive_group() + group.add_argument( + "--soft-reset", + default=True, + action="store_true", + help="Whether to perform a soft reset when connecting to the board [default]", + ) + group.add_argument( + "--no-soft-reset", + action="store_false", + dest="soft_reset", + ) + group = cmd_parser.add_mutually_exclusive_group() + group.add_argument( + "--follow", + action="store_true", + default=None, + help="follow the output after running the scripts [default if no scripts given]", + ) + group.add_argument( + "--no-follow", + action="store_false", + dest="follow", + ) + group = cmd_parser.add_mutually_exclusive_group() + group.add_argument( + "--exclusive", + action="store_true", + default=True, + help="Open the serial device for exclusive access [default]", + ) + group.add_argument( + "--no-exclusive", + action="store_false", + dest="exclusive", + ) + cmd_parser.add_argument( + "-f", + "--filesystem", + action="store_true", + help="perform a filesystem action: " + "cp local :device | cp :device local | cat path | ls [path] | rm path | mkdir path | rmdir path", + ) + cmd_parser.add_argument("files", nargs="*", help="input files") + args = cmd_parser.parse_args() + + # open the connection to the pyboard + try: + pyb = Pyboard( + args.device, args.baudrate, args.user, args.password, args.wait, args.exclusive + ) + except PyboardError as er: + print(er) + sys.exit(1) + + # run any command or file(s) + if args.command is not None or args.filesystem or len(args.files): + # we must enter raw-REPL mode to execute commands + # this will do a soft-reset of the board + try: + pyb.enter_raw_repl(args.soft_reset) + except PyboardError as er: + print(er) + pyb.close() + sys.exit(1) + + def execbuffer(buf): + try: + if args.follow is None or args.follow: + ret, ret_err = pyb.exec_raw( + buf, timeout=None, data_consumer=stdout_write_bytes + ) + else: + pyb.exec_raw_no_follow(buf) + ret_err = None + except PyboardError as er: + print(er) + pyb.close() + sys.exit(1) + except KeyboardInterrupt: + sys.exit(1) + if ret_err: + pyb.exit_raw_repl() + pyb.close() + stdout_write_bytes(ret_err) + sys.exit(1) + + # do filesystem commands, if given + if args.filesystem: + filesystem_command(pyb, args.files) + del args.files[:] + + # run the command, if given + if args.command is not None: + execbuffer(args.command.encode("utf-8")) + + # run any files + for filename in args.files: + with open(filename, "rb") as f: + pyfile = f.read() + if filename.endswith(".mpy") and pyfile[0] == ord("M"): + pyb.exec_("_injected_buf=" + repr(pyfile)) + pyfile = _injected_import_hook_code + execbuffer(pyfile) + + # exiting raw-REPL just drops to friendly-REPL mode + pyb.exit_raw_repl() + + # if asked explicitly, or no files given, then follow the output + if args.follow or (args.command is None and not args.filesystem and len(args.files) == 0): + try: + ret, ret_err = pyb.follow(timeout=None, data_consumer=stdout_write_bytes) + except PyboardError as er: + print(er) + sys.exit(1) + except KeyboardInterrupt: + sys.exit(1) + if ret_err: + pyb.close() + stdout_write_bytes(ret_err) + sys.exit(1) + + # close the connection to the pyboard + pyb.close() + + +if __name__ == "__main__": + main() |