394 lines
12 KiB
TypeScript

import "jsr:@supabase/functions-js/edge-runtime.d.ts";
import { createClient } from "jsr:@supabase/supabase-js@2";
import { Mistral } from "npm:@mistralai/mistralai";
import pLimit from "npm:p-limit";
export const corsHeaders = {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers":
"authorization, x-client-info, apikey, content-type",
};
const apiKey = Deno.env.get("MISTRAL_API_KEY");
const client = new Mistral({
apiKey: apiKey,
});
const PROCESSING_PROMPT = `
You are a document processing AI. Your task is to process the Markdown text scanned from a document page and return it in a clean and structured format.
The textual page data should only be returned in valid Markdown format. Use proper headings and subheadings to structure the content.
Any images should be included.
Do not return the Markdown as a code block, only as a raw string, without any new lines.
No data or information should ever be removed, it should only be processed and formatted.
There are in-text citations/references in the text, remove them from the text (**but most importantly, keep the reference number in the text. use a <sup></sup> tag**) and put them into an object where the key is the reference number and the value is the text.
The Markdown should be human-readable and well-formatted. The markdown string should properly sanitized and should not break a JSON parser when returned as the final format.
Return the final result as a text object with the following structure (without code block formatting):
"""
<processed markdown text>
---------
{
"citations": {
"1": "Citation text for reference 1",
"2": "Citation text for reference 2",
// ... more citations
}
}
"""
Do not return the text object as a code block, only as a raw string.
`;
Deno.serve(async (req) => {
if (req.method === "OPTIONS") {
console.log("Handling OPTIONS request...");
return new Response(null, {
headers: {
...corsHeaders,
"Access-Control-Allow-Methods": "POST, OPTIONS",
},
});
}
if (req.method === "POST") {
console.log("Processing POST request...");
const { body, writable } = new TransformStream();
const writer = writable.getWriter();
// Set up the SSE response
const headers = new Headers({
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
...corsHeaders,
});
let activeOperations = 0; // Track active operations
let streamClosed = false; // Track if the stream is closed
const sendEvent = async (event, data) => {
if (streamClosed) {
console.warn("Attempted to write to a closed stream.");
return;
}
const message = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`;
console.log("Sending event:", message);
try {
activeOperations++;
await writer.write(new TextEncoder().encode(message));
} catch (error) {
console.error("Error writing to stream:", error);
} finally {
activeOperations--;
}
};
// Start streaming updates
sendEvent("status", {
message: "Initializing...",
});
try {
const supabase = createClient(
Deno.env.get("SUPABASE_URL"),
Deno.env.get("SUPABASE_ANON_KEY")
);
const formData = await req.formData();
const file = formData.get("file");
const accessToken = formData.get("access_token");
const refreshToken = formData.get("refresh_token");
const fileName = file.name;
const uuid = crypto.randomUUID();
const {
data: { user },
error: sessionError,
} = await supabase.auth.setSession({
access_token: accessToken,
refresh_token: refreshToken,
});
if (sessionError) {
console.error("Error setting session:", sessionError);
sendEvent("error", {
message: "Error setting session",
error: sessionError,
});
throw new Error("Setting session failed");
}
console.log("Generated UUID:", uuid);
sendEvent("status", {
message: "Generated UUID",
uuid,
});
console.log("Authenticated user:", user);
sendEvent("status", {
message: "Authenticated user",
user,
});
const { data: storageData, error: storageError } = await supabase.storage
.from("documents")
.upload(`${user.id}/${uuid}.pdf`, file);
if (storageError) {
console.error("Error uploading file to storage:", storageError);
sendEvent("error", {
message: "Error uploading file to storage",
error: storageError,
});
throw new Error("File upload failed");
}
console.log("File uploaded to storage:", storageData);
sendEvent("status", {
message: "File uploaded to storage",
storageData,
});
const { error: docError } = await supabase.from("documents").insert({
id: uuid,
file_name: file.name,
owner: user.id,
raw_file: storageData.id,
is_processing: true,
});
if (docError) {
console.error("Error inserting document record:", docError);
sendEvent("error", {
message: "Error inserting document record",
error: docError,
});
throw new Error("Document record insertion failed");
}
console.log("Document record inserted successfully.");
sendEvent("status", {
message: "Document record inserted successfully",
});
console.log("Uploading file to Mistral...");
sendEvent("status", {
message: "Uploading file to Mistral...",
});
const uploaded_pdf = await client.files.upload({
file: {
fileName,
content: file,
},
purpose: "ocr",
});
console.log("File uploaded to Mistral:", uploaded_pdf);
sendEvent("status", {
message: "File uploaded to Mistral",
uploaded_pdf,
});
const signedUrl = await client.files.getSignedUrl({
fileId: uploaded_pdf.id,
});
console.log("Generated signed URL:", signedUrl);
sendEvent("status", {
message: "Generated signed URL",
signedUrl,
});
console.log("Processing OCR...");
sendEvent("status", {
message: "Processing OCR...",
});
const ocrResponse = await client.ocr.process({
model: "mistral-ocr-latest",
document: {
type: "document_url",
documentUrl: signedUrl.url,
},
});
console.log("OCR response received:", ocrResponse);
sendEvent("status", {
message: "OCR response received",
ocrResponse,
});
const limit = pLimit(2);
const promises = [];
for (const page of ocrResponse.pages) {
console.log("Processing page:", page.index);
sendEvent("status", {
message: `Processing page ${page.index}`,
});
const pagePromise = limit(async () => {
console.log(`Processing page ${page.index} with Mistral...`);
const response = await client.chat.complete({
model: "mistral-small-latest",
messages: [
{
role: "system",
content: [
{
type: "text",
text: PROCESSING_PROMPT,
},
],
},
{
role: "user",
content: [
{
type: "text",
text: page.markdown,
},
],
},
],
});
if (!response.choices) {
console.error("No choices in response for page:", page.index);
sendEvent("error", {
message: `No choices in response for page ${page.index}`,
});
return;
}
console.log("Response received for page:", page.index);
sendEvent("status", {
message: `Response received for page ${page.index}`,
});
const imageData = {};
if (page.images.length > 0) {
console.log(
`Processing ${page.images.length} images for page ${page.index}...`
);
sendEvent("status", {
message: `Processing images for page ${page.index}`,
});
for (const img of page.images) {
imageData[img.id] = img.imageBase64;
}
}
if (response.choices[0].message.content) {
// remove any potential code block formatting from the content
console.log(
`[${page.index}] ${response.choices[0].message.content}`
);
const split =
response.choices[0].message.content.split("---------");
const content = split[0].trim();
const citationsStr = split[1]?.trim() || "{}";
const citations = JSON.parse(citationsStr).citations || {};
console.log("Generating Markdown for page:", page.index);
sendEvent("status", {
message: `Generating Markdown for page ${page.index}`,
});
const markdown = replaceImagesInMarkdown(content, imageData);
return {
...page,
markdown,
citations,
};
} else {
console.error("Message content is undefined for page:", page.index);
sendEvent("error", {
message: `Message content is undefined for page ${page.index}`,
});
}
});
promises.push(pagePromise);
}
console.log("Waiting for all pages to be processed...");
sendEvent("status", {
message: "Waiting for all pages to be processed...",
});
const results = await Promise.all(promises);
console.log("All pages processed. Results:", results);
sendEvent("status", {
message: "All pages processed",
results,
});
const sortedResults = results.sort((a, b) => a.index - b.index);
console.log("Sorted results:", sortedResults);
sendEvent("status", { message: "Sorted results", sortedResults });
const { data, error } = await supabase
.from("documents")
.update({
ocr_data: sortedResults,
is_processing: false,
})
.eq("id", uuid);
if (error) {
console.error("Error updating document record:", error);
sendEvent("error", {
message: "Error updating document record",
error,
});
throw new Error("Document record update failed");
}
console.log("Closing SSE stream...");
} catch (error) {
console.error("Error during processing:", error);
sendEvent("error", {
message: "Error during processing",
error,
});
} finally {
// Wait for all active operations to complete before closing the stream
const interval = setInterval(() => {
if (activeOperations === 0) {
clearInterval(interval);
streamClosed = true;
writer.close();
}
}, 100); // Check every 100ms
}
return new Response(body, {
headers,
});
}
console.error("Method not allowed:", req.method);
return new Response("Method not allowed", {
status: 405,
});
});
function replaceImagesInMarkdown(markdownStr, imagesDict) {
console.log("Replacing images in Markdown...");
for (const [imgName, base64Str] of Object.entries(imagesDict)) {
markdownStr = markdownStr.replace(
new RegExp(`!\\[${imgName}\\]\\(${imgName}\\)`, "g"),
`![${imgName}](${base64Str})`
);
}
console.log("Image replacement complete.");
return markdownStr;
}