Let us walk on the 3-isogeny graph
Loading...
Searching...
No Matches
link.py
Go to the documentation of this file.
1import functools
2import itertools
3import logging
4import os
5import posixpath
6import re
7import urllib.parse
8from dataclasses import dataclass
9from typing import (
10 TYPE_CHECKING,
11 Any,
12 Dict,
13 List,
14 Mapping,
15 NamedTuple,
16 Optional,
17 Tuple,
18 Union,
19)
20
21from pip._internal.utils.deprecation import deprecated
22from pip._internal.utils.filetypes import WHEEL_EXTENSION
23from pip._internal.utils.hashes import Hashes
24from pip._internal.utils.misc import (
25 pairwise,
26 redact_auth_from_url,
27 split_auth_from_netloc,
28 splitext,
29)
30from pip._internal.utils.models import KeyBasedCompareMixin
31from pip._internal.utils.urls import path_to_url, url_to_path
32
33if TYPE_CHECKING:
34 from pip._internal.index.collector import IndexContent
35
36logger = logging.getLogger(__name__)
37
38
39# Order matters, earlier hashes have a precedence over later hashes for what
40# we will pick to use.
41_SUPPORTED_HASHES = ("sha512", "sha384", "sha256", "sha224", "sha1", "md5")
42
43
44@dataclass(frozen=True)
46 """Links to content may have embedded hash values. This class parses those.
47
48 `name` must be any member of `_SUPPORTED_HASHES`.
49
50 This class can be converted to and from `ArchiveInfo`. While ArchiveInfo intends to
51 be JSON-serializable to conform to PEP 610, this class contains the logic for
52 parsing a hash name and value for correctness, and then checking whether that hash
53 conforms to a schema with `.is_hash_allowed()`."""
54
55 name: str
56 value: str
57
58 _hash_url_fragment_re = re.compile(
59 # NB: we do not validate that the second group (.*) is a valid hex
60 # digest. Instead, we simply keep that string in this class, and then check it
61 # against Hashes when hash-checking is needed. This is easier to debug than
62 # proactively discarding an invalid hex digest, as we handle incorrect hashes
63 # and malformed hashes in the same place.
64 r"[#&]({choices})=([^&]*)".format(
65 choices="|".join(re.escape(hash_name) for hash_name in _SUPPORTED_HASHES)
66 ),
67 )
68
69 def __post_init__(self) -> None:
70 assert self.namename in _SUPPORTED_HASHES
71
72 @classmethod
73 @functools.lru_cache(maxsize=None)
74 def find_hash_url_fragment(cls, url: str) -> Optional["LinkHash"]:
75 """Search a string for a checksum algorithm name and encoded output value."""
76 match = cls._hash_url_fragment_re.search(url)
77 if match is None:
78 return None
79 name, value = match.groups()
80 return cls(name=name, value=value)
81
82 def as_dict(self) -> Dict[str, str]:
83 return {self.namename: self.value}
84
85 def as_hashes(self) -> Hashes:
86 """Return a Hashes instance which checks only for the current hash."""
87 return Hashes({self.namename: [self.value]})
88
89 def is_hash_allowed(self, hashes: Optional[Hashes]) -> bool:
90 """
91 Return True if the current hash is allowed by `hashes`.
92 """
93 if hashes is None:
94 return False
95 return hashes.is_hash_allowed(self.namename, hex_digest=self.value)
96
97
98@dataclass(frozen=True)
100 """Information about a core metadata file associated with a distribution."""
101
102 hashes: Optional[Dict[str, str]]
103
104 def __post_init__(self) -> None:
105 if self.hasheshashes is not None:
106 assert all(name in _SUPPORTED_HASHES for name in self.hasheshashes)
107
108
109def supported_hashes(hashes: Optional[Dict[str, str]]) -> Optional[Dict[str, str]]:
110 # Remove any unsupported hash types from the mapping. If this leaves no
111 # supported hashes, return None
112 if hashes is None:
113 return None
114 hashes = {n: v for n, v in hashes.items() if n in _SUPPORTED_HASHES}
115 if not hashes:
116 return None
117 return hashes
118
119
120def _clean_url_path_part(part: str) -> str:
121 """
122 Clean a "part" of a URL path (i.e. after splitting on "@" characters).
123 """
124 # We unquote prior to quoting to make sure nothing is double quoted.
126
127
128def _clean_file_url_path(part: str) -> str:
129 """
130 Clean the first part of a URL path that corresponds to a local
131 filesystem path (i.e. the first part after splitting on "@" characters).
132 """
133 # We unquote prior to quoting to make sure nothing is double quoted.
134 # Also, on Windows the path part might contain a drive letter which
135 # should not be quoted. On Linux where drive letters do not
136 # exist, the colon should be quoted. We rely on urllib.request
137 # to do the right thing here.
139
140
141# percent-encoded: /
142_reserved_chars_re = re.compile("(@|%2F)", re.IGNORECASE)
143
144
145def _clean_url_path(path: str, is_local_path: bool) -> str:
146 """
147 Clean the path portion of a URL.
148 """
149 if is_local_path:
150 clean_func = _clean_file_url_path
151 else:
152 clean_func = _clean_url_path_part
153
154 # Split on the reserved characters prior to cleaning so that
155 # revision strings in VCS URLs are properly preserved.
156 parts = _reserved_chars_re.split(path)
157
158 cleaned_parts = []
159 for to_clean, reserved in pairwise(itertools.chain(parts, [""])):
161 # Normalize %xx escapes (e.g. %2f -> %2F)
163
164 return "".join(cleaned_parts)
165
166
167def _ensure_quoted_url(url: str) -> str:
168 """
169 Make sure a link is fully quoted.
170 For example, if ' ' occurs in the URL, it will be replaced with "%20",
171 and without double-quoting other characters.
172 """
173 # Split the URL into parts according to the general structure
174 # `scheme://netloc/path;parameters?query#fragment`.
175 result = urllib.parse.urlparse(url)
176 # If the netloc is empty, then the URL refers to a local filesystem path.
177 is_local_path = not result.netloc
178 path = _clean_url_path(result.path, is_local_path=is_local_path)
180
181
183 """Represents a parsed link from a Package Index's simple URL"""
184
185 __slots__ = [
186 "_parsed_url",
187 "_url",
188 "_hashes",
189 "comes_from",
190 "requires_python",
191 "yanked_reason",
192 "metadata_file_data",
193 "cache_link_parsing",
194 "egg_fragment",
195 ]
196
198 self,
199 url: str,
200 comes_from: Optional[Union[str, "IndexContent"]] = None,
201 requires_python: Optional[str] = None,
202 yanked_reason: Optional[str] = None,
203 metadata_file_data: Optional[MetadataFile] = None,
204 cache_link_parsing: bool = True,
205 hashes: Optional[Mapping[str, str]] = None,
206 ) -> None:
207 """
208 :param url: url of the resource pointed to (href of the link)
209 :param comes_from: instance of IndexContent where the link was found,
210 or string.
211 :param requires_python: String containing the `Requires-Python`
212 metadata field, specified in PEP 345. This may be specified by
213 a data-requires-python attribute in the HTML link tag, as
214 described in PEP 503.
215 :param yanked_reason: the reason the file has been yanked, if the
216 file has been yanked, or None if the file hasn't been yanked.
217 This is the value of the "data-yanked" attribute, if present, in
218 a simple repository HTML link. If the file has been yanked but
219 no reason was provided, this should be the empty string. See
220 PEP 592 for more information and the specification.
221 :param metadata_file_data: the metadata attached to the file, or None if
222 no such metadata is provided. This argument, if not None, indicates
223 that a separate metadata file exists, and also optionally supplies
224 hashes for that file.
225 :param cache_link_parsing: A flag that is used elsewhere to determine
226 whether resources retrieved from this link should be cached. PyPI
227 URLs should generally have this set to False, for example.
228 :param hashes: A mapping of hash names to digests to allow us to
229 determine the validity of a download.
230 """
231
232 # The comes_from, requires_python, and metadata_file_data arguments are
233 # only used by classmethods of this class, and are not used in client
234 # code directly.
235
236 # url can be a UNC windows share
237 if url.startswith("\\\\"):
238 url = path_to_url(url)
239
241 # Store the url as a private attribute to prevent accidentally
242 # trying to set a new value.
243 self._url = url
244
245 link_hash = LinkHash.find_hash_url_fragment(url)
246 hashes_from_link = {} if link_hash is None else link_hash.as_dict()
247 if hashes is None:
248 self._hashes = hashes_from_link
249 else:
250 self._hashes = {**hashes, **hashes_from_link}
251
252 self.comes_from = comes_from
253 self.requires_python = requires_python if requires_python else None
254 self.yanked_reason = yanked_reason
255 self.metadata_file_data = metadata_file_data
256
257 super().__init__(key=url, defining_class=Link)
258
259 self.cache_link_parsing = cache_link_parsing
261
262 @classmethod
264 cls,
265 file_data: Dict[str, Any],
266 page_url: str,
267 ) -> Optional["Link"]:
268 """
269 Convert an pypi json document from a simple repository page into a Link.
270 """
271 file_url = file_data.get("url")
272 if file_url is None:
273 return None
274
275 url = _ensure_quoted_url(urllib.parse.urljoin(page_url, file_url))
276 pyrequire = file_data.get("requires-python")
277 yanked_reason = file_data.get("yanked")
278 hashes = file_data.get("hashes", {})
279
280 # PEP 714: Indexes must use the name core-metadata, but
281 # clients should support the old name as a fallback for compatibility.
282 metadata_info = file_data.get("core-metadata")
283 if metadata_info is None:
284 metadata_info = file_data.get("dist-info-metadata")
285
286 # The metadata info value may be a boolean, or a dict of hashes.
287 if isinstance(metadata_info, dict):
288 # The file exists, and hashes have been supplied
289 metadata_file_data = MetadataFile(supported_hashes(metadata_info))
290 elif metadata_info:
291 # The file exists, but there are no hashes
292 metadata_file_data = MetadataFile(None)
293 else:
294 # False or not present: the file does not exist
295 metadata_file_data = None
296
297 # The Link.yanked_reason expects an empty string instead of a boolean.
298 if yanked_reason and not isinstance(yanked_reason, str):
299 yanked_reason = ""
300 # The Link.yanked_reason expects None instead of False.
301 elif not yanked_reason:
302 yanked_reason = None
303
304 return cls(
305 url,
306 comes_from=page_url,
307 requires_python=pyrequire,
308 yanked_reason=yanked_reason,
309 hashes=hashes,
310 metadata_file_data=metadata_file_data,
311 )
312
313 @classmethod
315 cls,
316 anchor_attribs: Dict[str, Optional[str]],
317 page_url: str,
318 base_url: str,
319 ) -> Optional["Link"]:
320 """
321 Convert an anchor element's attributes in a simple repository page to a Link.
322 """
323 href = anchor_attribs.get("href")
324 if not href:
325 return None
326
327 url = _ensure_quoted_url(urllib.parse.urljoin(base_url, href))
328 pyrequire = anchor_attribs.get("data-requires-python")
329 yanked_reason = anchor_attribs.get("data-yanked")
330
331 # PEP 714: Indexes must use the name data-core-metadata, but
332 # clients should support the old name as a fallback for compatibility.
333 metadata_info = anchor_attribs.get("data-core-metadata")
334 if metadata_info is None:
335 metadata_info = anchor_attribs.get("data-dist-info-metadata")
336 # The metadata info value may be the string "true", or a string of
337 # the form "hashname=hashval"
338 if metadata_info == "true":
339 # The file exists, but there are no hashes
340 metadata_file_data = MetadataFile(None)
341 elif metadata_info is None:
342 # The file does not exist
343 metadata_file_data = None
344 else:
345 # The file exists, and hashes have been supplied
346 hashname, sep, hashval = metadata_info.partition("=")
347 if sep == "=":
348 metadata_file_data = MetadataFile(supported_hashes({hashname: hashval}))
349 else:
350 # Error - data is wrong. Treat as no hashes supplied.
352 "Index returned invalid data-dist-info-metadata value: %s",
353 metadata_info,
354 )
355 metadata_file_data = MetadataFile(None)
356
357 return cls(
358 url,
359 comes_from=page_url,
360 requires_python=pyrequire,
361 yanked_reason=yanked_reason,
362 metadata_file_data=metadata_file_data,
363 )
364
365 def __str__(self) -> str:
366 if self.requires_python:
367 rp = f" (requires-python:{self.requires_python})"
368 else:
369 rp = ""
370 if self.comes_from:
371 return "{} (from {}){}".format(
372 redact_auth_from_url(self._url), self.comes_from, rp
373 )
374 else:
375 return redact_auth_from_url(str(self._url))
376
377 def __repr__(self) -> str:
378 return f"<Link {self}>"
379
380 @property
381 def url(self) -> str:
382 return self._url
383
384 @property
385 def filename(self) -> str:
386 path = self.path.rstrip("/")
387 name = posixpath.basename(path)
388 if not name:
389 # Make sure we don't leak auth information if the netloc
390 # includes a username and password.
391 netloc, user_pass = split_auth_from_netloc(self.netloc)
392 return netloc
393
394 name = urllib.parse.unquote(name)
395 assert name, f"URL {self._url!r} produced no filename"
396 return name
397
398 @property
399 def file_path(self) -> str:
400 return url_to_path(self.urlurl)
401
402 @property
403 def scheme(self) -> str:
404 return self._parsed_url.scheme
405
406 @property
407 def netloc(self) -> str:
408 """
409 This can contain auth information.
410 """
411 return self._parsed_url.netloc
412
413 @property
414 def path(self) -> str:
415 return urllib.parse.unquote(self._parsed_url.path)
416
417 def splitext(self) -> Tuple[str, str]:
418 return splitext(posixpath.basename(self.path.rstrip("/")))
419
420 @property
421 def ext(self) -> str:
422 return self.splitext()[1]
423
424 @property
425 def url_without_fragment(self) -> str:
426 scheme, netloc, path, query, fragment = self._parsed_url
427 return urllib.parse.urlunsplit((scheme, netloc, path, query, ""))
428
429 _egg_fragment_re = re.compile(r"[#&]egg=([^&]*)")
430
431 # Per PEP 508.
432 _project_name_re = re.compile(
433 r"^([A-Z0-9]|[A-Z0-9][A-Z0-9._-]*[A-Z0-9])$", re.IGNORECASE
434 )
435
436 def _egg_fragment(self) -> Optional[str]:
437 match = self._egg_fragment_re.search(self._url)
438 if not match:
439 return None
440
441 # An egg fragment looks like a PEP 508 project name, along with
442 # an optional extras specifier. Anything else is invalid.
443 project_name = match.group(1)
444 if not self._project_name_re.match(project_name):
445 deprecated(
446 reason=f"{self} contains an egg fragment with a non-PEP 508 name",
447 replacement="to use the req @ url syntax, and remove the egg fragment",
448 gone_in="25.0",
449 issue=11617,
450 )
451
452 return project_name
453
454 _subdirectory_fragment_re = re.compile(r"[#&]subdirectory=([^&]*)")
455
456 @property
457 def subdirectory_fragment(self) -> Optional[str]:
458 match = self._subdirectory_fragment_re.search(self._url)
459 if not match:
460 return None
461 return match.group(1)
462
463 def metadata_link(self) -> Optional["Link"]:
464 """Return a link to the associated core metadata file (if any)."""
465 if self.metadata_file_data is None:
466 return None
467 metadata_url = f"{self.url_without_fragment}.metadata"
468 if self.metadata_file_data.hashes is None:
469 return Link(metadata_url)
470 return Link(metadata_url, hashes=self.metadata_file_data.hashes)
471
472 def as_hashes(self) -> Hashes:
473 return Hashes({k: [v] for k, v in self._hashes.items()})
474
475 @property
476 def hash(self) -> Optional[str]:
477 return next(iter(self._hashes.values()), None)
478
479 @property
480 def hash_name(self) -> Optional[str]:
481 return next(iter(self._hashes), None)
482
483 @property
484 def show_url(self) -> str:
485 return posixpath.basename(self._url.split("#", 1)[0].split("?", 1)[0])
486
487 @property
488 def is_file(self) -> bool:
489 return self.schemescheme == "file"
490
491 def is_existing_dir(self) -> bool:
493
494 @property
495 def is_wheel(self) -> bool:
496 return self.extext == WHEEL_EXTENSION
497
498 @property
499 def is_vcs(self) -> bool:
500 from pip._internal.vcs import vcs
501
502 return self.schemescheme in vcs.all_schemes
503
504 @property
505 def is_yanked(self) -> bool:
506 return self.yanked_reason is not None
507
508 @property
509 def has_hash(self) -> bool:
510 return bool(self._hashes)
511
512 def is_hash_allowed(self, hashes: Optional[Hashes]) -> bool:
513 """
514 Return True if the link has a hash and it is allowed by `hashes`.
515 """
516 if hashes is None:
517 return False
518 return any(hashes.is_hash_allowed(k, v) for k, v in self._hashes.items())
519
520
521class _CleanResult(NamedTuple):
522 """Convert link for equivalency check.
523
524 This is used in the resolver to check whether two URL-specified requirements
525 likely point to the same distribution and can be considered equivalent. This
526 equivalency logic avoids comparing URLs literally, which can be too strict
527 (e.g. "a=1&b=2" vs "b=2&a=1") and produce conflicts unexpecting to users.
528
529 Currently this does three things:
530
531 1. Drop the basic auth part. This is technically wrong since a server can
532 serve different content based on auth, but if it does that, it is even
533 impossible to guarantee two URLs without auth are equivalent, since
534 the user can input different auth information when prompted. So the
535 practical solution is to assume the auth doesn't affect the response.
536 2. Parse the query to avoid the ordering issue. Note that ordering under the
537 same key in the query are NOT cleaned; i.e. "a=1&a=2" and "a=2&a=1" are
538 still considered different.
539 3. Explicitly drop most of the fragment part, except ``subdirectory=`` and
540 hash values, since it should have no impact the downloaded content. Note
541 that this drops the "egg=" part historically used to denote the requested
542 project (and extras), which is wrong in the strictest sense, but too many
543 people are supplying it inconsistently to cause superfluous resolution
544 conflicts, so we choose to also ignore them.
545 """
546
548 query: Dict[str, List[str]]
549 subdirectory: str
550 hashes: Dict[str, str]
551
552
553def _clean_link(link: Link) -> _CleanResult:
554 parsed = link._parsed_url
555 netloc = parsed.netloc.rsplit("@", 1)[-1]
556 # According to RFC 8089, an empty host in file: means localhost.
557 if parsed.scheme == "file" and not netloc:
558 netloc = "localhost"
560 if "egg" in fragment:
561 logger.debug("Ignoring egg= fragment in %s", link)
562 try:
563 # If there are multiple subdirectory values, use the first one.
564 # This matches the behavior of Link.subdirectory_fragment.
565 subdirectory = fragment["subdirectory"][0]
566 except (IndexError, KeyError):
567 subdirectory = ""
568 # If there are multiple hash values under the same algorithm, use the
569 # first one. This matches the behavior of Link.hash_value.
570 hashes = {k: fragment[k][0] for k in _SUPPORTED_HASHES if k in fragment}
571 return _CleanResult(
572 parsed=parsed._replace(netloc=netloc, query="", fragment=""),
574 subdirectory=subdirectory,
575 hashes=hashes,
576 )
577
578
579@functools.lru_cache(maxsize=None)
580def links_equivalent(link1: Link, link2: Link) -> bool:
581 return _clean_link(link1) == _clean_link(link2)
for i