Files
procat2/markup/tasks.py

135 lines
3.9 KiB
Python

from __future__ import absolute_import, unicode_literals
from celery import task, shared_task
from celery.utils.log import get_task_logger
import datetime
import fileinput
import os
import re
import shutil
import smtplib
import sys
from pathlib import Path
from os.path import basename, dirname, isfile
from email.feedparser import FeedParser
from email.message import EmailMessage
from email.header import decode_header, make_header
import django
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'procat2.settings')
django.setup()
from .utils import clean_path, ensure_dir, set_file_perms, WORKDIR
from .email import reply, reply_missing, reply_no_matches, send_error_email
from .matching import find_marked_products
from .spreadsheet import write_spreadsheet
from procat2.settings import TREE_NAME
logger = get_task_logger(__name__)
def on_fail_handler(self, exc, task_id, args, kwargs, einfo):
"""Send an email if a task throws an exception."""
print(str(einfo))
send_error_email(f'ERROR: {TREE_NAME} celery task {task_id}', str(einfo))
# @shared_task(on_failure=on_fail_handler)
# def test_fail(x, y):
# test_fail_internal()
# def test_fail_internal():
# raise KeyError()
@shared_task(on_failure=on_fail_handler)
def process_message(path):
parser = FeedParser()
with open(path) as f:
for line in f:
parser.feed(line)
msg = parser.close()
frm = str(make_header(decode_header(msg['From'])))
subject = str(make_header(decode_header(msg['Subject'])))
found_pdf = False
for attach in msg.walk():
if attach.get_content_type() == 'application/pdf':
process_attachment(frm, subject, attach)
found_pdf = True
if not found_pdf:
reply_missing(frm, subject)
def process_attachment(from_address, subject, attachment):
# write out pdf
pdf_name = attachment.get_filename()
pdf_name = str(make_header(decode_header(pdf_name)))
# if pdf name is in UUID format, use email subject
if re.match(r'[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}\.pdf', pdf_name):
pdf_name = f'{subject}.pdf'
print(f'Using pdf name: {pdf_name}')
pdf_base = Path(pdf_name).stem
workdir = os.path.join(WORKDIR, clean_path(from_address), pdf_base)
ensure_dir(workdir)
pdf_path = os.path.join(workdir, pdf_name)
print(f'saving pdf to {pdf_path}')
with open(pdf_path, 'wb') as att:
att.write(attachment.get_payload(decode=True))
set_file_perms(pdf_path)
process_pdf(pdf_path, from_address, subject, workdir)
@shared_task(on_failure=on_fail_handler)
def process_markup_pdf(pdf_path, user):
if not Path(pdf_path).is_file():
print(f'No pdf - exiting ({pdf_path})')
return
pdf_stem = Path(pdf_path).stem
workdir = os.path.join(WORKDIR, clean_path(user.username), clean_path(pdf_stem))
ensure_dir(workdir)
pdf_name = Path(pdf_path).name
dest_path = os.path.join(workdir, pdf_name)
print(f'copying pdf to {dest_path}')
shutil.copy(pdf_path, dest_path)
set_file_perms(dest_path)
frm = str(make_header(decode_header(f'{user.get_full_name()} <{user.email}>')))
subject = str(make_header(decode_header(pdf_name)))
process_pdf(dest_path, frm, subject, workdir)
def process_pdf(pdf_path, from_address, subject, workdir):
# find matches
matches = find_marked_products(pdf_path, workdir, debug=0)
if not matches:
print('no product matches')
reply_no_matches(from_address, subject)
return
print(f'{len(matches)} product matches')
# write spreadsheet
pdf_stem = Path(pdf_path).stem
xls_path = write_spreadsheet(matches, workdir, pdf_stem)
if xls_path:
# send reply
print(f'wrote spreadsheet: {xls_path}')
reply(from_address, subject, xls_path, pdf_path)
else:
# send error
print(f'error creating spreadsheet')