forked from learn-co-curriculum/python-p3-freebie-tracker
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlanghelpers.py
More file actions
2306 lines (1793 loc) · 66.9 KB
/
langhelpers.py
File metadata and controls
2306 lines (1793 loc) · 66.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# util/langhelpers.py
# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
# <see AUTHORS file>
#
# This module is part of SQLAlchemy and is released under
# the MIT License: https://www.opensource.org/licenses/mit-license.php
# mypy: allow-untyped-defs, allow-untyped-calls
"""Routines to help with the creation, loading and introspection of
modules, classes, hierarchies, attributes, functions, and methods.
"""
from __future__ import annotations
import collections
import enum
from functools import update_wrapper
import inspect
import itertools
import operator
import re
import sys
import textwrap
import threading
import types
from types import CodeType
from typing import Any
from typing import Callable
from typing import cast
from typing import Dict
from typing import FrozenSet
from typing import Generic
from typing import Iterator
from typing import List
from typing import Mapping
from typing import NoReturn
from typing import Optional
from typing import overload
from typing import Sequence
from typing import Set
from typing import Tuple
from typing import Type
from typing import TYPE_CHECKING
from typing import TypeVar
from typing import Union
import warnings
from . import _collections
from . import compat
from ._has_cy import HAS_CYEXTENSION
from .typing import Literal
from .. import exc
_T = TypeVar("_T")
_T_co = TypeVar("_T_co", covariant=True)
_F = TypeVar("_F", bound=Callable[..., Any])
_MP = TypeVar("_MP", bound="memoized_property[Any]")
_MA = TypeVar("_MA", bound="HasMemoized.memoized_attribute[Any]")
_HP = TypeVar("_HP", bound="hybridproperty[Any]")
_HM = TypeVar("_HM", bound="hybridmethod[Any]")
if compat.py314:
# vendor a minimal form of get_annotations per
# https://github.com/python/cpython/issues/133684#issuecomment-2863841891
from annotationlib import call_annotate_function # type: ignore
from annotationlib import Format
def _get_and_call_annotate(obj, format): # noqa: A002
annotate = getattr(obj, "__annotate__", None)
if annotate is not None:
ann = call_annotate_function(annotate, format, owner=obj)
if not isinstance(ann, dict):
raise ValueError(f"{obj!r}.__annotate__ returned a non-dict")
return ann
return None
# this is ported from py3.13.0a7
_BASE_GET_ANNOTATIONS = type.__dict__["__annotations__"].__get__ # type: ignore # noqa: E501
def _get_dunder_annotations(obj):
if isinstance(obj, type):
try:
ann = _BASE_GET_ANNOTATIONS(obj)
except AttributeError:
# For static types, the descriptor raises AttributeError.
return {}
else:
ann = getattr(obj, "__annotations__", None)
if ann is None:
return {}
if not isinstance(ann, dict):
raise ValueError(
f"{obj!r}.__annotations__ is neither a dict nor None"
)
return dict(ann)
def _vendored_get_annotations(
obj: Any, *, format: Format # noqa: A002
) -> Mapping[str, Any]:
"""A sparse implementation of annotationlib.get_annotations()"""
try:
ann = _get_dunder_annotations(obj)
except Exception:
pass
else:
if ann is not None:
return dict(ann)
# But if __annotations__ threw a NameError, we try calling __annotate__
ann = _get_and_call_annotate(obj, format)
if ann is None:
# If that didn't work either, we have a very weird object:
# evaluating
# __annotations__ threw NameError and there is no __annotate__.
# In that case,
# we fall back to trying __annotations__ again.
ann = _get_dunder_annotations(obj)
if ann is None:
if isinstance(obj, type) or callable(obj):
return {}
raise TypeError(f"{obj!r} does not have annotations")
if not ann:
return {}
return dict(ann)
def get_annotations(obj: Any) -> Mapping[str, Any]:
# FORWARDREF has the effect of giving us ForwardRefs and not
# actually trying to evaluate the annotations. We need this so
# that the annotations act as much like
# "from __future__ import annotations" as possible, which is going
# away in future python as a separate mode
return _vendored_get_annotations(obj, format=Format.FORWARDREF)
elif compat.py310:
def get_annotations(obj: Any) -> Mapping[str, Any]:
return inspect.get_annotations(obj)
else:
def get_annotations(obj: Any) -> Mapping[str, Any]:
# it's been observed that cls.__annotations__ can be non present.
# it's not clear what causes this, running under tox py37/38 it
# happens, running straight pytest it doesnt
# https://docs.python.org/3/howto/annotations.html#annotations-howto
if isinstance(obj, type):
ann = obj.__dict__.get("__annotations__", None)
else:
ann = getattr(obj, "__annotations__", None)
if ann is None:
return _collections.EMPTY_DICT
else:
return cast("Mapping[str, Any]", ann)
def md5_hex(x: Any) -> str:
x = x.encode("utf-8")
m = compat.md5_not_for_security()
m.update(x)
return cast(str, m.hexdigest())
class safe_reraise:
"""Reraise an exception after invoking some
handler code.
Stores the existing exception info before
invoking so that it is maintained across a potential
coroutine context switch.
e.g.::
try:
sess.commit()
except:
with safe_reraise():
sess.rollback()
TODO: we should at some point evaluate current behaviors in this regard
based on current greenlet, gevent/eventlet implementations in Python 3, and
also see the degree to which our own asyncio (based on greenlet also) is
impacted by this. .rollback() will cause IO / context switch to occur in
all these scenarios; what happens to the exception context from an
"except:" block if we don't explicitly store it? Original issue was #2703.
"""
__slots__ = ("_exc_info",)
_exc_info: Union[
None,
Tuple[
Type[BaseException],
BaseException,
types.TracebackType,
],
Tuple[None, None, None],
]
def __enter__(self) -> None:
self._exc_info = sys.exc_info()
def __exit__(
self,
type_: Optional[Type[BaseException]],
value: Optional[BaseException],
traceback: Optional[types.TracebackType],
) -> NoReturn:
assert self._exc_info is not None
# see #2703 for notes
if type_ is None:
exc_type, exc_value, exc_tb = self._exc_info
assert exc_value is not None
self._exc_info = None # remove potential circular references
raise exc_value.with_traceback(exc_tb)
else:
self._exc_info = None # remove potential circular references
assert value is not None
raise value.with_traceback(traceback)
def walk_subclasses(cls: Type[_T]) -> Iterator[Type[_T]]:
seen: Set[Any] = set()
stack = [cls]
while stack:
cls = stack.pop()
if cls in seen:
continue
else:
seen.add(cls)
stack.extend(cls.__subclasses__())
yield cls
def string_or_unprintable(element: Any) -> str:
if isinstance(element, str):
return element
else:
try:
return str(element)
except Exception:
return "unprintable element %r" % element
def clsname_as_plain_name(
cls: Type[Any], use_name: Optional[str] = None
) -> str:
name = use_name or cls.__name__
return " ".join(n.lower() for n in re.findall(r"([A-Z][a-z]+|SQL)", name))
def method_is_overridden(
instance_or_cls: Union[Type[Any], object],
against_method: Callable[..., Any],
) -> bool:
"""Return True if the two class methods don't match."""
if not isinstance(instance_or_cls, type):
current_cls = instance_or_cls.__class__
else:
current_cls = instance_or_cls
method_name = against_method.__name__
current_method: types.MethodType = getattr(current_cls, method_name)
return current_method != against_method
def decode_slice(slc: slice) -> Tuple[Any, ...]:
"""decode a slice object as sent to __getitem__.
takes into account the 2.5 __index__() method, basically.
"""
ret: List[Any] = []
for x in slc.start, slc.stop, slc.step:
if hasattr(x, "__index__"):
x = x.__index__()
ret.append(x)
return tuple(ret)
def _unique_symbols(used: Sequence[str], *bases: str) -> Iterator[str]:
used_set = set(used)
for base in bases:
pool = itertools.chain(
(base,),
map(lambda i: base + str(i), range(1000)),
)
for sym in pool:
if sym not in used_set:
used_set.add(sym)
yield sym
break
else:
raise NameError("exhausted namespace for symbol base %s" % base)
def map_bits(fn: Callable[[int], Any], n: int) -> Iterator[Any]:
"""Call the given function given each nonzero bit from n."""
while n:
b = n & (~n + 1)
yield fn(b)
n ^= b
_Fn = TypeVar("_Fn", bound="Callable[..., Any]")
# this seems to be in flux in recent mypy versions
def decorator(target: Callable[..., Any]) -> Callable[[_Fn], _Fn]:
"""A signature-matching decorator factory."""
def decorate(fn: _Fn) -> _Fn:
if not inspect.isfunction(fn) and not inspect.ismethod(fn):
raise Exception("not a decoratable function")
# Python 3.14 defer creating __annotations__ until its used.
# We do not want to create __annotations__ now.
annofunc = getattr(fn, "__annotate__", None)
if annofunc is not None:
fn.__annotate__ = None # type: ignore[union-attr]
try:
spec = compat.inspect_getfullargspec(fn)
finally:
fn.__annotate__ = annofunc # type: ignore[union-attr]
else:
spec = compat.inspect_getfullargspec(fn)
# Do not generate code for annotations.
# update_wrapper() copies the annotation from fn to decorated.
# We use dummy defaults for code generation to avoid having
# copy of large globals for compiling.
# We copy __defaults__ and __kwdefaults__ from fn to decorated.
empty_defaults = (None,) * len(spec.defaults or ())
empty_kwdefaults = dict.fromkeys(spec.kwonlydefaults or ())
spec = spec._replace(
annotations={},
defaults=empty_defaults,
kwonlydefaults=empty_kwdefaults,
)
names = (
tuple(cast("Tuple[str, ...]", spec[0]))
+ cast("Tuple[str, ...]", spec[1:3])
+ (fn.__name__,)
)
targ_name, fn_name = _unique_symbols(names, "target", "fn")
metadata: Dict[str, Optional[str]] = dict(target=targ_name, fn=fn_name)
metadata.update(format_argspec_plus(spec, grouped=False))
metadata["name"] = fn.__name__
if inspect.iscoroutinefunction(fn):
metadata["prefix"] = "async "
metadata["target_prefix"] = "await "
else:
metadata["prefix"] = ""
metadata["target_prefix"] = ""
# look for __ positional arguments. This is a convention in
# SQLAlchemy that arguments should be passed positionally
# rather than as keyword
# arguments. note that apply_pos doesn't currently work in all cases
# such as when a kw-only indicator "*" is present, which is why
# we limit the use of this to just that case we can detect. As we add
# more kinds of methods that use @decorator, things may have to
# be further improved in this area
if "__" in repr(spec[0]):
code = (
"""\
%(prefix)sdef %(name)s%(grouped_args)s:
return %(target_prefix)s%(target)s(%(fn)s, %(apply_pos)s)
"""
% metadata
)
else:
code = (
"""\
%(prefix)sdef %(name)s%(grouped_args)s:
return %(target_prefix)s%(target)s(%(fn)s, %(apply_kw)s)
"""
% metadata
)
env: Dict[str, Any] = {
targ_name: target,
fn_name: fn,
"__name__": fn.__module__,
}
decorated = cast(
types.FunctionType,
_exec_code_in_env(code, env, fn.__name__),
)
decorated.__defaults__ = fn.__defaults__
decorated.__kwdefaults__ = fn.__kwdefaults__ # type: ignore
return update_wrapper(decorated, fn) # type: ignore[return-value]
return update_wrapper(decorate, target) # type: ignore[return-value]
def _exec_code_in_env(
code: Union[str, types.CodeType], env: Dict[str, Any], fn_name: str
) -> Callable[..., Any]:
exec(code, env)
return env[fn_name] # type: ignore[no-any-return]
_PF = TypeVar("_PF")
_TE = TypeVar("_TE")
class PluginLoader:
def __init__(
self, group: str, auto_fn: Optional[Callable[..., Any]] = None
):
self.group = group
self.impls: Dict[str, Any] = {}
self.auto_fn = auto_fn
def clear(self):
self.impls.clear()
def load(self, name: str) -> Any:
if name in self.impls:
return self.impls[name]()
if self.auto_fn:
loader = self.auto_fn(name)
if loader:
self.impls[name] = loader
return loader()
for impl in compat.importlib_metadata_get(self.group):
if impl.name == name:
self.impls[name] = impl.load
return impl.load()
raise exc.NoSuchModuleError(
"Can't load plugin: %s:%s" % (self.group, name)
)
def register(self, name: str, modulepath: str, objname: str) -> None:
def load():
mod = __import__(modulepath)
for token in modulepath.split(".")[1:]:
mod = getattr(mod, token)
return getattr(mod, objname)
self.impls[name] = load
def deregister(self, name: str) -> None:
del self.impls[name]
def _inspect_func_args(fn):
try:
co_varkeywords = inspect.CO_VARKEYWORDS
except AttributeError:
# https://docs.python.org/3/library/inspect.html
# The flags are specific to CPython, and may not be defined in other
# Python implementations. Furthermore, the flags are an implementation
# detail, and can be removed or deprecated in future Python releases.
spec = compat.inspect_getfullargspec(fn)
return spec[0], bool(spec[2])
else:
# use fn.__code__ plus flags to reduce method call overhead
co = fn.__code__
nargs = co.co_argcount
return (
list(co.co_varnames[:nargs]),
bool(co.co_flags & co_varkeywords),
)
@overload
def get_cls_kwargs(
cls: type,
*,
_set: Optional[Set[str]] = None,
raiseerr: Literal[True] = ...,
) -> Set[str]: ...
@overload
def get_cls_kwargs(
cls: type, *, _set: Optional[Set[str]] = None, raiseerr: bool = False
) -> Optional[Set[str]]: ...
def get_cls_kwargs(
cls: type, *, _set: Optional[Set[str]] = None, raiseerr: bool = False
) -> Optional[Set[str]]:
r"""Return the full set of inherited kwargs for the given `cls`.
Probes a class's __init__ method, collecting all named arguments. If the
__init__ defines a \**kwargs catch-all, then the constructor is presumed
to pass along unrecognized keywords to its base classes, and the
collection process is repeated recursively on each of the bases.
Uses a subset of inspect.getfullargspec() to cut down on method overhead,
as this is used within the Core typing system to create copies of type
objects which is a performance-sensitive operation.
No anonymous tuple arguments please !
"""
toplevel = _set is None
if toplevel:
_set = set()
assert _set is not None
ctr = cls.__dict__.get("__init__", False)
has_init = (
ctr
and isinstance(ctr, types.FunctionType)
and isinstance(ctr.__code__, types.CodeType)
)
if has_init:
names, has_kw = _inspect_func_args(ctr)
_set.update(names)
if not has_kw and not toplevel:
if raiseerr:
raise TypeError(
f"given cls {cls} doesn't have an __init__ method"
)
else:
return None
else:
has_kw = False
if not has_init or has_kw:
for c in cls.__bases__:
if get_cls_kwargs(c, _set=_set) is None:
break
_set.discard("self")
return _set
def get_func_kwargs(func: Callable[..., Any]) -> List[str]:
"""Return the set of legal kwargs for the given `func`.
Uses getargspec so is safe to call for methods, functions,
etc.
"""
return compat.inspect_getfullargspec(func)[0]
def get_callable_argspec(
fn: Callable[..., Any], no_self: bool = False, _is_init: bool = False
) -> compat.FullArgSpec:
"""Return the argument signature for any callable.
All pure-Python callables are accepted, including
functions, methods, classes, objects with __call__;
builtins and other edge cases like functools.partial() objects
raise a TypeError.
"""
if inspect.isbuiltin(fn):
raise TypeError("Can't inspect builtin: %s" % fn)
elif inspect.isfunction(fn):
if _is_init and no_self:
spec = compat.inspect_getfullargspec(fn)
return compat.FullArgSpec(
spec.args[1:],
spec.varargs,
spec.varkw,
spec.defaults,
spec.kwonlyargs,
spec.kwonlydefaults,
spec.annotations,
)
else:
return compat.inspect_getfullargspec(fn)
elif inspect.ismethod(fn):
if no_self and (_is_init or fn.__self__):
spec = compat.inspect_getfullargspec(fn.__func__)
return compat.FullArgSpec(
spec.args[1:],
spec.varargs,
spec.varkw,
spec.defaults,
spec.kwonlyargs,
spec.kwonlydefaults,
spec.annotations,
)
else:
return compat.inspect_getfullargspec(fn.__func__)
elif inspect.isclass(fn):
return get_callable_argspec(
fn.__init__, no_self=no_self, _is_init=True
)
elif hasattr(fn, "__func__"):
return compat.inspect_getfullargspec(fn.__func__)
elif hasattr(fn, "__call__"):
if inspect.ismethod(fn.__call__):
return get_callable_argspec(fn.__call__, no_self=no_self)
else:
raise TypeError("Can't inspect callable: %s" % fn)
else:
raise TypeError("Can't inspect callable: %s" % fn)
def format_argspec_plus(
fn: Union[Callable[..., Any], compat.FullArgSpec], grouped: bool = True
) -> Dict[str, Optional[str]]:
"""Returns a dictionary of formatted, introspected function arguments.
A enhanced variant of inspect.formatargspec to support code generation.
fn
An inspectable callable or tuple of inspect getargspec() results.
grouped
Defaults to True; include (parens, around, argument) lists
Returns:
args
Full inspect.formatargspec for fn
self_arg
The name of the first positional argument, varargs[0], or None
if the function defines no positional arguments.
apply_pos
args, re-written in calling rather than receiving syntax. Arguments are
passed positionally.
apply_kw
Like apply_pos, except keyword-ish args are passed as keywords.
apply_pos_proxied
Like apply_pos but omits the self/cls argument
Example::
>>> format_argspec_plus(lambda self, a, b, c=3, **d: 123)
{'grouped_args': '(self, a, b, c=3, **d)',
'self_arg': 'self',
'apply_kw': '(self, a, b, c=c, **d)',
'apply_pos': '(self, a, b, c, **d)'}
"""
if callable(fn):
spec = compat.inspect_getfullargspec(fn)
else:
spec = fn
args = compat.inspect_formatargspec(*spec)
apply_pos = compat.inspect_formatargspec(
spec[0], spec[1], spec[2], None, spec[4]
)
if spec[0]:
self_arg = spec[0][0]
apply_pos_proxied = compat.inspect_formatargspec(
spec[0][1:], spec[1], spec[2], None, spec[4]
)
elif spec[1]:
# I'm not sure what this is
self_arg = "%s[0]" % spec[1]
apply_pos_proxied = apply_pos
else:
self_arg = None
apply_pos_proxied = apply_pos
num_defaults = 0
if spec[3]:
num_defaults += len(cast(Tuple[Any], spec[3]))
if spec[4]:
num_defaults += len(spec[4])
name_args = spec[0] + spec[4]
defaulted_vals: Union[List[str], Tuple[()]]
if num_defaults:
defaulted_vals = name_args[0 - num_defaults :]
else:
defaulted_vals = ()
apply_kw = compat.inspect_formatargspec(
name_args,
spec[1],
spec[2],
defaulted_vals,
formatvalue=lambda x: "=" + str(x),
)
if spec[0]:
apply_kw_proxied = compat.inspect_formatargspec(
name_args[1:],
spec[1],
spec[2],
defaulted_vals,
formatvalue=lambda x: "=" + str(x),
)
else:
apply_kw_proxied = apply_kw
if grouped:
return dict(
grouped_args=args,
self_arg=self_arg,
apply_pos=apply_pos,
apply_kw=apply_kw,
apply_pos_proxied=apply_pos_proxied,
apply_kw_proxied=apply_kw_proxied,
)
else:
return dict(
grouped_args=args,
self_arg=self_arg,
apply_pos=apply_pos[1:-1],
apply_kw=apply_kw[1:-1],
apply_pos_proxied=apply_pos_proxied[1:-1],
apply_kw_proxied=apply_kw_proxied[1:-1],
)
def format_argspec_init(method, grouped=True):
"""format_argspec_plus with considerations for typical __init__ methods
Wraps format_argspec_plus with error handling strategies for typical
__init__ cases:
.. sourcecode:: text
object.__init__ -> (self)
other unreflectable (usually C) -> (self, *args, **kwargs)
"""
if method is object.__init__:
grouped_args = "(self)"
args = "(self)" if grouped else "self"
proxied = "()" if grouped else ""
else:
try:
return format_argspec_plus(method, grouped=grouped)
except TypeError:
grouped_args = "(self, *args, **kwargs)"
args = grouped_args if grouped else "self, *args, **kwargs"
proxied = "(*args, **kwargs)" if grouped else "*args, **kwargs"
return dict(
self_arg="self",
grouped_args=grouped_args,
apply_pos=args,
apply_kw=args,
apply_pos_proxied=proxied,
apply_kw_proxied=proxied,
)
def create_proxy_methods(
target_cls: Type[Any],
target_cls_sphinx_name: str,
proxy_cls_sphinx_name: str,
classmethods: Sequence[str] = (),
methods: Sequence[str] = (),
attributes: Sequence[str] = (),
use_intermediate_variable: Sequence[str] = (),
) -> Callable[[_T], _T]:
"""A class decorator indicating attributes should refer to a proxy
class.
This decorator is now a "marker" that does nothing at runtime. Instead,
it is consumed by the tools/generate_proxy_methods.py script to
statically generate proxy methods and attributes that are fully
recognized by typing tools such as mypy.
"""
def decorate(cls):
return cls
return decorate
def getargspec_init(method):
"""inspect.getargspec with considerations for typical __init__ methods
Wraps inspect.getargspec with error handling for typical __init__ cases:
.. sourcecode:: text
object.__init__ -> (self)
other unreflectable (usually C) -> (self, *args, **kwargs)
"""
try:
return compat.inspect_getfullargspec(method)
except TypeError:
if method is object.__init__:
return (["self"], None, None, None)
else:
return (["self"], "args", "kwargs", None)
def unbound_method_to_callable(func_or_cls):
"""Adjust the incoming callable such that a 'self' argument is not
required.
"""
if isinstance(func_or_cls, types.MethodType) and not func_or_cls.__self__:
return func_or_cls.__func__
else:
return func_or_cls
def generic_repr(
obj: Any,
additional_kw: Sequence[Tuple[str, Any]] = (),
to_inspect: Optional[Union[object, List[object]]] = None,
omit_kwarg: Sequence[str] = (),
) -> str:
"""Produce a __repr__() based on direct association of the __init__()
specification vs. same-named attributes present.
"""
if to_inspect is None:
to_inspect = [obj]
else:
to_inspect = _collections.to_list(to_inspect)
missing = object()
pos_args = []
kw_args: _collections.OrderedDict[str, Any] = _collections.OrderedDict()
vargs = None
for i, insp in enumerate(to_inspect):
try:
spec = compat.inspect_getfullargspec(insp.__init__)
except TypeError:
continue
else:
default_len = len(spec.defaults) if spec.defaults else 0
if i == 0:
if spec.varargs:
vargs = spec.varargs
if default_len:
pos_args.extend(spec.args[1:-default_len])
else:
pos_args.extend(spec.args[1:])
else:
kw_args.update(
[(arg, missing) for arg in spec.args[1:-default_len]]
)
if default_len:
assert spec.defaults
kw_args.update(
[
(arg, default)
for arg, default in zip(
spec.args[-default_len:], spec.defaults
)
]
)
output: List[str] = []
output.extend(repr(getattr(obj, arg, None)) for arg in pos_args)
if vargs is not None and hasattr(obj, vargs):
output.extend([repr(val) for val in getattr(obj, vargs)])
for arg, defval in kw_args.items():
if arg in omit_kwarg:
continue
try:
val = getattr(obj, arg, missing)
if val is not missing and val != defval:
output.append("%s=%r" % (arg, val))
except Exception:
pass
if additional_kw:
for arg, defval in additional_kw:
try:
val = getattr(obj, arg, missing)
if val is not missing and val != defval:
output.append("%s=%r" % (arg, val))
except Exception:
pass
return "%s(%s)" % (obj.__class__.__name__, ", ".join(output))
class portable_instancemethod:
"""Turn an instancemethod into a (parent, name) pair
to produce a serializable callable.
"""
__slots__ = "target", "name", "kwargs", "__weakref__"
def __getstate__(self):
return {
"target": self.target,
"name": self.name,
"kwargs": self.kwargs,
}
def __setstate__(self, state):
self.target = state["target"]
self.name = state["name"]
self.kwargs = state.get("kwargs", ())
def __init__(self, meth, kwargs=()):
self.target = meth.__self__
self.name = meth.__name__
self.kwargs = kwargs
def __call__(self, *arg, **kw):
kw.update(self.kwargs)
return getattr(self.target, self.name)(*arg, **kw)
def class_hierarchy(cls):
"""Return an unordered sequence of all classes related to cls.
Traverses diamond hierarchies.
Fibs slightly: subclasses of builtin types are not returned. Thus
class_hierarchy(class A(object)) returns (A, object), not A plus every
class systemwide that derives from object.
"""
hier = {cls}
process = list(cls.__mro__)
while process:
c = process.pop()
bases = (_ for _ in c.__bases__ if _ not in hier)
for b in bases:
process.append(b)
hier.add(b)
if c.__module__ == "builtins" or not hasattr(c, "__subclasses__"):
continue
for s in [
_
for _ in (
c.__subclasses__()
if not issubclass(c, type)
else c.__subclasses__(c)
)
if _ not in hier
]:
process.append(s)
hier.add(s)
return list(hier)
def iterate_attributes(cls):
"""iterate all the keys and attributes associated
with a class, without using getattr().
Does not use getattr() so that class-sensitive
descriptors (i.e. property.__get__()) are not called.
"""
keys = dir(cls)
for key in keys:
for c in cls.__mro__:
if key in c.__dict__:
yield (key, c.__dict__[key])
break
def monkeypatch_proxied_specials(
into_cls,
from_cls,
skip=None,
only=None,
name="self.proxy",
from_instance=None,