import "jsr:@supabase/functions-js/edge-runtime.d.ts"; import { createClient } from "jsr:@supabase/supabase-js@2"; import { Mistral } from "npm:@mistralai/mistralai"; import pLimit from "npm:p-limit"; export const corsHeaders = { "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Headers": "authorization, x-client-info, apikey, content-type", }; const apiKey = Deno.env.get("MISTRAL_API_KEY"); const client = new Mistral({ apiKey: apiKey, }); const PROCESSING_PROMPT = ` You are a document processing AI. Your task is to process the Markdown text scanned from a document page and return it in a clean and structured format. The textual page data should only be returned in valid Markdown format. Use proper headings and subheadings to structure the content. **Do not add headings if they do not exist in the original text.** Any images should be included. Do not return the Markdown as a code block, only as a raw string, without any new lines. No data or information should ever be removed, it should only be processed and formatted. There are in-text citations/references in the text, remove them from the text (**but most importantly, keep the reference number in the text. use a tag**) and put them into an object where the key is the reference number and the value is the text. The Markdown should be human-readable and well-formatted. The markdown string should properly sanitized and should not break a JSON parser when returned as the final format. Return the final result as a text object with the following structure (without code block formatting): """ --------- { "citations": [ { "number": 1, // The number as it appears in the text "text": "Citation text 1" // Ensure any JSON-breaking characters are properly escaped }, { "number": 2, "text": "Citation text 2" } ] } """ Do not return the text object as a code block, only as a raw string. `; Deno.serve(async (req) => { if (req.method === "OPTIONS") { console.log("Handling OPTIONS request..."); return new Response(null, { headers: { ...corsHeaders, "Access-Control-Allow-Methods": "POST, OPTIONS", }, }); } if (req.method === "POST") { console.log("Processing POST request..."); const { body, writable } = new TransformStream(); const writer = writable.getWriter(); // Set up the SSE response const headers = new Headers({ "Content-Type": "text/event-stream", "Cache-Control": "no-cache", Connection: "keep-alive", ...corsHeaders, }); let activeOperations = 0; // Track active operations let streamClosed = false; // Track if the stream is closed const sendEvent = async (event, data) => { if (streamClosed) { console.warn("Attempted to write to a closed stream."); return; } const message = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`; console.log("Sending event:", message); try { activeOperations++; await writer.write(new TextEncoder().encode(message)); } catch (error) { console.error("Error writing to stream:", error); } finally { activeOperations--; } }; // Start streaming updates sendEvent("status", { message: "Initializing...", }); try { const supabase = createClient( Deno.env.get("SUPABASE_URL"), Deno.env.get("SUPABASE_ANON_KEY") ); const supabaseServer = createClient( Deno.env.get("SUPABASE_URL"), Deno.env.get("SUPABASE_SERVICE_ROLE_KEY") ); const formData = await req.formData(); const accessToken = formData.get("access_token"); const refreshToken = formData.get("refresh_token"); var reprocessing = false; var uuid = crypto.randomUUID(); const { data: { user }, error: sessionError, } = await supabase.auth.setSession({ access_token: accessToken, refresh_token: refreshToken, }); if (sessionError) { console.error("Error setting session:", sessionError); sendEvent("error", { message: "Error setting session", error: sessionError, }); throw new Error("Setting session failed"); } if (formData.has("id")) { console.log("Reprocessing document..."); reprocessing = true; console.log("File ID found in form data."); sendEvent("status", { message: "File ID found in form data.", }); const docId = formData.get("id"); console.log("Document ID:", docId, formData); const { data: documentData, error: documentError } = await supabase .from("documents") .select("*") .eq("id", docId) .single(); if (documentError) { console.error("Error fetching document record:", documentError); sendEvent("error", { message: "Error fetching document record", error: documentError, }); throw new Error("Document record fetch failed"); } if (documentData) { await supabase .from("documents") .update({ is_processing: true, }) .eq("id", documentData.id); uuid = documentData.id; } else { console.error("Document record not found."); sendEvent("error", { message: "Document record not found", }); throw new Error("Document record not found"); } const { data: fileData, error: fileError } = await supabase.storage .from("documents") .download(`${user.id}/${uuid}.pdf`); if (fileError) { console.error("Error downloading file from storage:", fileError); sendEvent("error", { message: "Error downloading file from storage", error: fileError, }); throw new Error("File download failed"); } console.log("File downloaded from storage:", fileData); sendEvent("status", { message: "File downloaded from storage", fileData, }); formData.set("file", fileData); } if (!formData.has("file")) { console.error("File not found in form data."); sendEvent("error", { message: "File not found in form data", }); throw new Error("File not found"); } if (!formData.has("access_token") || !formData.has("refresh_token")) { console.error("Access token or refresh token not found in form data."); sendEvent("error", { message: "Access token or refresh token not found in form data", }); throw new Error("Tokens not found"); } const file = formData.get("file") as File; const fileName = file.name; console.log("Generated UUID:", uuid); sendEvent("status", { message: "Generated UUID", uuid, }); console.log("Authenticated user:", user); sendEvent("status", { message: "Authenticated user", user, }); if (!reprocessing) { const { data: storageData, error: storageError } = await supabase.storage .from("documents") .upload(`${user.id}/${uuid}.pdf`, file); if (storageError) { console.error("Error uploading file to storage:", storageError); sendEvent("error", { message: "Error uploading file to storage", error: storageError, }); throw new Error("File upload failed"); } console.log("File uploaded to storage:", storageData); sendEvent("status", { message: "File uploaded to storage", storageData, }); const { error: docError } = await supabase.from("documents").insert({ id: uuid, file_name: file.name, owner: user.id, raw_file: storageData.id, is_processing: true, }); if (docError) { console.error("Error inserting document record:", docError); sendEvent("error", { message: "Error inserting document record", error: docError, }); throw new Error("Document record insertion failed"); } console.log("Document record inserted successfully."); sendEvent("status", { message: "Document record inserted successfully", }); } else { console.log("Reprocessing document..."); sendEvent("status", { message: "Reprocessing document", }); const { error: docError } = await supabase .from("documents") .update({ is_processing: true, }) .eq("id", uuid); if (docError) { console.error("Error updating document record:", docError); sendEvent("error", { message: "Error updating document record", error: docError, }); throw new Error("Document record update failed"); } console.log("Document record updated successfully."); sendEvent("status", { message: "Document record updated successfully", }); } console.log("Uploading file to Mistral..."); sendEvent("status", { message: "Uploading file to Mistral...", }); const uploaded_pdf = await client.files.upload({ file: { fileName, content: file, }, purpose: "ocr", }); console.log("File uploaded to Mistral:", uploaded_pdf); sendEvent("status", { message: "File uploaded to Mistral", uploaded_pdf, }); const signedUrl = await client.files.getSignedUrl({ fileId: uploaded_pdf.id, }); console.log("Generated signed URL:", signedUrl); sendEvent("status", { message: "Generated signed URL", signedUrl, }); console.log("Processing OCR..."); sendEvent("status", { message: "Processing OCR...", }); const ocrResponse = await client.ocr.process({ model: "mistral-ocr-latest", document: { type: "document_url", documentUrl: signedUrl.url, }, }); console.log("OCR response received:", ocrResponse); sendEvent("status", { message: "OCR response received", ocrResponse, }); const limit = pLimit(2); const promises = []; for (const page of ocrResponse.pages) { console.log("Processing page:", page.index); sendEvent("status", { message: `Processing page ${page.index}`, }); const pagePromise = limit(async () => { console.log(`Processing page ${page.index} with Mistral...`); const response = await client.chat.complete({ model: "mistral-small-latest", messages: [ { role: "system", content: [ { type: "text", text: PROCESSING_PROMPT, }, ], }, { role: "user", content: [ { type: "text", text: page.markdown, }, ], }, ], }); if (!response.choices) { console.error("No choices in response for page:", page.index); sendEvent("error", { message: `No choices in response for page ${page.index}`, }); return; } console.log("Response received for page:", page.index); sendEvent("status", { message: `Response received for page ${page.index}`, }); const imageData = {}; if (page.images.length > 0) { console.log( `Processing ${page.images.length} images for page ${page.index}...` ); sendEvent("status", { message: `Processing images for page ${page.index}`, }); for (const img of page.images) { imageData[img.id] = img.imageBase64; } } if (response.choices[0].message.content) { // remove any potential code block formatting from the content console.log( `[${page.index}] ${response.choices[0].message.content}` ); const split = response.choices[0].message.content.split("---------"); const content = split[0].trim(); const citationsStr = split[1]?.trim() || "{}"; console.log(`[${page.index}] Citations: ${citationsStr}`); const citations = JSON.parse(citationsStr).citations || {}; console.log("Generating Markdown for page:", page.index); sendEvent("status", { message: `Generating Markdown for page ${page.index}`, }); const markdown = replaceImagesInMarkdown(content, imageData); return { ...page, markdown, citations, }; } else { console.error("Message content is undefined for page:", page.index); sendEvent("error", { message: `Message content is undefined for page ${page.index}`, }); } }); promises.push(pagePromise); } console.log("Waiting for all pages to be processed..."); sendEvent("status", { message: "Waiting for all pages to be processed...", }); const results = await Promise.all(promises); console.log("All pages processed. Results:", results); sendEvent("status", { message: "All pages processed", results, }); const sortedResults = results.sort((a, b) => a.index - b.index); console.log("Sorted results:", sortedResults); sendEvent("status", { message: "Sorted results", sortedResults }); const { data, error } = await supabase .from("documents") .update({ ocr_data: sortedResults, is_processing: false, }) .eq("id", uuid); if (error) { console.error("Error updating document record:", error); sendEvent("error", { message: "Error updating document record", error, }); throw new Error("Document record update failed"); } console.log("Closing SSE stream..."); } catch (error) { console.error("Error during processing:", error); sendEvent("error", { message: "Error during processing", error, }); } finally { // Wait for all active operations to complete before closing the stream const interval = setInterval(() => { if (activeOperations === 0) { clearInterval(interval); streamClosed = true; writer.close(); } }, 100); // Check every 100ms } return new Response(body, { headers, }); } console.error("Method not allowed:", req.method); return new Response("Method not allowed", { status: 405, }); }); function replaceImagesInMarkdown(markdownStr, imagesDict) { console.log("Replacing images in Markdown..."); for (const [imgName, base64Str] of Object.entries(imagesDict)) { markdownStr = markdownStr.replace( new RegExp(`!\\[${imgName}\\]\\(${imgName}\\)`, "g"), `![${imgName}](${base64Str})` ); } console.log("Image replacement complete."); return markdownStr; }