Let us walk on the 3-isogeny graph
Loading...
Searching...
No Matches
req_uninstall.py
Go to the documentation of this file.
1import functools
2import os
3import sys
4import sysconfig
5from importlib.util import cache_from_source
6from typing import Any, Callable, Dict, Generator, Iterable, List, Optional, Set, Tuple
7
8from pip._internal.exceptions import UninstallationError
9from pip._internal.locations import get_bin_prefix, get_bin_user
10from pip._internal.metadata import BaseDistribution
11from pip._internal.utils.compat import WINDOWS
12from pip._internal.utils.egg_link import egg_link_path_from_location
13from pip._internal.utils.logging import getLogger, indent_log
14from pip._internal.utils.misc import ask, normalize_path, renames, rmtree
15from pip._internal.utils.temp_dir import AdjacentTempDirectory, TempDirectory
16from pip._internal.utils.virtualenv import running_under_virtualenv
17
18logger = getLogger(__name__)
19
20
22 bin_dir: str, script_name: str, is_gui: bool
23) -> Generator[str, None, None]:
24 """Create the fully qualified name of the files created by
25 {console,gui}_scripts for the given ``dist``.
26 Returns the list of file names
27 """
28 exe_name = os.path.join(bin_dir, script_name)
29 yield exe_name
30 if not WINDOWS:
31 return
32 yield f"{exe_name}.exe"
33 yield f"{exe_name}.exe.manifest"
34 if is_gui:
35 yield f"{exe_name}-script.pyw"
36 else:
37 yield f"{exe_name}-script.py"
38
39
41 fn: Callable[..., Generator[Any, None, None]]
42) -> Callable[..., Generator[Any, None, None]]:
43 @functools.wraps(fn)
44 def unique(*args: Any, **kw: Any) -> Generator[Any, None, None]:
45 seen: Set[Any] = set()
46 for item in fn(*args, **kw):
47 if item not in seen:
48 seen.add(item)
49 yield item
50
51 return unique
52
53
54@_unique
55def uninstallation_paths(dist: BaseDistribution) -> Generator[str, None, None]:
56 """
57 Yield all the uninstallation paths for dist based on RECORD-without-.py[co]
58
59 Yield paths to all the files in RECORD. For each .py file in RECORD, add
60 the .pyc and .pyo in the same directory.
61
62 UninstallPathSet.add() takes care of the __pycache__ .py[co].
63
64 If RECORD is not found, raises UninstallationError,
65 with possible information from the INSTALLER file.
66
67 https://packaging.python.org/specifications/recording-installed-packages/
68 """
69 location = dist.location
70 assert location is not None, "not installed"
71
73 if entries is None:
74 msg = "Cannot uninstall {dist}, RECORD file not found.".format(dist=dist)
75 installer = dist.installer
76 if not installer or installer == "pip":
77 dep = "{}=={}".format(dist.raw_name, dist.version)
78 msg += (
79 " You might be able to recover from this via: "
80 "'pip install --force-reinstall --no-deps {}'.".format(dep)
81 )
82 else:
83 msg += " Hint: The package was installed by {}.".format(installer)
84 raise UninstallationError(msg)
85
86 for entry in entries:
87 path = os.path.join(location, entry)
88 yield path
89 if path.endswith(".py"):
90 dn, fn = os.path.split(path)
91 base = fn[:-3]
92 path = os.path.join(dn, base + ".pyc")
93 yield path
94 path = os.path.join(dn, base + ".pyo")
95 yield path
96
97
98def compact(paths: Iterable[str]) -> Set[str]:
99 """Compact a path set to contain the minimal number of paths
100 necessary to contain all paths in the set. If /a/path/ and
101 /a/path/to/a/file.txt are both in the set, leave only the
102 shorter path."""
103
104 sep = os.path.sep
105 short_paths: Set[str] = set()
106 for path in sorted(paths, key=len):
107 should_skip = any(
109 and path[len(shortpath.rstrip("*").rstrip(sep))] == sep
110 for shortpath in short_paths
111 )
112 if not should_skip:
113 short_paths.add(path)
114 return short_paths
115
116
117def compress_for_rename(paths: Iterable[str]) -> Set[str]:
118 """Returns a set containing the paths that need to be renamed.
119
120 This set may include directories when the original sequence of paths
121 included every file on disk.
122 """
123 case_map = {os.path.normcase(p): p for p in paths}
124 remaining = set(case_map)
125 unchecked = sorted({os.path.split(p)[0] for p in case_map.values()}, key=len)
126 wildcards: Set[str] = set()
127
128 def norm_join(*a: str) -> str:
130
131 for root in unchecked:
132 if any(os.path.normcase(root).startswith(w) for w in wildcards):
133 # This directory has already been handled.
134 continue
135
136 all_files: Set[str] = set()
137 all_subdirs: Set[str] = set()
138 for dirname, subdirs, files in os.walk(root):
139 all_subdirs.update(norm_join(root, dirname, d) for d in subdirs)
140 all_files.update(norm_join(root, dirname, f) for f in files)
141 # If all the files we found are in our remaining set of files to
142 # remove, then remove them from the latter set and add a wildcard
143 # for the directory.
144 if not (all_files - remaining):
146 wildcards.add(root + os.sep)
147
148 return set(map(case_map.__getitem__, remaining)) | wildcards
149
150
151def compress_for_output_listing(paths: Iterable[str]) -> Tuple[Set[str], Set[str]]:
152 """Returns a tuple of 2 sets of which paths to display to user
153
154 The first set contains paths that would be deleted. Files of a package
155 are not added and the top-level directory of the package has a '*' added
156 at the end - to signify that all it's contents are removed.
157
158 The second set contains files that would have been skipped in the above
159 folders.
160 """
161
162 will_remove = set(paths)
163 will_skip = set()
164
165 # Determine folders and files
166 folders = set()
167 files = set()
168 for path in will_remove:
169 if path.endswith(".pyc"):
170 continue
171 if path.endswith("__init__.py") or ".dist-info" in path:
173 files.add(path)
174
175 # probably this one https://github.com/python/mypy/issues/390
176 _normcased_files = set(map(os.path.normcase, files)) # type: ignore
177
178 folders = compact(folders)
179
180 # This walks the tree using os.walk to not miss extra folders
181 # that might get added.
182 for folder in folders:
183 for dirpath, _, dirfiles in os.walk(folder):
184 for fname in dirfiles:
185 if fname.endswith(".pyc"):
186 continue
187
188 file_ = os.path.join(dirpath, fname)
189 if (
190 os.path.isfile(file_)
191 and os.path.normcase(file_) not in _normcased_files
192 ):
193 # We are skipping this file. Add it to the set.
194 will_skip.add(file_)
195
196 will_remove = files | {os.path.join(folder, "*") for folder in folders}
197
198 return will_remove, will_skip
199
200
202 """A set of file rename operations to stash files while
203 tentatively uninstalling them."""
204
205 def __init__(self) -> None:
206 # Mapping from source file root to [Adjacent]TempDirectory
207 # for files under that directory.
208 self._save_dirs: Dict[str, TempDirectory] = {}
209 # (old path, new path) tuples for each move that may need
210 # to be undone.
211 self._moves: List[Tuple[str, str]] = []
212
213 def _get_directory_stash(self, path: str) -> str:
214 """Stashes a directory.
215
216 Directories are stashed adjacent to their original location if
217 possible, or else moved/copied into the user's temp dir."""
218
219 try:
220 save_dir: TempDirectory = AdjacentTempDirectory(path)
221 except OSError:
222 save_dir = TempDirectory(kind="uninstall")
223 self._save_dirs[os.path.normcase(path)] = save_dir
224
225 return save_dir.path
226
227 def _get_file_stash(self, path: str) -> str:
228 """Stashes a file.
229
230 If no root has been provided, one will be created for the directory
231 in the user's temp directory."""
232 path = os.path.normcase(path)
233 head, old_head = os.path.dirname(path), None
234 save_dir = None
235
236 while head != old_head:
237 try:
238 save_dir = self._save_dirs[head]
239 break
240 except KeyError:
241 pass
242 head, old_head = os.path.dirname(head), head
243 else:
244 # Did not find any suitable root
245 head = os.path.dirname(path)
246 save_dir = TempDirectory(kind="uninstall")
247 self._save_dirs[head] = save_dir
248
249 relpath = os.path.relpath(path, head)
250 if relpath and relpath != os.path.curdir:
251 return os.path.join(save_dir.path, relpath)
252 return save_dir.path
253
254 def stash(self, path: str) -> str:
255 """Stashes the directory or file and returns its new location.
256 Handle symlinks as files to avoid modifying the symlink targets.
257 """
258 path_is_dir = os.path.isdir(path) and not os.path.islink(path)
259 if path_is_dir:
260 new_path = self._get_directory_stash(path)
261 else:
262 new_path = self._get_file_stash(path)
263
264 self._moves.append((path, new_path))
265 if path_is_dir and os.path.isdir(new_path):
266 # If we're moving a directory, we need to
267 # remove the destination first or else it will be
268 # moved to inside the existing directory.
269 # We just created new_path ourselves, so it will
270 # be removable.
271 os.rmdir(new_path)
272 renames(path, new_path)
273 return new_path
274
275 def commit(self) -> None:
276 """Commits the uninstall by removing stashed files."""
277 for _, save_dir in self._save_dirs.items():
279 self._moves = []
280 self._save_dirs = {}
281
282 def rollback(self) -> None:
283 """Undoes the uninstall by moving stashed files back."""
284 for p in self._moves:
285 logger.info("Moving to %s\n from %s", *p)
286
287 for new_path, path in self._moves:
288 try:
289 logger.debug("Replacing %s from %s", new_path, path)
290 if os.path.isfile(new_path) or os.path.islink(new_path):
291 os.unlink(new_path)
292 elif os.path.isdir(new_path):
293 rmtree(new_path)
294 renames(path, new_path)
295 except OSError as ex:
296 logger.error("Failed to restore %s", new_path)
297 logger.debug("Exception: %s", ex)
298
299 self.commit()
300
301 @property
302 def can_rollback(self) -> bool:
303 return bool(self._moves)
304
305
307 """A set of file paths to be removed in the uninstallation of a
308 requirement."""
309
310 def __init__(self, dist: BaseDistribution) -> None:
311 self._paths: Set[str] = set()
312 self._refuse: Set[str] = set()
313 self._pth: Dict[str, UninstallPthEntries] = {}
314 self._dist = dist
316 # Create local cache of normalize_path results. Creating an UninstallPathSet
317 # can result in hundreds/thousands of redundant calls to normalize_path with
318 # the same args, which hurts performance.
320
321 def _permitted(self, path: str) -> bool:
322 """
323 Return True if the given path is one we are permitted to
324 remove/modify, False otherwise.
325
326 """
327 # aka is_local, but caching normalized sys.prefix
328 if not running_under_virtualenv():
329 return True
331
332 def add(self, path: str) -> None:
333 head, tail = os.path.split(path)
334
335 # we normalize the head to resolve parent directory symlinks, but not
336 # the tail, since we only want to uninstall symlinks, not their targets
338
339 if not os.path.exists(path):
340 return
341 if self._permitted(path):
342 self._paths.add(path)
343 else:
344 self._refuse.add(path)
345
346 # __pycache__ files can show up after 'installed-files.txt' is created,
347 # due to imports
348 if os.path.splitext(path)[1] == ".py":
349 self.add(cache_from_source(path))
350
351 def add_pth(self, pth_file: str, entry: str) -> None:
352 pth_file = self._normalize_path_cached(pth_file)
353 if self._permitted(pth_file):
354 if pth_file not in self._pth:
355 self._pth[pth_file] = UninstallPthEntries(pth_file)
356 self._pth[pth_file].add(entry)
357 else:
358 self._refuse.add(pth_file)
359
360 def remove(self, auto_confirm: bool = False, verbose: bool = False) -> None:
361 """Remove paths in ``self._paths`` with confirmation (unless
362 ``auto_confirm`` is True)."""
363
364 if not self._paths:
366 "Can't uninstall '%s'. No files were found to uninstall.",
367 self._dist.raw_name,
368 )
369 return
370
371 dist_name_version = f"{self._dist.raw_name}-{self._dist.version}"
372 logger.info("Uninstalling %s:", dist_name_version)
373
374 with indent_log():
375 if auto_confirm or self._allowed_to_proceed(verbose):
376 moved = self._moved_paths
377
378 for_rename = compress_for_rename(self._paths)
379
380 for path in sorted(compact(for_rename)):
381 moved.stash(path)
382 logger.verbose("Removing file or directory %s", path)
383
384 for pth in self._pth.values():
385 pth.remove()
386
387 logger.info("Successfully uninstalled %s", dist_name_version)
388
389 def _allowed_to_proceed(self, verbose: bool) -> bool:
390 """Display which files would be deleted and prompt for confirmation"""
391
392 def _display(msg: str, paths: Iterable[str]) -> None:
393 if not paths:
394 return
395
396 logger.info(msg)
397 with indent_log():
398 for path in sorted(compact(paths)):
399 logger.info(path)
400
401 if not verbose:
402 will_remove, will_skip = compress_for_output_listing(self._paths)
403 else:
404 # In verbose mode, display all the files that are going to be
405 # deleted.
406 will_remove = set(self._paths)
407 will_skip = set()
408
409 _display("Would remove:", will_remove)
410 _display("Would not remove (might be manually added):", will_skip)
411 _display("Would not remove (outside of prefix):", self._refuse)
412 if verbose:
413 _display("Will actually move:", compress_for_rename(self._paths))
414
415 return ask("Proceed (Y/n)? ", ("y", "n", "")) != "n"
416
417 def rollback(self) -> None:
418 """Rollback the changes previously made by remove()."""
419 if not self._moved_paths.can_rollback:
421 "Can't roll back %s; was not uninstalled",
422 self._dist.raw_name,
423 )
424 return
425 logger.info("Rolling back uninstall of %s", self._dist.raw_name)
427 for pth in self._pth.values():
429
430 def commit(self) -> None:
431 """Remove temporary save dir: rollback will no longer be possible."""
432 self._moved_paths.commit()
433
434 @classmethod
435 def from_dist(cls, dist: BaseDistribution) -> "UninstallPathSet":
436 dist_location = dist.location
437 info_location = dist.info_location
438 if dist_location is None:
440 "Not uninstalling %s since it is not installed",
442 )
443 return cls(dist)
444
445 normalized_dist_location = normalize_path(dist_location)
446 if not dist.local:
448 "Not uninstalling %s at %s, outside environment %s",
450 normalized_dist_location,
452 )
453 return cls(dist)
454
455 if normalized_dist_location in {
456 p
457 for p in {sysconfig.get_path("stdlib"), sysconfig.get_path("platstdlib")}
458 if p
459 }:
461 "Not uninstalling %s at %s, as it is in the standard library.",
463 normalized_dist_location,
464 )
465 return cls(dist)
466
467 paths_to_remove = cls(dist)
468 develop_egg_link = egg_link_path_from_location(dist.raw_name)
469
470 # Distribution is installed with metadata in a "flat" .egg-info
471 # directory. This means it is not a modern .dist-info installation, an
472 # egg, or legacy editable.
473 setuptools_flat_installation = (
475 and info_location is not None
476 and os.path.exists(info_location)
477 # If dist is editable and the location points to a ``.egg-info``,
478 # we are in fact in the legacy editable case.
479 and not info_location.endswith(f"{dist.setuptools_filename}.egg-info")
480 )
481
482 # Uninstall cases order do matter as in the case of 2 installs of the
483 # same package, pip needs to uninstall the currently detected version
484 if setuptools_flat_installation:
485 if info_location is not None:
486 paths_to_remove.add(info_location)
487 installed_files = dist.iter_declared_entries()
488 if installed_files is not None:
489 for installed_file in installed_files:
490 paths_to_remove.add(os.path.join(dist_location, installed_file))
491 # FIXME: need a test for this elif block
492 # occurs with --single-version-externally-managed/--record outside
493 # of pip
494 elif dist.is_file("top_level.txt"):
495 try:
496 namespace_packages = dist.read_text("namespace_packages.txt")
497 except FileNotFoundError:
498 namespaces = []
499 else:
500 namespaces = namespace_packages.splitlines(keepends=False)
501 for top_level_pkg in [
502 p
503 for p in dist.read_text("top_level.txt").splitlines()
504 if p and p not in namespaces
505 ]:
506 path = os.path.join(dist_location, top_level_pkg)
508 paths_to_remove.add(f"{path}.py")
509 paths_to_remove.add(f"{path}.pyc")
510 paths_to_remove.add(f"{path}.pyo")
511
514 "Cannot uninstall {!r}. It is a distutils installed project "
515 "and thus we cannot accurately determine which files belong "
516 "to it which would lead to only a partial uninstall.".format(
518 )
519 )
520
522 # package installed by easy_install
523 # We cannot match on dist.egg_name because it can slightly vary
524 # i.e. setuptools-0.6c11-py2.6.egg vs setuptools-0.6rc11-py2.6.egg
525 paths_to_remove.add(dist_location)
526 easy_install_egg = os.path.split(dist_location)[1]
527 easy_install_pth = os.path.join(
528 os.path.dirname(dist_location),
529 "easy-install.pth",
530 )
531 paths_to_remove.add_pth(easy_install_pth, "./" + easy_install_egg)
532
534 for path in uninstallation_paths(dist):
536
537 elif develop_egg_link:
538 # PEP 660 modern editable is handled in the ``.dist-info`` case
539 # above, so this only covers the setuptools-style editable.
540 with open(develop_egg_link) as fh:
541 link_pointer = os.path.normcase(fh.readline().strip())
542 normalized_link_pointer = paths_to_remove._normalize_path_cached(
543 link_pointer
544 )
545 assert os.path.samefile(
546 normalized_link_pointer, normalized_dist_location
547 ), (
548 f"Egg-link {develop_egg_link} (to {link_pointer}) does not match "
549 f"installed location of {dist.raw_name} (at {dist_location})"
550 )
551 paths_to_remove.add(develop_egg_link)
552 easy_install_pth = os.path.join(
553 os.path.dirname(develop_egg_link), "easy-install.pth"
554 )
555 paths_to_remove.add_pth(easy_install_pth, dist_location)
556
557 else:
559 "Not sure how to uninstall: %s - Check: %s",
560 dist,
561 dist_location,
562 )
563
565 bin_dir = get_bin_user()
566 else:
567 bin_dir = get_bin_prefix()
568
569 # find distutils scripts= scripts
570 try:
571 for script in dist.iter_distutils_script_names():
572 paths_to_remove.add(os.path.join(bin_dir, script))
573 if WINDOWS:
574 paths_to_remove.add(os.path.join(bin_dir, f"{script}.bat"))
575 except (FileNotFoundError, NotADirectoryError):
576 pass
577
578 # find console_scripts and gui_scripts
580 dist: BaseDistribution,
581 bin_dir: str,
582 ) -> Generator[str, None, None]:
583 for entry_point in dist.iter_entry_points():
584 if entry_point.group == "console_scripts":
585 yield from _script_names(bin_dir, entry_point.name, False)
586 elif entry_point.group == "gui_scripts":
587 yield from _script_names(bin_dir, entry_point.name, True)
588
589 for s in iter_scripts_to_remove(dist, bin_dir):
591
592 return paths_to_remove
593
594
596 def __init__(self, pth_file: str) -> None:
597 self.file = pth_file
598 self.entries: Set[str] = set()
599 self._saved_lines: Optional[List[bytes]] = None
600
601 def add(self, entry: str) -> None:
602 entry = os.path.normcase(entry)
603 # On Windows, os.path.normcase converts the entry to use
604 # backslashes. This is correct for entries that describe absolute
605 # paths outside of site-packages, but all the others use forward
606 # slashes.
607 # os.path.splitdrive is used instead of os.path.isabs because isabs
608 # treats non-absolute paths with drive letter markings like c:foo\bar
609 # as absolute paths. It also does not recognize UNC paths if they don't
610 # have more than "\\sever\share". Valid examples: "\\server\share\" or
611 # "\\server\share\folder".
612 if WINDOWS and not os.path.splitdrive(entry)[0]:
613 entry = entry.replace("\\", "/")
614 self.entries.add(entry)
615
616 def remove(self) -> None:
617 logger.verbose("Removing pth entries from %s:", self.file)
618
619 # If the file doesn't exist, log a warning and return
620 if not os.path.isfile(self.file):
621 logger.warning("Cannot remove entries from nonexistent file %s", self.file)
622 return
623 with open(self.file, "rb") as fh:
624 # windows uses '\r\n' with py3k, but uses '\n' with py2.x
625 lines = fh.readlines()
626 self._saved_lines = lines
627 if any(b"\r\n" in line for line in lines):
628 endline = "\r\n"
629 else:
630 endline = "\n"
631 # handle missing trailing newline
632 if lines and not lines[-1].endswith(endline.encode("utf-8")):
633 lines[-1] = lines[-1] + endline.encode("utf-8")
634 for entry in self.entries:
635 try:
636 logger.verbose("Removing entry: %s", entry)
637 lines.remove((entry + endline).encode("utf-8"))
638 except ValueError:
639 pass
640 with open(self.file, "wb") as fh:
641 fh.writelines(lines)
642
643 def rollback(self) -> bool:
644 if self._saved_lines is None:
645 logger.error("Cannot roll back changes to %s, none were made", self.file)
646 return False
647 logger.debug("Rolling %s back to previous state", self.file)
648 with open(self.file, "wb") as fh:
650 return True
None __init__(self, BaseDistribution dist)
None remove(self, bool auto_confirm=False, bool verbose=False)
"UninstallPathSet" from_dist(cls, BaseDistribution dist)
None add_pth(self, str pth_file, str entry)
Set[str] compress_for_rename(Iterable[str] paths)
Generator[str, None, None] uninstallation_paths(BaseDistribution dist)
Tuple[Set[str], Set[str]] compress_for_output_listing(Iterable[str] paths)
Set[str] compact(Iterable[str] paths)
Callable[..., Generator[Any, None, None]] _unique(Callable[..., Generator[Any, None, None]] fn)
Generator[str, None, None] _script_names(str bin_dir, str script_name, bool is_gui)
for i