1"""PipSession and supporting code, containing all pip-specific
2network request configuration and behavior.
40from pip
import __version__
53 from ssl
import SSLContext
60SecureOrigin = Tuple[str, str, Optional[Union[int, str]]]
67SECURE_ORIGINS: List[SecureOrigin] = [
71 (
"*",
"localhost",
"*"),
72 (
"*",
"127.0.0.0/8",
"*"),
73 (
"*",
"::1/128",
"*"),
87CI_ENVIRONMENT_VARIABLES = (
101 Return whether it looks like pip is running under CI.
106 return any(name
in os.environ for name
in CI_ENVIRONMENT_VARIABLES)
111 Return a string representing the user agent.
113 data: Dict[str, Any] = {
114 "installer": {
"name":
"pip",
"version": __version__},
121 if data[
"implementation"][
"name"] ==
"CPython":
123 elif data[
"implementation"][
"name"] ==
"PyPy":
126 pypy_version_info = pypy_version_info[:3]
127 data[
"implementation"][
"version"] =
".".join(
128 [str(x)
for x
in pypy_version_info]
130 elif data[
"implementation"][
"name"] ==
"Jython":
133 elif data[
"implementation"][
"name"] ==
"IronPython":
141 distro_infos: Dict[str, Any] = dict(
144 zip([
"name",
"version",
"id"], linux_distribution),
150 zip([
"lib",
"version"], libc_ver()),
154 distro_infos[
"libc"] = libc
156 data[
"distro"] = distro_infos
175 setuptools_dist = get_default_environment().get_distribution(
"setuptools")
176 if setuptools_dist
is not None:
201 if user_data
is not None:
202 data[
"user_data"] = user_data
204 return "{data[installer][name]}/{data[installer][version]} {json}".format(
206 json=
json.dumps(data, separators=(
",",
":"), sort_keys=
True),
213 request: PreparedRequest,
214 stream: bool =
False,
215 timeout: Optional[Union[float, Tuple[float, float]]] =
None,
216 verify: Union[bool, str] =
True,
217 cert: Optional[Union[str, Tuple[str, str]]] =
None,
218 proxies: Optional[Mapping[str, str]] =
None,
228 except OSError
as exc:
239 "Content-Type": content_type,
241 "Last-Modified": modified,
250 def close(self) -> None:
255 """Mixin to add the ``ssl_context`` constructor argument to HTTP adapters.
257 The additional argument is forwarded directly to the pool manager. This allows us
258 to dynamically decide what SSL store to use at runtime, which is used to implement
259 the optional ``truststore`` backend.
265 ssl_context: Optional[
"SSLContext"] =
None,
268 self._ssl_context = ssl_context
269 super().__init__(**kwargs)
271 def init_poolmanager(
275 block: bool = DEFAULT_POOLBLOCK,
278 if self._ssl_context
is not None:
280 return super().init_poolmanager(
281 connections=connections,
299 conn: ConnectionPool,
301 verify: Union[bool, str],
302 cert: Optional[Union[str, Tuple[str, str]]],
304 super().cert_verify(conn=conn, url=url, verify=
False, cert=cert)
310 conn: ConnectionPool,
312 verify: Union[bool, str],
313 cert: Optional[Union[str, Tuple[str, str]]],
315 super().cert_verify(conn=conn, url=url, verify=
False, cert=cert)
319 timeout: Optional[int] =
None
325 cache: Optional[str] =
None,
326 trusted_hosts: Sequence[str] = (),
327 index_urls: Optional[List[str]] =
None,
328 ssl_context: Optional[
"SSLContext"] =
None,
332 :param trusted_hosts: Domains not to emit warnings for when not using
335 super().__init__(*args, **kwargs)
339 self.pip_trusted_origins: List[Tuple[str, Optional[int]]] = []
345 self.auth = MultiDomainBasicAuth(index_urls=index_urls)
359 status_forcelist=[500, 503, 520, 527],
378 secure_adapter = CacheControlAdapter(
379 cache=SafeFileCache(cache),
381 ssl_context=ssl_context,
384 cache=SafeFileCache(cache),
388 secure_adapter = HTTPAdapter(max_retries=retries, ssl_context=ssl_context)
389 self._trusted_host_adapter = insecure_adapter
391 self.mount(
"https://", secure_adapter)
392 self.mount(
"http://", insecure_adapter)
397 for host
in trusted_hosts:
398 self.add_trusted_host(host, suppress_logging=
True)
402 :param new_index_urls: New index urls to update the authentication
405 self.auth.index_urls = new_index_urls
408 self, host: str, source: Optional[str] =
None, suppress_logging: bool =
False
411 :param host: It is okay to provide a host that has previously been
413 :param source: An optional source string, for logging where the host
416 if not suppress_logging:
417 msg = f
"adding trusted host: {host!r}"
418 if source
is not None:
419 msg += f
" (from {source})"
422 parsed_host, parsed_port = parse_netloc(host)
423 if parsed_host
is None:
424 raise ValueError(f
"Trusted host URL must include a host part: {host!r}")
425 if (parsed_host, parsed_port)
not in self.pip_trusted_origins:
426 self.pip_trusted_origins.append((parsed_host, parsed_port))
429 build_url_from_netloc(host, scheme=
"http") +
"/", self._trusted_host_adapter
431 self.mount(build_url_from_netloc(host) +
"/", self._trusted_host_adapter)
434 build_url_from_netloc(host, scheme=
"http") +
":",
435 self._trusted_host_adapter,
438 self.mount(build_url_from_netloc(host) +
":", self._trusted_host_adapter)
441 yield from SECURE_ORIGINS
442 for host, port
in self.pip_trusted_origins:
443 yield (
"*", host,
"*" if port
is None else port)
448 origin_protocol, origin_host, origin_port = (
463 for secure_origin
in self.iter_secure_origins():
464 secure_protocol, secure_host, secure_port = secure_origin
465 if origin_protocol != secure_protocol
and secure_protocol !=
"*":
477 and secure_host !=
"*"
483 if addr
not in network:
488 origin_port != secure_port
489 and secure_port !=
"*"
490 and secure_port
is not None
502 "The repository located at %s is not a trusted or secure host and "
503 "is being ignored. If this repository is available via HTTPS we "
504 "recommend you use HTTPS instead, otherwise you may silence "
505 "this warning and allow it anyway with '--trusted-host %s'.",
512 def request(self, method: str, url: str, *args: Any, **kwargs: Any) -> Response:
519 return super().request(method, url, *args, **kwargs)