Let us walk on the 3-isogeny graph
Loading...
Searching...
No Matches
self_outdated_check.py
Go to the documentation of this file.
1import datetime
2import functools
3import hashlib
4import json
5import logging
6import optparse
7import os.path
8import sys
9from dataclasses import dataclass
10from typing import Any, Callable, Dict, Optional
11
12from pip._vendor.packaging.version import parse as parse_version
13from pip._vendor.rich.console import Group
14from pip._vendor.rich.markup import escape
15from pip._vendor.rich.text import Text
16
17from pip._internal.index.collector import LinkCollector
18from pip._internal.index.package_finder import PackageFinder
19from pip._internal.metadata import get_default_environment
20from pip._internal.metadata.base import DistributionVersion
21from pip._internal.models.selection_prefs import SelectionPreferences
22from pip._internal.network.session import PipSession
23from pip._internal.utils.compat import WINDOWS
25 get_best_invocation_for_this_pip,
26 get_best_invocation_for_this_python,
27)
28from pip._internal.utils.filesystem import adjacent_tmp_file, check_path_owner, replace
29from pip._internal.utils.misc import ensure_dir
30
31_DATE_FMT = "%Y-%m-%dT%H:%M:%SZ"
32
33
34logger = logging.getLogger(__name__)
35
36
37def _get_statefile_name(key: str) -> str:
38 key_bytes = key.encode()
39 name = hashlib.sha224(key_bytes).hexdigest()
40 return name
41
42
44 def __init__(self, cache_dir: str) -> None:
45 self._state: Dict[str, Any] = {}
46 self._statefile_path = None
47
48 # Try to load the existing state
49 if cache_dir:
51 cache_dir, "selfcheck", _get_statefile_name(self.keykey)
52 )
53 try:
54 with open(self._statefile_path, encoding="utf-8") as statefile:
55 self._state = json.load(statefile)
56 except (OSError, ValueError, KeyError):
57 # Explicitly suppressing exceptions, since we don't want to
58 # error out if the cache file is invalid.
59 pass
60
61 @property
62 def key(self) -> str:
63 return sys.prefix
64
65 def get(self, current_time: datetime.datetime) -> Optional[str]:
66 """Check if we have a not-outdated version loaded already."""
67 if not self._state:
68 return None
69
70 if "last_check" not in self._state:
71 return None
72
73 if "pypi_version" not in self._state:
74 return None
75
76 seven_days_in_seconds = 7 * 24 * 60 * 60
77
78 # Determine if we need to refresh the state
79 last_check = datetime.datetime.strptime(self._state["last_check"], _DATE_FMT)
80 seconds_since_last_check = (current_time - last_check).total_seconds()
81 if seconds_since_last_check > seven_days_in_seconds:
82 return None
83
84 return self._state["pypi_version"]
85
86 def set(self, pypi_version: str, current_time: datetime.datetime) -> None:
87 # If we do not have a path to cache in, don't bother saving.
88 if not self._statefile_path:
89 return
90
91 # Check to make sure that we own the directory
92 if not check_path_owner(os.path.dirname(self._statefile_path)):
93 return
94
95 # Now that we've ensured the directory is owned by this user, we'll go
96 # ahead and make sure that all our directories are created.
97 ensure_dir(os.path.dirname(self._statefile_path))
98
99 state = {
100 # Include the key so it's easy to tell which pip wrote the
101 # file.
102 "key": self.keykey,
103 "last_check": current_time.strftime(_DATE_FMT),
104 "pypi_version": pypi_version,
105 }
106
107 text = json.dumps(state, sort_keys=True, separators=(",", ":"))
108
109 with adjacent_tmp_file(self._statefile_path) as f:
111
112 try:
113 # Since we have a prefix-specific state file, we can just
114 # overwrite whatever is there, no need to check.
115 replace(f.name, self._statefile_path)
116 except OSError:
117 # Best effort.
118 pass
119
120
121@dataclass
123 old: str
124 new: str
125
126 def __rich__(self) -> Group:
127 if WINDOWS:
128 pip_cmd = f"{get_best_invocation_for_this_python()} -m pip"
129 else:
130 pip_cmd = get_best_invocation_for_this_pip()
131
132 notice = "[bold][[reset][blue]notice[reset][bold]][reset]"
133 return Group(
134 Text(),
136 f"{notice} A new release of pip is available: "
137 f"[red]{self.old}[reset] -> [green]{self.new}[reset]"
138 ),
140 f"{notice} To update, run: "
141 f"[green]{escape(pip_cmd)} install --upgrade pip"
142 ),
143 )
144
145
146def was_installed_by_pip(pkg: str) -> bool:
147 """Checks whether pkg was installed by pip
148
149 This is used not to display the upgrade message when pip is in fact
150 installed by system package manager, such as dnf on Fedora.
151 """
152 dist = get_default_environment().get_distribution(pkg)
153 return dist is not None and "pip" == dist.installer
154
155
157 session: PipSession, options: optparse.Values
158) -> Optional[str]:
159 # Lets use PackageFinder to see what the latest pip version is
160 link_collector = LinkCollector.create(
161 session,
162 options=options,
163 suppress_no_index=True,
164 )
165
166 # Pass allow_yanked=False so we don't suggest upgrading to a
167 # yanked version.
168 selection_prefs = SelectionPreferences(
169 allow_yanked=False,
170 allow_all_prereleases=False, # Explicitly set to False
171 )
172
173 finder = PackageFinder.create(
174 link_collector=link_collector,
175 selection_prefs=selection_prefs,
176 )
177 best_candidate = finder.find_best_candidate("pip").best_candidate
178 if best_candidate is None:
179 return None
180
181 return str(best_candidate.version)
182
183
185 *,
186 state: SelfCheckState,
187 current_time: datetime.datetime,
188 local_version: DistributionVersion,
189 get_remote_version: Callable[[], Optional[str]],
190) -> Optional[UpgradePrompt]:
191 remote_version_str = state.get(current_time)
192 if remote_version_str is None:
193 remote_version_str = get_remote_version()
194 if remote_version_str is None:
195 logger.debug("No remote pip version found")
196 return None
197 state.set(remote_version_str, current_time)
198
199 remote_version = parse_version(remote_version_str)
200 logger.debug("Remote version of pip: %s", remote_version)
201 logger.debug("Local version of pip: %s", local_version)
202
203 pip_installed_by_pip = was_installed_by_pip("pip")
204 logger.debug("Was pip installed by pip? %s", pip_installed_by_pip)
205 if not pip_installed_by_pip:
206 return None # Only suggest upgrade if pip is installed by pip.
207
208 local_version_is_older = (
209 local_version < remote_version
211 )
212 if local_version_is_older:
213 return UpgradePrompt(old=str(local_version), new=remote_version_str)
214
215 return None
216
217
218def pip_self_version_check(session: PipSession, options: optparse.Values) -> None:
219 """Check for an update for pip.
220
221 Limit the frequency of checks to once per week. State is stored either in
222 the active virtualenv or in the user's USER_CACHE_DIR keyed off the prefix
223 of the pip script path.
224 """
225 installed_dist = get_default_environment().get_distribution("pip")
226 if not installed_dist:
227 return
228
229 try:
230 upgrade_prompt = _self_version_check_logic(
231 state=SelfCheckState(cache_dir=options.cache_dir),
232 current_time=datetime.datetime.utcnow(),
233 local_version=installed_dist.version,
234 get_remote_version=functools.partial(
235 _get_current_remote_pip_version, session, options
236 ),
237 )
238 if upgrade_prompt is not None:
239 logger.warning("[present-rich] %s", upgrade_prompt)
240 except Exception:
241 logger.warning("There was an error checking the latest version of pip.")
242 logger.debug("See below for error", exc_info=True)
None set(self, str pypi_version, datetime.datetime current_time)
Optional[UpgradePrompt] _self_version_check_logic(*SelfCheckState state, datetime.datetime current_time, DistributionVersion local_version, Callable[[], Optional[str]] get_remote_version)
Optional[str] _get_current_remote_pip_version(PipSession session, optparse.Values options)
for i