Eliminado venv y www del repositorio, agrege un requirements igual

This commit is contained in:
2020-11-22 21:14:46 -03:00
parent 18cf2d335a
commit 199a1e2a61
820 changed files with 15495 additions and 22017 deletions

View File

@@ -1,15 +1,17 @@
from __future__ import absolute_import
import collections
import logging
from .req_install import InstallRequirement
from .req_set import RequirementSet
from .req_file import parse_requirements
from pip._internal.utils.logging import indent_log
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
from .req_file import parse_requirements
from .req_install import InstallRequirement
from .req_set import RequirementSet
if MYPY_CHECK_RUNNING:
from typing import Any, List, Sequence
from typing import Iterator, List, Optional, Sequence, Tuple
__all__ = [
"RequirementSet", "InstallRequirement",
@@ -19,60 +21,83 @@ __all__ = [
logger = logging.getLogger(__name__)
def install_given_reqs(
to_install, # type: List[InstallRequirement]
install_options, # type: List[str]
global_options=(), # type: Sequence[str]
*args, # type: Any
**kwargs # type: Any
class InstallationResult(object):
def __init__(self, name):
# type: (str) -> None
self.name = name
def __repr__(self):
# type: () -> str
return "InstallationResult(name={!r})".format(self.name)
def _validate_requirements(
requirements, # type: List[InstallRequirement]
):
# type: (...) -> List[InstallRequirement]
# type: (...) -> Iterator[Tuple[str, InstallRequirement]]
for req in requirements:
assert req.name, "invalid to-be-installed requirement: {}".format(req)
yield req.name, req
def install_given_reqs(
requirements, # type: List[InstallRequirement]
install_options, # type: List[str]
global_options, # type: Sequence[str]
root, # type: Optional[str]
home, # type: Optional[str]
prefix, # type: Optional[str]
warn_script_location, # type: bool
use_user_site, # type: bool
pycompile, # type: bool
):
# type: (...) -> List[InstallationResult]
"""
Install everything in the given list.
(to be called after having downloaded and unpacked the packages)
"""
to_install = collections.OrderedDict(_validate_requirements(requirements))
if to_install:
logger.info(
'Installing collected packages: %s',
', '.join([req.name for req in to_install]),
', '.join(to_install.keys()),
)
installed = []
with indent_log():
for requirement in to_install:
if requirement.conflicts_with:
logger.info(
'Found existing installation: %s',
requirement.conflicts_with,
)
for req_name, requirement in to_install.items():
if requirement.should_reinstall:
logger.info('Attempting uninstall: %s', req_name)
with indent_log():
uninstalled_pathset = requirement.uninstall(
auto_confirm=True
)
else:
uninstalled_pathset = None
try:
requirement.install(
install_options,
global_options,
*args,
**kwargs
root=root,
home=home,
prefix=prefix,
warn_script_location=warn_script_location,
use_user_site=use_user_site,
pycompile=pycompile,
)
except Exception:
should_rollback = (
requirement.conflicts_with and
not requirement.install_succeeded
)
# if install did not succeed, rollback previous uninstall
if should_rollback:
if uninstalled_pathset and not requirement.install_succeeded:
uninstalled_pathset.rollback()
raise
else:
should_commit = (
requirement.conflicts_with and
requirement.install_succeeded
)
if should_commit:
if uninstalled_pathset and requirement.install_succeeded:
uninstalled_pathset.commit()
requirement.remove_temporary_source()
return to_install
installed.append(InstallationResult(req_name))
return installed

View File

@@ -17,22 +17,24 @@ from pip._vendor.packaging.requirements import InvalidRequirement, Requirement
from pip._vendor.packaging.specifiers import Specifier
from pip._vendor.pkg_resources import RequirementParseError, parse_requirements
from pip._internal.download import is_archive_file, is_url, url_to_path
from pip._internal.exceptions import InstallationError
from pip._internal.models.index import PyPI, TestPyPI
from pip._internal.models.link import Link
from pip._internal.models.wheel import Wheel
from pip._internal.pyproject import make_pyproject_path
from pip._internal.req.req_install import InstallRequirement
from pip._internal.utils.misc import is_installable_dir, path_to_url
from pip._internal.utils.deprecation import deprecated
from pip._internal.utils.filetypes import ARCHIVE_EXTENSIONS
from pip._internal.utils.misc import is_installable_dir, splitext
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
from pip._internal.vcs import vcs
from pip._internal.wheel import Wheel
from pip._internal.utils.urls import path_to_url
from pip._internal.vcs import is_url, vcs
if MYPY_CHECK_RUNNING:
from typing import (
Any, Dict, Optional, Set, Tuple, Union,
)
from pip._internal.cache import WheelCache
from pip._internal.req.req_file import ParsedRequirement
__all__ = [
@@ -44,6 +46,15 @@ logger = logging.getLogger(__name__)
operators = Specifier._operators.keys()
def is_archive_file(name):
# type: (str) -> bool
"""Return True if `name` is a considered as an archive file."""
ext = splitext(name)[1].lower()
if ext in ARCHIVE_EXTENSIONS:
return True
return False
def _strip_extras(path):
# type: (str) -> Tuple[str, Optional[str]]
m = re.match(r'^(.+)(\[[^\]]+\])$', path)
@@ -57,8 +68,15 @@ def _strip_extras(path):
return path_no_extras, extras
def convert_extras(extras):
# type: (Optional[str]) -> Set[str]
if not extras:
return set()
return Requirement("placeholder" + extras.lower()).extras
def parse_editable(editable_req):
# type: (str) -> Tuple[Optional[str], str, Optional[Set[str]]]
# type: (str) -> Tuple[Optional[str], str, Set[str]]
"""Parses an editable requirement into:
- a requirement name
- an URL
@@ -100,11 +118,11 @@ def parse_editable(editable_req):
Requirement("placeholder" + extras.lower()).extras,
)
else:
return package_name, url_no_extras, None
return package_name, url_no_extras, set()
for version_control in vcs:
if url.lower().startswith('%s:' % version_control):
url = '%s+%s' % (version_control, url)
if url.lower().startswith('{}:'.format(version_control)):
url = '{}+{}'.format(version_control, url)
break
if '+' not in url:
@@ -117,18 +135,19 @@ def parse_editable(editable_req):
vc_type = url.split('+', 1)[0].lower()
if not vcs.get_backend(vc_type):
error_message = 'For --editable=%s only ' % editable_req + \
', '.join([backend.name + '+URL' for backend in vcs.backends]) + \
' is currently supported'
backends = ", ".join([bends.name + '+URL' for bends in vcs.backends])
error_message = "For --editable={}, " \
"only {} are currently supported".format(
editable_req, backends)
raise InstallationError(error_message)
package_name = Link(url).egg_fragment
if not package_name:
raise InstallationError(
"Could not detect requirement name for '%s', please specify one "
"with #egg=your_package_name" % editable_req
"Could not detect requirement name for '{}', please specify one "
"with #egg=your_package_name".format(editable_req)
)
return package_name, url, None
return package_name, url, set()
def deduce_helpful_msg(req):
@@ -146,75 +165,141 @@ def deduce_helpful_msg(req):
with open(req, 'r') as fp:
# parse first line only
next(parse_requirements(fp.read()))
msg += " The argument you provided " + \
"(%s) appears to be a" % (req) + \
" requirements file. If that is the" + \
" case, use the '-r' flag to install" + \
msg += (
"The argument you provided "
"({}) appears to be a"
" requirements file. If that is the"
" case, use the '-r' flag to install"
" the packages specified within it."
).format(req)
except RequirementParseError:
logger.debug("Cannot parse '%s' as requirements \
file" % (req), exc_info=True)
logger.debug(
"Cannot parse '%s' as requirements file", req, exc_info=True
)
else:
msg += " File '%s' does not exist." % (req)
msg += " File '{}' does not exist.".format(req)
return msg
class RequirementParts(object):
def __init__(
self,
requirement, # type: Optional[Requirement]
link, # type: Optional[Link]
markers, # type: Optional[Marker]
extras, # type: Set[str]
):
self.requirement = requirement
self.link = link
self.markers = markers
self.extras = extras
def parse_req_from_editable(editable_req):
# type: (str) -> RequirementParts
name, url, extras_override = parse_editable(editable_req)
if name is not None:
try:
req = Requirement(name)
except InvalidRequirement:
raise InstallationError("Invalid requirement: '{}'".format(name))
else:
req = None
link = Link(url)
return RequirementParts(req, link, None, extras_override)
# ---- The actual constructors follow ----
def install_req_from_editable(
editable_req, # type: str
comes_from=None, # type: Optional[str]
comes_from=None, # type: Optional[Union[InstallRequirement, str]]
use_pep517=None, # type: Optional[bool]
isolated=False, # type: bool
options=None, # type: Optional[Dict[str, Any]]
wheel_cache=None, # type: Optional[WheelCache]
constraint=False # type: bool
constraint=False, # type: bool
user_supplied=False, # type: bool
):
# type: (...) -> InstallRequirement
name, url, extras_override = parse_editable(editable_req)
if url.startswith('file:'):
source_dir = url_to_path(url)
else:
source_dir = None
if name is not None:
try:
req = Requirement(name)
except InvalidRequirement:
raise InstallationError("Invalid requirement: '%s'" % name)
else:
req = None
parts = parse_req_from_editable(editable_req)
return InstallRequirement(
req, comes_from, source_dir=source_dir,
parts.requirement,
comes_from=comes_from,
user_supplied=user_supplied,
editable=True,
link=Link(url),
link=parts.link,
constraint=constraint,
use_pep517=use_pep517,
isolated=isolated,
options=options if options else {},
wheel_cache=wheel_cache,
extras=extras_override or (),
install_options=options.get("install_options", []) if options else [],
global_options=options.get("global_options", []) if options else [],
hash_options=options.get("hashes", {}) if options else {},
extras=parts.extras,
)
def install_req_from_line(
name, # type: str
comes_from=None, # type: Optional[Union[str, InstallRequirement]]
use_pep517=None, # type: Optional[bool]
isolated=False, # type: bool
options=None, # type: Optional[Dict[str, Any]]
wheel_cache=None, # type: Optional[WheelCache]
constraint=False, # type: bool
line_source=None, # type: Optional[str]
):
# type: (...) -> InstallRequirement
"""Creates an InstallRequirement from a name, which might be a
requirement, directory containing 'setup.py', filename, or URL.
def _looks_like_path(name):
# type: (str) -> bool
"""Checks whether the string "looks like" a path on the filesystem.
:param line_source: An optional string describing where the line is from,
for logging purposes in case of an error.
This does not check whether the target actually exists, only judge from the
appearance.
Returns true if any of the following conditions is true:
* a path separator is found (either os.path.sep or os.path.altsep);
* a dot is found (which represents the current directory).
"""
if os.path.sep in name:
return True
if os.path.altsep is not None and os.path.altsep in name:
return True
if name.startswith("."):
return True
return False
def _get_url_from_path(path, name):
# type: (str, str) -> Optional[str]
"""
First, it checks whether a provided path is an installable directory
(e.g. it has a setup.py). If it is, returns the path.
If false, check if the path is an archive file (such as a .whl).
The function checks if the path is a file. If false, if the path has
an @, it will treat it as a PEP 440 URL requirement and return the path.
"""
if _looks_like_path(name) and os.path.isdir(path):
if is_installable_dir(path):
return path_to_url(path)
raise InstallationError(
"Directory {name!r} is not installable. Neither 'setup.py' "
"nor 'pyproject.toml' found.".format(**locals())
)
if not is_archive_file(path):
return None
if os.path.isfile(path):
return path_to_url(path)
urlreq_parts = name.split('@', 1)
if len(urlreq_parts) >= 2 and not _looks_like_path(urlreq_parts[0]):
# If the path contains '@' and the part before it does not look
# like a path, try to treat it as a PEP 440 URL req instead.
return None
logger.warning(
'Requirement %r looks like a filename, but the '
'file does not exist',
name
)
return path_to_url(path)
def parse_req_from_line(name, line_source):
# type: (str, Optional[str]) -> RequirementParts
if is_url(name):
marker_sep = '; '
else:
@@ -238,26 +323,9 @@ def install_req_from_line(
link = Link(name)
else:
p, extras_as_string = _strip_extras(path)
looks_like_dir = os.path.isdir(p) and (
os.path.sep in name or
(os.path.altsep is not None and os.path.altsep in name) or
name.startswith('.')
)
if looks_like_dir:
if not is_installable_dir(p):
raise InstallationError(
"Directory %r is not installable. Neither 'setup.py' "
"nor 'pyproject.toml' found." % name
)
link = Link(path_to_url(p))
elif is_archive_file(p):
if not os.path.isfile(p):
logger.warning(
'Requirement %r looks like a filename, but the '
'file does not exist',
name
)
link = Link(path_to_url(p))
url = _get_url_from_path(p, name)
if url is not None:
link = Link(url)
# it's a local file, dir, or url
if link:
@@ -268,7 +336,7 @@ def install_req_from_line(
# wheel file
if link.is_wheel:
wheel = Wheel(link.filename) # can raise InvalidWheelFilename
req_as_string = "%s==%s" % (wheel.name, wheel.version)
req_as_string = "{wheel.name}=={wheel.version}".format(**locals())
else:
# set the req to the egg fragment. when it's not there, this
# will become an 'unnamed' requirement
@@ -278,10 +346,14 @@ def install_req_from_line(
else:
req_as_string = name
if extras_as_string:
extras = Requirement("placeholder" + extras_as_string.lower()).extras
else:
extras = ()
extras = convert_extras(extras_as_string)
def with_source(text):
# type: (str) -> str
if not line_source:
return text
return '{} (from {})'.format(text, line_source)
if req_as_string is not None:
try:
req = Requirement(req_as_string)
@@ -294,26 +366,57 @@ def install_req_from_line(
add_msg = "= is not a valid operator. Did you mean == ?"
else:
add_msg = ''
if line_source is None:
source = ''
else:
source = ' (from {})'.format(line_source)
msg = (
'Invalid requirement: {!r}{}'.format(req_as_string, source)
msg = with_source(
'Invalid requirement: {!r}'.format(req_as_string)
)
if add_msg:
msg += '\nHint: {}'.format(add_msg)
raise InstallationError(msg)
else:
# Deprecate extras after specifiers: "name>=1.0[extras]"
# This currently works by accident because _strip_extras() parses
# any extras in the end of the string and those are saved in
# RequirementParts
for spec in req.specifier:
spec_str = str(spec)
if spec_str.endswith(']'):
msg = "Extras after version '{}'.".format(spec_str)
replace = "moving the extras before version specifiers"
deprecated(msg, replacement=replace, gone_in="21.0")
else:
req = None
return RequirementParts(req, link, markers, extras)
def install_req_from_line(
name, # type: str
comes_from=None, # type: Optional[Union[str, InstallRequirement]]
use_pep517=None, # type: Optional[bool]
isolated=False, # type: bool
options=None, # type: Optional[Dict[str, Any]]
constraint=False, # type: bool
line_source=None, # type: Optional[str]
user_supplied=False, # type: bool
):
# type: (...) -> InstallRequirement
"""Creates an InstallRequirement from a name, which might be a
requirement, directory containing 'setup.py', filename, or URL.
:param line_source: An optional string describing where the line is from,
for logging purposes in case of an error.
"""
parts = parse_req_from_line(name, line_source)
return InstallRequirement(
req, comes_from, link=link, markers=markers,
parts.requirement, comes_from, link=parts.link, markers=parts.markers,
use_pep517=use_pep517, isolated=isolated,
options=options if options else {},
wheel_cache=wheel_cache,
install_options=options.get("install_options", []) if options else [],
global_options=options.get("global_options", []) if options else [],
hash_options=options.get("hashes", {}) if options else {},
constraint=constraint,
extras=extras,
extras=parts.extras,
user_supplied=user_supplied,
)
@@ -321,14 +424,14 @@ def install_req_from_req_string(
req_string, # type: str
comes_from=None, # type: Optional[InstallRequirement]
isolated=False, # type: bool
wheel_cache=None, # type: Optional[WheelCache]
use_pep517=None # type: Optional[bool]
use_pep517=None, # type: Optional[bool]
user_supplied=False, # type: bool
):
# type: (...) -> InstallRequirement
try:
req = Requirement(req_string)
except InvalidRequirement:
raise InstallationError("Invalid requirement: '%s'" % req_string)
raise InstallationError("Invalid requirement: '{}'".format(req_string))
domains_not_allowed = [
PyPI.file_storage_domain,
@@ -340,10 +443,44 @@ def install_req_from_req_string(
raise InstallationError(
"Packages installed from PyPI cannot depend on packages "
"which are not also hosted on PyPI.\n"
"%s depends on %s " % (comes_from.name, req)
"{} depends on {} ".format(comes_from.name, req)
)
return InstallRequirement(
req, comes_from, isolated=isolated, wheel_cache=wheel_cache,
use_pep517=use_pep517
req,
comes_from,
isolated=isolated,
use_pep517=use_pep517,
user_supplied=user_supplied,
)
def install_req_from_parsed_requirement(
parsed_req, # type: ParsedRequirement
isolated=False, # type: bool
use_pep517=None, # type: Optional[bool]
user_supplied=False, # type: bool
):
# type: (...) -> InstallRequirement
if parsed_req.is_editable:
req = install_req_from_editable(
parsed_req.requirement,
comes_from=parsed_req.comes_from,
use_pep517=use_pep517,
constraint=parsed_req.constraint,
isolated=isolated,
user_supplied=user_supplied,
)
else:
req = install_req_from_line(
parsed_req.requirement,
comes_from=parsed_req.comes_from,
use_pep517=use_pep517,
isolated=isolated,
options=parsed_req.options,
constraint=parsed_req.constraint,
line_source=parsed_req.line_source,
user_supplied=user_supplied,
)
return req

View File

@@ -10,29 +10,33 @@ import re
import shlex
import sys
from pip._vendor.six.moves import filterfalse
from pip._vendor.six.moves.urllib import parse as urllib_parse
from pip._internal.cli import cmdoptions
from pip._internal.download import get_file_content
from pip._internal.exceptions import RequirementsFileParseError
from pip._internal.models.search_scope import SearchScope
from pip._internal.req.constructors import (
install_req_from_editable, install_req_from_line,
from pip._internal.exceptions import (
InstallationError,
RequirementsFileParseError,
)
from pip._internal.models.search_scope import SearchScope
from pip._internal.network.utils import raise_for_status
from pip._internal.utils.encoding import auto_decode
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
from pip._internal.utils.urls import get_url_scheme
if MYPY_CHECK_RUNNING:
from optparse import Values
from typing import (
Any, Callable, Iterator, List, NoReturn, Optional, Text, Tuple,
Any, Callable, Dict, Iterator, List, NoReturn, Optional, Text, Tuple,
)
from pip._internal.req import InstallRequirement
from pip._internal.cache import WheelCache
from pip._internal.index import PackageFinder
from pip._internal.download import PipSession
from pip._internal.index.package_finder import PackageFinder
from pip._internal.network.session import PipSession
ReqFileLines = Iterator[Tuple[int, Text]]
LineParser = Callable[[Text], Tuple[str, Values]]
__all__ = ['parse_requirements']
SCHEME_RE = re.compile(r'^(http|https|file):', re.I)
@@ -45,19 +49,20 @@ COMMENT_RE = re.compile(r'(^|\s+)#.*$')
ENV_VAR_RE = re.compile(r'(?P<var>\$\{(?P<name>[A-Z0-9_]+)\})')
SUPPORTED_OPTIONS = [
cmdoptions.constraints,
cmdoptions.editable,
cmdoptions.requirements,
cmdoptions.no_index,
cmdoptions.index_url,
cmdoptions.find_links,
cmdoptions.extra_index_url,
cmdoptions.always_unzip,
cmdoptions.no_index,
cmdoptions.constraints,
cmdoptions.requirements,
cmdoptions.editable,
cmdoptions.find_links,
cmdoptions.no_binary,
cmdoptions.only_binary,
cmdoptions.prefer_binary,
cmdoptions.require_hashes,
cmdoptions.pre,
cmdoptions.trusted_host,
cmdoptions.require_hashes,
cmdoptions.use_new_feature,
] # type: List[Callable[..., optparse.Option]]
# options to be passed to requirements
@@ -71,174 +76,167 @@ SUPPORTED_OPTIONS_REQ = [
SUPPORTED_OPTIONS_REQ_DEST = [str(o().dest) for o in SUPPORTED_OPTIONS_REQ]
class ParsedRequirement(object):
def __init__(
self,
requirement, # type:str
is_editable, # type: bool
comes_from, # type: str
constraint, # type: bool
options=None, # type: Optional[Dict[str, Any]]
line_source=None, # type: Optional[str]
):
# type: (...) -> None
self.requirement = requirement
self.is_editable = is_editable
self.comes_from = comes_from
self.options = options
self.constraint = constraint
self.line_source = line_source
class ParsedLine(object):
def __init__(
self,
filename, # type: str
lineno, # type: int
comes_from, # type: Optional[str]
args, # type: str
opts, # type: Values
constraint, # type: bool
):
# type: (...) -> None
self.filename = filename
self.lineno = lineno
self.comes_from = comes_from
self.opts = opts
self.constraint = constraint
if args:
self.is_requirement = True
self.is_editable = False
self.requirement = args
elif opts.editables:
self.is_requirement = True
self.is_editable = True
# We don't support multiple -e on one line
self.requirement = opts.editables[0]
else:
self.is_requirement = False
def parse_requirements(
filename, # type: str
session, # type: PipSession
finder=None, # type: Optional[PackageFinder]
comes_from=None, # type: Optional[str]
options=None, # type: Optional[optparse.Values]
session=None, # type: Optional[PipSession]
constraint=False, # type: bool
wheel_cache=None, # type: Optional[WheelCache]
use_pep517=None # type: Optional[bool]
):
# type: (...) -> Iterator[InstallRequirement]
"""Parse a requirements file and yield InstallRequirement instances.
# type: (...) -> Iterator[ParsedRequirement]
"""Parse a requirements file and yield ParsedRequirement instances.
:param filename: Path or url of requirements file.
:param session: PipSession instance.
:param finder: Instance of pip.index.PackageFinder.
:param comes_from: Origin description of requirements.
:param options: cli options.
:param session: Instance of pip.download.PipSession.
:param constraint: If true, parsing a constraint file rather than
requirements file.
:param wheel_cache: Instance of pip.wheel.WheelCache
:param use_pep517: Value of the --use-pep517 option.
"""
if session is None:
raise TypeError(
"parse_requirements() missing 1 required keyword argument: "
"'session'"
line_parser = get_line_parser(finder)
parser = RequirementsFileParser(session, line_parser, comes_from)
for parsed_line in parser.parse(filename, constraint):
parsed_req = handle_line(
parsed_line,
options=options,
finder=finder,
session=session
)
_, content = get_file_content(
filename, comes_from=comes_from, session=session
)
lines_enum = preprocess(content, options)
for line_number, line in lines_enum:
req_iter = process_line(line, filename, line_number, finder,
comes_from, options, session, wheel_cache,
use_pep517=use_pep517, constraint=constraint)
for req in req_iter:
yield req
if parsed_req is not None:
yield parsed_req
def preprocess(content, options):
# type: (Text, Optional[optparse.Values]) -> ReqFileLines
def preprocess(content):
# type: (Text) -> ReqFileLines
"""Split, filter, and join lines, and return a line iterator
:param content: the content of the requirements file
:param options: cli options
"""
lines_enum = enumerate(content.splitlines(), start=1) # type: ReqFileLines
lines_enum = join_lines(lines_enum)
lines_enum = ignore_comments(lines_enum)
lines_enum = skip_regex(lines_enum, options)
lines_enum = expand_env_variables(lines_enum)
return lines_enum
def process_line(
line, # type: Text
filename, # type: str
line_number, # type: int
finder=None, # type: Optional[PackageFinder]
comes_from=None, # type: Optional[str]
def handle_requirement_line(
line, # type: ParsedLine
options=None, # type: Optional[optparse.Values]
session=None, # type: Optional[PipSession]
wheel_cache=None, # type: Optional[WheelCache]
use_pep517=None, # type: Optional[bool]
constraint=False, # type: bool
):
# type: (...) -> Iterator[InstallRequirement]
"""Process a single requirements line; This can result in creating/yielding
requirements, or updating the finder.
For lines that contain requirements, the only options that have an effect
are from SUPPORTED_OPTIONS_REQ, and they are scoped to the
requirement. Other options from SUPPORTED_OPTIONS may be present, but are
ignored.
For lines that do not contain requirements, the only options that have an
effect are from SUPPORTED_OPTIONS. Options from SUPPORTED_OPTIONS_REQ may
be present, but are ignored. These lines may contain multiple options
(although our docs imply only one is supported), and all our parsed and
affect the finder.
:param constraint: If True, parsing a constraints file.
:param options: OptionParser options that we may update
"""
parser = build_parser(line)
defaults = parser.get_default_values()
defaults.index_url = None
if finder:
defaults.format_control = finder.format_control
args_str, options_str = break_args_options(line)
# Prior to 2.7.3, shlex cannot deal with unicode entries
if sys.version_info < (2, 7, 3):
# https://github.com/python/mypy/issues/1174
options_str = options_str.encode('utf8') # type: ignore
# https://github.com/python/mypy/issues/1174
opts, _ = parser.parse_args(
shlex.split(options_str), defaults) # type: ignore
# type: (...) -> ParsedRequirement
# preserve for the nested code path
line_comes_from = '%s %s (line %s)' % (
'-c' if constraint else '-r', filename, line_number,
line_comes_from = '{} {} (line {})'.format(
'-c' if line.constraint else '-r', line.filename, line.lineno,
)
# yield a line requirement
if args_str:
isolated = options.isolated_mode if options else False
assert line.is_requirement
if line.is_editable:
# For editable requirements, we don't support per-requirement
# options, so just return the parsed requirement.
return ParsedRequirement(
requirement=line.requirement,
is_editable=line.is_editable,
comes_from=line_comes_from,
constraint=line.constraint,
)
else:
if options:
cmdoptions.check_install_build_global(options, opts)
# Disable wheels if the user has specified build options
cmdoptions.check_install_build_global(options, line.opts)
# get the options that apply to requirements
req_options = {}
for dest in SUPPORTED_OPTIONS_REQ_DEST:
if dest in opts.__dict__ and opts.__dict__[dest]:
req_options[dest] = opts.__dict__[dest]
line_source = 'line {} of {}'.format(line_number, filename)
yield install_req_from_line(
args_str,
if dest in line.opts.__dict__ and line.opts.__dict__[dest]:
req_options[dest] = line.opts.__dict__[dest]
line_source = 'line {} of {}'.format(line.lineno, line.filename)
return ParsedRequirement(
requirement=line.requirement,
is_editable=line.is_editable,
comes_from=line_comes_from,
use_pep517=use_pep517,
isolated=isolated,
constraint=line.constraint,
options=req_options,
wheel_cache=wheel_cache,
constraint=constraint,
line_source=line_source,
)
# yield an editable requirement
elif opts.editables:
isolated = options.isolated_mode if options else False
yield install_req_from_editable(
opts.editables[0], comes_from=line_comes_from,
use_pep517=use_pep517,
constraint=constraint, isolated=isolated, wheel_cache=wheel_cache
)
# parse a nested requirements file
elif opts.requirements or opts.constraints:
if opts.requirements:
req_path = opts.requirements[0]
nested_constraint = False
else:
req_path = opts.constraints[0]
nested_constraint = True
# original file is over http
if SCHEME_RE.search(filename):
# do a url join so relative paths work
req_path = urllib_parse.urljoin(filename, req_path)
# original file and nested file are paths
elif not SCHEME_RE.search(req_path):
# do a join so relative paths work
req_path = os.path.join(os.path.dirname(filename), req_path)
# TODO: Why not use `comes_from='-r {} (line {})'` here as well?
parsed_reqs = parse_requirements(
req_path, finder, comes_from, options, session,
constraint=nested_constraint, wheel_cache=wheel_cache
)
for req in parsed_reqs:
yield req
def handle_option_line(
opts, # type: Values
filename, # type: str
lineno, # type: int
finder=None, # type: Optional[PackageFinder]
options=None, # type: Optional[optparse.Values]
session=None, # type: Optional[PipSession]
):
# type: (...) -> None
# percolate hash-checking option upward
elif opts.require_hashes:
options.require_hashes = opts.require_hashes
if options:
# percolate options upward
if opts.require_hashes:
options.require_hashes = opts.require_hashes
if opts.features_enabled:
options.features_enabled.extend(
f for f in opts.features_enabled
if f not in options.features_enabled
)
# set finder options
elif finder:
if finder:
find_links = finder.find_links
index_urls = finder.index_urls
if opts.index_url:
@@ -266,9 +264,164 @@ def process_line(
if opts.pre:
finder.set_allow_all_prereleases()
for host in opts.trusted_hosts or []:
source = 'line {} of {}'.format(line_number, filename)
finder.add_trusted_host(host, source=source)
if opts.prefer_binary:
finder.set_prefer_binary()
if session:
for host in opts.trusted_hosts or []:
source = 'line {} of {}'.format(lineno, filename)
session.add_trusted_host(host, source=source)
def handle_line(
line, # type: ParsedLine
options=None, # type: Optional[optparse.Values]
finder=None, # type: Optional[PackageFinder]
session=None, # type: Optional[PipSession]
):
# type: (...) -> Optional[ParsedRequirement]
"""Handle a single parsed requirements line; This can result in
creating/yielding requirements, or updating the finder.
:param line: The parsed line to be processed.
:param options: CLI options.
:param finder: The finder - updated by non-requirement lines.
:param session: The session - updated by non-requirement lines.
Returns a ParsedRequirement object if the line is a requirement line,
otherwise returns None.
For lines that contain requirements, the only options that have an effect
are from SUPPORTED_OPTIONS_REQ, and they are scoped to the
requirement. Other options from SUPPORTED_OPTIONS may be present, but are
ignored.
For lines that do not contain requirements, the only options that have an
effect are from SUPPORTED_OPTIONS. Options from SUPPORTED_OPTIONS_REQ may
be present, but are ignored. These lines may contain multiple options
(although our docs imply only one is supported), and all our parsed and
affect the finder.
"""
if line.is_requirement:
parsed_req = handle_requirement_line(line, options)
return parsed_req
else:
handle_option_line(
line.opts,
line.filename,
line.lineno,
finder,
options,
session,
)
return None
class RequirementsFileParser(object):
def __init__(
self,
session, # type: PipSession
line_parser, # type: LineParser
comes_from, # type: Optional[str]
):
# type: (...) -> None
self._session = session
self._line_parser = line_parser
self._comes_from = comes_from
def parse(self, filename, constraint):
# type: (str, bool) -> Iterator[ParsedLine]
"""Parse a given file, yielding parsed lines.
"""
for line in self._parse_and_recurse(filename, constraint):
yield line
def _parse_and_recurse(self, filename, constraint):
# type: (str, bool) -> Iterator[ParsedLine]
for line in self._parse_file(filename, constraint):
if (
not line.is_requirement and
(line.opts.requirements or line.opts.constraints)
):
# parse a nested requirements file
if line.opts.requirements:
req_path = line.opts.requirements[0]
nested_constraint = False
else:
req_path = line.opts.constraints[0]
nested_constraint = True
# original file is over http
if SCHEME_RE.search(filename):
# do a url join so relative paths work
req_path = urllib_parse.urljoin(filename, req_path)
# original file and nested file are paths
elif not SCHEME_RE.search(req_path):
# do a join so relative paths work
req_path = os.path.join(
os.path.dirname(filename), req_path,
)
for inner_line in self._parse_and_recurse(
req_path, nested_constraint,
):
yield inner_line
else:
yield line
def _parse_file(self, filename, constraint):
# type: (str, bool) -> Iterator[ParsedLine]
_, content = get_file_content(
filename, self._session, comes_from=self._comes_from
)
lines_enum = preprocess(content)
for line_number, line in lines_enum:
try:
args_str, opts = self._line_parser(line)
except OptionParsingError as e:
# add offending line
msg = 'Invalid requirement: {}\n{}'.format(line, e.msg)
raise RequirementsFileParseError(msg)
yield ParsedLine(
filename,
line_number,
self._comes_from,
args_str,
opts,
constraint,
)
def get_line_parser(finder):
# type: (Optional[PackageFinder]) -> LineParser
def parse_line(line):
# type: (Text) -> Tuple[str, Values]
# Build new parser for each line since it accumulates appendable
# options.
parser = build_parser()
defaults = parser.get_default_values()
defaults.index_url = None
if finder:
defaults.format_control = finder.format_control
args_str, options_str = break_args_options(line)
# Prior to 2.7.3, shlex cannot deal with unicode entries
if sys.version_info < (2, 7, 3):
# https://github.com/python/mypy/issues/1174
options_str = options_str.encode('utf8') # type: ignore
# https://github.com/python/mypy/issues/1174
opts, _ = parser.parse_args(
shlex.split(options_str), defaults) # type: ignore
return args_str, opts
return parse_line
def break_args_options(line):
@@ -289,8 +442,14 @@ def break_args_options(line):
return ' '.join(args), ' '.join(options) # type: ignore
def build_parser(line):
# type: (Text) -> optparse.OptionParser
class OptionParsingError(Exception):
def __init__(self, msg):
# type: (str) -> None
self.msg = msg
def build_parser():
# type: () -> optparse.OptionParser
"""
Return a parser for parsing requirement lines
"""
@@ -305,9 +464,7 @@ def build_parser(line):
# that in our own exception.
def parser_exit(self, msg):
# type: (Any, str) -> NoReturn
# add offending line
msg = 'Invalid requirement: %s\n%s' % (line, msg)
raise RequirementsFileParseError(msg)
raise OptionParsingError(msg)
# NOTE: mypy disallows assigning to a method
# https://github.com/python/mypy/issues/2427
parser.exit = parser_exit # type: ignore
@@ -329,6 +486,7 @@ def join_lines(lines_enum):
line = ' ' + line
if new_line:
new_line.append(line)
assert primary_line_number is not None
yield primary_line_number, ''.join(new_line)
new_line = []
else:
@@ -340,6 +498,7 @@ def join_lines(lines_enum):
# last line contains \
if new_line:
assert primary_line_number is not None
yield primary_line_number, ''.join(new_line)
# TODO: handle space after '\'.
@@ -357,20 +516,6 @@ def ignore_comments(lines_enum):
yield line_number, line
def skip_regex(lines_enum, options):
# type: (ReqFileLines, Optional[optparse.Values]) -> ReqFileLines
"""
Skip lines that match '--skip-requirements-regex' pattern
Note: the regex pattern is only built once
"""
skip_regex = options.skip_requirements_regex if options else None
if skip_regex:
pattern = re.compile(skip_regex)
lines_enum = filterfalse(lambda e: pattern.search(e[1]), lines_enum)
return lines_enum
def expand_env_variables(lines_enum):
# type: (ReqFileLines) -> ReqFileLines
"""Replace all environment variables that can be retrieved via `os.getenv`.
@@ -397,3 +542,51 @@ def expand_env_variables(lines_enum):
line = line.replace(env_var, value)
yield line_number, line
def get_file_content(url, session, comes_from=None):
# type: (str, PipSession, Optional[str]) -> Tuple[str, Text]
"""Gets the content of a file; it may be a filename, file: URL, or
http: URL. Returns (location, content). Content is unicode.
Respects # -*- coding: declarations on the retrieved files.
:param url: File path or url.
:param session: PipSession instance.
:param comes_from: Origin description of requirements.
"""
scheme = get_url_scheme(url)
if scheme in ['http', 'https']:
# FIXME: catch some errors
resp = session.get(url)
raise_for_status(resp)
return resp.url, resp.text
elif scheme == 'file':
if comes_from and comes_from.startswith('http'):
raise InstallationError(
'Requirements file {} references URL {}, '
'which is local'.format(comes_from, url)
)
path = url.split(':', 1)[1]
path = path.replace('\\', '/')
match = _url_slash_drive_re.match(path)
if match:
path = match.group(1) + ':' + path.split('|', 1)[1]
path = urllib_parse.unquote(path)
if path.startswith('/'):
path = '/' + path.lstrip('/')
url = path
try:
with open(url, 'rb') as f:
content = auto_decode(f.read())
except IOError as exc:
raise InstallationError(
'Could not open requirements file: {}'.format(exc)
)
return url, content
_url_slash_drive_re = re.compile(r'/*([a-z])\|', re.I)

View File

@@ -3,10 +3,12 @@ from __future__ import absolute_import
import logging
from collections import OrderedDict
from pip._vendor.packaging.utils import canonicalize_name
from pip._internal.exceptions import InstallationError
from pip._internal.utils.logging import indent_log
from pip._internal.models.wheel import Wheel
from pip._internal.utils import compatibility_tags
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
from pip._internal.wheel import Wheel
if MYPY_CHECK_RUNNING:
from typing import Dict, Iterable, List, Optional, Tuple
@@ -18,35 +20,49 @@ logger = logging.getLogger(__name__)
class RequirementSet(object):
def __init__(self, require_hashes=False, check_supported_wheels=True):
# type: (bool, bool) -> None
def __init__(self, check_supported_wheels=True):
# type: (bool) -> None
"""Create a RequirementSet.
"""
self.requirements = OrderedDict() # type: Dict[str, InstallRequirement] # noqa: E501
self.require_hashes = require_hashes
self.check_supported_wheels = check_supported_wheels
# Mapping of alias: real_name
self.requirement_aliases = {} # type: Dict[str, str]
self.unnamed_requirements = [] # type: List[InstallRequirement]
self.successfully_downloaded = [] # type: List[InstallRequirement]
self.reqs_to_cleanup = [] # type: List[InstallRequirement]
def __str__(self):
# type: () -> str
reqs = [req for req in self.requirements.values()
if not req.comes_from]
reqs.sort(key=lambda req: req.name.lower())
return ' '.join([str(req.req) for req in reqs])
requirements = sorted(
(req for req in self.requirements.values() if not req.comes_from),
key=lambda req: canonicalize_name(req.name),
)
return ' '.join(str(req.req) for req in requirements)
def __repr__(self):
# type: () -> str
reqs = [req for req in self.requirements.values()]
reqs.sort(key=lambda req: req.name.lower())
reqs_str = ', '.join([str(req.req) for req in reqs])
return ('<%s object; %d requirement(s): %s>'
% (self.__class__.__name__, len(reqs), reqs_str))
requirements = sorted(
self.requirements.values(),
key=lambda req: canonicalize_name(req.name),
)
format_string = '<{classname} object; {count} requirement(s): {reqs}>'
return format_string.format(
classname=self.__class__.__name__,
count=len(requirements),
reqs=', '.join(str(req.req) for req in requirements),
)
def add_unnamed_requirement(self, install_req):
# type: (InstallRequirement) -> None
assert not install_req.name
self.unnamed_requirements.append(install_req)
def add_named_requirement(self, install_req):
# type: (InstallRequirement) -> None
assert install_req.name
project_name = canonicalize_name(install_req.name)
self.requirements[project_name] = install_req
def add_requirement(
self,
@@ -69,13 +85,11 @@ class RequirementSet(object):
the requirement is not applicable, or [install_req] if the
requirement is applicable and has just been added.
"""
name = install_req.name
# If the markers do not match, ignore this requirement.
if not install_req.match_markers(extras_requested):
logger.info(
"Ignoring %s: markers '%s' don't match your environment",
name, install_req.markers,
install_req.name, install_req.markers,
)
return [], None
@@ -85,27 +99,27 @@ class RequirementSet(object):
# single requirements file.
if install_req.link and install_req.link.is_wheel:
wheel = Wheel(install_req.link.filename)
if self.check_supported_wheels and not wheel.supported():
tags = compatibility_tags.get_supported()
if (self.check_supported_wheels and not wheel.supported(tags)):
raise InstallationError(
"%s is not a supported wheel on this platform." %
wheel.filename
"{} is not a supported wheel on this platform.".format(
wheel.filename)
)
# This next bit is really a sanity check.
assert install_req.is_direct == (parent_req_name is None), (
"a direct req shouldn't have a parent and also, "
"a non direct req should have a parent"
assert not install_req.user_supplied or parent_req_name is None, (
"a user supplied req shouldn't have a parent"
)
# Unnamed requirements are scanned again and the requirement won't be
# added as a dependency until after scanning.
if not name:
# url or path requirement w/o an egg fragment
self.unnamed_requirements.append(install_req)
if not install_req.name:
self.add_unnamed_requirement(install_req)
return [install_req], None
try:
existing_req = self.get_requirement(name)
existing_req = self.get_requirement(
install_req.name) # type: Optional[InstallRequirement]
except KeyError:
existing_req = None
@@ -118,18 +132,15 @@ class RequirementSet(object):
)
if has_conflicting_requirement:
raise InstallationError(
"Double requirement given: %s (already in %s, name=%r)"
% (install_req, existing_req, name)
"Double requirement given: {} (already in {}, name={!r})"
.format(install_req, existing_req, install_req.name)
)
# When no existing requirement exists, add the requirement as a
# dependency and it will be scanned again after.
if not existing_req:
self.requirements[name] = install_req
# FIXME: what about other normalizations? E.g., _ vs. -?
if name.lower() != name:
self.requirement_aliases[name.lower()] = name
# We'd want to rescan this requirements later
self.add_named_requirement(install_req)
# We'd want to rescan this requirement later
return [install_req], install_req
# Assume there's no need to scan, and that we've already
@@ -145,15 +156,18 @@ class RequirementSet(object):
)
)
if does_not_satisfy_constraint:
self.reqs_to_cleanup.append(install_req)
raise InstallationError(
"Could not satisfy constraints for '%s': "
"Could not satisfy constraints for '{}': "
"installation from path or url cannot be "
"constrained to a version" % name,
"constrained to a version".format(install_req.name)
)
# If we're now installing a constraint, mark the existing
# object for real installation.
existing_req.constraint = False
# If we're now installing a user supplied requirement,
# mark the existing object as such.
if install_req.user_supplied:
existing_req.user_supplied = True
existing_req.extras = tuple(sorted(
set(existing_req.extras) | set(install_req.extras)
))
@@ -165,29 +179,25 @@ class RequirementSet(object):
# scanning again.
return [existing_req], existing_req
def has_requirement(self, project_name):
def has_requirement(self, name):
# type: (str) -> bool
name = project_name.lower()
if (name in self.requirements and
not self.requirements[name].constraint or
name in self.requirement_aliases and
not self.requirements[self.requirement_aliases[name]].constraint):
return True
return False
project_name = canonicalize_name(name)
def get_requirement(self, project_name):
return (
project_name in self.requirements and
not self.requirements[project_name].constraint
)
def get_requirement(self, name):
# type: (str) -> InstallRequirement
for name in project_name, project_name.lower():
if name in self.requirements:
return self.requirements[name]
if name in self.requirement_aliases:
return self.requirements[self.requirement_aliases[name]]
raise KeyError("No project with the name %r" % project_name)
project_name = canonicalize_name(name)
def cleanup_files(self):
# type: () -> None
"""Clean up files, remove builds."""
logger.debug('Cleaning up...')
with indent_log():
for req in self.reqs_to_cleanup:
req.remove_temporary_source()
if project_name in self.requirements:
return self.requirements[project_name]
raise KeyError("No project with the name {name!r}".format(**locals()))
@property
def all_requirements(self):
# type: () -> List[InstallRequirement]
return self.unnamed_requirements + list(self.requirements.values())

View File

@@ -6,35 +6,74 @@ import hashlib
import logging
import os
from pip._vendor import contextlib2
from pip._internal.utils.temp_dir import TempDirectory
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from types import TracebackType
from typing import Iterator, Optional, Set, Type
from typing import Dict, Iterator, Optional, Set, Type, Union
from pip._internal.req.req_install import InstallRequirement
from pip._internal.models.link import Link
logger = logging.getLogger(__name__)
@contextlib.contextmanager
def update_env_context_manager(**changes):
# type: (str) -> Iterator[None]
target = os.environ
# Save values from the target and change them.
non_existent_marker = object()
saved_values = {} # type: Dict[str, Union[object, str]]
for name, new_value in changes.items():
try:
saved_values[name] = target[name]
except KeyError:
saved_values[name] = non_existent_marker
target[name] = new_value
try:
yield
finally:
# Restore original values in the target.
for name, original_value in saved_values.items():
if original_value is non_existent_marker:
del target[name]
else:
assert isinstance(original_value, str) # for mypy
target[name] = original_value
@contextlib.contextmanager
def get_requirement_tracker():
# type: () -> Iterator[RequirementTracker]
root = os.environ.get('PIP_REQ_TRACKER')
with contextlib2.ExitStack() as ctx:
if root is None:
root = ctx.enter_context(
TempDirectory(kind='req-tracker')
).path
ctx.enter_context(update_env_context_manager(PIP_REQ_TRACKER=root))
logger.debug("Initialized build tracking at %s", root)
with RequirementTracker(root) as tracker:
yield tracker
class RequirementTracker(object):
def __init__(self):
# type: () -> None
self._root = os.environ.get('PIP_REQ_TRACKER')
if self._root is None:
self._temp_dir = TempDirectory(delete=False, kind='req-tracker')
self._temp_dir.create()
self._root = os.environ['PIP_REQ_TRACKER'] = self._temp_dir.path
logger.debug('Created requirements tracker %r', self._root)
else:
self._temp_dir = None
logger.debug('Re-using requirements tracker %r', self._root)
def __init__(self, root):
# type: (str) -> None
self._root = root
self._entries = set() # type: Set[InstallRequirement]
logger.debug("Created build tracker: %s", self._root)
def __enter__(self):
# type: () -> RequirementTracker
logger.debug("Entered build tracker: %s", self._root)
return self
def __exit__(
@@ -53,40 +92,55 @@ class RequirementTracker(object):
def add(self, req):
# type: (InstallRequirement) -> None
link = req.link
info = str(req)
entry_path = self._entry_path(link)
"""Add an InstallRequirement to build tracking.
"""
assert req.link
# Get the file to write information about this requirement.
entry_path = self._entry_path(req.link)
# Try reading from the file. If it exists and can be read from, a build
# is already in progress, so a LookupError is raised.
try:
with open(entry_path) as fp:
# Error, these's already a build in progress.
raise LookupError('%s is already being built: %s'
% (link, fp.read()))
contents = fp.read()
except IOError as e:
# if the error is anything other than "file does not exist", raise.
if e.errno != errno.ENOENT:
raise
assert req not in self._entries
with open(entry_path, 'w') as fp:
fp.write(info)
self._entries.add(req)
logger.debug('Added %s to build tracker %r', req, self._root)
else:
message = '{} is already being built: {}'.format(
req.link, contents)
raise LookupError(message)
# If we're here, req should really not be building already.
assert req not in self._entries
# Start tracking this requirement.
with open(entry_path, 'w') as fp:
fp.write(str(req))
self._entries.add(req)
logger.debug('Added %s to build tracker %r', req, self._root)
def remove(self, req):
# type: (InstallRequirement) -> None
link = req.link
"""Remove an InstallRequirement from build tracking.
"""
assert req.link
# Delete the created file and the corresponding entries.
os.unlink(self._entry_path(req.link))
self._entries.remove(req)
os.unlink(self._entry_path(link))
logger.debug('Removed %s from build tracker %r', req, self._root)
def cleanup(self):
# type: () -> None
for req in set(self._entries):
self.remove(req)
remove = self._temp_dir is not None
if remove:
self._temp_dir.cleanup()
logger.debug('%s build tracker %r',
'Removed' if remove else 'Cleaned',
self._root)
logger.debug("Removed build tracker: %r", self._root)
@contextlib.contextmanager
def track(self, req):

View File

@@ -14,8 +14,15 @@ from pip._internal.locations import bin_py, bin_user
from pip._internal.utils.compat import WINDOWS, cache_from_source, uses_pycache
from pip._internal.utils.logging import indent_log
from pip._internal.utils.misc import (
FakeFile, ask, dist_in_usersite, dist_is_local, egg_link_path, is_local,
normalize_path, renames, rmtree,
FakeFile,
ask,
dist_in_usersite,
dist_is_local,
egg_link_path,
is_local,
normalize_path,
renames,
rmtree,
)
from pip._internal.utils.temp_dir import AdjacentTempDirectory, TempDirectory
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
@@ -52,7 +59,7 @@ def _script_names(dist, script_name, is_gui):
def _unique(fn):
# type: (Callable) -> Callable[..., Iterator[Any]]
# type: (Callable[..., Iterator[Any]]) -> Callable[..., Iterator[Any]]
@functools.wraps(fn)
def unique(*args, **kw):
# type: (Any, Any) -> Iterator[Any]
@@ -220,10 +227,8 @@ class StashedUninstallPathSet(object):
try:
save_dir = AdjacentTempDirectory(path) # type: TempDirectory
save_dir.create()
except OSError:
save_dir = TempDirectory(kind="uninstall")
save_dir.create()
self._save_dirs[os.path.normcase(path)] = save_dir
return save_dir.path
@@ -249,7 +254,6 @@ class StashedUninstallPathSet(object):
# Did not find any suitable root
head = os.path.dirname(path)
save_dir = TempDirectory(kind='uninstall')
save_dir.create()
self._save_dirs[head] = save_dir
relpath = os.path.relpath(path, head)
@@ -260,14 +264,16 @@ class StashedUninstallPathSet(object):
def stash(self, path):
# type: (str) -> str
"""Stashes the directory or file and returns its new location.
Handle symlinks as files to avoid modifying the symlink targets.
"""
if os.path.isdir(path):
path_is_dir = os.path.isdir(path) and not os.path.islink(path)
if path_is_dir:
new_path = self._get_directory_stash(path)
else:
new_path = self._get_file_stash(path)
self._moves.append((path, new_path))
if os.path.isdir(path) and os.path.isdir(new_path):
if (path_is_dir and os.path.isdir(new_path)):
# If we're moving a directory, we need to
# remove the destination first or else it will be
# moved to inside the existing directory.
@@ -289,12 +295,12 @@ class StashedUninstallPathSet(object):
# type: () -> None
"""Undoes the uninstall by moving stashed files back."""
for p in self._moves:
logging.info("Moving to %s\n from %s", *p)
logger.info("Moving to %s\n from %s", *p)
for new_path, path in self._moves:
try:
logger.debug('Replacing %s from %s', new_path, path)
if os.path.isfile(new_path):
if os.path.isfile(new_path) or os.path.islink(new_path):
os.unlink(new_path)
elif os.path.isdir(new_path):
rmtree(new_path)
@@ -534,8 +540,9 @@ class UninstallPathSet(object):
with open(develop_egg_link, 'r') as fh:
link_pointer = os.path.normcase(fh.readline().strip())
assert (link_pointer == dist.location), (
'Egg-link %s does not match installed location of %s '
'(at %s)' % (link_pointer, dist.project_name, dist.location)
'Egg-link {} does not match installed location of {} '
'(at {})'.format(
link_pointer, dist.project_name, dist.location)
)
paths_to_remove.add(develop_egg_link)
easy_install_pth = os.path.join(os.path.dirname(develop_egg_link),
@@ -578,10 +585,6 @@ class UninstallPathSet(object):
class UninstallPthEntries(object):
def __init__(self, pth_file):
# type: (str) -> None
if not os.path.isfile(pth_file):
raise UninstallationError(
"Cannot remove entries from nonexistent file %s" % pth_file
)
self.file = pth_file
self.entries = set() # type: Set[str]
self._saved_lines = None # type: Optional[List[bytes]]
@@ -593,6 +596,11 @@ class UninstallPthEntries(object):
# backslashes. This is correct for entries that describe absolute
# paths outside of site-packages, but all the others use forward
# slashes.
# os.path.splitdrive is used instead of os.path.isabs because isabs
# treats non-absolute paths with drive letter markings like c:foo\bar
# as absolute paths. It also does not recognize UNC paths if they don't
# have more than "\\sever\share". Valid examples: "\\server\share\" or
# "\\server\share\folder". Python 2.7.8+ support UNC in splitdrive.
if WINDOWS and not os.path.splitdrive(entry)[0]:
entry = entry.replace('\\', '/')
self.entries.add(entry)
@@ -600,6 +608,13 @@ class UninstallPthEntries(object):
def remove(self):
# type: () -> None
logger.debug('Removing pth entries from %s:', self.file)
# If the file doesn't exist, log a warning and return
if not os.path.isfile(self.file):
logger.warning(
"Cannot remove entries from nonexistent file %s", self.file
)
return
with open(self.file, 'rb') as fh:
# windows uses '\r\n' with py3k, but uses '\n' with py2.x
lines = fh.readlines()