Let us walk on the 3-isogeny graph
Loading...
Searching...
No Matches
versioncontrol.py
Go to the documentation of this file.
1"""Handles all VCS (version control) support"""
2
3import logging
4import os
5import shutil
6import sys
7import urllib.parse
8from typing import (
9 TYPE_CHECKING,
10 Any,
11 Dict,
12 Iterable,
13 Iterator,
14 List,
15 Mapping,
16 Optional,
17 Tuple,
18 Type,
19 Union,
20)
21
22from pip._internal.cli.spinners import SpinnerInterface
23from pip._internal.exceptions import BadCommand, InstallationError
24from pip._internal.utils.misc import (
25 HiddenText,
26 ask_path_exists,
27 backup_dir,
28 display_path,
29 hide_url,
30 hide_value,
31 is_installable_dir,
32 rmtree,
33)
35 CommandArgs,
36 call_subprocess,
37 format_command_args,
38 make_command,
39)
40from pip._internal.utils.urls import get_url_scheme
41
42if TYPE_CHECKING:
43 # Literal was introduced in Python 3.8.
44 #
45 # TODO: Remove `if TYPE_CHECKING` when dropping support for Python 3.7.
46 from typing import Literal
47
48
49__all__ = ["vcs"]
50
51
52logger = logging.getLogger(__name__)
53
54AuthInfo = Tuple[Optional[str], Optional[str]]
55
56
57def is_url(name: str) -> bool:
58 """
59 Return true if the name looks like a URL.
60 """
61 scheme = get_url_scheme(name)
62 if scheme is None:
63 return False
64 return scheme in ["http", "https", "file", "ftp"] + vcs.all_schemes
65
66
68 repo_url: str, rev: str, project_name: str, subdir: Optional[str] = None
69) -> str:
70 """
71 Return the URL for a VCS requirement.
72
73 Args:
74 repo_url: the remote VCS url, with any needed VCS prefix (e.g. "git+").
75 project_name: the (unescaped) project name.
76 """
77 egg_project_name = project_name.replace("-", "_")
78 req = f"{repo_url}@{rev}#egg={egg_project_name}"
79 if subdir:
80 req += f"&subdirectory={subdir}"
81
82 return req
83
84
85def find_path_to_project_root_from_repo_root(
86 location: str, repo_root: str
87) -> Optional[str]:
88 """
89 Find the the Python project's root by searching up the filesystem from
90 `location`. Return the path to project root relative to `repo_root`.
91 Return None if the project root is `repo_root`, or cannot be found.
92 """
93 # find project root.
94 orig_location = location
95 while not is_installable_dir(location):
96 last_location = location
97 location = os.path.dirname(location)
98 if location == last_location:
99 # We've traversed up to the root of the filesystem without
100 # finding a Python project.
102 "Could not find a Python project for directory %s (tried all "
103 "parent directories)",
104 orig_location,
105 )
106 return None
107
108 if os.path.samefile(repo_root, location):
109 return None
110
111 return os.path.relpath(location, repo_root)
112
113
114class RemoteNotFoundError(Exception):
115 pass
116
117
118class RemoteNotValidError(Exception):
119 def __init__(self, url: str):
120 super().__init__(url)
121 self.url = url
122
123
125
126 """
127 Encapsulates a VCS-specific revision to install, along with any VCS
128 install options.
129
130 Instances of this class should be treated as if immutable.
131 """
132
134 self,
135 vc_class: Type["VersionControl"],
136 rev: Optional[str] = None,
137 extra_args: Optional[CommandArgs] = None,
138 ) -> None:
139 """
140 Args:
141 vc_class: a VersionControl subclass.
142 rev: the name of the revision to install.
143 extra_args: a list of extra options.
144 """
145 if extra_args is None:
146 extra_args = []
147
148 self.extra_args = extra_args
149 self.rev = rev
150 self.vc_class = vc_class
151 self.branch_name: Optional[str] = None
152
153 def __repr__(self) -> str:
154 return f"<RevOptions {self.vc_class.name}: rev={self.rev!r}>"
155
156 @property
157 def arg_rev(self) -> Optional[str]:
158 if self.rev is None:
159 return self.vc_class.default_arg_rev
160
161 return self.rev
162
163 def to_args(self) -> CommandArgs:
164 """
165 Return the VCS-specific command arguments.
166 """
167 args: CommandArgs = []
168 rev = self.arg_rev
169 if rev is not None:
170 args += self.vc_class.get_base_rev_args(rev)
171 args += self.extra_args
172
173 return args
174
175 def to_display(self) -> str:
176 if not self.rev:
177 return ""
178
179 return f" (to revision {self.rev})"
180
181 def make_new(self, rev: str) -> "RevOptions":
182 """
183 Make a copy of the current instance, but with a new rev.
184
185 Args:
186 rev: the name of the revision for the new object.
187 """
188 return self.vc_class.make_rev_options(rev, extra_args=self.extra_args)
189
190
192 _registry: Dict[str, "VersionControl"] = {}
193 schemes = ["ssh", "git", "hg", "bzr", "sftp", "svn"]
194
195 def __init__(self) -> None:
196 # Register more schemes with urlparse for various version control
197 # systems
199 super().__init__()
200
201 def __iter__(self) -> Iterator[str]:
202 return self._registry.__iter__()
203
204 @property
205 def backends(self) -> List["VersionControl"]:
206 return list(self._registry.values())
207
208 @property
209 def dirnames(self) -> List[str]:
210 return [backend.dirname for backend in self.backends]
211
212 @property
213 def all_schemes(self) -> List[str]:
214 schemes: List[str] = []
215 for backend in self.backends:
217 return schemes
218
219 def register(self, cls: Type["VersionControl"]) -> None:
220 if not hasattr(cls, "name"):
221 logger.warning("Cannot register VCS %s", cls.__name__)
222 return
223 if cls.name not in self._registry:
224 self._registry[cls.name] = cls()
225 logger.debug("Registered VCS backend: %s", cls.name)
226
227 def unregister(self, name: str) -> None:
228 if name in self._registry:
229 del self._registry[name]
230
231 def get_backend_for_dir(self, location: str) -> Optional["VersionControl"]:
232 """
233 Return a VersionControl object if a repository of that type is found
234 at the given directory.
235 """
236 vcs_backends = {}
237 for vcs_backend in self._registry.values():
238 repo_path = vcs_backend.get_repository_root(location)
239 if not repo_path:
240 continue
241 logger.debug("Determine that %s uses VCS: %s", location, vcs_backend.name)
242 vcs_backends[repo_path] = vcs_backend
243
244 if not vcs_backends:
245 return None
246
247 # Choose the VCS in the inner-most directory. Since all repository
248 # roots found here would be either `location` or one of its
249 # parents, the longest path should have the most path components,
250 # i.e. the backend representing the inner-most repository.
251 inner_most_repo_path = max(vcs_backends, key=len)
252 return vcs_backends[inner_most_repo_path]
253
254 def get_backend_for_scheme(self, scheme: str) -> Optional["VersionControl"]:
255 """
256 Return a VersionControl object or None.
257 """
258 for vcs_backend in self._registry.values():
259 if scheme in vcs_backend.schemes:
260 return vcs_backend
261 return None
262
263 def get_backend(self, name: str) -> Optional["VersionControl"]:
264 """
265 Return a VersionControl object or None.
266 """
267 name = name.lower()
268 return self._registry.get(name)
269
270
272
273
275 name = ""
276 dirname = ""
277 repo_name = ""
278 # List of supported schemes for this Version Control
279 schemes: Tuple[str, ...] = ()
280 # Iterable of environment variable names to pass to call_subprocess().
281 unset_environ: Tuple[str, ...] = ()
282 default_arg_rev: Optional[str] = None
283
284 @classmethod
285 def should_add_vcs_url_prefix(cls, remote_url: str) -> bool:
286 """
287 Return whether the vcs prefix (e.g. "git+") should be added to a
288 repository's remote url when used in a requirement.
289 """
290 return not remote_url.lower().startswith(f"{cls.name}:")
291
292 @classmethod
293 def get_subdirectory(cls, location: str) -> Optional[str]:
294 """
295 Return the path to Python project root, relative to the repo root.
296 Return None if the project root is in the repo root.
297 """
298 return None
299
300 @classmethod
301 def get_requirement_revision(cls, repo_dir: str) -> str:
302 """
303 Return the revision string that should be used in a requirement.
304 """
305 return cls.get_revision(repo_dir)
306
307 @classmethod
308 def get_src_requirement(cls, repo_dir: str, project_name: str) -> str:
309 """
310 Return the requirement string to use to redownload the files
311 currently at the given repository directory.
312
313 Args:
314 project_name: the (unescaped) project name.
315
316 The return value has a form similar to the following:
317
318 {repository_url}@{revision}#egg={project_name}
319 """
320 repo_url = cls.get_remote_url(repo_dir)
321
322 if cls.should_add_vcs_url_prefix(repo_url):
323 repo_url = f"{cls.name}+{repo_url}"
324
325 revision = cls.get_requirement_revision(repo_dir)
326 subdir = cls.get_subdirectory(repo_dir)
327 req = make_vcs_requirement_url(repo_url, revision, project_name, subdir=subdir)
328
329 return req
330
331 @staticmethod
332 def get_base_rev_args(rev: str) -> List[str]:
333 """
334 Return the base revision arguments for a vcs command.
335
336 Args:
337 rev: the name of a revision to install. Cannot be None.
338 """
339 raise NotImplementedError
340
341 def is_immutable_rev_checkout(self, url: str, dest: str) -> bool:
342 """
343 Return true if the commit hash checked out at dest matches
344 the revision in url.
345
346 Always return False, if the VCS does not support immutable commit
347 hashes.
348
349 This method does not check if there are local uncommitted changes
350 in dest after checkout, as pip currently has no use case for that.
351 """
352 return False
353
354 @classmethod
356 cls, rev: Optional[str] = None, extra_args: Optional[CommandArgs] = None
357 ) -> RevOptions:
358 """
359 Return a RevOptions object.
360
361 Args:
362 rev: the name of a revision to install.
363 extra_args: a list of extra options.
364 """
365 return RevOptions(cls, rev, extra_args=extra_args)
366
367 @classmethod
368 def _is_local_repository(cls, repo: str) -> bool:
369 """
370 posix absolute paths start with os.path.sep,
371 win32 ones start with drive (like c:\\folder)
372 """
373 drive, tail = os.path.splitdrive(repo)
374 return repo.startswith(os.path.sep) or bool(drive)
375
376 @classmethod
378 cls, netloc: str, scheme: str
379 ) -> Tuple[str, Tuple[Optional[str], Optional[str]]]:
380 """
381 Parse the repository URL's netloc, and return the new netloc to use
382 along with auth information.
383
384 Args:
385 netloc: the original repository URL netloc.
386 scheme: the repository URL's scheme without the vcs prefix.
387
388 This is mainly for the Subversion class to override, so that auth
389 information can be provided via the --username and --password options
390 instead of through the URL. For other subclasses like Git without
391 such an option, auth information must stay in the URL.
392
393 Returns: (netloc, (username, password)).
394 """
395 return netloc, (None, None)
396
397 @classmethod
398 def get_url_rev_and_auth(cls, url: str) -> Tuple[str, Optional[str], AuthInfo]:
399 """
400 Parse the repository URL to use, and return the URL, revision,
401 and auth info to use.
402
403 Returns: (url, rev, (username, password)).
404 """
405 scheme, netloc, path, query, frag = urllib.parse.urlsplit(url)
406 if "+" not in scheme:
407 raise ValueError(
408 "Sorry, {!r} is a malformed VCS url. "
409 "The format is <vcs>+<protocol>://<url>, "
410 "e.g. svn+http://myrepo/svn/MyApp#egg=MyApp".format(url)
411 )
412 # Remove the vcs prefix.
413 scheme = scheme.split("+", 1)[1]
414 netloc, user_pass = cls.get_netloc_and_auth(netloc, scheme)
415 rev = None
416 if "@" in path:
417 path, rev = path.rsplit("@", 1)
418 if not rev:
419 raise InstallationError(
420 "The URL {!r} has an empty revision (after @) "
421 "which is not supported. Include a revision after @ "
422 "or remove @ from the URL.".format(url)
423 )
424 url = urllib.parse.urlunsplit((scheme, netloc, path, query, ""))
425 return url, rev, user_pass
426
427 @staticmethod
429 username: Optional[str], password: Optional[HiddenText]
430 ) -> CommandArgs:
431 """
432 Return the RevOptions "extra arguments" to use in obtain().
433 """
434 return []
435
436 def get_url_rev_options(self, url: HiddenText) -> Tuple[HiddenText, RevOptions]:
437 """
438 Return the URL and RevOptions object to use in obtain(),
439 as a tuple (url, rev_options).
440 """
441 secret_url, rev, user_pass = self.get_url_rev_and_auth(url.secret)
442 username, secret_password = user_pass
443 password: Optional[HiddenText] = None
444 if secret_password is not None:
445 password = hide_value(secret_password)
446 extra_args = self.make_rev_args(username, password)
447 rev_options = self.make_rev_options(rev, extra_args=extra_args)
448
449 return hide_url(secret_url), rev_options
450
451 @staticmethod
452 def normalize_url(url: str) -> str:
453 """
454 Normalize a URL for comparison by unquoting it and removing any
455 trailing slash.
456 """
457 return urllib.parse.unquote(url).rstrip("/")
458
459 @classmethod
460 def compare_urls(cls, url1: str, url2: str) -> bool:
461 """
462 Compare two repo URLs for identity, ignoring incidental differences.
463 """
464 return cls.normalize_url(url1) == cls.normalize_url(url2)
465
467 self, dest: str, url: HiddenText, rev_options: RevOptions, verbosity: int
468 ) -> None:
469 """
470 Fetch a revision from a repository, in the case that this is the
471 first fetch from the repository.
472
473 Args:
474 dest: the directory to fetch the repository to.
475 rev_options: a RevOptions object.
476 verbosity: verbosity level.
477 """
478 raise NotImplementedError
479
480 def switch(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
481 """
482 Switch the repo at ``dest`` to point to ``URL``.
483
484 Args:
485 rev_options: a RevOptions object.
486 """
487 raise NotImplementedError
488
489 def update(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None:
490 """
491 Update an already-existing repo to the given ``rev_options``.
492
493 Args:
494 rev_options: a RevOptions object.
495 """
496 raise NotImplementedError
497
498 @classmethod
499 def is_commit_id_equal(cls, dest: str, name: Optional[str]) -> bool:
500 """
501 Return whether the id of the current commit equals the given name.
502
503 Args:
504 dest: the repository directory.
505 name: a string name.
506 """
507 raise NotImplementedError
508
509 def obtain(self, dest: str, url: HiddenText, verbosity: int) -> None:
510 """
511 Install or update in editable mode the package represented by this
512 VersionControl object.
513
514 :param dest: the repository directory in which to install or update.
515 :param url: the repository URL starting with a vcs prefix.
516 :param verbosity: verbosity level.
517 """
518 url, rev_options = self.get_url_rev_options(url)
519
520 if not os.path.exists(dest):
521 self.fetch_new(dest, url, rev_options, verbosity=verbosity)
522 return
523
524 rev_display = rev_options.to_display()
525 if self.is_repository_directory(dest):
526 existing_url = self.get_remote_url(dest)
527 if self.compare_urls(existing_url, url.secret):
529 "%s in %s exists, and has correct URL (%s)",
530 self.repo_namerepo_name.title(),
531 display_path(dest),
532 url,
533 )
534 if not self.is_commit_id_equal(dest, rev_options.rev):
536 "Updating %s %s%s",
537 display_path(dest),
539 rev_display,
540 )
541 self.update(dest, url, rev_options)
542 else:
543 logger.info("Skipping because already up-to-date.")
544 return
545
547 "%s %s in %s exists with URL %s",
550 display_path(dest),
551 existing_url,
552 )
553 prompt = ("(s)witch, (i)gnore, (w)ipe, (b)ackup ", ("s", "i", "w", "b"))
554 else:
556 "Directory %s already exists, and is not a %s %s.",
557 dest,
558 self.namename,
560 )
561 # https://github.com/python/mypy/issues/1174
562 prompt = ("(i)gnore, (w)ipe, (b)ackup ", ("i", "w", "b")) # type: ignore
563
565 "The plan is to install the %s repository %s",
566 self.namename,
567 url,
568 )
569 response = ask_path_exists("What to do? {}".format(prompt[0]), prompt[1])
570
571 if response == "a":
572 sys.exit(-1)
573
574 if response == "w":
575 logger.warning("Deleting %s", display_path(dest))
576 rmtree(dest)
577 self.fetch_new(dest, url, rev_options, verbosity=verbosity)
578 return
579
580 if response == "b":
581 dest_dir = backup_dir(dest)
582 logger.warning("Backing up %s to %s", display_path(dest), dest_dir)
583 shutil.move(dest, dest_dir)
584 self.fetch_new(dest, url, rev_options, verbosity=verbosity)
585 return
586
587 # Do nothing if the response is "i".
588 if response == "s":
590 "Switching %s %s to %s%s",
592 display_path(dest),
593 url,
594 rev_display,
595 )
596 self.switch(dest, url, rev_options)
597
598 def unpack(self, location: str, url: HiddenText, verbosity: int) -> None:
599 """
600 Clean up current location and download the url repository
601 (and vcs infos) into location
602
603 :param url: the repository URL starting with a vcs prefix.
604 :param verbosity: verbosity level.
605 """
606 if os.path.exists(location):
607 rmtree(location)
608 self.obtain(location, url=url, verbosity=verbosity)
609
610 @classmethod
611 def get_remote_url(cls, location: str) -> str:
612 """
613 Return the url used at location
614
615 Raises RemoteNotFoundError if the repository does not have a remote
616 url configured.
617 """
618 raise NotImplementedError
619
620 @classmethod
621 def get_revision(cls, location: str) -> str:
622 """
623 Return the current commit id of the files at the given location.
624 """
625 raise NotImplementedError
626
627 @classmethod
629 cls,
630 cmd: Union[List[str], CommandArgs],
631 show_stdout: bool = True,
632 cwd: Optional[str] = None,
633 on_returncode: 'Literal["raise", "warn", "ignore"]' = "raise",
634 extra_ok_returncodes: Optional[Iterable[int]] = None,
635 command_desc: Optional[str] = None,
636 extra_environ: Optional[Mapping[str, Any]] = None,
637 spinner: Optional[SpinnerInterface] = None,
638 log_failed_cmd: bool = True,
639 stdout_only: bool = False,
640 ) -> str:
641 """
642 Run a VCS subcommand
643 This is simply a wrapper around call_subprocess that adds the VCS
644 command name, and checks that the VCS is available
645 """
646 cmd = make_command(cls.namename, *cmd)
647 if command_desc is None:
648 command_desc = format_command_args(cmd)
649 try:
650 return call_subprocess(
651 cmd,
652 show_stdout,
653 cwd,
654 on_returncode=on_returncode,
655 extra_ok_returncodes=extra_ok_returncodes,
656 command_desc=command_desc,
657 extra_environ=extra_environ,
658 unset_environ=cls.unset_environ,
659 spinner=spinner,
660 log_failed_cmd=log_failed_cmd,
661 stdout_only=stdout_only,
662 )
663 except FileNotFoundError:
664 # errno.ENOENT = no such file or directory
665 # In other words, the VCS executable isn't available
666 raise BadCommand(
667 f"Cannot find command {cls.name!r} - do you have "
668 f"{cls.name!r} installed and in your PATH?"
669 )
670 except PermissionError:
671 # errno.EACCES = Permission denied
672 # This error occurs, for instance, when the command is installed
673 # only for another user. So, the current user don't have
674 # permission to call the other user command.
675 raise BadCommand(
676 f"No permission to execute {cls.name!r} - install it "
677 f"locally, globally (ask admin), or check your PATH. "
678 f"See possible solutions at "
679 f"https://pip.pypa.io/en/latest/reference/pip_freeze/"
680 f"#fixing-permission-denied."
681 )
682
683 @classmethod
684 def is_repository_directory(cls, path: str) -> bool:
685 """
686 Return whether a directory path is a repository directory.
687 """
688 logger.debug("Checking in %s for %s (%s)...", path, cls.dirnamedirname, cls.namename)
690
691 @classmethod
692 def get_repository_root(cls, location: str) -> Optional[str]:
693 """
694 Return the "root" (top-level) directory controlled by the vcs,
695 or `None` if the directory is not in any.
696
697 It is meant to be overridden to implement smarter detection
698 mechanisms for specific vcs.
699
700 This can do more than is_repository_directory() alone. For
701 example, the Git override checks that Git is actually available.
702 """
703 if cls.is_repository_directory(location):
704 return location
705 return None
None __init__(self, Type["VersionControl"] vc_class, Optional[str] rev=None, Optional[CommandArgs] extra_args=None)
Optional["VersionControl"] get_backend_for_dir(self, str location)
Optional["VersionControl"] get_backend_for_scheme(self, str scheme)
None register(self, Type["VersionControl"] cls)
Optional["VersionControl"] get_backend(self, str name)
Tuple[str, Tuple[Optional[str], Optional[str]]] get_netloc_and_auth(cls, str netloc, str scheme)
Optional[str] get_subdirectory(cls, str location)
str get_src_requirement(cls, str repo_dir, str project_name)
bool is_immutable_rev_checkout(self, str url, str dest)
Tuple[str, Optional[str], AuthInfo] get_url_rev_and_auth(cls, str url)
None obtain(self, str dest, HiddenText url, int verbosity)
None unpack(self, str location, HiddenText url, int verbosity)
Tuple[HiddenText, RevOptions] get_url_rev_options(self, HiddenText url)
RevOptions make_rev_options(cls, Optional[str] rev=None, Optional[CommandArgs] extra_args=None)
CommandArgs make_rev_args(Optional[str] username, Optional[HiddenText] password)
None fetch_new(self, str dest, HiddenText url, RevOptions rev_options, int verbosity)
bool is_commit_id_equal(cls, str dest, Optional[str] name)
Optional[str] get_repository_root(cls, str location)
None switch(self, str dest, HiddenText url, RevOptions rev_options)
None update(self, str dest, HiddenText url, RevOptions rev_options)
str run_command(cls, Union[List[str], CommandArgs] cmd, bool show_stdout=True, Optional[str] cwd=None, 'Literal["raise", "warn", "ignore"]' on_returncode="raise", Optional[Iterable[int]] extra_ok_returncodes=None, Optional[str] command_desc=None, Optional[Mapping[str, Any]] extra_environ=None, Optional[SpinnerInterface] spinner=None, bool log_failed_cmd=True, bool stdout_only=False)
str make_vcs_requirement_url(str repo_url, str rev, str project_name, Optional[str] subdir=None)
for i