aboutsummaryrefslogtreecommitdiff
path: root/circuitpython/tools/upip.py
diff options
context:
space:
mode:
authorRaghuram Subramani <raghus2247@gmail.com>2022-06-19 19:47:51 +0530
committerRaghuram Subramani <raghus2247@gmail.com>2022-06-19 19:47:51 +0530
commit4fd287655a72b9aea14cdac715ad5b90ed082ed2 (patch)
tree65d393bc0e699dd12d05b29ba568e04cea666207 /circuitpython/tools/upip.py
parent0150f70ce9c39e9e6dd878766c0620c85e47bed0 (diff)
add circuitpython code
Diffstat (limited to 'circuitpython/tools/upip.py')
-rw-r--r--circuitpython/tools/upip.py356
1 files changed, 356 insertions, 0 deletions
diff --git a/circuitpython/tools/upip.py b/circuitpython/tools/upip.py
new file mode 100644
index 0000000..70afe36
--- /dev/null
+++ b/circuitpython/tools/upip.py
@@ -0,0 +1,356 @@
+# upip - Package manager for MicroPython
+
+# SPDX-FileCopyrightText: Copyright (c) 2015-2018 Paul Sokolovsky
+# SPDX-FileCopyrightText: 2014 MicroPython & CircuitPython contributors (https://github.com/adafruit/circuitpython/graphs/contributors)
+#
+# SPDX-License-Identifier: MIT
+
+import sys
+import gc
+import uos as os
+import uerrno as errno
+import ujson as json
+import uzlib
+import upip_utarfile as tarfile
+
+gc.collect()
+
+
+debug = False
+index_urls = ["https://micropython.org/pi", "https://pypi.org/pypi"]
+install_path = None
+cleanup_files = []
+gzdict_sz = 16 + 15
+
+file_buf = bytearray(512)
+
+
+class NotFoundError(Exception):
+ pass
+
+
+def op_split(path):
+ if path == "":
+ return ("", "")
+ r = path.rsplit("/", 1)
+ if len(r) == 1:
+ return ("", path)
+ head = r[0]
+ if not head:
+ head = "/"
+ return (head, r[1])
+
+
+def op_basename(path):
+ return op_split(path)[1]
+
+
+# Expects *file* name
+def _makedirs(name, mode=0o777):
+ ret = False
+ s = ""
+ comps = name.rstrip("/").split("/")[:-1]
+ if comps[0] == "":
+ s = "/"
+ for c in comps:
+ if s and s[-1] != "/":
+ s += "/"
+ s += c
+ try:
+ os.mkdir(s)
+ ret = True
+ except OSError as e:
+ if e.errno != errno.EEXIST and e.errno != errno.EISDIR:
+ raise e
+ ret = False
+ return ret
+
+
+def save_file(fname, subf):
+ global file_buf
+ with open(fname, "wb") as outf:
+ while True:
+ sz = subf.readinto(file_buf)
+ if not sz:
+ break
+ outf.write(file_buf, sz)
+
+
+def install_tar(f, prefix):
+ meta = {}
+ for info in f:
+ # print(info)
+ fname = info.name
+ try:
+ fname = fname[fname.index("/") + 1 :]
+ except ValueError:
+ fname = ""
+
+ save = True
+ for p in ("setup.", "PKG-INFO", "README"):
+ # print(fname, p)
+ if fname.startswith(p) or ".egg-info" in fname:
+ if fname.endswith("/requires.txt"):
+ meta["deps"] = f.extractfile(info).read()
+ save = False
+ if debug:
+ print("Skipping", fname)
+ break
+
+ if save:
+ outfname = prefix + fname
+ if info.type != tarfile.DIRTYPE:
+ if debug:
+ print("Extracting " + outfname)
+ _makedirs(outfname)
+ subf = f.extractfile(info)
+ save_file(outfname, subf)
+ return meta
+
+
+def expandhome(s):
+ if "~/" in s:
+ h = os.getenv("HOME")
+ s = s.replace("~/", h + "/")
+ return s
+
+
+import ussl
+import usocket
+
+warn_ussl = True
+
+
+def url_open(url):
+ global warn_ussl
+
+ if debug:
+ print(url)
+
+ proto, _, host, urlpath = url.split("/", 3)
+ try:
+ port = 443
+ if ":" in host:
+ host, port = host.split(":")
+ port = int(port)
+ ai = usocket.getaddrinfo(host, port, 0, usocket.SOCK_STREAM)
+ except OSError as e:
+ fatal("Unable to resolve %s (no Internet?)" % host, e)
+ # print("Address infos:", ai)
+ ai = ai[0]
+
+ s = usocket.socket(ai[0], ai[1], ai[2])
+ try:
+ # print("Connect address:", addr)
+ s.connect(ai[-1])
+
+ if proto == "https:":
+ s = ussl.wrap_socket(s, server_hostname=host)
+ if warn_ussl:
+ print("Warning: %s SSL certificate is not validated" % host)
+ warn_ussl = False
+
+ # MicroPython rawsocket module supports file interface directly
+ s.write("GET /%s HTTP/1.0\r\nHost: %s:%s\r\n\r\n" % (urlpath, host, port))
+ l = s.readline()
+ protover, status, msg = l.split(None, 2)
+ if status != b"200":
+ if status == b"404" or status == b"301":
+ raise NotFoundError("Package not found")
+ raise ValueError(status)
+ while 1:
+ l = s.readline()
+ if not l:
+ raise ValueError("Unexpected EOF in HTTP headers")
+ if l == b"\r\n":
+ break
+ except Exception as e:
+ s.close()
+ raise e
+
+ return s
+
+
+def get_pkg_metadata(name):
+ for url in index_urls:
+ try:
+ f = url_open("%s/%s/json" % (url, name))
+ except NotFoundError:
+ continue
+ try:
+ return json.load(f)
+ finally:
+ f.close()
+ raise NotFoundError("Package not found")
+
+
+def fatal(msg, exc=None):
+ print("Error:", msg)
+ if exc and debug:
+ raise exc
+ sys.exit(1)
+
+
+def install_pkg(pkg_spec, install_path):
+ package = pkg_spec.split("==")
+ data = get_pkg_metadata(package[0])
+
+ if len(package) == 1:
+ latest_ver = data["info"]["version"]
+ else:
+ latest_ver = package[1]
+ packages = data["releases"][latest_ver]
+ del data
+ gc.collect()
+ assert len(packages) == 1
+ package_url = packages[0]["url"]
+ print("Installing %s %s from %s" % (pkg_spec, latest_ver, package_url))
+ package_fname = op_basename(package_url)
+ f1 = url_open(package_url)
+ try:
+ f2 = uzlib.DecompIO(f1, gzdict_sz)
+ f3 = tarfile.TarFile(fileobj=f2)
+ meta = install_tar(f3, install_path)
+ finally:
+ f1.close()
+ del f3
+ del f2
+ gc.collect()
+ return meta
+
+
+def install(to_install, install_path=None):
+ # Calculate gzip dictionary size to use
+ global gzdict_sz
+ sz = gc.mem_free() + gc.mem_alloc()
+ if sz <= 65536:
+ gzdict_sz = 16 + 12
+
+ if install_path is None:
+ install_path = get_install_path()
+ if install_path[-1] != "/":
+ install_path += "/"
+ if not isinstance(to_install, list):
+ to_install = [to_install]
+ print("Installing to: " + install_path)
+ # sets would be perfect here, but don't depend on them
+ installed = []
+ try:
+ while to_install:
+ if debug:
+ print("Queue:", to_install)
+ pkg_spec = to_install.pop(0)
+ if pkg_spec in installed:
+ continue
+ meta = install_pkg(pkg_spec, install_path)
+ installed.append(pkg_spec)
+ if debug:
+ print(meta)
+ deps = meta.get("deps", "").rstrip()
+ if deps:
+ deps = deps.decode("utf-8").split("\n")
+ to_install.extend(deps)
+ except Exception as e:
+ print(
+ "Error installing '{}': {}, packages may be partially installed".format(pkg_spec, e),
+ file=sys.stderr,
+ )
+
+
+def get_install_path():
+ global install_path
+ if install_path is None:
+ # sys.path[0] is current module's path
+ install_path = sys.path[1]
+ if install_path == ".frozen":
+ install_path = sys.path[2]
+ install_path = expandhome(install_path)
+ return install_path
+
+
+def cleanup():
+ for fname in cleanup_files:
+ try:
+ os.unlink(fname)
+ except OSError:
+ print("Warning: Cannot delete " + fname)
+
+
+def help():
+ print(
+ """\
+upip - Simple PyPI package manager for MicroPython
+Usage: micropython -m upip install [-p <path>] <package>... | -r <requirements.txt>
+import upip; upip.install(package_or_list, [<path>])
+
+If <path> isn't given, packages will be installed to sys.path[1], or
+sys.path[2] if the former is .frozen (path can be set from MICROPYPATH
+environment variable if supported)."""
+ )
+ print("Default install path:", get_install_path())
+ print(
+ """\
+
+Note: only MicroPython packages (usually, named micropython-*) are supported
+for installation, upip does not support arbitrary code in setup.py.
+"""
+ )
+
+
+def main():
+ global debug
+ global index_urls
+ global install_path
+ install_path = None
+
+ if len(sys.argv) < 2 or sys.argv[1] == "-h" or sys.argv[1] == "--help":
+ help()
+ return
+
+ if sys.argv[1] != "install":
+ fatal("Only 'install' command supported")
+
+ to_install = []
+
+ i = 2
+ while i < len(sys.argv) and sys.argv[i][0] == "-":
+ opt = sys.argv[i]
+ i += 1
+ if opt == "-h" or opt == "--help":
+ help()
+ return
+ elif opt == "-p":
+ install_path = sys.argv[i]
+ i += 1
+ elif opt == "-r":
+ list_file = sys.argv[i]
+ i += 1
+ with open(list_file) as f:
+ while True:
+ l = f.readline()
+ if not l:
+ break
+ if l[0] == "#":
+ continue
+ to_install.append(l.rstrip())
+ elif opt == "-i":
+ index_urls = [sys.argv[i]]
+ i += 1
+ elif opt == "--debug":
+ debug = True
+ else:
+ fatal("Unknown/unsupported option: " + opt)
+
+ to_install.extend(sys.argv[i:])
+ if not to_install:
+ help()
+ return
+
+ install(to_install)
+
+ if not debug:
+ cleanup()
+
+
+if __name__ == "__main__":
+ main()