Let us walk on the 3-isogeny graph
Loading...
Searching...
No Matches
stop.py
Go to the documentation of this file.
1# Copyright 2016–2021 Julien Danjou
2# Copyright 2016 Joshua Harlow
3# Copyright 2013-2014 Ray Holder
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16import abc
17import typing
18
19from pip._vendor.tenacity import _utils
20
22 import threading
23
24 from pip._vendor.tenacity import RetryCallState
25
26
28 """Abstract base class for stop strategies."""
29
30 @abc.abstractmethod
31 def __call__(self, retry_state: "RetryCallState") -> bool:
32 pass
33
34 def __and__(self, other: "stop_base") -> "stop_all":
35 return stop_all(self, other)
36
37 def __or__(self, other: "stop_base") -> "stop_any":
38 return stop_any(self, other)
39
40
41StopBaseT = typing.Union[stop_base, typing.Callable[["RetryCallState"], bool]]
42
43
45 """Stop if any of the stop condition is valid."""
46
47 def __init__(self, *stops: stop_base) -> None:
48 self.stops = stops
49
50 def __call__(self, retry_state: "RetryCallState") -> bool:
51 return any(x(retry_state) for x in self.stops)
52
53
55 """Stop if all the stop conditions are valid."""
56
57 def __init__(self, *stops: stop_base) -> None:
58 self.stops = stops
59
60 def __call__(self, retry_state: "RetryCallState") -> bool:
61 return all(x(retry_state) for x in self.stops)
62
63
65 """Never stop."""
66
67 def __call__(self, retry_state: "RetryCallState") -> bool:
68 return False
69
70
71stop_never = _stop_never()
72
73
75 """Stop when the given event is set."""
76
77 def __init__(self, event: "threading.Event") -> None:
78 self.event = event
79
80 def __call__(self, retry_state: "RetryCallState") -> bool:
81 return self.event.is_set()
82
83
85 """Stop when the previous attempt >= max_attempt."""
86
87 def __init__(self, max_attempt_number: int) -> None:
88 self.max_attempt_number = max_attempt_number
89
90 def __call__(self, retry_state: "RetryCallState") -> bool:
92
93
95 """Stop when the time from the first attempt >= limit."""
96
97 def __init__(self, max_delay: _utils.time_unit_type) -> None:
98 self.max_delay = _utils.to_seconds(max_delay)
99
100 def __call__(self, retry_state: "RetryCallState") -> bool:
102 raise RuntimeError("__call__() called but seconds_since_start is not set")
bool __call__(self, "RetryCallState" retry_state)
Definition stop.py:67
bool __call__(self, "RetryCallState" retry_state)
Definition stop.py:90
None __init__(self, int max_attempt_number)
Definition stop.py:87
bool __call__(self, "RetryCallState" retry_state)
Definition stop.py:100
None __init__(self, _utils.time_unit_type max_delay)
Definition stop.py:97
bool __call__(self, "RetryCallState" retry_state)
Definition stop.py:60
None __init__(self, *stop_base stops)
Definition stop.py:57
bool __call__(self, "RetryCallState" retry_state)
Definition stop.py:50
None __init__(self, *stop_base stops)
Definition stop.py:47
bool __call__(self, "RetryCallState" retry_state)
Definition stop.py:31
"stop_all" __and__(self, "stop_base" other)
Definition stop.py:34
"stop_any" __or__(self, "stop_base" other)
Definition stop.py:37
bool __call__(self, "RetryCallState" retry_state)
Definition stop.py:80
None __init__(self, "threading.Event" event)
Definition stop.py:77
for i