forked from piccolo-orm/piccolo
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_pool.py
More file actions
80 lines (58 loc) · 2.26 KB
/
test_pool.py
File metadata and controls
80 lines (58 loc) · 2.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import asyncio
from piccolo.engine.postgres import PostgresEngine
from ..base import DBTestCase, postgres_only
from ..example_app.tables import Manager
@postgres_only
class TestPool(DBTestCase):
async def _create_pool(self):
engine: PostgresEngine = Manager._meta.db
await engine.start_connection_pool()
assert engine.pool is not None
await engine.close_connection_pool()
assert engine.pool is None
async def _make_query(self):
await Manager._meta.db.start_connection_pool()
await Manager(name="Bob").save().run()
response = await Manager.select().run()
self.assertTrue("Bob" in [i["name"] for i in response])
await Manager._meta.db.close_connection_pool()
async def _make_many_queries(self):
await Manager._meta.db.start_connection_pool()
await Manager(name="Bob").save().run()
async def get_data():
response = await Manager.select().run()
self.assertEqual(response, [{"id": 1, "name": "Bob"}])
await asyncio.gather(*[get_data() for _ in range(500)])
await Manager._meta.db.close_connection_pool()
def test_creation(self):
"""
Make sure a connection pool can be created.
"""
asyncio.run(self._create_pool())
def test_query(self):
"""
Make several queries using a connection pool.
"""
asyncio.run(self._make_query())
def test_many_queries(self):
"""
Make sure the connection pool is working correctly, and we don't
exceed a connection limit - queries should queue, then succeed.
"""
asyncio.run(self._make_many_queries())
@postgres_only
class TestPoolProxyMethods(DBTestCase):
async def _create_pool(self):
engine: PostgresEngine = Manager._meta.db
# Deliberate typo ('nnn'):
await engine.start_connnection_pool()
assert engine.pool is not None
# Deliberate typo ('nnn'):
await engine.close_connnection_pool()
assert engine.pool is None
def test_proxy_methods(self):
"""
There are some proxy methods, due to some old typos. Make sure they
work, to ensure backwards compatibility.
"""
asyncio.run(self._create_pool())