import "jsr:@supabase/functions-js/edge-runtime.d.ts"; import { createClient } from "jsr:@supabase/supabase-js@2"; import { Mistral } from "npm:@mistralai/mistralai"; import pLimit from "npm:p-limit"; export const corsHeaders = { "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Headers": "authorization, x-client-info, apikey, content-type", }; const apiKey = Deno.env.get("MISTRAL_API_KEY"); const client = new Mistral({ apiKey: apiKey, }); const PROCESSING_PROMPT = ` You are a document processing AI. Your task is to process the Markdown text scanned from a document page and return it in a clean and structured format. The textual page data should only be returned in valid Markdown format. Use proper headings and subheadings to structure the content. Any images should be included. Do not return the Markdown as a code block, only as a raw string, without any new lines. No data or information should ever be removed, it should only be processed and formatted. There are in-text citations/references in the text, remove them from the text and put them into an object where the key is the reference number and the value is the text. The Markdown should be human-readable and well-formatted. Return the final result as a JSON object with the following structure: { "markdown": "", "citations": { "1": "", "2": "" } } `; Deno.serve(async (req) => { if (req.method === "OPTIONS") { console.log("Handling OPTIONS request..."); return new Response(null, { headers: { ...corsHeaders, "Access-Control-Allow-Methods": "POST, OPTIONS", }, }); } if (req.method === "POST") { console.log("Processing POST request..."); const { body, writable } = new TransformStream(); const writer = writable.getWriter(); // Set up the SSE response const headers = new Headers({ "Content-Type": "text/event-stream", "Cache-Control": "no-cache", Connection: "keep-alive", ...corsHeaders, }); let activeOperations = 0; // Track active operations let streamClosed = false; // Track if the stream is closed const sendEvent = async (event, data) => { if (streamClosed) { console.warn("Attempted to write to a closed stream."); return; } const message = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`; console.log("Sending event:", message); try { activeOperations++; await writer.write(new TextEncoder().encode(message)); } catch (error) { console.error("Error writing to stream:", error); } finally { activeOperations--; } }; // Start streaming updates sendEvent("status", { message: "Initializing...", }); try { const supabase = createClient( Deno.env.get("SUPABASE_URL"), Deno.env.get("SUPABASE_ANON_KEY") ); const formData = await req.formData(); const file = formData.get("file"); const accessToken = formData.get("access_token"); const refreshToken = formData.get("refresh_token"); const fileName = file.name; const uuid = crypto.randomUUID(); const { data: { user }, error: sessionError, } = await supabase.auth.setSession({ access_token: accessToken, refresh_token: refreshToken, }); if (sessionError) { console.error("Error setting session:", sessionError); sendEvent("error", { message: "Error setting session", error: sessionError, }); throw new Error("Setting session failed"); } console.log("Generated UUID:", uuid); sendEvent("status", { message: "Generated UUID", uuid, }); console.log("Authenticated user:", user); sendEvent("status", { message: "Authenticated user", user, }); const { data: storageData, error: storageError } = await supabase.storage .from("documents") .upload(`${user.id}/${uuid}.pdf`, file); if (storageError) { console.error("Error uploading file to storage:", storageError); sendEvent("error", { message: "Error uploading file to storage", error: storageError, }); throw new Error("File upload failed"); } console.log("File uploaded to storage:", storageData); sendEvent("status", { message: "File uploaded to storage", storageData, }); const { error: docError } = await supabase.from("documents").insert({ id: uuid, file_name: file.name, owner: user.id, raw_file: storageData.id, is_processing: true, }); if (docError) { console.error("Error inserting document record:", docError); sendEvent("error", { message: "Error inserting document record", error: docError, }); throw new Error("Document record insertion failed"); } console.log("Document record inserted successfully."); sendEvent("status", { message: "Document record inserted successfully", }); console.log("Uploading file to Mistral..."); sendEvent("status", { message: "Uploading file to Mistral...", }); const uploaded_pdf = await client.files.upload({ file: { fileName, content: file, }, purpose: "ocr", }); console.log("File uploaded to Mistral:", uploaded_pdf); sendEvent("status", { message: "File uploaded to Mistral", uploaded_pdf, }); const signedUrl = await client.files.getSignedUrl({ fileId: uploaded_pdf.id, }); console.log("Generated signed URL:", signedUrl); sendEvent("status", { message: "Generated signed URL", signedUrl, }); console.log("Processing OCR..."); sendEvent("status", { message: "Processing OCR...", }); const ocrResponse = await client.ocr.process({ model: "mistral-ocr-latest", document: { type: "document_url", documentUrl: signedUrl.url, }, }); console.log("OCR response received:", ocrResponse); sendEvent("status", { message: "OCR response received", ocrResponse, }); const limit = pLimit(2); const promises = []; for (const page of ocrResponse.pages) { console.log("Processing page:", page.index); sendEvent("status", { message: `Processing page ${page.index}`, }); const pagePromise = limit(async () => { console.log(`Processing page ${page.index} with Mistral...`); const response = await client.chat.complete({ model: "mistral-small-latest", messages: [ { role: "system", content: [ { type: "text", text: PROCESSING_PROMPT, }, ], }, { role: "user", content: [ { type: "text", text: page.markdown, }, ], }, ], }); if (!response.choices) { console.error("No choices in response for page:", page.index); sendEvent("error", { message: `No choices in response for page ${page.index}`, }); return; } console.log("Response received for page:", page.index); sendEvent("status", { message: `Response received for page ${page.index}`, }); const imageData = {}; if (page.images.length > 0) { console.log( `Processing ${page.images.length} images for page ${page.index}...` ); sendEvent("status", { message: `Processing images for page ${page.index}`, }); for (const img of page.images) { imageData[img.id] = img.imageBase64; } } if (response.choices[0].message.content) { const markdownResponse = JSON.parse( response.choices[0].message.content.toString() ); const citations = markdownResponse.citations; const markdown = markdownResponse.markdown; console.log("Generating Markdown for page:", page.index); sendEvent("status", { message: `Generating Markdown for page ${page.index}`, }); const markdown = replaceImagesInMarkdown(markdown, imageData); return { ...page, markdown, citations, }; } else { console.error("Message content is undefined for page:", page.index); sendEvent("error", { message: `Message content is undefined for page ${page.index}`, }); } }); promises.push(pagePromise); } console.log("Waiting for all pages to be processed..."); sendEvent("status", { message: "Waiting for all pages to be processed...", }); const results = await Promise.all(promises); console.log("All pages processed. Results:", results); sendEvent("status", { message: "All pages processed", results, }); const sortedResults = results.sort((a, b) => a.index - b.index); console.log("Sorted results:", sortedResults); sendEvent("status", { message: "Sorted results", sortedResults }); const { data, error } = await supabase .from("documents") .update({ ocr_data: sortedResults, is_processing: false, }) .eq("id", uuid); if (error) { console.error("Error updating document record:", error); sendEvent("error", { message: "Error updating document record", error, }); throw new Error("Document record update failed"); } console.log("Closing SSE stream..."); } catch (error) { console.error("Error during processing:", error); sendEvent("error", { message: "Error during processing", error, }); } finally { // Wait for all active operations to complete before closing the stream const interval = setInterval(() => { if (activeOperations === 0) { clearInterval(interval); streamClosed = true; writer.close(); } }, 100); // Check every 100ms } return new Response(body, { headers, }); } console.error("Method not allowed:", req.method); return new Response("Method not allowed", { status: 405, }); }); function replaceImagesInMarkdown(markdownStr, imagesDict) { console.log("Replacing images in Markdown..."); for (const [imgName, base64Str] of Object.entries(imagesDict)) { markdownStr = markdownStr.replace( new RegExp(`!\\[${imgName}\\]\\(${imgName}\\)`, "g"), `![${imgName}](${base64Str})` ); } console.log("Image replacement complete."); return markdownStr; }