import "jsr:@supabase/functions-js/edge-runtime.d.ts";
import { createClient } from "jsr:@supabase/supabase-js@2";
import { Mistral } from "npm:@mistralai/mistralai";
import pLimit from "npm:p-limit";
export const corsHeaders = {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers":
"authorization, x-client-info, apikey, content-type",
};
const apiKey = Deno.env.get("MISTRAL_API_KEY");
const client = new Mistral({
apiKey: apiKey,
});
const PROCESSING_PROMPT = `
You are a document processing AI. Your task is to process the Markdown text scanned from a document page and return it in a clean and structured format.
The textual page data should only be returned in valid Markdown format. Use proper headings and subheadings to structure the content.
Any images should be included.
Do not return the Markdown as a code block, only as a raw string, without any new lines.
No data or information should ever be removed, it should only be processed and formatted.
There are in-text citations/references in the text, remove them from the text (**but most importantly, keep the reference number in the text. use a tag**) and put them into an object where the key is the reference number and the value is the text.
The Markdown should be human-readable and well-formatted. The markdown string should properly sanitized and should not break a JSON parser when returned as the final format.
Return the final result as a text object with the following structure (without code block formatting):
"""
---------
{
"citations": [
{
"number": 1, // The number as it appears in the text
"text": "Citation text 1"
},
{
"number": 2,
"text": "Citation text 2"
}
]
}
"""
Do not return the text object as a code block, only as a raw string.
`;
Deno.serve(async (req) => {
if (req.method === "OPTIONS") {
console.log("Handling OPTIONS request...");
return new Response(null, {
headers: {
...corsHeaders,
"Access-Control-Allow-Methods": "POST, OPTIONS",
},
});
}
if (req.method === "POST") {
console.log("Processing POST request...");
const { body, writable } = new TransformStream();
const writer = writable.getWriter();
// Set up the SSE response
const headers = new Headers({
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
...corsHeaders,
});
let activeOperations = 0; // Track active operations
let streamClosed = false; // Track if the stream is closed
const sendEvent = async (event, data) => {
if (streamClosed) {
console.warn("Attempted to write to a closed stream.");
return;
}
const message = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`;
console.log("Sending event:", message);
try {
activeOperations++;
await writer.write(new TextEncoder().encode(message));
} catch (error) {
console.error("Error writing to stream:", error);
} finally {
activeOperations--;
}
};
// Start streaming updates
sendEvent("status", {
message: "Initializing...",
});
try {
const supabase = createClient(
Deno.env.get("SUPABASE_URL"),
Deno.env.get("SUPABASE_ANON_KEY")
);
const supabaseServer = createClient(
Deno.env.get("SUPABASE_URL"),
Deno.env.get("SUPABASE_SERVICE_ROLE_KEY")
);
const formData = await req.formData();
const accessToken = formData.get("access_token");
const refreshToken = formData.get("refresh_token");
var reprocessing = false;
var uuid = crypto.randomUUID();
const {
data: { user },
error: sessionError,
} = await supabase.auth.setSession({
access_token: accessToken,
refresh_token: refreshToken,
});
if (sessionError) {
console.error("Error setting session:", sessionError);
sendEvent("error", {
message: "Error setting session",
error: sessionError,
});
throw new Error("Setting session failed");
}
if (formData.has("id")) {
console.log("Reprocessing document...");
reprocessing = true;
console.log("File ID found in form data.");
sendEvent("status", {
message: "File ID found in form data.",
});
const docId = formData.get("id");
const { data: documentData, error: documentError } = await supabase
.from("documents")
.select("*")
.eq("id", docId)
.single();
if (documentError) {
console.error("Error fetching document record:", documentError);
sendEvent("error", {
message: "Error fetching document record",
error: documentError,
});
throw new Error("Document record fetch failed");
}
if (documentData) {
await supabase
.from("documents")
.update({
is_processing: true,
})
.eq("id", documentData.id);
uuid = documentData.id;
} else {
console.error("Document record not found.");
sendEvent("error", {
message: "Document record not found",
});
throw new Error("Document record not found");
}
const { data: storageData, error: storageError } = await supabaseServer
.from("storage.objects")
.select("name")
.eq("id", documentData.raw_file)
.single();
if (storageError) {
console.error("Error fetching file name:", storageError);
sendEvent("error", {
message: "Error fetching file name",
error: storageError,
});
throw new Error("Storage data fetch failed");
}
const { data: fileData, error: fileError } = await supabase.storage
.from("documents")
.download(storageData.name);
if (fileError) {
console.error("Error downloading file from storage:", fileError);
sendEvent("error", {
message: "Error downloading file from storage",
error: fileError,
});
throw new Error("File download failed");
}
console.log("File downloaded from storage:", fileData);
sendEvent("status", {
message: "File downloaded from storage",
fileData,
});
formData.set("file", fileData);
}
if (!formData.has("file")) {
console.error("File not found in form data.");
sendEvent("error", {
message: "File not found in form data",
});
throw new Error("File not found");
}
if (!formData.has("access_token") || !formData.has("refresh_token")) {
console.error("Access token or refresh token not found in form data.");
sendEvent("error", {
message: "Access token or refresh token not found in form data",
});
throw new Error("Tokens not found");
}
const file = formData.get("file") as File;
const fileName = file.name;
console.log("Generated UUID:", uuid);
sendEvent("status", {
message: "Generated UUID",
uuid,
});
console.log("Authenticated user:", user);
sendEvent("status", {
message: "Authenticated user",
user,
});
if (!reprocessing) {
const { data: storageData, error: storageError } =
await supabase.storage
.from("documents")
.upload(`${user.id}/${uuid}.pdf`, file);
if (storageError) {
console.error("Error uploading file to storage:", storageError);
sendEvent("error", {
message: "Error uploading file to storage",
error: storageError,
});
throw new Error("File upload failed");
}
console.log("File uploaded to storage:", storageData);
sendEvent("status", {
message: "File uploaded to storage",
storageData,
});
const { error: docError } = await supabase.from("documents").insert({
id: uuid,
file_name: file.name,
owner: user.id,
raw_file: storageData.id,
is_processing: true,
});
if (docError) {
console.error("Error inserting document record:", docError);
sendEvent("error", {
message: "Error inserting document record",
error: docError,
});
throw new Error("Document record insertion failed");
}
console.log("Document record inserted successfully.");
sendEvent("status", {
message: "Document record inserted successfully",
});
} else {
console.log("Reprocessing document...");
sendEvent("status", {
message: "Reprocessing document",
});
const { error: docError } = await supabase
.from("documents")
.update({
is_processing: true,
})
.eq("id", uuid);
if (docError) {
console.error("Error updating document record:", docError);
sendEvent("error", {
message: "Error updating document record",
error: docError,
});
throw new Error("Document record update failed");
}
console.log("Document record updated successfully.");
sendEvent("status", {
message: "Document record updated successfully",
});
}
console.log("Uploading file to Mistral...");
sendEvent("status", {
message: "Uploading file to Mistral...",
});
const uploaded_pdf = await client.files.upload({
file: {
fileName,
content: file,
},
purpose: "ocr",
});
console.log("File uploaded to Mistral:", uploaded_pdf);
sendEvent("status", {
message: "File uploaded to Mistral",
uploaded_pdf,
});
const signedUrl = await client.files.getSignedUrl({
fileId: uploaded_pdf.id,
});
console.log("Generated signed URL:", signedUrl);
sendEvent("status", {
message: "Generated signed URL",
signedUrl,
});
console.log("Processing OCR...");
sendEvent("status", {
message: "Processing OCR...",
});
const ocrResponse = await client.ocr.process({
model: "mistral-ocr-latest",
document: {
type: "document_url",
documentUrl: signedUrl.url,
},
});
console.log("OCR response received:", ocrResponse);
sendEvent("status", {
message: "OCR response received",
ocrResponse,
});
const limit = pLimit(2);
const promises = [];
for (const page of ocrResponse.pages) {
console.log("Processing page:", page.index);
sendEvent("status", {
message: `Processing page ${page.index}`,
});
const pagePromise = limit(async () => {
console.log(`Processing page ${page.index} with Mistral...`);
const response = await client.chat.complete({
model: "mistral-small-latest",
messages: [
{
role: "system",
content: [
{
type: "text",
text: PROCESSING_PROMPT,
},
],
},
{
role: "user",
content: [
{
type: "text",
text: page.markdown,
},
],
},
],
});
if (!response.choices) {
console.error("No choices in response for page:", page.index);
sendEvent("error", {
message: `No choices in response for page ${page.index}`,
});
return;
}
console.log("Response received for page:", page.index);
sendEvent("status", {
message: `Response received for page ${page.index}`,
});
const imageData = {};
if (page.images.length > 0) {
console.log(
`Processing ${page.images.length} images for page ${page.index}...`
);
sendEvent("status", {
message: `Processing images for page ${page.index}`,
});
for (const img of page.images) {
imageData[img.id] = img.imageBase64;
}
}
if (response.choices[0].message.content) {
// remove any potential code block formatting from the content
console.log(
`[${page.index}] ${response.choices[0].message.content}`
);
const split =
response.choices[0].message.content.split("---------");
const content = split[0].trim();
const citationsStr = split[1]?.trim() || "{}";
const citations = JSON.parse(citationsStr).citations || {};
console.log("Generating Markdown for page:", page.index);
sendEvent("status", {
message: `Generating Markdown for page ${page.index}`,
});
const markdown = replaceImagesInMarkdown(content, imageData);
return {
...page,
markdown,
citations,
};
} else {
console.error("Message content is undefined for page:", page.index);
sendEvent("error", {
message: `Message content is undefined for page ${page.index}`,
});
}
});
promises.push(pagePromise);
}
console.log("Waiting for all pages to be processed...");
sendEvent("status", {
message: "Waiting for all pages to be processed...",
});
const results = await Promise.all(promises);
console.log("All pages processed. Results:", results);
sendEvent("status", {
message: "All pages processed",
results,
});
const sortedResults = results.sort((a, b) => a.index - b.index);
console.log("Sorted results:", sortedResults);
sendEvent("status", { message: "Sorted results", sortedResults });
const { data, error } = await supabase
.from("documents")
.update({
ocr_data: sortedResults,
is_processing: false,
})
.eq("id", uuid);
if (error) {
console.error("Error updating document record:", error);
sendEvent("error", {
message: "Error updating document record",
error,
});
throw new Error("Document record update failed");
}
console.log("Closing SSE stream...");
} catch (error) {
console.error("Error during processing:", error);
sendEvent("error", {
message: "Error during processing",
error,
});
} finally {
// Wait for all active operations to complete before closing the stream
const interval = setInterval(() => {
if (activeOperations === 0) {
clearInterval(interval);
streamClosed = true;
writer.close();
}
}, 100); // Check every 100ms
}
return new Response(body, {
headers,
});
}
console.error("Method not allowed:", req.method);
return new Response("Method not allowed", {
status: 405,
});
});
function replaceImagesInMarkdown(markdownStr, imagesDict) {
console.log("Replacing images in Markdown...");
for (const [imgName, base64Str] of Object.entries(imagesDict)) {
markdownStr = markdownStr.replace(
new RegExp(`!\\[${imgName}\\]\\(${imgName}\\)`, "g"),
``
);
}
console.log("Image replacement complete.");
return markdownStr;
}