- Notifications
You must be signed in to change notification settings - Fork 31.7k
/
Copy path_py_abc.py
147 lines (132 loc) · 6.04 KB
/
_py_abc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
from_weakrefsetimportWeakSet
defget_cache_token():
"""Returns the current ABC cache token.
The token is an opaque object (supporting equality testing) identifying the
current version of the ABC cache for virtual subclasses. The token changes
with every call to ``register()`` on any ABC.
"""
returnABCMeta._abc_invalidation_counter
classABCMeta(type):
"""Metaclass for defining Abstract Base Classes (ABCs).
Use this metaclass to create an ABC. An ABC can be subclassed
directly, and then acts as a mix-in class. You can also register
unrelated concrete classes (even built-in classes) and unrelated
ABCs as 'virtual subclasses' -- these and their descendants will
be considered subclasses of the registering ABC by the built-in
issubclass() function, but the registering ABC won't show up in
their MRO (Method Resolution Order) nor will method
implementations defined by the registering ABC be callable (not
even via super()).
"""
# A global counter that is incremented each time a class is
# registered as a virtual subclass of anything. It forces the
# negative cache to be cleared before its next use.
# Note: this counter is private. Use `abc.get_cache_token()` for
# external code.
_abc_invalidation_counter=0
def__new__(mcls, name, bases, namespace, **kwargs):
cls=super().__new__(mcls, name, bases, namespace, **kwargs)
# Compute set of abstract method names
abstracts= {name
forname, valueinnamespace.items()
ifgetattr(value, "__isabstractmethod__", False)}
forbaseinbases:
fornameingetattr(base, "__abstractmethods__", set()):
value=getattr(cls, name, None)
ifgetattr(value, "__isabstractmethod__", False):
abstracts.add(name)
cls.__abstractmethods__=frozenset(abstracts)
# Set up inheritance registry
cls._abc_registry=WeakSet()
cls._abc_cache=WeakSet()
cls._abc_negative_cache=WeakSet()
cls._abc_negative_cache_version=ABCMeta._abc_invalidation_counter
returncls
defregister(cls, subclass):
"""Register a virtual subclass of an ABC.
Returns the subclass, to allow usage as a class decorator.
"""
ifnotisinstance(subclass, type):
raiseTypeError("Can only register classes")
ifissubclass(subclass, cls):
returnsubclass# Already a subclass
# Subtle: test for cycles *after* testing for "already a subclass";
# this means we allow X.register(X) and interpret it as a no-op.
ifissubclass(cls, subclass):
# This would create a cycle, which is bad for the algorithm below
raiseRuntimeError("Refusing to create an inheritance cycle")
cls._abc_registry.add(subclass)
ABCMeta._abc_invalidation_counter+=1# Invalidate negative cache
returnsubclass
def_dump_registry(cls, file=None):
"""Debug helper to print the ABC registry."""
print(f"Class: {cls.__module__}.{cls.__qualname__}", file=file)
print(f"Inv. counter: {get_cache_token()}", file=file)
fornameincls.__dict__:
ifname.startswith("_abc_"):
value=getattr(cls, name)
ifisinstance(value, WeakSet):
value=set(value)
print(f"{name}: {value!r}", file=file)
def_abc_registry_clear(cls):
"""Clear the registry (for debugging or testing)."""
cls._abc_registry.clear()
def_abc_caches_clear(cls):
"""Clear the caches (for debugging or testing)."""
cls._abc_cache.clear()
cls._abc_negative_cache.clear()
def__instancecheck__(cls, instance):
"""Override for isinstance(instance, cls)."""
# Inline the cache checking
subclass=instance.__class__
ifsubclassincls._abc_cache:
returnTrue
subtype=type(instance)
ifsubtypeissubclass:
if (cls._abc_negative_cache_version==
ABCMeta._abc_invalidation_counterand
subclassincls._abc_negative_cache):
returnFalse
# Fall back to the subclass check.
returncls.__subclasscheck__(subclass)
returnany(cls.__subclasscheck__(c) forcin (subclass, subtype))
def__subclasscheck__(cls, subclass):
"""Override for issubclass(subclass, cls)."""
ifnotisinstance(subclass, type):
raiseTypeError('issubclass() arg 1 must be a class')
# Check cache
ifsubclassincls._abc_cache:
returnTrue
# Check negative cache; may have to invalidate
ifcls._abc_negative_cache_version<ABCMeta._abc_invalidation_counter:
# Invalidate the negative cache
cls._abc_negative_cache=WeakSet()
cls._abc_negative_cache_version=ABCMeta._abc_invalidation_counter
elifsubclassincls._abc_negative_cache:
returnFalse
# Check the subclass hook
ok=cls.__subclasshook__(subclass)
ifokisnotNotImplemented:
assertisinstance(ok, bool)
ifok:
cls._abc_cache.add(subclass)
else:
cls._abc_negative_cache.add(subclass)
returnok
# Check if it's a direct subclass
ifclsingetattr(subclass, '__mro__', ()):
cls._abc_cache.add(subclass)
returnTrue
# Check if it's a subclass of a registered class (recursive)
forrclsincls._abc_registry:
ifissubclass(subclass, rcls):
cls._abc_cache.add(subclass)
returnTrue
# Check if it's a subclass of a subclass (recursive)
forsclsincls.__subclasses__():
ifissubclass(subclass, scls):
cls._abc_cache.add(subclass)
returnTrue
# No dice; update negative cache
cls._abc_negative_cache.add(subclass)
returnFalse