14from io
import StringIO
15from itertools
import filterfalse, tee, zip_longest
16from types
import TracebackType
39from pip
import __version__
58 "remove_auth_from_url",
59 "check_externally_managed",
60 "ConfiguredBuildBackendHookCaller",
66ExcInfo = Tuple[Type[BaseException], BaseException, TracebackType]
67VersionInfo = Tuple[int, int, int]
68NetlocTuple = Tuple[str, Tuple[Optional[str], Optional[str]]]
75 return "pip {} from {} (python {})".format(
78 get_major_minor_version(),
82def normalize_version_info(py_version_info: Tuple[int, ...]) -> Tuple[int, int, int]:
84 Convert a tuple of ints representing a Python version to one of length
87 :param py_version_info: a tuple of ints representing a Python version,
88 or None to specify no version. The tuple can have any length.
90 :return: a tuple of length three if `py_version_info` is non-None.
91 Otherwise, return `py_version_info` unchanged (i.e. None).
93 if len(py_version_info) < 3:
94 py_version_info += (3 -
len(py_version_info)) * (0,)
95 elif len(py_version_info) > 3:
96 py_version_info = py_version_info[:3]
98 return cast(
"VersionInfo", py_version_info)
101def ensure_dir(path: str) ->
None:
102 """os.path.makedirs without EEXIST."""
114 if prog
in (
"__main__.py",
"-c"):
115 return f
"{sys.executable} -m pip"
118 except (AttributeError, TypeError, IndexError):
125@retry(reraise=True, stop=stop_after_delay(3), wait=
wait_fixed(0.5))
126def rmtree(dir: str, ignore_errors: bool =
False) ->
None:
128 shutil.rmtree(dir, ignore_errors=ignore_errors, onexc=rmtree_errorhandler)
130 shutil.rmtree(dir, ignore_errors=ignore_errors, onerror=rmtree_errorhandler)
134 func: Callable[..., Any], path: str, exc_info: Union[ExcInfo, BaseException]
136 """On Windows, the files in .svn are read-only, so when rmtree() tries to
137 remove them, an exception is thrown. We catch that here, remove the
138 read-only attribute, and hopefully continue without problems."""
145 if has_attr_readonly:
155def display_path(path: str) -> str:
156 """Gives the display value for a given path, making it relative to cwd
164def backup_dir(dir: str, ext: str =
".bak") -> str:
165 """Figure out the name of a directory to back up the given dir to
166 (adding .bak, .bak2, etc)"""
171 extension = ext + str(n)
172 return dir + extension
175def ask_path_exists(message: str, options: Iterable[str]) -> str:
177 if action
in options:
179 return ask(message, options)
183 """Raise an error if no input is allowed."""
186 f
"No input was expected ($PIP_NO_INPUT set); question: {message}"
190def ask(message: str, options: Iterable[str]) -> str:
191 """Ask the message interactively, with the given possible responses"""
194 response = input(message)
196 if response
not in options:
198 "Your response ({!r}) was not one of the expected responses: "
199 "{}".format(response,
", ".join(options))
205def ask_input(message: str) -> str:
206 """Ask for input interactively."""
208 return input(message)
211def ask_password(message: str) -> str:
212 """Ask for a password interactively."""
217def strtobool(val: str) -> int:
218 """Convert a string representation of truth to true (1) or false (0).
220 True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values
221 are 'n', 'no', 'f', 'false', 'off', and '0'. Raises ValueError if
222 'val' is anything else.
225 if val
in (
"y",
"yes",
"t",
"true",
"on",
"1"):
227 elif val
in (
"n",
"no",
"f",
"false",
"off",
"0"):
230 raise ValueError(f
"invalid truth value {val!r}")
233def format_size(bytes: float) -> str:
234 if bytes > 1000 * 1000:
235 return "{:.1f} MB".format(bytes / 1000.0 / 1000)
236 elif bytes > 10 * 1000:
237 return "{} kB".format(int(bytes / 1000))
239 return "{:.1f} kB".format(bytes / 1000.0)
241 return "{} bytes".format(int(bytes))
244def tabulate(rows: Iterable[Iterable[Any]]) -> Tuple[List[str], List[int]]:
245 """Return a list of formatted rows and a list of column sizes.
249 >>> tabulate([['foobar', 2000], [0xdeadbeef]])
250 (['foobar 2000', '3735928559'], [10, 4])
252 rows = [tuple(map(str, row))
for row
in rows]
253 sizes = [max(map(len, col))
for col
in zip_longest(*rows, fillvalue=
"")]
254 table = [
" ".join(map(
str.ljust, row, sizes)).rstrip()
for row
in rows]
258def is_installable_dir(path: str) -> bool:
259 """Is path is a directory containing pyproject.toml or setup.py?
261 If pyproject.toml exists, this is a PEP 517 project. Otherwise we look for
262 a legacy setuptools layout by identifying setup.py. We don't check for the
263 setup.cfg because using it without setup.py is only available for PEP 517
264 projects, which are already covered by the pyproject.toml check.
277) -> Generator[bytes,
None,
None]:
278 """Yield pieces of data from a file-like object until EOF."""
286def normalize_path(path: str, resolve_symlinks: bool =
True) -> str:
288 Convert a path to its canonical, case-normalized, absolute version.
299def splitext(path: str) -> Tuple[str, str]:
300 """Like os.path.splitext, but take off .tar too"""
303 ext = base[-4:] + ext
308def renames(old: str, new: str) ->
None:
309 """Like os.renames(), but handles renaming across devices."""
325def is_local(path: str) -> bool:
327 Return True if path is within sys.prefix, if we're running in a virtualenv.
329 If we're not in a virtualenv, all paths are considered "local."
331 Caution: this function assumes the head of path has been normalized
334 if not running_under_virtualenv():
339def write_output(msg: Any, *args: Any) ->
None:
355 def encoding(self) -> str:
359@contextlib.contextmanager
361 """Return a context manager used by captured_stdout/stdin/stderr
362 that temporarily replaces the sys stream *stream_name* with a StringIO.
364 Taken from Lib/support/__init__.py in the CPython repo.
366 orig_stdout =
getattr(sys, stream_name)
369 yield getattr(sys, stream_name)
371 setattr(sys, stream_name, orig_stdout)
375 """Capture the output of sys.stdout:
377 with captured_stdout() as stdout:
379 self.assertEqual(stdout.getvalue(), 'hello\n')
381 Taken from Lib/support/__init__.py in the CPython repo.
388 See captured_stdout().
394def enum(*sequential: Any, **named: Any) -> Type[Any]:
395 enums = dict(
zip(sequential,
range(
len(sequential))), **named)
396 reverse = {value: key
for key, value
in enums.items()}
397 enums[
"reverse_mapping"] = reverse
398 return type(
"Enum", (), enums)
401def build_netloc(host: str, port: Optional[int]) -> str:
403 Build a netloc from a host-port pair
410 return f
"{host}:{port}"
413def build_url_from_netloc(netloc: str, scheme: str =
"https") -> str:
415 Build a full URL from a netloc.
417 if netloc.count(
":") >= 2
and "@" not in netloc
and "[" not in netloc:
419 netloc = f
"[{netloc}]"
420 return f
"{scheme}://{netloc}"
423def parse_netloc(netloc: str) -> Tuple[Optional[str], Optional[int]]:
425 Return the host-port pair from a netloc.
427 url = build_url_from_netloc(netloc)
432def split_auth_from_netloc(netloc: str) -> NetlocTuple:
434 Parse out and remove the auth information from a netloc.
436 Returns: (netloc, (username, password)).
438 if "@" not in netloc:
439 return netloc, (
None,
None)
445 pw: Optional[str] =
None
452 user, pw = auth,
None
458 return netloc, (user, pw)
463 Replace the sensitive data in a netloc with "****", if it exists.
466 - "user:pass@example.com" returns "user:****@example.com"
467 - "accesstoken@example.com" returns "****@example.com"
469 netloc, (user, password) = split_auth_from_netloc(netloc)
478 return "{user}{password}@{netloc}".format(
479 user=user, password=password, netloc=netloc
484 url: str, transform_netloc: Callable[[str], Tuple[Any, ...]]
485) -> Tuple[str, NetlocTuple]:
486 """Transform and replace netloc in a url.
488 transform_netloc is a function taking the netloc and returning a
489 tuple. The first element of this tuple is the new netloc. The
490 entire tuple is returned.
492 Returns a tuple containing the transformed url as item 0 and the
493 original tuple returned by transform_netloc as item 1.
500 return surl, cast(
"NetlocTuple", netloc_tuple)
504 return split_auth_from_netloc(netloc)
511def split_auth_netloc_from_url(
513) -> Tuple[str, str, Tuple[Optional[str], Optional[str]]]:
515 Parse a url into separate netloc, auth, and url with no auth.
517 Returns: (url_without_auth, netloc, (username, password))
519 url_without_auth, (netloc, auth) =
_transform_url(url, _get_netloc)
520 return url_without_auth, netloc, auth
523def remove_auth_from_url(url: str) -> str:
524 """Return a copy of url with 'username:password@' removed."""
530def redact_auth_from_url(url: str) -> str:
531 """Replace the password in a given url with ****."""
536 def __init__(self, secret: str, redacted: str) ->
None:
541 return "<HiddenText {!r}>".format(str(self))
548 if type(self) != type(other):
556def hide_value(value: str) -> HiddenText:
560def hide_url(url: str) -> HiddenText:
561 redacted = redact_auth_from_url(url)
565def protect_pip_from_modification_on_windows(modifying_pip: bool) ->
None:
566 """Protection of pip.exe from modification on Windows
568 On Windows, any operation modifying pip should be run as:
573 f
"pip{sys.version_info.major}",
574 f
"pip{sys.version_info.major}.{sys.version_info.minor}",
578 should_show_use_python_msg = (
582 if should_show_use_python_msg:
585 "To modify pip, please run the following command:\n{}".format(
586 " ".join(new_command)
592 """Check whether the current environment is externally managed.
594 If the ``EXTERNALLY-MANAGED`` config file is found, the current environment
595 is considered externally managed, and an ExternallyManagedEnvironment is
598 if running_under_virtualenv():
607 """Is this console interactive?"""
611def hash_file(path: str, blocksize: int = 1 << 20) -> Tuple[Any, int]:
612 """Return (hash, length) for path using hashlib.sha256()"""
616 with open(path,
"rb")
as f:
617 for block
in read_chunks(f, size=blocksize):
623def pairwise(iterable: Iterable[Any]) -> Iterator[Tuple[Any, Any]]:
625 Return paired elements.
628 s -> (s0, s1), (s2, s3), (s4, s5), ...
630 iterable = iter(iterable)
631 return zip_longest(iterable, iterable)
635 pred: Callable[[T], bool],
636 iterable: Iterable[T],
637) -> Tuple[Iterable[T], Iterable[T]]:
639 Use a predicate to partition entries into false entries and true entries,
642 partition(is_odd, range(10)) --> 0 2 4 6 8 and 1 3 5 7 9
644 t1, t2 = tee(iterable)
645 return filterfalse(pred, t1), filter(pred, t2)
654 backend_path: Optional[str] =
None,
655 runner: Optional[Callable[...,
None]] =
None,
656 python_executable: Optional[str] =
None,
659 source_dir, build_backend, backend_path, runner, python_executable
665 wheel_directory: str,
666 config_settings: Optional[Dict[str, Union[str, List[str]]]] =
None,
667 metadata_directory: Optional[str] =
None,
671 wheel_directory, config_settings=cs, metadata_directory=metadata_directory
676 sdist_directory: str,
677 config_settings: Optional[Dict[str, Union[str, List[str]]]] =
None,
684 wheel_directory: str,
685 config_settings: Optional[Dict[str, Union[str, List[str]]]] =
None,
686 metadata_directory: Optional[str] =
None,
690 wheel_directory, config_settings=cs, metadata_directory=metadata_directory
694 self, config_settings: Optional[Dict[str, Union[str, List[str]]]] =
None
700 self, config_settings: Optional[Dict[str, Union[str, List[str]]]] =
None
706 self, config_settings: Optional[Dict[str, Union[str, List[str]]]] =
None
713 metadata_directory: str,
714 config_settings: Optional[Dict[str, Union[str, List[str]]]] =
None,
715 _allow_fallback: bool =
True,
719 metadata_directory=metadata_directory,
721 _allow_fallback=_allow_fallback,
726 metadata_directory: str,
727 config_settings: Optional[Dict[str, Union[str, List[str]]]] =
None,
728 _allow_fallback: bool =
True,
732 metadata_directory=metadata_directory,
734 _allow_fallback=_allow_fallback,
None __init__(self, str secret, str redacted)
bool __eq__(self, Any other)
"StreamWrapper" from_stream(cls, TextIO orig_stream)
Tuple[str] _redact_netloc(str netloc)
NetlocTuple _get_netloc(str netloc)
Tuple[str, NetlocTuple] _transform_url(str url, Callable[[str], Tuple[Any,...]] transform_netloc)
ContextManager[StreamWrapper] captured_stderr()
Generator[StreamWrapper, None, None] captured_output(str stream_name)
None check_externally_managed()
None _check_no_input(str message)
str redact_netloc(str netloc)
None rmtree_errorhandler(Callable[..., Any] func, str path, Union[ExcInfo, BaseException] exc_info)
ContextManager[StreamWrapper] captured_stdout()
bool is_console_interactive()