2018-12-11 12:38:15 +01:00

418 lines
12 KiB
Python
Executable File

# coding=utf-8
import logging
import os
import re
import uuid
from collections import OrderedDict
import dateutil.parser
from django.conf import settings
from django.db import models
from django.template.defaultfilters import slugify
from django.utils import timezone
from django.utils.text import slugify
from fuzzywuzzy import fuzz
from .managers import LogManager
try:
from django.core.urlresolvers import reverse
except ImportError:
from django.urls import reverse
class MatchingModel(models.Model):
name = models.CharField(max_length=128, unique=True)
slug = models.SlugField(blank=True, editable=False)
automatic_classification = models.BooleanField(
default=False,
help_text="Automatically assign to newly added documents based on "
"current usage in your document collection."
)
class Meta:
abstract = True
ordering = ("name",)
def __str__(self):
return self.name
def save(self, *args, **kwargs):
self.slug = slugify(self.name)
models.Model.save(self, *args, **kwargs)
class Correspondent(MatchingModel):
# This regex is probably more restrictive than it needs to be, but it's
# better safe than sorry.
SAFE_REGEX = re.compile(r"^[\w\- ,.']+$")
class Meta:
ordering = ("name",)
class Tag(MatchingModel):
COLOURS = (
(1, "#a6cee3"),
(2, "#1f78b4"),
(3, "#b2df8a"),
(4, "#33a02c"),
(5, "#fb9a99"),
(6, "#e31a1c"),
(7, "#fdbf6f"),
(8, "#ff7f00"),
(9, "#cab2d6"),
(10, "#6a3d9a"),
(11, "#b15928"),
(12, "#000000"),
(13, "#cccccc")
)
colour = models.PositiveIntegerField(choices=COLOURS, default=1)
is_inbox_tag = models.BooleanField(
default=False,
help_text="Marks this tag as an inbox tag: All newly consumed "
"documents will be tagged with inbox tags."
)
class DocumentType(MatchingModel):
pass
class Document(models.Model):
TYPE_PDF = "pdf"
TYPE_PNG = "png"
TYPE_JPG = "jpg"
TYPE_GIF = "gif"
TYPE_TIF = "tiff"
TYPE_TXT = "txt"
TYPE_CSV = "csv"
TYPE_MD = "md"
TYPES = (TYPE_PDF, TYPE_PNG, TYPE_JPG, TYPE_GIF, TYPE_TIF,
TYPE_TXT, TYPE_CSV, TYPE_MD)
STORAGE_TYPE_UNENCRYPTED = "unencrypted"
STORAGE_TYPE_GPG = "gpg"
STORAGE_TYPES = (
(STORAGE_TYPE_UNENCRYPTED, "Unencrypted"),
(STORAGE_TYPE_GPG, "Encrypted with GNU Privacy Guard")
)
correspondent = models.ForeignKey(
Correspondent,
blank=True,
null=True,
related_name="documents",
on_delete=models.SET_NULL
)
title = models.CharField(max_length=128, blank=True, db_index=True)
document_type = models.ForeignKey(
DocumentType,
blank=True,
null=True,
related_name="documents",
on_delete=models.SET_NULL
)
content = models.TextField(
db_index=True,
blank=True,
help_text="The raw, text-only data of the document. This field is "
"primarily used for searching."
)
file_type = models.CharField(
max_length=4,
editable=False,
choices=tuple([(t, t.upper()) for t in TYPES])
)
tags = models.ManyToManyField(
Tag, related_name="documents", blank=True)
checksum = models.CharField(
max_length=32,
editable=False,
unique=True,
help_text="The checksum of the original document (before it was "
"encrypted). We use this to prevent duplicate document "
"imports."
)
created = models.DateTimeField(
default=timezone.now, db_index=True)
modified = models.DateTimeField(
auto_now=True, editable=False, db_index=True)
storage_type = models.CharField(
max_length=11,
choices=STORAGE_TYPES,
default=STORAGE_TYPE_UNENCRYPTED,
editable=False
)
added = models.DateTimeField(
default=timezone.now, editable=False, db_index=True)
archive_serial_number = models.IntegerField(
blank=True,
null=True,
unique=True,
db_index=True,
help_text="The position of this document in your physical document "
"archive."
)
class Meta:
ordering = ("correspondent", "title")
def __str__(self):
created = self.created.strftime("%Y%m%d%H%M%S")
if self.correspondent and self.title:
return "{}: {} - {}".format(
created, self.correspondent, self.title)
if self.correspondent or self.title:
return "{}: {}".format(created, self.correspondent or self.title)
return str(created)
@property
def source_path(self):
file_name = "{:07}.{}".format(self.pk, self.file_type)
if self.storage_type == self.STORAGE_TYPE_GPG:
file_name += ".gpg"
return os.path.join(
settings.MEDIA_ROOT,
"documents",
"originals",
file_name
)
@property
def source_file(self):
return open(self.source_path, "rb")
@property
def file_name(self):
return slugify(str(self)) + "." + self.file_type
@property
def download_url(self):
return reverse("fetch", kwargs={"kind": "doc", "pk": self.pk})
@property
def thumbnail_path(self):
file_name = "{:07}.png".format(self.pk)
if self.storage_type == self.STORAGE_TYPE_GPG:
file_name += ".gpg"
return os.path.join(
settings.MEDIA_ROOT,
"documents",
"thumbnails",
file_name
)
@property
def thumbnail_file(self):
return open(self.thumbnail_path, "rb")
@property
def thumbnail_url(self):
return reverse("fetch", kwargs={"kind": "thumb", "pk": self.pk})
class Log(models.Model):
LEVELS = (
(logging.DEBUG, "Debugging"),
(logging.INFO, "Informational"),
(logging.WARNING, "Warning"),
(logging.ERROR, "Error"),
(logging.CRITICAL, "Critical"),
)
group = models.UUIDField(blank=True)
message = models.TextField()
level = models.PositiveIntegerField(choices=LEVELS, default=logging.INFO)
created = models.DateTimeField(auto_now_add=True)
modified = models.DateTimeField(auto_now=True)
objects = LogManager()
class Meta:
ordering = ("-modified",)
def __str__(self):
return self.message
def save(self, *args, **kwargs):
"""
To allow for the case where we don't want to group the message, we
shouldn't force the caller to specify a one-time group value. However,
allowing group=None means that the manager can't differentiate the
different un-grouped messages, so instead we set a random one here.
"""
if not self.group:
self.group = uuid.uuid4()
models.Model.save(self, *args, **kwargs)
class FileInfo:
# This epic regex *almost* worked for our needs, so I'm keeping it here for
# posterity, in the hopes that we might find a way to make it work one day.
ALMOST_REGEX = re.compile(
r"^((?P<date>\d\d\d\d\d\d\d\d\d\d\d\d\d\dZ){separator})?"
r"((?P<correspondent>{non_separated_word}+){separator})??"
r"(?P<title>{non_separated_word}+)"
r"({separator}(?P<tags>[a-z,0-9-]+))?"
r"\.(?P<extension>[a-zA-Z.-]+)$".format(
separator=r"\s+-\s+",
non_separated_word=r"([\w,. ]|([^\s]-))"
)
)
formats = "pdf|jpe?g|png|gif|tiff?|te?xt|md|csv"
REGEXES = OrderedDict([
("created-correspondent-title-tags", re.compile(
r"^(?P<created>\d\d\d\d\d\d\d\d(\d\d\d\d\d\d)?Z) - "
r"(?P<correspondent>.*) - "
r"(?P<title>.*) - "
r"(?P<tags>[a-z0-9\-,]*)"
r"\.(?P<extension>{})$".format(formats),
flags=re.IGNORECASE
)),
("created-title-tags", re.compile(
r"^(?P<created>\d\d\d\d\d\d\d\d(\d\d\d\d\d\d)?Z) - "
r"(?P<title>.*) - "
r"(?P<tags>[a-z0-9\-,]*)"
r"\.(?P<extension>{})$".format(formats),
flags=re.IGNORECASE
)),
("created-correspondent-title", re.compile(
r"^(?P<created>\d\d\d\d\d\d\d\d(\d\d\d\d\d\d)?Z) - "
r"(?P<correspondent>.*) - "
r"(?P<title>.*)"
r"\.(?P<extension>{})$".format(formats),
flags=re.IGNORECASE
)),
("created-title", re.compile(
r"^(?P<created>\d\d\d\d\d\d\d\d(\d\d\d\d\d\d)?Z) - "
r"(?P<title>.*)"
r"\.(?P<extension>{})$".format(formats),
flags=re.IGNORECASE
)),
("correspondent-title-tags", re.compile(
r"(?P<correspondent>.*) - "
r"(?P<title>.*) - "
r"(?P<tags>[a-z0-9\-,]*)"
r"\.(?P<extension>{})$".format(formats),
flags=re.IGNORECASE
)),
("correspondent-title", re.compile(
r"(?P<correspondent>.*) - "
r"(?P<title>.*)?"
r"\.(?P<extension>{})$".format(formats),
flags=re.IGNORECASE
)),
("title", re.compile(
r"(?P<title>.*)"
r"\.(?P<extension>{})$".format(formats),
flags=re.IGNORECASE
))
])
def __init__(self, created=None, correspondent=None, title=None, tags=(),
extension=None):
self.created = created
self.title = title
self.extension = extension
self.correspondent = correspondent
self.tags = tags
@classmethod
def _get_created(cls, created):
try:
return dateutil.parser.parse("{:0<14}Z".format(created[:-1]))
except ValueError:
return None
@classmethod
def _get_correspondent(cls, name):
if not name:
return None
return Correspondent.objects.get_or_create(name=name, defaults={
"slug": slugify(name)
})[0]
@classmethod
def _get_title(cls, title):
return title
@classmethod
def _get_tags(cls, tags):
r = []
for t in tags.split(","):
r.append(Tag.objects.get_or_create(
slug=slugify(t),
defaults={"name": t}
)[0])
return tuple(r)
@classmethod
def _get_extension(cls, extension):
r = extension.lower()
if r == "jpeg":
return "jpg"
if r == "tif":
return "tiff"
return r
@classmethod
def _mangle_property(cls, properties, name):
if name in properties:
properties[name] = getattr(cls, "_get_{}".format(name))(
properties[name]
)
@classmethod
def from_path(cls, path):
"""
We use a crude naming convention to make handling the correspondent,
title, and tags easier:
"<date> - <correspondent> - <title> - <tags>.<suffix>"
"<correspondent> - <title> - <tags>.<suffix>"
"<correspondent> - <title>.<suffix>"
"<title>.<suffix>"
"""
for regex in cls.REGEXES.values():
m = regex.match(os.path.basename(path))
if m:
properties = m.groupdict()
cls._mangle_property(properties, "created")
cls._mangle_property(properties, "correspondent")
cls._mangle_property(properties, "title")
cls._mangle_property(properties, "tags")
cls._mangle_property(properties, "extension")
return cls(**properties)