mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2025-07-28 18:24:38 -05:00
Feature: Audit Trail (#4425)
Adds new feature for optionally enabling change tracking for possible audit purposes --------- Co-authored-by: shamoon <4887959+shamoon@users.noreply.github.com> Co-authored-by: Trenton Holmes <797416+stumpylog@users.noreply.github.com>
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
from django.conf import settings
|
||||
from django.contrib import admin
|
||||
from guardian.admin import GuardedModelAdmin
|
||||
|
||||
@@ -12,6 +13,10 @@ from documents.models import ShareLink
|
||||
from documents.models import StoragePath
|
||||
from documents.models import Tag
|
||||
|
||||
if settings.AUDIT_LOG_ENABLED:
|
||||
from auditlog.admin import LogEntryAdmin
|
||||
from auditlog.models import LogEntry
|
||||
|
||||
|
||||
class CorrespondentAdmin(GuardedModelAdmin):
|
||||
list_display = ("name", "match", "matching_algorithm")
|
||||
@@ -148,3 +153,12 @@ admin.site.register(StoragePath, StoragePathAdmin)
|
||||
admin.site.register(PaperlessTask, TaskAdmin)
|
||||
admin.site.register(Note, NotesAdmin)
|
||||
admin.site.register(ShareLink, ShareLinksAdmin)
|
||||
|
||||
if settings.AUDIT_LOG_ENABLED:
|
||||
|
||||
class LogEntryAUDIT(LogEntryAdmin):
|
||||
def has_delete_permission(self, request, obj=None):
|
||||
return False
|
||||
|
||||
admin.site.unregister(LogEntry)
|
||||
admin.site.register(LogEntry, LogEntryAUDIT)
|
||||
|
@@ -20,6 +20,9 @@ from django.utils import timezone
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from multiselectfield import MultiSelectField
|
||||
|
||||
if settings.AUDIT_LOG_ENABLED:
|
||||
from auditlog.registry import auditlog
|
||||
|
||||
from documents.data_models import DocumentSource
|
||||
from documents.parsers import get_default_file_extension
|
||||
|
||||
@@ -872,3 +875,11 @@ class ConsumptionTemplate(models.Model):
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.name}"
|
||||
|
||||
|
||||
if settings.AUDIT_LOG_ENABLED:
|
||||
auditlog.register(Document, m2m_fields={"tags"})
|
||||
auditlog.register(Correspondent)
|
||||
auditlog.register(Tag)
|
||||
auditlog.register(DocumentType)
|
||||
auditlog.register(Note)
|
||||
|
@@ -37,6 +37,10 @@ from documents.parsers import DocumentParser
|
||||
from documents.parsers import get_parser_class_for_mime_type
|
||||
from documents.sanity_checker import SanityCheckFailedException
|
||||
|
||||
if settings.AUDIT_LOG_ENABLED:
|
||||
import json
|
||||
|
||||
from auditlog.models import LogEntry
|
||||
logger = logging.getLogger("paperless.tasks")
|
||||
|
||||
|
||||
@@ -258,11 +262,37 @@ def update_document_archive_file(document_id):
|
||||
document,
|
||||
archive_filename=True,
|
||||
)
|
||||
oldDocument = Document.objects.get(pk=document.pk)
|
||||
Document.objects.filter(pk=document.pk).update(
|
||||
archive_checksum=checksum,
|
||||
content=parser.get_text(),
|
||||
archive_filename=document.archive_filename,
|
||||
)
|
||||
newDocument = Document.objects.get(pk=document.pk)
|
||||
if settings.AUDIT_LOG_ENABLED:
|
||||
LogEntry.objects.log_create(
|
||||
instance=oldDocument,
|
||||
changes=json.dumps(
|
||||
{
|
||||
"content": [oldDocument.content, newDocument.content],
|
||||
"archive_checksum": [
|
||||
oldDocument.archive_checksum,
|
||||
newDocument.archive_checksum,
|
||||
],
|
||||
"archive_filename": [
|
||||
oldDocument.archive_filename,
|
||||
newDocument.archive_filename,
|
||||
],
|
||||
},
|
||||
),
|
||||
additional_data=json.dumps(
|
||||
{
|
||||
"reason": "Redo OCR called",
|
||||
},
|
||||
),
|
||||
action=LogEntry.Action.UPDATE,
|
||||
)
|
||||
|
||||
with FileLock(settings.MEDIA_LOCK):
|
||||
create_source_path_directory(document.archive_path)
|
||||
shutil.move(parser.get_archive_path(), document.archive_path)
|
||||
|
@@ -115,6 +115,9 @@ from paperless import version
|
||||
from paperless.db import GnuPG
|
||||
from paperless.views import StandardPagination
|
||||
|
||||
if settings.AUDIT_LOG_ENABLED:
|
||||
from auditlog.models import LogEntry
|
||||
|
||||
logger = logging.getLogger("paperless.api")
|
||||
|
||||
|
||||
@@ -521,6 +524,18 @@ class DocumentViewSet(
|
||||
user=currentUser,
|
||||
)
|
||||
c.save()
|
||||
# If audit log is enabled make an entry in the log
|
||||
# about this note change
|
||||
if settings.AUDIT_LOG_ENABLED:
|
||||
LogEntry.objects.log_create(
|
||||
instance=doc,
|
||||
changes=json.dumps(
|
||||
{
|
||||
"Note Added": ["None", c.id],
|
||||
},
|
||||
),
|
||||
action=LogEntry.Action.UPDATE,
|
||||
)
|
||||
|
||||
doc.modified = timezone.now()
|
||||
doc.save()
|
||||
@@ -546,6 +561,17 @@ class DocumentViewSet(
|
||||
return HttpResponseForbidden("Insufficient permissions to delete")
|
||||
|
||||
note = Note.objects.get(id=int(request.GET.get("id")))
|
||||
if settings.AUDIT_LOG_ENABLED:
|
||||
LogEntry.objects.log_create(
|
||||
instance=doc,
|
||||
changes=json.dumps(
|
||||
{
|
||||
"Note Deleted": [note.id, "None"],
|
||||
},
|
||||
),
|
||||
action=LogEntry.Action.UPDATE,
|
||||
)
|
||||
|
||||
note.delete()
|
||||
|
||||
doc.modified = timezone.now()
|
||||
|
@@ -1,4 +1,5 @@
|
||||
from paperless.celery import app as celery_app
|
||||
from paperless.checks import audit_log_check
|
||||
from paperless.checks import binaries_check
|
||||
from paperless.checks import paths_check
|
||||
from paperless.checks import settings_values_check
|
||||
@@ -8,4 +9,5 @@ __all__ = [
|
||||
"binaries_check",
|
||||
"paths_check",
|
||||
"settings_values_check",
|
||||
"audit_log_check",
|
||||
]
|
||||
|
@@ -5,9 +5,11 @@ import shutil
|
||||
import stat
|
||||
|
||||
from django.conf import settings
|
||||
from django.core.checks import Critical
|
||||
from django.core.checks import Error
|
||||
from django.core.checks import Warning
|
||||
from django.core.checks import register
|
||||
from django.db import connections
|
||||
|
||||
exists_message = "{} is set but doesn't exist."
|
||||
exists_hint = "Create a directory at {}"
|
||||
@@ -195,3 +197,19 @@ def settings_values_check(app_configs, **kwargs):
|
||||
+ _barcode_scanner_validate()
|
||||
+ _email_certificate_validate()
|
||||
)
|
||||
|
||||
|
||||
@register()
|
||||
def audit_log_check(app_configs, **kwargs):
|
||||
db_conn = connections["default"]
|
||||
all_tables = db_conn.introspection.table_names()
|
||||
|
||||
if ("auditlog_logentry" in all_tables) and not (settings.AUDIT_LOG_ENABLED):
|
||||
return [
|
||||
Critical(
|
||||
(
|
||||
"auditlog table was found but PAPERLESS_AUDIT_LOG_ENABLED"
|
||||
" is not active. This setting cannot be disabled after enabling"
|
||||
),
|
||||
),
|
||||
]
|
||||
|
@@ -933,6 +933,11 @@ TIKA_GOTENBERG_ENDPOINT = os.getenv(
|
||||
if TIKA_ENABLED:
|
||||
INSTALLED_APPS.append("paperless_tika.apps.PaperlessTikaConfig")
|
||||
|
||||
AUDIT_LOG_ENABLED = __get_boolean("PAPERLESS_AUDIT_LOG_ENABLED", "NO")
|
||||
if AUDIT_LOG_ENABLED:
|
||||
INSTALLED_APPS.append("auditlog")
|
||||
MIDDLEWARE.append("auditlog.middleware.AuditlogMiddleware")
|
||||
|
||||
|
||||
def _parse_ignore_dates(
|
||||
env_ignore: str,
|
||||
|
@@ -1,11 +1,13 @@
|
||||
import os
|
||||
from pathlib import Path
|
||||
from unittest import mock
|
||||
|
||||
from django.test import TestCase
|
||||
from django.test import override_settings
|
||||
|
||||
from documents.tests.utils import DirectoriesMixin
|
||||
from documents.tests.utils import FileSystemAssertsMixin
|
||||
from paperless.checks import audit_log_check
|
||||
from paperless.checks import binaries_check
|
||||
from paperless.checks import debug_mode_check
|
||||
from paperless.checks import paths_check
|
||||
@@ -231,3 +233,35 @@ class TestEmailCertSettingsChecks(DirectoriesMixin, FileSystemAssertsMixin, Test
|
||||
msg = msgs[0]
|
||||
|
||||
self.assertIn("Email cert /tmp/not_actually_here.pem is not a file", msg.msg)
|
||||
|
||||
|
||||
class TestAuditLogChecks(TestCase):
|
||||
def test_was_enabled_once(self):
|
||||
"""
|
||||
GIVEN:
|
||||
- Audit log is not enabled
|
||||
WHEN:
|
||||
- Database tables contain audit log entry
|
||||
THEN:
|
||||
- system check error reported for disabling audit log
|
||||
"""
|
||||
introspect_mock = mock.MagicMock()
|
||||
introspect_mock.introspection.table_names.return_value = ["auditlog_logentry"]
|
||||
with override_settings(AUDIT_LOG_ENABLED=False):
|
||||
with mock.patch.dict(
|
||||
"paperless.checks.connections",
|
||||
{"default": introspect_mock},
|
||||
):
|
||||
msgs = audit_log_check(None)
|
||||
|
||||
self.assertEqual(len(msgs), 1)
|
||||
|
||||
msg = msgs[0]
|
||||
|
||||
self.assertIn(
|
||||
(
|
||||
"auditlog table was found but PAPERLESS_AUDIT_LOG_ENABLED"
|
||||
" is not active."
|
||||
),
|
||||
msg.msg,
|
||||
)
|
||||
|
@@ -21,6 +21,11 @@ omit =
|
||||
paperless/wsgi.py
|
||||
paperless/auth.py
|
||||
|
||||
[coverage:report]
|
||||
exclude_also =
|
||||
if settings.AUDIT_LOG_ENABLED:
|
||||
if AUDIT_LOG_ENABLED:
|
||||
|
||||
[mypy]
|
||||
plugins = mypy_django_plugin.main, mypy_drf_plugin.main, numpy.typing.mypy_plugin
|
||||
check_untyped_defs = true
|
||||
|
Reference in New Issue
Block a user