Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 12 additions & 14 deletions src/file_manipulator.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import os
from src.filler import Filler
from src.llm import LLM
from src.logger import setup_logger
from commonforms import prepare_form

logger = setup_logger(__name__)


class FileManipulator:
def __init__(self):
Expand All @@ -22,26 +25,21 @@ def fill_form(self, user_input: str, fields: list, pdf_form_path: str):
It receives the raw data, runs the PDF filling logic,
and returns the path to the newly created file.
"""
print("[1] Received request from frontend.")
print(f"[2] PDF template path: {pdf_form_path}")
logger.info("Received request from frontend.")
logger.info(f"PDF template path: {pdf_form_path}")

if not os.path.exists(pdf_form_path):
print(f"Error: PDF template not found at {pdf_form_path}")
return None # Or raise an exception
logger.error(f"PDF template not found at {pdf_form_path}")
return None

print("[3] Starting extraction and PDF filling process...")
logger.info("Starting extraction and PDF filling process...")
try:
self.llm._target_fields = fields
self.llm._transcript_text = user_input
output_name = self.filler.fill_form(pdf_form=pdf_form_path, llm=self.llm)

print("\n----------------------------------")
print("✅ Process Complete.")
print(f"Output saved to: {output_name}")

logger.info("Process Complete.")
logger.info(f"Output saved to: {output_name}")
return output_name

except Exception as e:
print(f"An error occurred during PDF generation: {e}")
# Re-raise the exception so the frontend can handle it
raise e
logger.error(f"An error occurred during PDF generation: {e}")
raise e
55 changes: 19 additions & 36 deletions src/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@
import os
import requests

from src.logger import setup_logger
logger = setup_logger(__name__)


class LLM:
def __init__(self, transcript_text=None, target_fields=None, json=None):
if json is None:
json = {}
self._transcript_text = transcript_text # str
self._target_fields = target_fields # List, contains the template field.
self._json = json # dictionary
self._transcript_text = transcript_text
self._target_fields = target_fields
self._json = json

def type_check_all(self):
if type(self._transcript_text) is not str:
Expand All @@ -24,10 +27,6 @@ def type_check_all(self):
)

def build_prompt(self, current_field):
"""
This method is in charge of the prompt engineering. It creates a specific prompt for each target field.
@params: current_field -> represents the current element of the json that is being prompted.
"""
prompt = f"""
SYSTEM PROMPT:
You are an AI assistant designed to help fillout json files with information extracted from transcribed voice recordings.
Expand All @@ -41,22 +40,18 @@ def build_prompt(self, current_field):

TEXT: {self._transcript_text}
"""

return prompt

def main_loop(self):
# self.type_check_all()
for field in self._target_fields.keys():
prompt = self.build_prompt(field)
# print(prompt)
# ollama_url = "http://localhost:11434/api/generate"
ollama_host = os.getenv("OLLAMA_HOST", "http://localhost:11434").rstrip("/")
ollama_url = f"{ollama_host}/api/generate"

payload = {
"model": "mistral",
"prompt": prompt,
"stream": False, # don't really know why --> look into this later.
"stream": False,
}

try:
Expand All @@ -70,24 +65,18 @@ def main_loop(self):
except requests.exceptions.HTTPError as e:
raise RuntimeError(f"Ollama returned an error: {e}")

# parse response
json_data = response.json()
parsed_response = json_data["response"]
# print(parsed_response)
self.add_response_to_json(field, parsed_response)

print("----------------------------------")
print("\t[LOG] Resulting JSON created from the input text:")
print(json.dumps(self._json, indent=2))
print("--------- extracted data ---------")
logger.info("----------------------------------")
logger.info("Resulting JSON created from the input text:")
logger.info(json.dumps(self._json, indent=2))
logger.info("--------- extracted data ---------")

return self

def add_response_to_json(self, field, value):
"""
this method adds the following value under the specified field,
or under a new field if the field doesn't exist, to the json dict
"""
value = value.strip().replace('"', "")
parsed_value = None

Expand All @@ -97,39 +86,33 @@ def add_response_to_json(self, field, value):
if ";" in value:
parsed_value = self.handle_plural_values(value)

if field in self._json.keys():
self._json[field].append(parsed_value)
if field in self._json:
if isinstance(self._json[field], list):
self._json[field].append(parsed_value)
else:
self._json[field] = [self._json[field], parsed_value]
else:
self._json[field] = parsed_value

return

def handle_plural_values(self, plural_value):
"""
This method handles plural values.
Takes in strings of the form 'value1; value2; value3; ...; valueN'
returns a list with the respective values -> [value1, value2, value3, ..., valueN]
"""
if ";" not in plural_value:
raise ValueError(
f"Value is not plural, doesn't have ; separator, Value: {plural_value}"
)

print(
f"\t[LOG]: Formating plural values for JSON, [For input {plural_value}]..."
)
logger.info(f"Formatting plural values for JSON, input: {plural_value}")
values = plural_value.split(";")

# Remove trailing leading whitespace
for i in range(len(values)):
current = i + 1
if current < len(values):
clean_value = values[current].lstrip()
values[current] = clean_value

print(f"\t[LOG]: Resulting formatted list of values: {values}")

logger.info(f"Resulting formatted list of values: {values}")
return values

def get_data(self):
return self._json
return self._json
25 changes: 25 additions & 0 deletions src/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import logging


def setup_logger(name: str):
"""
Sets up and returns a logger with the given name.
Avoids adding duplicate handlers if logger already exists.
"""
logger = logging.getLogger(name)

if logger.handlers:
return logger

logger.setLevel(logging.INFO)

handler = logging.StreamHandler()

formatter = logging.Formatter(
"%(asctime)s | %(levelname)s | %(name)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
handler.setFormatter(formatter)
logger.addHandler(handler)

return logger