build our own worker fanout and wrapper

This commit is contained in:
timothycarambat 2025-02-14 10:31:06 -08:00
parent c463710b0f
commit 64b3210db2
6 changed files with 275 additions and 131 deletions
collector
processSingleFile/convert/asPDF
utils/OCRLoader
server/storage/models

View file

@ -1,39 +0,0 @@
class NodeCanvasFactory {
constructor() {
this.Canvas = null;
}
async init() {
this.Canvas = await import("canvas");
this.Image = this.Canvas.Image;
}
create(
width,
height,
transparent
) {
const canvas = this.Canvas.createCanvas(width, height);
const context = canvas.getContext("2d", { alpha: transparent });
if (transparent) context.clearRect(0, 0, width, height);
return {
canvas,
context,
};
}
reset(canvasAndContext, width, height) {
canvasAndContext.canvas.width = width;
canvasAndContext.canvas.height = height;
}
destroy(canvasAndContext) {
canvasAndContext.canvas.width = 0;
canvasAndContext.canvas.height = 0;
canvasAndContext.canvas = null;
canvasAndContext.context = null;
}
}
module.exports = NodeCanvasFactory;

View file

@ -1,22 +1,12 @@
const fs = require("fs").promises; const fs = require("fs").promises;
const path = require("path");
const NodeCanvasFactory = require("./CanvasFactory");
class PDFLoader { class PDFLoader {
constructor(filePath, { splitPages = true } = {}) { constructor(filePath, { splitPages = true } = {}) {
this.filePath = filePath; this.filePath = filePath;
this.splitPages = splitPages; this.splitPages = splitPages;
this.metadata = {};
} }
/**
* Loads a PDF file and returns an array of documents.
* This function is reserved to parsing for DIGITAL documents - scanned documents are not supported in this function
* For scanned documents, use the `asOCR` function instead.
* @returns {Promise<{pageContent: string, metadata: object}[]>} An array of documents with page content and metadata.
*/
async load() { async load() {
const documents = [];
const buffer = await fs.readFile(this.filePath); const buffer = await fs.readFile(this.filePath);
const { getDocument, version } = await this.getPdfJS(); const { getDocument, version } = await this.getPdfJS();
@ -28,21 +18,15 @@ class PDFLoader {
}).promise; }).promise;
const meta = await pdf.getMetadata().catch(() => null); const meta = await pdf.getMetadata().catch(() => null);
this.metadata = { const documents = [];
source: this.filePath,
pdf: {
version,
info: meta?.info,
metadata: meta?.metadata,
totalPages: pdf.numPages,
},
};
for (let i = 1; i <= pdf.numPages; i += 1) { for (let i = 1; i <= pdf.numPages; i += 1) {
const page = await pdf.getPage(i); const page = await pdf.getPage(i);
const content = await page.getTextContent(); const content = await page.getTextContent();
if (content.items.length === 0) continue; if (content.items.length === 0) {
continue;
}
let lastY; let lastY;
const textItems = []; const textItems = [];
@ -61,88 +45,46 @@ class PDFLoader {
documents.push({ documents.push({
pageContent: text.trim(), pageContent: text.trim(),
metadata: { metadata: {
...this.metadata, source: this.filePath,
pdf: {
version,
info: meta?.info,
metadata: meta?.metadata,
totalPages: pdf.numPages,
},
loc: { pageNumber: i }, loc: { pageNumber: i },
}, },
}); });
} }
if (this.splitPages) return documents; if (this.splitPages) {
if (documents.length === 0) return []; return documents;
}
if (documents.length === 0) {
return [];
}
return [ return [
{ {
pageContent: documents.map((doc) => doc.pageContent).join("\n\n"), pageContent: documents.map((doc) => doc.pageContent).join("\n\n"),
metadata: this.metadata, metadata: {
source: this.filePath,
pdf: {
version,
info: meta?.info,
metadata: meta?.metadata,
totalPages: pdf.numPages,
},
},
}, },
]; ];
} }
/**
* Loads a PDF file and returns an array of documents.
* This function is reserved to parsing for SCANNED documents - digital documents are not supported in this function
* For digital documents, use the `load` function instead.
* @returns {Promise<{pageContent: string, metadata: object}[]>} An array of documents with page content and metadata.
*/
async asOCR() {
const documents = [];
const pdfjs = await import("pdf-parse/lib/pdf.js/v2.0.550/build/pdf.js");
const buffer = await fs.readFile(this.filePath);
const canvasFactory = new NodeCanvasFactory();
await canvasFactory.init();
global.Image = canvasFactory.Image;
const pdfDocument = await pdfjs.getDocument({
data: new Uint8Array(buffer),
canvasFactory,
}).promise;
async function getPageAsBuffer(pageNumber, scale = 1) {
const page = await pdfDocument.getPage(pageNumber);
const viewport = page.getViewport(scale);
const { canvas, context } = canvasFactory.create(
viewport.width,
viewport.height,
false
);
await page.render({
canvasFactory,
canvasContext: context,
viewport,
}).promise;
return canvas.toBuffer();
}
const { createWorker, setLogging, OEM } = require("tesseract.js");
setLogging(false);
const worker = await createWorker("eng", OEM.LSTM_ONLY, {
cachePath: path.resolve(__dirname, `../../../../storage/tmp`),
});
for (let i = 1; i <= pdfDocument.numPages; i += 1) {
const image = await getPageAsBuffer(i, 5);
const { data } = await worker.recognize(image, {}, "text");
documents.push({
pageContent: data.text,
metadata: {
...this.metadata,
loc: { pageNumber: i },
},
});
}
return documents;
}
async getPdfJS() { async getPdfJS() {
try { try {
const pdfjs = await import("pdf-parse/lib/pdf.js/v1.10.100/build/pdf.js"); const pdfjs = await import("pdf-parse/lib/pdf.js/v1.10.100/build/pdf.js");
return { return { getDocument: pdfjs.getDocument, version: pdfjs.version };
getDocument: pdfjs.getDocument,
version: pdfjs.version,
};
} catch (e) { } catch (e) {
console.error(e); console.error(e);
throw new Error( throw new Error(

View file

@ -7,6 +7,7 @@ const {
const { tokenizeString } = require("../../../utils/tokenizer"); const { tokenizeString } = require("../../../utils/tokenizer");
const { default: slugify } = require("slugify"); const { default: slugify } = require("slugify");
const PDFLoader = require("./PDFLoader"); const PDFLoader = require("./PDFLoader");
const OCRLoader = require("../../../utils/OCRLoader");
async function asPdf({ fullFilePath = "", filename = "" }) { async function asPdf({ fullFilePath = "", filename = "" }) {
const pdfLoader = new PDFLoader(fullFilePath, { const pdfLoader = new PDFLoader(fullFilePath, {
@ -19,9 +20,9 @@ async function asPdf({ fullFilePath = "", filename = "" }) {
if (docs.length === 0) { if (docs.length === 0) {
console.log( console.log(
`[PDFLoader] No text content found for ${filename}. Attempting OCR parse.` `[asPDF] No text content found for ${filename}. Will attempt OCR parse.`
); );
docs = await pdfLoader.asOCR(); docs = await new OCRLoader().ocrPDF(fullFilePath);
} }
for (const doc of docs) { for (const doc of docs) {
@ -35,9 +36,7 @@ async function asPdf({ fullFilePath = "", filename = "" }) {
} }
if (!pageContent.length) { if (!pageContent.length) {
console.error( console.error(`[asPDF] Resulting text content was empty for ${filename}.`);
`[PDFLoader] Resulting text content was empty for ${filename}.`
);
trashFile(fullFilePath); trashFile(fullFilePath);
return { return {
success: false, success: false,

View file

@ -0,0 +1,52 @@
/**
* This is a factory for creating a canvas and context in Node.js
* it is used to create a canvas and context for the PDFLoader for turning the PDF into an image
* so we can later use the image to extract text from the PDF.
*/
class NodeCanvasFactory {
constructor() {
this.CanvasModule = null;
}
async init() {
this.CanvasModule = await import("canvas");
this.Image = this.CanvasModule.Image;
}
/**
* Creates a canvas and context for the PDFLoader
* @param {number} width - The width of the canvas
* @param {number} height - The height of the canvas
* @param {boolean} transparent - Whether the canvas is transparent
* @returns {{canvas: HTMLCanvasElement, context: CanvasRenderingContext2D}} - The canvas and context
*/
create(width, height, transparent = false) {
const canvas = this.CanvasModule.createCanvas(width, height);
const context = canvas.getContext("2d", { alpha: transparent });
if (transparent) context.clearRect(0, 0, width, height);
return {
canvas,
context,
};
}
/**
* Required for the PDFLoader pdfjs interation - do not remove or use directly.
*/
reset(canvasAndContext, width, height) {
canvasAndContext.canvas.width = width;
canvasAndContext.canvas.height = height;
}
/**
* Required for the PDFLoader pdfjs interation - do not remove or use directly.
*/
destroy(canvasAndContext) {
canvasAndContext.canvas.width = 0;
canvasAndContext.canvas.height = 0;
canvasAndContext.canvas = null;
canvasAndContext.context = null;
}
}
module.exports = NodeCanvasFactory;

View file

@ -0,0 +1,190 @@
const fs = require("fs");
const os = require("os");
const path = require("path");
const NodeCanvasFactory = require("./CanvasFactory");
class OCRLoader {
constructor() {
this.cacheDir = path.resolve(
process.env.STORAGE_DIR
? path.resolve(process.env.STORAGE_DIR, `models`, `tesseract`)
: path.resolve(__dirname, `../../../server/storage/models/tesseract`)
);
}
log(text, ...args) {
console.log(`\x1b[36m[OCRLoader]\x1b[0m ${text}`, ...args);
}
/**
* Loads a PDF file and returns an array of documents.
* This function is reserved to parsing for SCANNED documents - digital documents are not supported in this function
* @returns {Promise<{pageContent: string, metadata: object}[]>} An array of documents with page content and metadata.
*/
async ocrPDF(
filePath,
{ maxExecutionTime = 300_000, batchSize = 10, maxWorkers = null } = {}
) {
if (
!filePath ||
!fs.existsSync(filePath) ||
!fs.statSync(filePath).isFile()
) {
this.log(`File ${filePath} does not exist. Skipping OCR.`);
return [];
}
const documentTitle = path.basename(filePath);
this.log(`Starting OCR of ${documentTitle}`);
const pdfjs = await import("pdf-parse/lib/pdf.js/v2.0.550/build/pdf.js");
let buffer = fs.readFileSync(filePath);
const canvasFactory = new NodeCanvasFactory();
await canvasFactory.init();
global.Image = canvasFactory.Image;
const pdfDocument = await pdfjs.getDocument({
data: new Uint8Array(buffer),
canvasFactory,
}).promise;
buffer = null;
const documents = [];
const meta = await pdfDocument.getMetadata().catch(() => null);
const metadata = {
source: filePath,
pdf: {
version: "v2.0.550",
info: meta?.info,
metadata: meta?.metadata,
totalPages: pdfDocument.numPages,
},
};
async function getPageAsBuffer(pageNumber, scale = 1) {
let canvas = null;
let context = null;
try {
const page = await pdfDocument.getPage(pageNumber);
const viewport = page.getViewport(scale);
({ canvas, context } = canvasFactory.create(
viewport.width,
viewport.height
));
await page.render({
canvasFactory,
canvasContext: context,
viewport,
}).promise;
return canvas.toBuffer();
} catch (e) {
this.log(`Error getting page as buffer: ${e.message}`);
return null;
} finally {
canvas = null;
context = null;
}
}
const { createWorker, OEM } = require("tesseract.js");
const BATCH_SIZE = batchSize;
const MAX_EXECUTION_TIME = maxExecutionTime;
const NUM_WORKERS = maxWorkers ?? Math.min(os.cpus().length, 4);
const totalPages = pdfDocument.numPages;
const workerPool = await Promise.all(
Array(NUM_WORKERS)
.fill(0)
.map(() =>
createWorker("eng", OEM.LSTM_ONLY, {
cachePath: this.cacheDir,
})
)
);
const startTime = Date.now();
try {
this.log("Bootstrapping OCR completed successfully!", {
MAX_EXECUTION_TIME_MS: MAX_EXECUTION_TIME,
BATCH_SIZE,
MAX_CONCURRENT_WORKERS: NUM_WORKERS,
TOTAL_PAGES: totalPages,
});
const timeoutPromise = new Promise((_, reject) => {
setTimeout(() => {
reject(
new Error(
`OCR job took too long to complete (${
MAX_EXECUTION_TIME / 1000
} seconds)`
)
);
}, MAX_EXECUTION_TIME);
});
const processPages = async () => {
for (
let startPage = 1;
startPage <= totalPages;
startPage += BATCH_SIZE
) {
const endPage = Math.min(startPage + BATCH_SIZE - 1, totalPages);
const pageNumbers = Array.from(
{ length: endPage - startPage + 1 },
(_, i) => startPage + i
);
this.log(`Working on pages ${startPage} - ${endPage}`);
const pageQueue = [...pageNumbers];
const results = [];
const workerPromises = workerPool.map(async (worker, workerIndex) => {
while (pageQueue.length > 0) {
const pageNum = pageQueue.shift();
this.log(
`\x1b[34m[Worker ${
workerIndex + 1
}]\x1b[0m assigned pg${pageNum}`
);
const imageBuffer = await getPageAsBuffer(pageNum, 5);
const { data } = await worker.recognize(imageBuffer, {}, "text");
this.log(
`\x1b[34m[Worker ${
workerIndex + 1
}]\x1b[0m completed pg${pageNum}`
);
results.push({
pageContent: data.text,
metadata: {
...metadata,
loc: { pageNumber: pageNum },
},
});
}
});
await Promise.all(workerPromises);
documents.push(
...results.sort(
(a, b) => a.metadata.loc.pageNumber - b.metadata.loc.pageNumber
)
);
}
return documents;
};
await Promise.race([timeoutPromise, processPages()]);
} catch (e) {
this.log(`Error: ${e.message}`);
} finally {
global.Image = undefined;
await Promise.all(workerPool.map((worker) => worker.terminate()));
}
this.log(`Completed OCR of ${documentTitle}!`, {
documentsParsed: documents.length,
totalPages: totalPages,
executionTime: `${((Date.now() - startTime) / 1000).toFixed(2)}s`,
});
return documents;
}
}
module.exports = OCRLoader;

View file

@ -7,4 +7,4 @@ novita
mixedbread-ai* mixedbread-ai*
gemini gemini
togetherAi togetherAi
ocr tesseract