Skip to content

Commit d03380b

Browse files
committed
added a clean command for migrations
There are situations where you may have rows in your migration table which refer to migration modules whch were deleted for some reason. The command identifies these rows, and optionally deletes them.
1 parent 2eeee4c commit d03380b

File tree

3 files changed

+121
-1
lines changed

3 files changed

+121
-1
lines changed

piccolo/apps/migrations/commands/backwards.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ def backwards(
118118
Specify a value of 'all' to undo all of the migrations. Specify a
119119
value of '1' to undo the most recent migration.
120120
:param auto_agree:
121-
Automatically agree to any input prompts.
121+
If true, automatically agree to any input prompts.
122122
:param clean:
123123
If true, the migration files which have been run backwards are deleted
124124
from the disk after completing.
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
from __future__ import annotations
2+
import typing as t
3+
4+
from piccolo.apps.migrations.commands.base import BaseMigrationManager
5+
from piccolo.apps.migrations.tables import Migration
6+
7+
8+
class CleanMigrationManager(BaseMigrationManager):
9+
def __init__(self, app_name: str, auto_agree: bool = False):
10+
self.app_name = app_name
11+
self.auto_agree = auto_agree
12+
super().__init__()
13+
14+
def get_migration_ids_to_remove(self) -> t.List[str]:
15+
"""
16+
Returns a list of migration ID strings, which are rows in the table,
17+
but don't have a corresponding migration module on disk.
18+
"""
19+
app_config = self.get_app_config(app_name=self.app_name)
20+
21+
migration_module_dict = self.get_migration_modules(
22+
folder_path=app_config.migrations_folder_path
23+
)
24+
25+
# The migration IDs which are in migration modules.
26+
migration_ids = self.get_migration_ids(
27+
migration_module_dict=migration_module_dict
28+
)
29+
30+
migration_ids_to_remove = (
31+
Migration.select(Migration.name)
32+
.where(Migration.app_name == self.app_name)
33+
.where(Migration.name.not_in(migration_ids))
34+
.output(as_list=True)
35+
.run_sync()
36+
)
37+
return migration_ids_to_remove
38+
39+
def run(self):
40+
print("Checking the migration table ...")
41+
42+
# Make sure the migration table exists, otherwise we'll get an error.
43+
self.create_migration_table()
44+
45+
migration_ids_to_remove = self.get_migration_ids_to_remove()
46+
47+
if migration_ids_to_remove:
48+
id_string = "\n".join(migration_ids_to_remove)
49+
print(
50+
"The following migrations exist in the table, not not in "
51+
"modules:"
52+
)
53+
print(id_string)
54+
confirm = (
55+
"y"
56+
if self.auto_agree
57+
else input("Would you like to delete these rows? (y/N)")
58+
)
59+
if confirm == "y":
60+
Migration.delete().where(
61+
Migration.name.is_in(migration_ids_to_remove)
62+
).run_sync()
63+
print("Deleted")
64+
else:
65+
print("Cancelled")
66+
else:
67+
print(
68+
"No migrations exist in the table, which aren't also in "
69+
"modules."
70+
)
71+
72+
73+
def clean(app_name: str, auto_agree: bool = False):
74+
"""
75+
Identifies any rows in the migration table which have no corresponding
76+
migration module on disk, and then optionally deletes those rows.
77+
78+
:param app_name:
79+
The name of the app to check.
80+
:param auto_agree:
81+
If true, automatically agree to any input prompts.
82+
83+
"""
84+
CleanMigrationManager(app_name=app_name, auto_agree=auto_agree).run()
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
from unittest import TestCase
2+
3+
from piccolo.apps.migrations.commands.clean import clean
4+
from piccolo.apps.migrations.tables import Migration
5+
6+
7+
class TestCleanMigrationCommand(TestCase):
8+
def test_clean(self):
9+
Migration.create_table(if_not_exists=True).run_sync()
10+
11+
real_migration_ids = [
12+
"2020-12-17T18:44:30",
13+
"2020-12-17T18:44:39",
14+
"2020-12-17T18:44:44",
15+
]
16+
17+
orphaned_migration_id = "2010-01-101T00:00:00"
18+
19+
migration_ids = real_migration_ids + [orphaned_migration_id]
20+
21+
Migration.insert(
22+
*[Migration(name=i, app_name="example_app") for i in migration_ids]
23+
).run_sync()
24+
25+
clean(app_name="example_app", auto_agree=True)
26+
27+
remaining_rows = (
28+
Migration.select(Migration.name)
29+
.where(Migration.app_name == "example_app")
30+
.output(as_list=True)
31+
.order_by(Migration.name)
32+
.run_sync()
33+
)
34+
self.assertEqual(remaining_rows, real_migration_ids)
35+
36+
Migration.alter().drop_table(if_exists=True).run_sync()

0 commit comments

Comments
 (0)