Feature: Allow a data only export/import cycle (#6871)

This commit is contained in:
Trenton H 2024-06-01 18:22:59 -07:00 committed by GitHub
parent 04f52f553a
commit 085447e7c4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 331 additions and 234 deletions

View File

@ -185,6 +185,13 @@ For PostgreSQL, refer to [Upgrading a PostgreSQL Cluster](https://www.postgresql
For MariaDB, refer to [Upgrading MariaDB](https://mariadb.com/kb/en/upgrading/)
You may also use the exporter and importer with the `--data-only` flag, after creating a new database with the updated version of PostgreSQL or MariaDB.
!!! warning
You should not change any settings, especially paths, when doing this or there is a
risk of data loss
## Downgrading Paperless {#downgrade-paperless}
Downgrades are possible. However, some updates also contain database
@ -269,6 +276,7 @@ optional arguments:
-sm, --split-manifest
-z, --zip
-zn, --zip-name
--data-only
```
`target` is a folder to which the data gets written. This includes
@ -327,6 +335,9 @@ If `-z` or `--zip` is provided, the export will be a zip file
in the target directory, named according to the current local date or the
value set in `-zn` or `--zip-name`.
If `--data-only` is provided, only the database will be exported. This option is intended
to facilitate database upgrades without needing to clean documents and thumbnails from the media directory.
!!! warning
If exporting with the file name format, there may be errors due to
@ -341,10 +352,15 @@ exporter](#exporter) and imports it into paperless.
The importer works just like the exporter. You point it at a directory,
and the script does the rest of the work:
```
```shell
document_importer source
```
| Option | Required | Default | Description |
| ----------- | -------- | ------- | ------------------------------------------------------------------------- |
| source | Yes | N/A | The directory containing an export |
| --data-only | No | False | If provided, only import data, do not import document files or thumbnails |
When you use the provided docker compose script, put the export inside
the `export` folder in your paperless source directory. Specify
`../export` as the `source`.

View File

@ -5,6 +5,7 @@ import shutil
import tempfile
import time
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Optional
import tqdm
@ -22,6 +23,9 @@ from filelock import FileLock
from guardian.models import GroupObjectPermission
from guardian.models import UserObjectPermission
if TYPE_CHECKING:
from django.db.models import QuerySet
if settings.AUDIT_LOG_ENABLED:
from auditlog.models import LogEntry
@ -147,6 +151,13 @@ class Command(BaseCommand):
help="Sets the export zip file name",
)
parser.add_argument(
"--data-only",
default=False,
action="store_true",
help="If set, only the database will be imported, not files",
)
parser.add_argument(
"--no-progress-bar",
default=False,
@ -154,19 +165,6 @@ class Command(BaseCommand):
help="If set, the progress bar will not be shown",
)
def __init__(self, *args, **kwargs):
BaseCommand.__init__(self, *args, **kwargs)
self.target: Path = None
self.split_manifest = False
self.files_in_export_dir: set[Path] = set()
self.exported_files: list[Path] = []
self.compare_checksums = False
self.use_filename_format = False
self.use_folder_prefix = False
self.delete = False
self.no_archive = False
self.no_thumbnail = False
def handle(self, *args, **options):
self.target = Path(options["target"]).resolve()
self.split_manifest: bool = options["split_manifest"]
@ -177,14 +175,17 @@ class Command(BaseCommand):
self.no_archive: bool = options["no_archive"]
self.no_thumbnail: bool = options["no_thumbnail"]
self.zip_export: bool = options["zip"]
self.data_only: bool = options["data_only"]
self.no_progress_bar: bool = options["no_progress_bar"]
self.files_in_export_dir: set[Path] = set()
self.exported_files: set[str] = set()
# If zipping, save the original target for later and
# get a temporary directory for the target instead
temp_dir = None
self.original_target: Optional[Path] = None
self.original_target = self.target
if self.zip_export:
self.original_target = self.target
settings.SCRATCH_DIR.mkdir(parents=True, exist_ok=True)
temp_dir = tempfile.TemporaryDirectory(
dir=settings.SCRATCH_DIR,
@ -202,12 +203,13 @@ class Command(BaseCommand):
raise CommandError("That path doesn't appear to be writable")
try:
# Prevent any ongoing changes in the documents
with FileLock(settings.MEDIA_LOCK):
self.dump(options["no_progress_bar"])
self.dump()
# We've written everything to the temporary directory in this case,
# now make an archive in the original target, with all files stored
if self.zip_export:
if self.zip_export and temp_dir is not None:
shutil.make_archive(
os.path.join(
self.original_target,
@ -222,7 +224,7 @@ class Command(BaseCommand):
if self.zip_export and temp_dir is not None:
temp_dir.cleanup()
def dump(self, progress_bar_disable=False):
def dump(self):
# 1. Take a snapshot of what files exist in the current export folder
for x in self.target.glob("**/*"):
if x.is_file():
@ -230,115 +232,59 @@ class Command(BaseCommand):
# 2. Create manifest, containing all correspondents, types, tags, storage paths
# note, documents and ui_settings
manifest_key_to_object_query: dict[str, QuerySet] = {
"correspondents": Correspondent.objects.all(),
"tags": Tag.objects.all(),
"document_types": DocumentType.objects.all(),
"storage_paths": StoragePath.objects.all(),
"mail_accounts": MailAccount.objects.all(),
"mail_rules": MailRule.objects.all(),
"saved_views": SavedView.objects.all(),
"saved_view_filter_rules": SavedViewFilterRule.objects.all(),
"groups": Group.objects.all(),
"users": User.objects.exclude(
username__in=["consumer", "AnonymousUser"],
).all(),
"ui_settings": UiSettings.objects.all(),
"content_types": ContentType.objects.all(),
"permissions": Permission.objects.all(),
"user_object_permissions": UserObjectPermission.objects.all(),
"group_object_permissions": GroupObjectPermission.objects.all(),
"workflow_triggers": WorkflowTrigger.objects.all(),
"workflow_actions": WorkflowAction.objects.all(),
"workflows": Workflow.objects.all(),
"custom_fields": CustomField.objects.all(),
"custom_field_instances": CustomFieldInstance.objects.all(),
"app_configs": ApplicationConfiguration.objects.all(),
"notes": Note.objects.all(),
"documents": Document.objects.order_by("id").all(),
}
if settings.AUDIT_LOG_ENABLED:
manifest_key_to_object_query["log_entries"] = LogEntry.objects.all()
with transaction.atomic():
manifest = json.loads(
serializers.serialize("json", Correspondent.objects.all()),
)
manifest_dict = {}
manifest += json.loads(serializers.serialize("json", Tag.objects.all()))
manifest += json.loads(
serializers.serialize("json", DocumentType.objects.all()),
)
manifest += json.loads(
serializers.serialize("json", StoragePath.objects.all()),
)
manifest += json.loads(
serializers.serialize("json", MailAccount.objects.all()),
)
manifest += json.loads(
serializers.serialize("json", MailRule.objects.all()),
)
manifest += json.loads(
serializers.serialize("json", SavedView.objects.all()),
)
manifest += json.loads(
serializers.serialize("json", SavedViewFilterRule.objects.all()),
)
manifest += json.loads(serializers.serialize("json", Group.objects.all()))
manifest += json.loads(
serializers.serialize(
"json",
User.objects.exclude(username__in=["consumer", "AnonymousUser"]),
),
)
manifest += json.loads(
serializers.serialize("json", UiSettings.objects.all()),
)
manifest += json.loads(
serializers.serialize("json", ContentType.objects.all()),
)
manifest += json.loads(
serializers.serialize("json", Permission.objects.all()),
)
manifest += json.loads(
serializers.serialize("json", UserObjectPermission.objects.all()),
)
manifest += json.loads(
serializers.serialize("json", GroupObjectPermission.objects.all()),
)
manifest += json.loads(
serializers.serialize("json", WorkflowTrigger.objects.all()),
)
manifest += json.loads(
serializers.serialize("json", WorkflowAction.objects.all()),
)
manifest += json.loads(
serializers.serialize("json", Workflow.objects.all()),
)
manifest += json.loads(
serializers.serialize("json", CustomField.objects.all()),
)
manifest += json.loads(
serializers.serialize("json", ApplicationConfiguration.objects.all()),
)
if settings.AUDIT_LOG_ENABLED:
manifest += json.loads(
serializers.serialize("json", LogEntry.objects.all()),
# Build an overall manifest
for key in manifest_key_to_object_query:
manifest_dict[key] = json.loads(
serializers.serialize("json", manifest_key_to_object_query[key]),
)
# These are treated specially and included in the per-document manifest
# if that setting is enabled. Otherwise, they are just exported to the bulk
# manifest
documents = Document.objects.order_by("id")
document_map: dict[int, Document] = {d.pk: d for d in documents}
document_manifest = json.loads(serializers.serialize("json", documents))
notes = json.loads(
serializers.serialize("json", Note.objects.all()),
)
custom_field_instances = json.loads(
serializers.serialize("json", CustomFieldInstance.objects.all()),
)
if not self.split_manifest:
manifest += document_manifest
manifest += notes
manifest += custom_field_instances
document_map: dict[int, Document] = {
d.pk: d for d in manifest_key_to_object_query["documents"]
}
document_manifest = manifest_dict["documents"]
# 3. Export files from each document
for index, document_dict in tqdm.tqdm(
enumerate(document_manifest),
total=len(document_manifest),
disable=progress_bar_disable,
disable=self.no_progress_bar,
):
# 3.1. store files unencrypted
document_dict["fields"]["storage_type"] = Document.STORAGE_TYPE_UNENCRYPTED
@ -346,102 +292,39 @@ class Command(BaseCommand):
document = document_map[document_dict["pk"]]
# 3.2. generate a unique filename
filename_counter = 0
while True:
if self.use_filename_format:
base_name = generate_filename(
document,
counter=filename_counter,
append_gpg=False,
)
else:
base_name = document.get_public_filename(counter=filename_counter)
if base_name not in self.exported_files:
self.exported_files.append(base_name)
break
else:
filename_counter += 1
base_name = self.generate_base_name(document)
# 3.3. write filenames into manifest
original_name = base_name
if self.use_folder_prefix:
original_name = os.path.join("originals", original_name)
original_target = (self.target / Path(original_name)).resolve()
document_dict[EXPORTER_FILE_NAME] = original_name
if not self.no_thumbnail:
thumbnail_name = base_name + "-thumbnail.webp"
if self.use_folder_prefix:
thumbnail_name = os.path.join("thumbnails", thumbnail_name)
thumbnail_target = (self.target / Path(thumbnail_name)).resolve()
document_dict[EXPORTER_THUMBNAIL_NAME] = thumbnail_name
else:
thumbnail_target = None
if not self.no_archive and document.has_archive_version:
archive_name = base_name + "-archive.pdf"
if self.use_folder_prefix:
archive_name = os.path.join("archive", archive_name)
archive_target = (self.target / Path(archive_name)).resolve()
document_dict[EXPORTER_ARCHIVE_NAME] = archive_name
else:
archive_target = None
original_target, thumbnail_target, archive_target = (
self.generate_document_targets(document, base_name, document_dict)
)
# 3.4. write files to target folder
if document.storage_type == Document.STORAGE_TYPE_GPG:
t = int(time.mktime(document.created.timetuple()))
original_target.parent.mkdir(parents=True, exist_ok=True)
with document.source_file as out_file:
original_target.write_bytes(GnuPG.decrypted(out_file))
os.utime(original_target, times=(t, t))
if thumbnail_target:
thumbnail_target.parent.mkdir(parents=True, exist_ok=True)
with document.thumbnail_file as out_file:
thumbnail_target.write_bytes(GnuPG.decrypted(out_file))
os.utime(thumbnail_target, times=(t, t))
if archive_target:
archive_target.parent.mkdir(parents=True, exist_ok=True)
with document.archive_path as out_file:
archive_target.write_bytes(GnuPG.decrypted(out_file))
os.utime(archive_target, times=(t, t))
else:
self.check_and_copy(
document.source_path,
document.checksum,
if not self.data_only:
self.copy_document_files(
document,
original_target,
thumbnail_target,
archive_target,
)
if thumbnail_target:
self.check_and_copy(document.thumbnail_path, None, thumbnail_target)
if archive_target:
self.check_and_copy(
document.archive_path,
document.archive_checksum,
archive_target,
)
if self.split_manifest:
manifest_name = base_name + "-manifest.json"
manifest_name = Path(base_name + "-manifest.json")
if self.use_folder_prefix:
manifest_name = os.path.join("json", manifest_name)
manifest_name = (self.target / Path(manifest_name)).resolve()
manifest_name = Path("json") / manifest_name
manifest_name = (self.target / manifest_name).resolve()
manifest_name.parent.mkdir(parents=True, exist_ok=True)
content = [document_manifest[index]]
content += list(
filter(
lambda d: d["fields"]["document"] == document_dict["pk"],
notes,
manifest_dict["notes"],
),
)
content += list(
filter(
lambda d: d["fields"]["document"] == document_dict["pk"],
custom_field_instances,
manifest_dict["custom_field_instances"],
),
)
manifest_name.write_text(
@ -451,8 +334,17 @@ class Command(BaseCommand):
if manifest_name in self.files_in_export_dir:
self.files_in_export_dir.remove(manifest_name)
# 4.1 write manifest to target folder
manifest_path = (self.target / Path("manifest.json")).resolve()
# These were exported already
if self.split_manifest:
del manifest_dict["documents"]
del manifest_dict["notes"]
del manifest_dict["custom_field_instances"]
# 4.1 write primary manifest to target folder
manifest = []
for key in manifest_dict:
manifest.extend(manifest_dict[key])
manifest_path = (self.target / "manifest.json").resolve()
manifest_path.write_text(
json.dumps(manifest, indent=2, ensure_ascii=False),
encoding="utf-8",
@ -461,7 +353,7 @@ class Command(BaseCommand):
self.files_in_export_dir.remove(manifest_path)
# 4.2 write version information to target folder
version_path = (self.target / Path("version.json")).resolve()
version_path = (self.target / "version.json").resolve()
version_path.write_text(
json.dumps(
{"version": version.__full_version_str__},
@ -491,7 +383,127 @@ class Command(BaseCommand):
else:
item.unlink()
def check_and_copy(self, source, source_checksum, target: Path):
def generate_base_name(self, document: Document) -> str:
"""
Generates a unique name for the document, one which hasn't already been exported (or will be)
"""
filename_counter = 0
while True:
if self.use_filename_format:
base_name = generate_filename(
document,
counter=filename_counter,
append_gpg=False,
)
else:
base_name = document.get_public_filename(counter=filename_counter)
if base_name not in self.exported_files:
self.exported_files.add(base_name)
break
else:
filename_counter += 1
return base_name
def generate_document_targets(
self,
document: Document,
base_name: str,
document_dict: dict,
) -> tuple[Path, Optional[Path], Optional[Path]]:
"""
Generates the targets for a given document, including the original file, archive file and thumbnail (depending on settings).
"""
original_name = base_name
if self.use_folder_prefix:
original_name = os.path.join("originals", original_name)
original_target = (self.target / Path(original_name)).resolve()
document_dict[EXPORTER_FILE_NAME] = original_name
if not self.no_thumbnail:
thumbnail_name = base_name + "-thumbnail.webp"
if self.use_folder_prefix:
thumbnail_name = os.path.join("thumbnails", thumbnail_name)
thumbnail_target = (self.target / Path(thumbnail_name)).resolve()
document_dict[EXPORTER_THUMBNAIL_NAME] = thumbnail_name
else:
thumbnail_target = None
if not self.no_archive and document.has_archive_version:
archive_name = base_name + "-archive.pdf"
if self.use_folder_prefix:
archive_name = os.path.join("archive", archive_name)
archive_target = (self.target / Path(archive_name)).resolve()
document_dict[EXPORTER_ARCHIVE_NAME] = archive_name
else:
archive_target = None
return original_target, thumbnail_target, archive_target
def copy_document_files(
self,
document: Document,
original_target: Path,
thumbnail_target: Optional[Path],
archive_target: Optional[Path],
) -> None:
"""
Copies files from the document storage location to the specified target location.
If the document is encrypted, the files are decrypted before copying them to the target location.
"""
if document.storage_type == Document.STORAGE_TYPE_GPG:
t = int(time.mktime(document.created.timetuple()))
original_target.parent.mkdir(parents=True, exist_ok=True)
with document.source_file as out_file:
original_target.write_bytes(GnuPG.decrypted(out_file))
os.utime(original_target, times=(t, t))
if thumbnail_target:
thumbnail_target.parent.mkdir(parents=True, exist_ok=True)
with document.thumbnail_file as out_file:
thumbnail_target.write_bytes(GnuPG.decrypted(out_file))
os.utime(thumbnail_target, times=(t, t))
if archive_target:
archive_target.parent.mkdir(parents=True, exist_ok=True)
if TYPE_CHECKING:
assert isinstance(document.archive_path, Path)
with document.archive_path as out_file:
archive_target.write_bytes(GnuPG.decrypted(out_file))
os.utime(archive_target, times=(t, t))
else:
self.check_and_copy(
document.source_path,
document.checksum,
original_target,
)
if thumbnail_target:
self.check_and_copy(document.thumbnail_path, None, thumbnail_target)
if archive_target:
if TYPE_CHECKING:
assert isinstance(document.archive_path, Path)
self.check_and_copy(
document.archive_path,
document.archive_checksum,
archive_target,
)
def check_and_copy(
self,
source: Path,
source_checksum: Optional[str],
target: Path,
):
"""
Copies the source to the target, if target doesn't exist or the target doesn't seem to match
the source attributes
"""
target = target.resolve()
if target in self.files_in_export_dir:
self.files_in_export_dir.remove(target)

View File

@ -57,6 +57,7 @@ class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument("source")
parser.add_argument(
"--no-progress-bar",
default=False,
@ -64,11 +65,12 @@ class Command(BaseCommand):
help="If set, the progress bar will not be shown",
)
def __init__(self, *args, **kwargs):
BaseCommand.__init__(self, *args, **kwargs)
self.source = None
self.manifest = None
self.version = None
parser.add_argument(
"--data-only",
default=False,
action="store_true",
help="If set, only the database will be exported, not files",
)
def pre_check(self) -> None:
"""
@ -82,17 +84,20 @@ class Command(BaseCommand):
if not os.access(self.source, os.R_OK):
raise CommandError("That path doesn't appear to be readable")
for document_dir in [settings.ORIGINALS_DIR, settings.ARCHIVE_DIR]:
if document_dir.exists() and document_dir.is_dir():
for entry in document_dir.glob("**/*"):
if entry.is_dir():
continue
self.stdout.write(
self.style.WARNING(
f"Found file {entry.relative_to(document_dir)}, this might indicate a non-empty installation",
),
)
break
# Skip this check if operating only on the database
# We can expect data to exist in that case
if not self.data_only:
for document_dir in [settings.ORIGINALS_DIR, settings.ARCHIVE_DIR]:
if document_dir.exists() and document_dir.is_dir():
for entry in document_dir.glob("**/*"):
if entry.is_dir():
continue
self.stdout.write(
self.style.WARNING(
f"Found file {entry.relative_to(document_dir)}, this might indicate a non-empty installation",
),
)
break
if (
User.objects.exclude(username__in=["consumer", "AnonymousUser"]).count()
!= 0
@ -113,6 +118,8 @@ class Command(BaseCommand):
logging.getLogger().handlers[0].level = logging.ERROR
self.source = Path(options["source"]).resolve()
self.data_only: bool = options["data_only"]
self.no_progress_bar: bool = options["no_progress_bar"]
self.pre_check()
@ -149,7 +156,8 @@ class Command(BaseCommand):
else:
self.stdout.write(self.style.NOTICE("No version.json file located"))
self._check_manifest_valid()
if not self.data_only:
self._check_manifest_files_valid()
with (
disable_signal(
@ -200,13 +208,16 @@ class Command(BaseCommand):
)
raise e
self._import_files_from_manifest(options["no_progress_bar"])
if not self.data_only:
self._import_files_from_manifest()
else:
self.stdout.write(self.style.NOTICE("Data only import completed"))
self.stdout.write("Updating search index...")
call_command(
"document_index",
"reindex",
no_progress_bar=options["no_progress_bar"],
no_progress_bar=self.no_progress_bar,
)
@staticmethod
@ -216,7 +227,7 @@ class Command(BaseCommand):
"That directory doesn't appear to contain a manifest.json file.",
)
def _check_manifest_valid(self):
def _check_manifest_files_valid(self):
"""
Attempts to verify the manifest is valid. Namely checking the files
referred to exist and the files can be read from
@ -233,15 +244,15 @@ class Command(BaseCommand):
)
doc_file = record[EXPORTER_FILE_NAME]
doc_path = self.source / doc_file
doc_path: Path = self.source / doc_file
if not doc_path.exists():
raise CommandError(
f'The manifest file refers to "{doc_file}" which does not '
"appear to be in the source directory.",
)
try:
with doc_path.open(mode="rb") as infile:
infile.read(1)
with doc_path.open(mode="rb"):
pass
except Exception as e:
raise CommandError(
f"Failed to read from original file {doc_path}",
@ -249,21 +260,21 @@ class Command(BaseCommand):
if EXPORTER_ARCHIVE_NAME in record:
archive_file = record[EXPORTER_ARCHIVE_NAME]
doc_archive_path = self.source / archive_file
doc_archive_path: Path = self.source / archive_file
if not doc_archive_path.exists():
raise CommandError(
f"The manifest file refers to {archive_file} which "
f"does not appear to be in the source directory.",
)
try:
with doc_archive_path.open(mode="rb") as infile:
infile.read(1)
with doc_archive_path.open(mode="rb"):
pass
except Exception as e:
raise CommandError(
f"Failed to read from archive file {doc_archive_path}",
) from e
def _import_files_from_manifest(self, progress_bar_disable):
def _import_files_from_manifest(self):
settings.ORIGINALS_DIR.mkdir(parents=True, exist_ok=True)
settings.THUMBNAIL_DIR.mkdir(parents=True, exist_ok=True)
settings.ARCHIVE_DIR.mkdir(parents=True, exist_ok=True)
@ -274,7 +285,7 @@ class Command(BaseCommand):
filter(lambda r: r["model"] == "documents.document", self.manifest),
)
for record in tqdm.tqdm(manifest_documents, disable=progress_bar_disable):
for record in tqdm.tqdm(manifest_documents, disable=self.no_progress_bar):
document = Document.objects.get(pk=record["pk"])
doc_file = record[EXPORTER_FILE_NAME]

View File

@ -37,10 +37,16 @@ from documents.sanity_checker import check_sanity
from documents.settings import EXPORTER_FILE_NAME
from documents.tests.utils import DirectoriesMixin
from documents.tests.utils import FileSystemAssertsMixin
from documents.tests.utils import SampleDirMixin
from documents.tests.utils import paperless_environment
class TestExportImport(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
class TestExportImport(
DirectoriesMixin,
FileSystemAssertsMixin,
SampleDirMixin,
TestCase,
):
def setUp(self) -> None:
self.target = Path(tempfile.mkdtemp())
self.addCleanup(shutil.rmtree, self.target)
@ -139,6 +145,7 @@ class TestExportImport(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
@override_settings(PASSPHRASE="test")
def _do_export(
self,
*,
use_filename_format=False,
compare_checksums=False,
delete=False,
@ -146,6 +153,7 @@ class TestExportImport(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
no_thumbnail=False,
split_manifest=False,
use_folder_prefix=False,
data_only=False,
):
args = ["document_exporter", self.target]
if use_filename_format:
@ -162,6 +170,8 @@ class TestExportImport(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
args += ["--split-manifest"]
if use_folder_prefix:
args += ["--use-folder-prefix"]
if data_only:
args += ["--data-only"]
call_command(*args)
@ -794,3 +804,39 @@ class TestExportImport(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
manifest = self._do_export(use_filename_format=True)
for obj in manifest:
self.assertNotEqual(obj["model"], "auditlog.logentry")
def test_export_data_only(self):
"""
GIVEN:
- Request to export documents with data only
WHEN:
- Export command is called
THEN:
- No document files are exported
- Manifest and version are exported
"""
shutil.rmtree(self.dirs.media_dir / "documents")
shutil.copytree(
self.SAMPLE_DIR / "documents",
self.dirs.media_dir / "documents",
)
_ = self._do_export(data_only=True)
# Manifest and version files only should be present in the exported directory
self.assertFileCountInDir(self.target, 2)
self.assertIsFile(self.target / "manifest.json")
self.assertIsFile(self.target / "version.json")
shutil.rmtree(self.dirs.media_dir / "documents")
Document.objects.all().delete()
call_command(
"document_importer",
"--no-progress-bar",
"--data-only",
self.target,
)
self.assertEqual(Document.objects.all().count(), 4)

View File

@ -14,9 +14,15 @@ from documents.settings import EXPORTER_ARCHIVE_NAME
from documents.settings import EXPORTER_FILE_NAME
from documents.tests.utils import DirectoriesMixin
from documents.tests.utils import FileSystemAssertsMixin
from documents.tests.utils import SampleDirMixin
class TestCommandImport(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
class TestCommandImport(
DirectoriesMixin,
FileSystemAssertsMixin,
SampleDirMixin,
TestCase,
):
def test_check_manifest_exists(self):
"""
GIVEN:
@ -120,14 +126,14 @@ class TestCommandImport(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
},
]
with self.assertRaises(CommandError) as cm:
cmd._check_manifest_valid()
cmd._check_manifest_files_valid()
self.assertInt("Failed to read from original file", str(cm.exception))
original_path.chmod(0o444)
archive_path.chmod(0o222)
with self.assertRaises(CommandError) as cm:
cmd._check_manifest_valid()
cmd._check_manifest_files_valid()
self.assertInt("Failed to read from archive file", str(cm.exception))
def test_import_source_not_existing(self):

View File

@ -156,10 +156,6 @@ class DirectoriesMixin:
they are cleaned up on exit
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.dirs = None
def setUp(self) -> None:
self.dirs = setup_directories()
super().setUp()
@ -200,6 +196,16 @@ class FileSystemAssertsMixin:
self.assertEqual(hash1, hash2, "File SHA256 mismatch")
def assertFileCountInDir(self, path: Union[PathLike, str], count: int):
path = Path(path).resolve()
self.assertTrue(path.is_dir(), f"Path {path} is not a directory")
files = [x for x in path.iterdir() if x.is_file()]
self.assertEqual(
len(files),
count,
f"Path {path} contains {len(files)} files instead of {count} files",
)
class ConsumerProgressMixin:
"""