|
import contextlib |
|
import os |
|
import pathlib |
|
import shutil |
|
import stat |
|
import sys |
|
import zipfile |
|
|
|
__all__ = ['ZipAppError', 'create_archive', 'get_interpreter'] |
|
|
|
|
|
# The __main__.py used if the users specifies "-m module:fn". |
|
# Note that this will always be written as UTF-8 (module and |
|
# function names can be non-ASCII in Python 3). |
|
# We add a coding cookie even though UTF-8 is the default in Python 3 |
|
# because the resulting archive may be intended to be run under Python 2. |
|
MAIN_TEMPLATE = """\ |
|
# -*- coding: utf-8 -*- |
|
import {module} |
|
{module}.{fn}() |
|
""" |
|
|
|
|
|
# The Windows launcher defaults to UTF-8 when parsing shebang lines if the |
|
# file has no BOM. So use UTF-8 on Windows. |
|
# On Unix, use the filesystem encoding. |
|
if sys.platform.startswith('win'): |
|
shebang_encoding = 'utf-8' |
|
else: |
|
shebang_encoding = sys.getfilesystemencoding() |
|
|
|
|
|
class ZipAppError(ValueError): |
|
pass |
|
|
|
|
|
@contextlib.contextmanager |
|
def _maybe_open(archive, mode): |
|
if isinstance(archive, (str, os.PathLike)): |
|
with open(archive, mode) as f: |
|
yield f |
|
else: |
|
yield archive |
|
|
|
|
|
def _write_file_prefix(f, interpreter): |
|
"""Write a shebang line.""" |
|
if interpreter: |
|
shebang = b'#!' + interpreter.encode(shebang_encoding) + b'\n' |
|
f.write(shebang) |
|
|
|
|
|
def _copy_archive(archive, new_archive, interpreter=None): |
|
"""Copy an application archive, modifying the shebang line.""" |
|
with _maybe_open(archive, 'rb') as src: |
|
# Skip the shebang line from the source. |
|
# Read 2 bytes of the source and check if they are #!. |
|
first_2 = src.read(2) |
|
if first_2 == b'#!': |
|
# Discard the initial 2 bytes and the rest of the shebang line. |
|
first_2 = b'' |
|
src.readline() |
|
|
|
with _maybe_open(new_archive, 'wb') as dst: |
|
_write_file_prefix(dst, interpreter) |
|
# If there was no shebang, "first_2" contains the first 2 bytes |
|
# of the source file, so write them before copying the rest |
|
# of the file. |
|
dst.write(first_2) |
|
shutil.copyfileobj(src, dst) |
|
|
|
if interpreter and isinstance(new_archive, str): |
|
os.chmod(new_archive, os.stat(new_archive).st_mode | stat.S_IEXEC) |
|
|
|
|
|
def create_archive(source, target=None, interpreter=None, main=None, |
|
filter=None, compressed=False): |
|
"""Create an application archive from SOURCE. |
|
|
|
The SOURCE can be the name of a directory, or a filename or a file-like |
|
object referring to an existing archive. |
|
|
|
The content of SOURCE is packed into an application archive in TARGET, |
|
which can be a filename or a file-like object. If SOURCE is a directory, |
|
TARGET can be omitted and will default to the name of SOURCE with .pyz |
|
appended. |
|
|
|
The created application archive will have a shebang line specifying |
|
that it should run with INTERPRETER (there will be no shebang line if |
|
INTERPRETER is None), and a __main__.py which runs MAIN (if MAIN is |
|
not specified, an existing __main__.py will be used). It is an error |
|
to specify MAIN for anything other than a directory source with no |
|
__main__.py, and it is an error to omit MAIN if the directory has no |
|
__main__.py. |
|
""" |
|
# Are we copying an existing archive? |
|
source_is_file = False |
|
if hasattr(source, 'read') and hasattr(source, 'readline'): |
|
source_is_file = True |
|
else: |
|
source = pathlib.Path(source) |
|
if source.is_file(): |
|
source_is_file = True |
|
|
|
if source_is_file: |
|
_copy_archive(source, target, interpreter) |
|
return |
|
|
|
# We are creating a new archive from a directory. |
|
if not source.exists(): |
|
raise ZipAppError("Source does not exist") |
|
has_main = (source / '__main__.py').is_file() |
|
if main and has_main: |
|
raise ZipAppError( |
|
"Cannot specify entry point if the source has __main__.py") |
|
if not (main or has_main): |
|
raise ZipAppError("Archive has no entry point") |
|
|
|
main_py = None |
|
if main: |
|
# Check that main has the right format. |
|
mod, sep, fn = main.partition(':') |
|
mod_ok = all(part.isidentifier() for part in mod.split('.')) |
|
fn_ok = all(part.isidentifier() for part in fn.split('.')) |
|
if not (sep == ':' and mod_ok and fn_ok): |
|
raise ZipAppError("Invalid entry point: " + main) |
|
main_py = MAIN_TEMPLATE.format(module=mod, fn=fn) |
|
|
|
if target is None: |
|
target = source.with_suffix('.pyz') |
|
elif not hasattr(target, 'write'): |
|
target = pathlib.Path(target) |
|
|
|
with _maybe_open(target, 'wb') as fd: |
|
_write_file_prefix(fd, interpreter) |
|
compression = (zipfile.ZIP_DEFLATED if compressed else |
|
zipfile.ZIP_STORED) |
|
with zipfile.ZipFile(fd, 'w', compression=compression) as z: |
|
for child in source.rglob('*'): |
|
arcname = child.relative_to(source) |
|
if filter is None or filter(arcname): |
|
z.write(child, arcname.as_posix()) |
|
if main_py: |
|
z.writestr('__main__.py', main_py.encode('utf-8')) |
|
|
|
if interpreter and not hasattr(target, 'write'): |
|
target.chmod(target.stat().st_mode | stat.S_IEXEC) |
|
|
|
|
|
def get_interpreter(archive): |
|
with _maybe_open(archive, 'rb') as f: |
|
if f.read(2) == b'#!': |
|
return f.readline().strip().decode(shebang_encoding) |
|
|
|
|
|
def main(args=None): |
|
"""Run the zipapp command line interface. |
|
|
|
The ARGS parameter lets you specify the argument list directly. |
|
Omitting ARGS (or setting it to None) works as for argparse, using |
|
sys.argv[1:] as the argument list. |
|
""" |
|
import argparse |
|
|
|
parser = argparse.ArgumentParser() |
|
parser.add_argument('--output', '-o', default=None, |
|
help="The name of the output archive. " |
|
"Required if SOURCE is an archive.") |
|
parser.add_argument('--python', '-p', default=None, |
|
help="The name of the Python interpreter to use " |
|
"(default: no shebang line).") |
|
parser.add_argument('--main', '-m', default=None, |
|
help="The main function of the application " |
|
"(default: use an existing __main__.py).") |
|
parser.add_argument('--compress', '-c', action='store_true', |
|
help="Compress files with the deflate method. " |
|
"Files are stored uncompressed by default.") |
|
parser.add_argument('--info', default=False, action='store_true', |
|
help="Display the interpreter from the archive.") |
|
parser.add_argument('source', |
|
help="Source directory (or existing archive).") |
|
|
|
args = parser.parse_args(args) |
|
|
|
# Handle `python -m zipapp archive.pyz --info`. |
|
if args.info: |
|
if not os.path.isfile(args.source): |
|
raise SystemExit("Can only get info for an archive file") |
|
interpreter = get_interpreter(args.source) |
|
print("Interpreter: {}".format(interpreter or "<none>")) |
|
sys.exit(0) |
|
|
|
if os.path.isfile(args.source): |
|
if args.output is None or (os.path.exists(args.output) and |
|
os.path.samefile(args.source, args.output)): |
|
raise SystemExit("In-place editing of archives is not supported") |
|
if args.main: |
|
raise SystemExit("Cannot change the main function when copying") |
|
|
|
create_archive(args.source, args.output, |
|
interpreter=args.python, main=args.main, |
|
compressed=args.compress) |
|
|
|
|
|
if __name__ == '__main__': |
|
main() |