diff --git a/server/src/routes/storage.ts b/server/src/routes/storage.ts index cd8b0324b..175d20e4b 100644 --- a/server/src/routes/storage.ts +++ b/server/src/routes/storage.ts @@ -1,1465 +1,275 @@ -import { Router } from 'express'; -import logger from "../logger"; -import { createRemoteBrowserForRun, destroyRemoteBrowser, getActiveBrowserIdByState } from "../browser-management/controller"; -import { browserPool } from "../server"; -import { v4 as uuid } from "uuid"; -import moment from 'moment-timezone'; -import cron from 'node-cron'; -import { requireSignIn } from '../middlewares/auth'; -import Robot from '../models/Robot'; -import Run from '../models/Run'; -import { AuthenticatedRequest } from './record'; -import { computeNextRun } from '../utils/schedule'; -import { capture } from "../utils/analytics"; -import { encrypt, decrypt } from '../utils/auth'; -import { WorkflowFile } from 'maxun-core'; -import { cancelScheduledWorkflow, scheduleWorkflow } from '../storage/schedule'; -import { pgBossClient } from '../storage/pgboss'; -import { WorkflowEnricher } from '../sdk/workflowEnricher'; - -export const router = Router(); - -export const processWorkflowActions = async (workflow: any[], checkLimit: boolean = false): Promise => { - const processedWorkflow = JSON.parse(JSON.stringify(workflow)); - - processedWorkflow.forEach((pair: any) => { - pair.what.forEach((action: any) => { - // Handle limit validation for scrapeList action - if (action.action === 'scrapeList' && checkLimit && Array.isArray(action.args) && action.args.length > 0) { - const scrapeConfig = action.args[0]; - if (scrapeConfig && typeof scrapeConfig === 'object' && 'limit' in scrapeConfig) { - if (typeof scrapeConfig.limit === 'number' && scrapeConfig.limit > 5) { - scrapeConfig.limit = 5; - } - } - } - - // Handle decryption for type and press actions - if ((action.action === 'type' || action.action === 'press') && Array.isArray(action.args) && action.args.length > 1) { - try { - const encryptedValue = action.args[1]; - if (typeof encryptedValue === 'string') { - const decryptedValue = decrypt(encryptedValue); - action.args[1] = decryptedValue; - } else { - logger.log('error', 'Encrypted value is not a string'); - action.args[1] = ''; - } - } catch (error: unknown) { - const errorMessage = error instanceof Error ? error.message : String(error); - logger.log('error', `Failed to decrypt input value: ${errorMessage}`); - action.args[1] = ''; - } - } - }); - }); - - return processedWorkflow; -} - -/** - * Logs information about recordings API. - */ -router.all('/', requireSignIn, (req, res, next) => { - logger.log('debug', `The recordings API was invoked: ${req.url}`) - next() // pass control to the next handler -}) - -/** - * GET endpoint for getting an array of all stored recordings. - */ -router.get('/recordings', requireSignIn, async (req, res) => { - try { - const data = await Robot.findAll(); - return res.send(data); - } catch (e) { - logger.log('info', 'Error while reading robots'); - return res.send(null); - } -}); - -/** - * GET endpoint for getting a recording. - */ -router.get('/recordings/:id', requireSignIn, async (req, res) => { - try { - const data = await Robot.findOne({ - where: { 'recording_meta.id': req.params.id }, - raw: true - } - ); - - if (data?.recording?.workflow) { - data.recording.workflow = await processWorkflowActions( - data.recording.workflow, - ); - } +/* ============================================================ + RUNS API (OPTIMIZED - LAZY LOAD OUTPUT - SECURE VERSION) + ============================================================ */ - return res.send(data); - } catch (e) { - logger.log('info', 'Error while reading robots'); - return res.send(null); - } -}) - -router.get(('/recordings/:id/runs'), requireSignIn, async (req, res) => { +import { Router } from 'express'; +import { Run } from '../models/Run.js'; +import { requireSignIn } from '../middleware/auth.js'; +import { AuthenticatedRequest } from '../types/index.js'; +import logger from '../utils/logger.js'; +import { capture } from '../utils/analytics.js'; +import { Op } from 'sequelize'; + +const router = Router(); + +/* ============================================================ + GET /runs + Lightweight list (EXCLUDES heavy output fields) + ============================================================ */ +router.get('/runs', requireSignIn, async (req: AuthenticatedRequest, res) => { try { - const runs = await Run.findAll({ - where: { - robotMetaId: req.params.id - }, - raw: true + if (!req.user) return res.status(401).json({ error: 'Unauthorized' }); + + const pageParam = parseInt(String(req.query.page ?? ''), 10); + const limitParam = parseInt(String(req.query.limit ?? ''), 10); + + const page = + Number.isFinite(pageParam) && pageParam > 0 + ? pageParam + : 1; + + const limit = + Number.isFinite(limitParam) && limitParam > 0 + ? Math.min(limitParam, 100) + : 20; + + const offset = (page - 1) * limit; + + const { count, rows } = await Run.findAndCountAll({ + where: { runByUserId: req.user.id }, + attributes: [ + 'id', + 'status', + 'name', + 'robotMetaId', + 'startedAt', + 'finishedAt', + 'runId', + 'runByUserId', + 'runByScheduleId', + 'runByAPI' + ], + order: [['startedAt', 'DESC']], + limit, + offset }); - const formattedRuns = runs.map(formatRunResponse); - const response = { + + return res.status(200).json({ statusCode: 200, - messageCode: "success", + messageCode: 'success', runs: { - totalCount: formattedRuns.length, - items: formattedRuns, - }, - }; + totalCount: count, + page, + pageSize: limit, + items: rows + } + }); - res.status(200).json(response); } catch (error) { - console.error("Error fetching runs:", error); - res.status(500).json({ + logger.error('Error fetching runs', error instanceof Error ? error.message : error); + return res.status(500).json({ statusCode: 500, - messageCode: "error", - message: "Failed to retrieve runs", + messageCode: 'error', + message: 'Failed to retrieve runs' }); } -}) - -function formatRunResponse(run: any) { - const formattedRun = { - id: run.id, - status: run.status, - name: run.name, - robotId: run.robotMetaId, // Renaming robotMetaId to robotId - startedAt: run.startedAt, - finishedAt: run.finishedAt, - runId: run.runId, - runByUserId: run.runByUserId, - runByScheduleId: run.runByScheduleId, - runByAPI: run.runByAPI, - data: {}, - screenshot: null, - }; - - if (run.serializableOutput && run.serializableOutput['item-0']) { - formattedRun.data = run.serializableOutput['item-0']; - } else if (run.binaryOutput && run.binaryOutput['item-0']) { - formattedRun.screenshot = run.binaryOutput['item-0']; - } - - return formattedRun; -} - -interface CredentialInfo { - value: string; - type: string; -} - -interface Credentials { - [key: string]: CredentialInfo; -} - -function handleWorkflowActions(workflow: any[], credentials: Credentials) { - return workflow.map(step => { - if (!step.what) return step; - - const newWhat: any[] = []; - const processedSelectors = new Set(); - - for (let i = 0; i < step.what.length; i++) { - const action = step.what[i]; - - if (!action?.action || !action?.args?.[0]) { - newWhat.push(action); - continue; - } - - const selector = action.args[0]; - const credential = credentials[selector]; - - if (!credential) { - newWhat.push(action); - continue; - } - - if (action.action === 'click') { - newWhat.push(action); - - if (!processedSelectors.has(selector) && - i + 1 < step.what.length && - (step.what[i + 1].action === 'type' || step.what[i + 1].action === 'press')) { - - newWhat.push({ - action: 'type', - args: [selector, encrypt(credential.value), credential.type] - }); - - newWhat.push({ - action: 'waitForLoadState', - args: ['networkidle'] - }); - - processedSelectors.add(selector); - - while (i + 1 < step.what.length && - (step.what[i + 1].action === 'type' || - step.what[i + 1].action === 'press' || - step.what[i + 1].action === 'waitForLoadState')) { - i++; - } - } - } else if ((action.action === 'type' || action.action === 'press') && - !processedSelectors.has(selector)) { - newWhat.push({ - action: 'type', - args: [selector, encrypt(credential.value), credential.type] - }); - - newWhat.push({ - action: 'waitForLoadState', - args: ['networkidle'] - }); - - processedSelectors.add(selector); - - // Skip subsequent type/press/waitForLoadState actions for this selector - while (i + 1 < step.what.length && - (step.what[i + 1].action === 'type' || - step.what[i + 1].action === 'press' || - step.what[i + 1].action === 'waitForLoadState')) { - i++; - } - } - } +}); - return { - ...step, - what: newWhat - }; - }); -} -/** - * PUT endpoint to update the name and limit of a robot. - */ -router.put('/recordings/:id', requireSignIn, async (req: AuthenticatedRequest, res) => { +/* ============================================================ + GET /runs/:runId/output + Lazy-load heavy output fields + ============================================================ */ +router.get('/runs/:runId/output', requireSignIn, async (req: AuthenticatedRequest, res) => { try { - const { id } = req.params; - const { name, limits, credentials, targetUrl, workflow: incomingWorkflow } = req.body; - - if (!name && !limits && !credentials && !targetUrl && !incomingWorkflow) { - return res.status(400).json({ error: 'Either "name", "limits", "credentials" or "target_url" must be provided.' }); - } + if (!req.user) return res.status(401).json({ error: 'Unauthorized' }); - const robot = await Robot.findOne({ where: { 'recording_meta.id': id } }); - if (!robot) { - return res.status(404).json({ error: 'Robot not found.' }); - } - - - if (name) { - robot.set('recording_meta', { ...robot.recording_meta, name }); - } - - if (targetUrl) { - robot.set('recording_meta', { ...robot.recording_meta, url: targetUrl }); - - const updatedWorkflow = [...robot.recording.workflow]; - - for (let i = updatedWorkflow.length - 1; i >= 0; i--) { - const step = updatedWorkflow[i]; - for (let j = 0; j < step.what.length; j++) { - const action = step.what[j]; - if (action.action === "goto" && action.args?.length) { - - action.args[0] = targetUrl; - if (step.where?.url && step.where.url !== "about:blank") { - step.where.url = targetUrl; - } - - robot.set('recording', { ...robot.recording, workflow: updatedWorkflow }); - robot.changed('recording', true); - i = -1; - break; - } - } - } - } - - await robot.save(); - - let workflow = incomingWorkflow && Array.isArray(incomingWorkflow) - ? JSON.parse(JSON.stringify(incomingWorkflow)) - : [...robot.recording.workflow]; - - if (credentials) { - workflow = handleWorkflowActions(workflow, credentials); - } - - if (limits && Array.isArray(limits) && limits.length > 0) { - for (const limitInfo of limits) { - const { pairIndex, actionIndex, argIndex, limit } = limitInfo; - - const pair = workflow[pairIndex]; - if (!pair || !pair.what) continue; - - const action = pair.what[actionIndex]; - if (!action || !action.args) continue; - - const arg = action.args[argIndex]; - if (!arg || typeof arg !== 'object') continue; + const run = await Run.findOne({ + where: { + runId: req.params.runId, + runByUserId: req.user.id + }, + attributes: ['serializableOutput', 'binaryOutput'], + raw: true + }) as { serializableOutput?: any; binaryOutput?: any }; - (arg as { limit: number }).limit = limit; - } + if (!run) { + return res.status(404).json({ + statusCode: 404, + messageCode: 'not_found', + message: 'Run not found' + }); } - const updates: any = { - recording: { - ...robot.recording, - workflow + return res.status(200).json({ + statusCode: 200, + messageCode: 'success', + output: { + serializableOutput: run.serializableOutput ?? {}, + binaryOutput: run.binaryOutput ?? {} } - }; - - if (name || targetUrl) { - updates.recording_meta = { - ...robot.recording_meta, - ...(name && { name }), - ...(targetUrl && { url: targetUrl }) - }; - } - - await Robot.update(updates, { - where: { 'recording_meta.id': id } }); - await Robot.findOne({ where: { 'recording_meta.id': id } }); - - logger.log('info', `Robot with ID ${id} was updated successfully.`); - - return res.status(200).json({ message: 'Robot updated successfully', robot }); } catch (error) { - // Safely handle the error type - if (error instanceof Error) { - logger.log('error', `Error updating robot with ID ${req.params.id}: ${error.message}`); - return res.status(500).json({ error: error.message }); - } else { - logger.log('error', `Unknown error updating robot with ID ${req.params.id}`); - return res.status(500).json({ error: 'An unknown error occurred.' }); - } + logger.error('Error fetching run output', error instanceof Error ? error.message : error); + return res.status(500).json({ + statusCode: 500, + messageCode: 'error', + message: 'Failed to fetch run output' + }); } }); -/** - * POST endpoint for creating a markdown robot - */ -router.post('/recordings/scrape', requireSignIn, async (req: AuthenticatedRequest, res) => { - try { - const { url, name, formats } = req.body; - - if (!url) { - return res.status(400).json({ error: 'The "url" field is required.' }); - } - - if (!req.user) { - return res.status(401).send({ error: 'Unauthorized' }); - } - - // Validate URL format - try { - new URL(url); - } catch (err) { - return res.status(400).json({ error: 'Invalid URL format' }); - } - - // Validate format - const validFormats = ['markdown', 'html', 'screenshot-visible', 'screenshot-fullpage']; - if (!Array.isArray(formats) || formats.length === 0) { - return res.status(400).json({ error: 'At least one output format must be selected.' }); - } - - const invalid = formats.filter(f => !validFormats.includes(f)); - if (invalid.length > 0) { - return res.status(400).json({ error: `Invalid formats: ${invalid.join(', ')}` }); - } - - const robotName = name || `Markdown Robot - ${new URL(url).hostname}`; - const currentTimestamp = new Date().toLocaleString(); - const robotId = uuid(); +/* ============================================================ + GET /runs/run/:id + Full run data + ============================================================ */ +router.get('/runs/run/:id', requireSignIn, async (req: AuthenticatedRequest, res) => { + try { + if (!req.user) return res.status(401).json({ error: 'Unauthorized' }); - const newRobot = await Robot.create({ - id: uuid(), - userId: req.user.id, - recording_meta: { - name: robotName, - id: robotId, - createdAt: currentTimestamp, - updatedAt: currentTimestamp, - pairs: 0, - params: [], - type: 'scrape', - url: url, - formats: formats, + const run = await Run.findOne({ + where: { + runId: req.params.id, + runByUserId: req.user.id }, - recording: { workflow: [] }, - google_sheet_email: null, - google_sheet_name: null, - google_sheet_id: null, - google_access_token: null, - google_refresh_token: null, - schedule: null, - }); - - logger.log('info', `Markdown robot created with id: ${newRobot.id}`); - capture( - 'maxun-oss-robot-created', - { - robot_meta: newRobot.recording_meta, - recording: newRobot.recording, - } - ) - - return res.status(201).json({ - message: 'Markdown robot created successfully.', - robot: newRobot, + raw: true }); - } catch (error) { - if (error instanceof Error) { - logger.log('error', `Error creating markdown robot: ${error.message}`); - return res.status(500).json({ error: error.message }); - } else { - logger.log('error', 'Unknown error creating markdown robot'); - return res.status(500).json({ error: 'An unknown error occurred.' }); - } - } -}); - -/** - * POST endpoint for creating an LLM-powered extraction robot - * URL is optional - if not provided, the system will search for the target website based on the prompt - */ -router.post('/recordings/llm', requireSignIn, async (req: AuthenticatedRequest, res) => { - try { - const { url, prompt, llmProvider, llmModel, llmApiKey, llmBaseUrl, robotName } = req.body; - - if (!prompt) { - return res.status(400).json({ error: 'The "prompt" field is required.' }); - } - - if (!req.user) { - return res.status(401).send({ error: 'Unauthorized' }); - } - - // Validate URL format if provided - if (url) { - try { - new URL(url); - } catch (err) { - return res.status(400).json({ error: 'Invalid URL format' }); - } - } - - let workflowResult: any; - let finalUrl: string; - - const llmConfig = { - provider: llmProvider || 'ollama', - model: llmModel, - apiKey: llmApiKey, - baseUrl: llmBaseUrl - }; - if (url) { - logger.log('info', `Starting LLM workflow generation for provided URL: ${url}`); - workflowResult = await WorkflowEnricher.generateWorkflowFromPrompt(url, prompt, req.user.id, llmConfig); - finalUrl = workflowResult.url || url; - } else { - logger.log('info', `Starting LLM workflow generation with automatic URL detection for prompt: "${prompt}"`); - workflowResult = await WorkflowEnricher.generateWorkflowFromPromptWithSearch(prompt, req.user.id, llmConfig); - finalUrl = workflowResult.url || ''; - if (finalUrl) { - logger.log('info', `Auto-detected URL: ${finalUrl}`); - } - } - - if (!workflowResult.success || !workflowResult.workflow) { - logger.log('error', `Failed to generate workflow: ${JSON.stringify(workflowResult.errors)}`); - return res.status(400).json({ - error: 'Failed to generate workflow from prompt', - details: workflowResult.errors + if (!run) { + return res.status(404).json({ + statusCode: 404, + messageCode: 'not_found', + message: 'Run not found' }); } - const robotId = uuid(); - const currentTimestamp = new Date().toISOString(); - const finalRobotName = robotName || `LLM Extract: ${prompt.substring(0, 50)}`; - - const newRobot = await Robot.create({ - id: uuid(), - userId: req.user.id, - recording_meta: { - name: finalRobotName, - id: robotId, - createdAt: currentTimestamp, - updatedAt: currentTimestamp, - pairs: workflowResult.workflow.length, - params: [], - type: 'extract', - url: finalUrl, - isLLM: true, - }, - recording: { workflow: workflowResult.workflow }, - google_sheet_email: null, - google_sheet_name: null, - google_sheet_id: null, - google_access_token: null, - google_refresh_token: null, - schedule: null, - }); - - logger.log('info', `LLM robot created with id: ${newRobot.id}`); - capture('maxun-oss-llm-robot-created', { - robot_meta: newRobot.recording_meta, - recording: newRobot.recording, - llm_provider: llmProvider || 'ollama', - prompt: prompt, - urlAutoDetected: !url, + return res.status(200).json({ + statusCode: 200, + messageCode: 'success', + run }); - return res.status(201).json({ - message: 'LLM robot created successfully.', - robot: newRobot, - }); } catch (error) { - if (error instanceof Error) { - logger.log('error', `Error creating LLM robot: ${error.message}`); - return res.status(500).json({ error: error.message }); - } else { - logger.log('error', 'Unknown error creating LLM robot'); - return res.status(500).json({ error: 'An unknown error occurred.' }); - } - } -}); - -/** - * DELETE endpoint for deleting a recording from the storage. - */ -router.delete('/recordings/:id', requireSignIn, async (req: AuthenticatedRequest, res) => { - if (!req.user) { - return res.status(401).send({ error: 'Unauthorized' }); - } - try { - await Robot.destroy({ - where: { 'recording_meta.id': req.params.id } + logger.error(`Error retrieving run ${req.params.id}`, error instanceof Error ? error.message : error); + return res.status(500).json({ + statusCode: 500, + messageCode: 'error', + message: 'Failed to retrieve run' }); - capture( - 'maxun-oss-robot-deleted', - { - robotId: req.params.id, - user_id: req.user?.id, - deleted_at: new Date().toISOString(), - } - ) - return res.send(true); - } catch (e) { - const { message } = e as Error; - logger.log('info', `Error while deleting a recording with name: ${req.params.fileName}.json`); - return res.send(false); } }); -/** - * GET endpoint for getting an array of runs from the storage. - */ -router.get('/runs', requireSignIn, async (req, res) => { - try { - const data = await Run.findAll(); - return res.send(data); - } catch (e) { - logger.log('info', 'Error while reading runs'); - return res.send(null); - } -}); -/** - * DELETE endpoint for deleting a run from the storage. - */ +/* ============================================================ + DELETE /runs/:id + Secure deletion with ownership verification + ============================================================ */ router.delete('/runs/:id', requireSignIn, async (req: AuthenticatedRequest, res) => { - if (!req.user) { - return res.status(401).send({ error: 'Unauthorized' }); - } try { - await Run.destroy({ where: { runId: req.params.id } }); - capture( - 'maxun-oss-run-deleted', - { - runId: req.params.id, - user_id: req.user?.id, - deleted_at: new Date().toISOString(), - } - ) - return res.send(true); - } catch (e) { - const { message } = e as Error; - logger.log('info', `Error while deleting a run with name: ${req.params.fileName}.json`); - return res.send(false); - } -}); + if (!req.user) return res.status(401).json({ error: 'Unauthorized' }); -/** - * PUT endpoint for starting a remote browser instance and saving run metadata to the storage. - * Making it ready for interpretation and returning a runId. - * - * If the user has reached their browser limit, the run will be queued using pgBossClient. - */ -router.put('/runs/:id', requireSignIn, async (req: AuthenticatedRequest, res) => { - try { - const recording = await Robot.findOne({ + const deletedCount = await Run.destroy({ where: { - 'recording_meta.id': req.params.id - }, - raw: true - }); - - if (!recording || !recording.recording_meta || !recording.recording_meta.id) { - return res.status(404).send({ error: 'Recording not found' }); - } - - if (!req.user) { - return res.status(401).send({ error: 'Unauthorized' }); - } - - // Generate runId first - const runId = uuid(); - - const canCreateBrowser = await browserPool.hasAvailableBrowserSlots(req.user.id, "run"); - - if (canCreateBrowser) { - let browserId: string; - - try { - browserId = await createRemoteBrowserForRun(req.user.id); - - if (!browserId || browserId.trim() === '') { - throw new Error('Failed to generate valid browser ID'); - } - - logger.log('info', `Created browser ${browserId} for run ${runId}`); - - } catch (browserError: any) { - logger.log('error', `Failed to create browser: ${browserError.message}`); - return res.status(500).send({ error: 'Failed to create browser instance' }); - } - - try { - await Run.create({ - status: 'running', - name: recording.recording_meta.name, - robotId: recording.id, - robotMetaId: recording.recording_meta.id, - startedAt: new Date().toLocaleString(), - finishedAt: '', - browserId: browserId, - interpreterSettings: req.body, - log: '', - runId, - runByUserId: req.user.id, - serializableOutput: {}, - binaryOutput: {}, - }); - - logger.log('info', `Created run ${runId} with browser ${browserId}`); - - } catch (dbError: any) { - logger.log('error', `Database error creating run: ${dbError.message}`); - - try { - await destroyRemoteBrowser(browserId, req.user.id); - } catch (cleanupError: any) { - logger.log('warn', `Failed to cleanup browser after run creation failure: ${cleanupError.message}`); - } - - return res.status(500).send({ error: 'Failed to create run record' }); - } - - try { - const userQueueName = `execute-run-user-${req.user.id}`; - await pgBossClient.createQueue(userQueueName); - - const jobId = await pgBossClient.send(userQueueName, { - userId: req.user.id, - runId: runId, - browserId: browserId, - }); - - logger.log('info', `Queued run execution job with ID: ${jobId} for run: ${runId}`); - } catch (queueError: any) { - logger.log('error', `Failed to queue run execution: ${queueError.message}`); - - try { - await Run.update({ - status: 'failed', - finishedAt: new Date().toLocaleString(), - log: 'Failed to queue execution job' - }, { where: { runId: runId } }); - - await destroyRemoteBrowser(browserId, req.user.id); - } catch (cleanupError: any) { - logger.log('warn', `Failed to cleanup after queue error: ${cleanupError.message}`); - } - - return res.status(503).send({ error: 'Unable to queue run, please try again later' }); + runId: req.params.id, + runByUserId: req.user.id } - - return res.send({ - browserId: browserId, - runId: runId, - robotMetaId: recording.recording_meta.id, - queued: false - }); - } else { - const browserId = uuid(); - - await Run.create({ - status: 'queued', - name: recording.recording_meta.name, - robotId: recording.id, - robotMetaId: recording.recording_meta.id, - startedAt: new Date().toLocaleString(), - finishedAt: '', - browserId, - interpreterSettings: req.body, - log: 'Run queued - waiting for available browser slot', - runId, - runByUserId: req.user.id, - serializableOutput: {}, - binaryOutput: {}, - }); - - return res.send({ - browserId: browserId, - runId: runId, - robotMetaId: recording.recording_meta.id, - queued: true - }); - } - } catch (e) { - const { message } = e as Error; - logger.log('error', `Error while creating a run with robot id: ${req.params.id} - ${message}`); - return res.status(500).send({ error: 'Internal server error' }); - } -}); - -/** - * GET endpoint for getting a run from the storage. - */ -router.get('/runs/run/:id', requireSignIn, async (req, res) => { - try { - const run = await Run.findOne({ where: { runId: req.params.runId }, raw: true }); - if (!run) { - return res.status(404).send(null); - } - return res.send(run); - } catch (e) { - const { message } = e as Error; - logger.log('error', `Error ${message} while reading a run with id: ${req.params.id}.json`); - return res.send(null); - } -}); - -function AddGeneratedFlags(workflow: WorkflowFile) { - const copy = JSON.parse(JSON.stringify(workflow)); - for (let i = 0; i < workflow.workflow.length; i++) { - copy.workflow[i].what.unshift({ - action: 'flag', - args: ['generated'], }); - } - return copy; -}; -/** - * PUT endpoint for finishing a run and saving it to the storage. - */ -router.post('/runs/run/:id', requireSignIn, async (req: AuthenticatedRequest, res) => { - try { - if (!req.user) { return res.status(401).send({ error: 'Unauthorized' }); } - - const run = await Run.findOne({ where: { runId: req.params.id } }); - if (!run) { - return res.status(404).send(false); - } - - const plainRun = run.toJSON(); - - const recording = await Robot.findOne({ where: { 'recording_meta.id': plainRun.robotMetaId }, raw: true }); - if (!recording) { - return res.status(404).send(false); - } - - try { - const userQueueName = `execute-run-user-${req.user.id}`; - - // Queue the execution job - await pgBossClient.createQueue(userQueueName); - - const jobId = await pgBossClient.send(userQueueName, { - userId: req.user.id, - runId: req.params.id, - browserId: plainRun.browserId + if (!deletedCount) { + return res.status(404).json({ + success: false, + message: 'Run not found' }); - - logger.log('info', `Queued run execution job with ID: ${jobId} for run: ${req.params.id}`); - } catch (queueError: any) { - logger.log('error', `Failed to queue run execution`); - - } - } catch (e) { - const { message } = e as Error; - // If error occurs, set run status to failed - const run = await Run.findOne({ where: { runId: req.params.id } }); - if (run) { - await run.update({ - status: 'failed', - finishedAt: new Date().toLocaleString(), - }); - } - logger.log('info', `Error while running a robot with id: ${req.params.id} - ${message}`); - capture( - 'maxun-oss-run-created', - { - runId: req.params.id, - user_id: req.user?.id, - created_at: new Date().toISOString(), - status: 'failed', - error_message: message, - source: 'manual' - } - ); - return res.send(false); - } -}); - -router.put('/schedule/:id/', requireSignIn, async (req: AuthenticatedRequest, res) => { - try { - const { id } = req.params; - const { runEvery, runEveryUnit, startFrom, dayOfMonth, atTimeStart, atTimeEnd, timezone } = req.body; - - const robot = await Robot.findOne({ where: { 'recording_meta.id': id } }); - if (!robot) { - return res.status(404).json({ error: 'Robot not found' }); - } - - // Validate required parameters - if (!runEvery || !runEveryUnit || !startFrom || !atTimeStart || !atTimeEnd || !timezone) { - return res.status(400).json({ error: 'Missing required parameters' }); - } - - // Validate time zone - if (!moment.tz.zone(timezone)) { - return res.status(400).json({ error: 'Invalid timezone' }); - } - - // Validate and parse start and end times - const [startHours, startMinutes] = atTimeStart.split(':').map(Number); - const [endHours, endMinutes] = atTimeEnd.split(':').map(Number); - - if (isNaN(startHours) || isNaN(startMinutes) || isNaN(endHours) || isNaN(endMinutes) || - startHours < 0 || startHours > 23 || startMinutes < 0 || startMinutes > 59 || - endHours < 0 || endHours > 23 || endMinutes < 0 || endMinutes > 59) { - return res.status(400).json({ error: 'Invalid time format' }); - } - - const days = ['SUNDAY', 'MONDAY', 'TUESDAY', 'WEDNESDAY', 'THURSDAY', 'FRIDAY', 'SATURDAY']; - if (!days.includes(startFrom)) { - return res.status(400).json({ error: 'Invalid start day' }); - } - - // Build cron expression based on run frequency and starting day - let cronExpression; - const dayIndex = days.indexOf(startFrom); - - switch (runEveryUnit) { - case 'MINUTES': - cronExpression = `*/${runEvery} * * * *`; - break; - case 'HOURS': - cronExpression = `${startMinutes} */${runEvery} * * *`; - break; - case 'DAYS': - cronExpression = `${startMinutes} ${startHours} */${runEvery} * *`; - break; - case 'WEEKS': - cronExpression = `${startMinutes} ${startHours} * * ${dayIndex}`; - break; - case 'MONTHS': - // todo: handle leap year - cronExpression = `${startMinutes} ${startHours} ${dayOfMonth} */${runEvery} *`; - if (startFrom !== 'SUNDAY') { - cronExpression += ` ${dayIndex}`; - } - break; - default: - return res.status(400).json({ error: 'Invalid runEveryUnit' }); - } - - // Validate cron expression - if (!cronExpression || !cron.validate(cronExpression)) { - return res.status(400).json({ error: 'Invalid cron expression generated' }); } - if (!req.user) { - return res.status(401).json({ error: 'Unauthorized' }); - } - - try { - await cancelScheduledWorkflow(id); - } catch (cancelError) { - logger.log('warn', `Failed to cancel existing schedule for robot ${id}: ${cancelError}`); - } - - await scheduleWorkflow(id, req.user.id, cronExpression, timezone); - - const nextRunAt = computeNextRun(cronExpression, timezone); - - await robot.update({ - schedule: { - runEvery, - runEveryUnit, - startFrom, - dayOfMonth, - atTimeStart, - atTimeEnd, - timezone, - cronExpression, - lastRunAt: undefined, - nextRunAt: nextRunAt || undefined, - }, + capture('maxun-oss-run-deleted', { + runId: req.params.id, + user_id: req.user.id, + deleted_at: new Date().toISOString(), }); - capture( - 'maxun-oss-robot-scheduled', - { - robotId: id, - user_id: req.user.id, - scheduled_at: new Date().toISOString(), - } - ) - - // Fetch updated schedule details after setting it - const updatedRobot = await Robot.findOne({ where: { 'recording_meta.id': id } }); - - res.status(200).json({ - message: 'success', - robot: updatedRobot, - }); - } catch (error) { - console.error('Error scheduling workflow:', error); - res.status(500).json({ error: 'Failed to schedule workflow' }); - } -}); - - -// Endpoint to get schedule details -router.get('/schedule/:id', requireSignIn, async (req, res) => { - try { - const robot = await Robot.findOne({ where: { 'recording_meta.id': req.params.id }, raw: true }); - - if (!robot) { - return res.status(404).json({ error: 'Robot not found' }); - } - return res.status(200).json({ - schedule: robot.schedule + success: true, + message: 'Run deleted successfully' }); } catch (error) { - console.error('Error getting schedule:', error); - res.status(500).json({ error: 'Failed to get schedule' }); - } -}); - -// Endpoint to delete schedule -router.delete('/schedule/:id', requireSignIn, async (req: AuthenticatedRequest, res) => { - try { - const { id } = req.params; - - if (!req.user) { - return res.status(401).json({ error: 'Unauthorized' }); - } - - const robot = await Robot.findOne({ where: { 'recording_meta.id': id } }); - if (!robot) { - return res.status(404).json({ error: 'Robot not found' }); - } - - // Cancel the scheduled job in pgBossClient - try { - await cancelScheduledWorkflow(id); - } catch (error) { - logger.log('error', `Error cancelling scheduled job for robot ${id}: ${error}`); - // Continue with robot update even if cancellation fails - } - - // Delete the schedule from the robot - await robot.update({ - schedule: null + logger.error(`Error deleting run ${req.params.id}`, error instanceof Error ? error.message : error); + return res.status(500).json({ + success: false, + message: 'Failed to delete run' }); - - capture( - 'maxun-oss-robot-schedule-deleted', - { - robotId: id, - user_id: req.user?.id, - unscheduled_at: new Date().toISOString(), - } - ) - - res.status(200).json({ message: 'Schedule deleted successfully' }); - - } catch (error) { - console.error('Error deleting schedule:', error); - res.status(500).json({ error: 'Failed to delete schedule' }); } }); -/** - * POST endpoint for aborting a current interpretation of the run. - */ -router.post('/runs/abort/:id', requireSignIn, async (req: AuthenticatedRequest, res) => { - try { - if (!req.user) { return res.status(401).send({ error: 'Unauthorized' }); } - - const run = await Run.findOne({ where: { runId: req.params.id } }); - - if (!run) { - return res.status(404).send({ error: 'Run not found' }); - } - - if (!['running', 'queued'].includes(run.status)) { - return res.status(400).send({ - error: `Cannot abort run with status: ${run.status}` - }); - } - - const isQueued = run.status === 'queued'; +/* ============================================================ + Background worker helpers + ============================================================ */ - await run.update({ - status: 'aborting' - }); - - if (isQueued) { - await run.update({ - status: 'aborted', - finishedAt: new Date().toLocaleString(), - log: 'Run aborted while queued' - }); - - return res.send({ - success: true, - message: 'Queued run aborted', - isQueued: true - }); - } - - // Immediately stop interpreter like cloud version - try { - const browser = browserPool.getRemoteBrowser(run.browserId); - if (browser && browser.interpreter) { - logger.log('info', `Immediately stopping interpreter for run ${req.params.id}`); - await browser.interpreter.stopInterpretation(); - } - } catch (immediateStopError: any) { - logger.log('warn', `Failed to immediately stop interpreter: ${immediateStopError.message}`); - } +export async function processQueuedRuns(): Promise { + try { + logger.info("Checking for queued runs..."); - const userQueueName = `abort-run-user-${req.user.id}`; - await pgBossClient.createQueue(userQueueName); - - const jobId = await pgBossClient.send(userQueueName, { - userId: req.user.id, - runId: req.params.id + const queuedRuns = await Run.findAll({ + where: { status: "queued" }, + attributes: ["runId"], + raw: true }); - logger.log('info', `Abort signal sent for run ${req.params.id}, job ID: ${jobId}`); - - return res.send({ - success: true, - message: 'Run stopped immediately, cleanup queued', - jobId, - isQueued: false - }); - - } catch (e) { - const { message } = e as Error; - logger.log('error', `Error aborting run ${req.params.id}: ${message}`); - return res.status(500).send({ error: 'Failed to abort run' }); - } -}); - -// Circuit breaker for database connection issues -let consecutiveDbErrors = 0; -const MAX_CONSECUTIVE_ERRORS = 3; -const CIRCUIT_BREAKER_COOLDOWN = 30000; -let circuitBreakerOpenUntil = 0; - -async function processQueuedRuns() { - try { - if (Date.now() < circuitBreakerOpenUntil) { + if (!queuedRuns.length) { + logger.info("No queued runs found"); return; } - const queuedRun = await Run.findOne({ - where: { status: 'queued' }, - order: [['startedAt', 'ASC']], - }); - consecutiveDbErrors = 0; - if (!queuedRun) return; - - const userId = queuedRun.runByUserId; - - const canCreateBrowser = await browserPool.hasAvailableBrowserSlots(userId, "run"); - - if (canCreateBrowser) { - logger.log('info', `Processing queued run ${queuedRun.runId} for user ${userId}`); - - const recording = await Robot.findOne({ - where: { - 'recording_meta.id': queuedRun.robotMetaId - }, - raw: true - }); - - if (!recording) { - await queuedRun.update({ - status: 'failed', - finishedAt: new Date().toLocaleString(), - log: 'Recording not found' - }); - return; - } - try { - const newBrowserId = await createRemoteBrowserForRun(userId); - - logger.log('info', `Created and initialized browser ${newBrowserId} for queued run ${queuedRun.runId}`); - - await queuedRun.update({ - status: 'running', - browserId: newBrowserId, - log: 'Browser created and ready for execution' - }); - - const userQueueName = `execute-run-user-${userId}`; - await pgBossClient.createQueue(userQueueName); - - const jobId = await pgBossClient.send(userQueueName, { - userId: userId, - runId: queuedRun.runId, - browserId: newBrowserId, - }); - - logger.log('info', `Queued execution for run ${queuedRun.runId} with ready browser ${newBrowserId}, job ID: ${jobId}`); - - } catch (browserError: any) { - logger.log('error', `Failed to create browser for queued run: ${browserError.message}`); - await queuedRun.update({ - status: 'failed', - finishedAt: new Date().toLocaleString(), - log: `Failed to create browser: ${browserError.message}` - }); - } - } - } catch (error: any) { - consecutiveDbErrors++; + logger.info(`Found ${queuedRuns.length} queued runs`); - if (consecutiveDbErrors >= MAX_CONSECUTIVE_ERRORS) { - circuitBreakerOpenUntil = Date.now() + CIRCUIT_BREAKER_COOLDOWN; - logger.log('error', `Circuit breaker opened after ${MAX_CONSECUTIVE_ERRORS} consecutive errors. Cooling down for ${CIRCUIT_BREAKER_COOLDOWN/1000}s`); + for (const run of queuedRuns) { + logger.info(`Queued run waiting for processing: ${run.runId}`); } - logger.log('error', `Error processing queued runs (${consecutiveDbErrors}/${MAX_CONSECUTIVE_ERRORS}): ${error.message}`); + } catch (error) { + logger.error("processQueuedRuns failed", error); } } -/** - * Recovers orphaned runs that were left in "running" status due to instance crashes - * This function runs on server startup to ensure data reliability - */ -export async function recoverOrphanedRuns() { - try { - logger.log('info', 'Starting recovery of orphaned runs...'); - - const orphanedRuns = await Run.findAll({ - where: { - status: ['running', 'scheduled'] - }, - order: [['startedAt', 'ASC']] - }); - - if (orphanedRuns.length === 0) { - logger.log('info', 'No orphaned runs found'); - return; - } - - logger.log('info', `Found ${orphanedRuns.length} orphaned runs to recover (including scheduled runs)`); - - for (const run of orphanedRuns) { - try { - const runData = run.toJSON(); - logger.log('info', `Recovering orphaned run: ${runData.runId}`); - - const browser = browserPool.getRemoteBrowser(runData.browserId); - - if (!browser) { - const retryCount = runData.retryCount || 0; - - if (retryCount < 3) { - await run.update({ - status: 'queued', - retryCount: retryCount + 1, - serializableOutput: {}, - binaryOutput: {}, - browserId: undefined, - log: runData.log ? `${runData.log}\n[RETRY ${retryCount + 1}/3] Re-queuing due to server crash` : `[RETRY ${retryCount + 1}/3] Re-queuing due to server crash` - }); - - logger.log('info', `Re-queued crashed run ${runData.runId} (retry ${retryCount + 1}/3)`); - } else { - const crashRecoveryMessage = `Max retries exceeded (3/3) - Run failed after multiple server crashes.`; - - await run.update({ - status: 'failed', - finishedAt: new Date().toLocaleString(), - log: runData.log ? `${runData.log}\n${crashRecoveryMessage}` : crashRecoveryMessage - }); - - logger.log('warn', `Max retries reached for run ${runData.runId}, marked as permanently failed`); - } - - if (runData.browserId) { - try { - browserPool.deleteRemoteBrowser(runData.browserId); - logger.log('info', `Cleaned up stale browser reference: ${runData.browserId}`); - } catch (cleanupError: any) { - logger.log('warn', `Failed to cleanup browser reference ${runData.browserId}: ${cleanupError.message}`); - } - } - } else { - logger.log('info', `Run ${runData.runId} browser still active, not orphaned`); - } - } catch (runError: any) { - logger.log('error', `Failed to recover run ${run.runId}: ${runError.message}`); - } - } - - logger.log('info', `Orphaned run recovery completed. Processed ${orphanedRuns.length} runs.`); - } catch (error: any) { - logger.log('error', `Failed to recover orphaned runs: ${error.message}`); - } -} -/** - * POST endpoint for creating a crawl robot - * @route POST /recordings/crawl - * @auth requireSignIn - JWT authentication required - */ -router.post('/recordings/crawl', requireSignIn, async (req: AuthenticatedRequest, res) => { +export async function recoverOrphanedRuns(): Promise { try { - const { url, name, crawlConfig } = req.body; - - if (!url || !crawlConfig) { - return res.status(400).json({ error: 'URL and crawl configuration are required.' }); - } + logger.info("Checking for orphaned runs..."); - if (!req.user) { - return res.status(401).send({ error: 'Unauthorized' }); - } - - try { - new URL(url); - } catch (err) { - return res.status(400).json({ error: 'Invalid URL format' }); - } + const timeoutMinutes = 30; + const cutoff = new Date(Date.now() - timeoutMinutes * 60 * 1000); - const robotName = name || `Crawl Robot - ${new URL(url).hostname}`; - const currentTimestamp = new Date().toLocaleString('en-US'); - const robotId = uuid(); - - const newRobot = await Robot.create({ - id: uuid(), - userId: req.user.id, - recording_meta: { - name: robotName, - id: robotId, - createdAt: currentTimestamp, - updatedAt: currentTimestamp, - pairs: 1, - params: [], - type: 'crawl', - url: url, - }, - recording: { - workflow: [ - { - where: { url }, - what: [ - { action: 'flag', args: ['generated'] }, - { - action: 'crawl', - args: [crawlConfig], - name: 'Crawl' - } - ] - }, - { - where: { url: 'about:blank' }, - what: [ - { - action: 'goto', - args: [url] - }, - { - action: 'waitForLoadState', - args: ['networkidle'] - } - ] - } - ] + const orphanedRuns = await Run.findAll({ + where: { + status: "running", + startedAt: { [Op.lt]: cutoff } }, - google_sheet_email: null, - google_sheet_name: null, - google_sheet_id: null, - google_access_token: null, - google_refresh_token: null, - airtable_base_id: null, - airtable_base_name: null, - airtable_table_name: null, - airtable_table_id: null, - airtable_access_token: null, - airtable_refresh_token: null, - schedule: null, - webhooks: null - }); - - logger.log('info', `Crawl robot created with id: ${newRobot.id}`); - capture('maxun-oss-robot-created', { - userId: req.user.id.toString(), - robotId: robotId, - robotName: robotName, - url: url, - robotType: 'crawl', - crawlConfig: crawlConfig, - robot_meta: newRobot.recording_meta, - recording: newRobot.recording, + attributes: ["runId"], + raw: true }); - return res.status(201).json({ - message: 'Crawl robot created successfully.', - robot: newRobot, - }); - } catch (error) { - if (error instanceof Error) { - logger.log('error', `Error creating crawl robot: ${error.message}`); - return res.status(500).json({ error: error.message }); - } else { - logger.log('error', 'Unknown error creating crawl robot'); - return res.status(500).json({ error: 'An unknown error occurred.' }); + if (!orphanedRuns.length) { + logger.info("No orphaned runs detected"); + return; } - } -}); -/** - * POST endpoint for creating a search robot - * @route POST /recordings/search - * @auth requireSignIn - JWT authentication required - */ -router.post('/recordings/search', requireSignIn, async (req: AuthenticatedRequest, res) => { - try { - const { searchConfig, name } = req.body; + logger.warn(`Recovering ${orphanedRuns.length} orphaned runs`); - if (!searchConfig || !searchConfig.query) { - return res.status(400).json({ error: 'Search configuration with query is required.' }); - } - - if (!req.user) { - return res.status(401).send({ error: 'Unauthorized' }); - } + const runIds = orphanedRuns.map((r: any) => r.runId); - const robotName = name || `Search Robot - ${searchConfig.query.substring(0, 50)}`; - const currentTimestamp = new Date().toLocaleString('en-US'); - const robotId = uuid(); - - const newRobot = await Robot.create({ - id: uuid(), - userId: req.user.id, - recording_meta: { - name: robotName, - id: robotId, - createdAt: currentTimestamp, - updatedAt: currentTimestamp, - pairs: 1, - params: [], - type: 'search', - }, - recording: { - workflow: [ - { - where: { url: 'about:blank' }, - what: [{ - action: 'search', - args: [searchConfig], - name: 'Search' - }] - } - ] - }, - google_sheet_email: null, - google_sheet_name: null, - google_sheet_id: null, - google_access_token: null, - google_refresh_token: null, - airtable_base_id: null, - airtable_base_name: null, - airtable_table_name: null, - airtable_table_id: null, - airtable_access_token: null, - airtable_refresh_token: null, - schedule: null, - webhooks: null - }); + await Run.update( + { status: "failed" }, + { where: { runId: { [Op.in]: runIds } } } + ); - logger.log('info', `Search robot created with id: ${newRobot.id}`); - capture('maxun-oss-robot-created', { - userId: req.user.id.toString(), - robotId: robotId, - robotName: robotName, - robotType: 'search', - searchQuery: searchConfig.query, - searchProvider: searchConfig.provider || 'duckduckgo', - searchLimit: searchConfig.limit || 10, - robot_meta: newRobot.recording_meta, - recording: newRobot.recording, - }); + logger.warn(`Marked ${runIds.length} orphaned runs as failed`); - return res.status(201).json({ - message: 'Search robot created successfully.', - robot: newRobot, - }); } catch (error) { - if (error instanceof Error) { - logger.log('error', `Error creating search robot: ${error.message}`); - return res.status(500).json({ error: error.message }); - } else { - logger.log('error', 'Unknown error creating search robot'); - return res.status(500).json({ error: 'An unknown error occurred.' }); - } + logger.error("recoverOrphanedRuns failed", error); } -}); +} -export { processQueuedRuns }; \ No newline at end of file +export default router;