117 lines
3.2 KiB
Python
117 lines
3.2 KiB
Python
from __future__ import absolute_import, unicode_literals
|
|
from celery import task, shared_task
|
|
from celery.utils.log import get_task_logger
|
|
|
|
import datetime
|
|
import fileinput
|
|
import os
|
|
import re
|
|
import shutil
|
|
import smtplib
|
|
import sys
|
|
|
|
from pathlib import Path
|
|
from os.path import basename, dirname, isfile
|
|
|
|
from email.feedparser import FeedParser
|
|
from email.message import EmailMessage
|
|
from email.header import decode_header, make_header
|
|
|
|
import django
|
|
from django.conf import settings
|
|
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'procat2.settings')
|
|
django.setup()
|
|
|
|
from django.contrib.auth.models import User
|
|
|
|
from .utils import clean_path, ensure_dir, set_file_perms, MARKUP_WORK_DIR
|
|
from .email import reply, reply_missing, reply_no_matches, send_error_email
|
|
from .matching import find_marked_products
|
|
from .spreadsheet import write_spreadsheet
|
|
from procat2.settings import TREE_NAME
|
|
|
|
logger = get_task_logger(__name__)
|
|
|
|
|
|
def on_fail_handler(self, exc, task_id, args, kwargs, einfo):
|
|
"""Send an email if a task throws an exception."""
|
|
print(str(einfo))
|
|
send_error_email(f'ERROR: {TREE_NAME} celery task {task_id}', str(einfo))
|
|
|
|
|
|
# @shared_task(on_failure=on_fail_handler)
|
|
# def test_fail(x, y):
|
|
# test_fail_internal()
|
|
|
|
# def test_fail_internal():
|
|
# raise KeyError()
|
|
|
|
|
|
def workdir(username, pdf_path):
|
|
stem = clean_path(Path(pdf_path).stem)
|
|
return os.path.join(MARKUP_WORK_DIR, clean_path(username), stem)
|
|
|
|
|
|
def dest_spreadsheet_path(dest_pdf_path):
|
|
path = Path(dest_pdf_path)
|
|
return os.path.join(path.parent, f"{path.stem}.xlsx")
|
|
|
|
|
|
def work_pdf_path(workdir, pdf_path):
|
|
return os.path.join(workdir, Path(pdf_path).name)
|
|
|
|
|
|
def work_spreadsheet_path(username, pdf_path):
|
|
stem = Path(pdf_path).stem
|
|
return os.path.join(workdir(username, pdf_path), f"{stem}.xlsx")
|
|
|
|
|
|
@shared_task(on_failure=on_fail_handler)
|
|
def process_markup_pdf(pdf_path, username):
|
|
if not Path(pdf_path).is_file():
|
|
print(f'No pdf - exiting ({pdf_path})')
|
|
return
|
|
|
|
user = User.objects.get(username=username)
|
|
|
|
work_dir = workdir(user.username, pdf_path)
|
|
ensure_dir(work_dir)
|
|
|
|
work_pdf = work_pdf_path(work_dir, pdf_path)
|
|
|
|
print(f'copying pdf from {pdf_path}')
|
|
print(f'copying pdf to {work_pdf}')
|
|
shutil.copy(pdf_path, work_pdf)
|
|
set_file_perms(work_pdf)
|
|
|
|
frm = str(make_header(decode_header(f'{user.get_full_name()} <{user.email}>')))
|
|
subject = str(make_header(decode_header(Path(pdf_path).stem)))
|
|
|
|
# find matches
|
|
matches = find_marked_products(work_pdf, work_dir, debug=0)
|
|
if not matches:
|
|
print('no product matches')
|
|
# reply_no_matches(frm, subject)
|
|
return
|
|
|
|
print(f'{len(matches)} product matches')
|
|
|
|
# write spreadsheet
|
|
work_xls_path = work_spreadsheet_path(username, pdf_path)
|
|
write_spreadsheet(matches, work_xls_path)
|
|
|
|
if not work_xls_path:
|
|
# TODO send error
|
|
print(f'error creating spreadsheet')
|
|
return
|
|
|
|
dest_xls_path = dest_spreadsheet_path(pdf_path)
|
|
|
|
print(f'wrote spreadsheet: {work_xls_path}')
|
|
print(f'copying xls to {dest_xls_path}')
|
|
|
|
shutil.copy(work_xls_path, dest_xls_path)
|
|
set_file_perms(dest_xls_path)
|
|
|
|
reply(frm, subject, dest_xls_path, pdf_path)
|