Skip to Content
📚 MyStoryFlow Docs — Your guide to preserving family stories

F006 - Content Processing Pipeline

Objective

Extract and process text content from uploaded manuscripts (PDF, DOCX, TXT), detecting structure, chapters, and metadata for analysis.

Quick Implementation

Using MyStoryFlow Components

  • Background job processing with @mystoryflow/shared
  • Progress tracking from @mystoryflow/ui
  • Error handling patterns from @mystoryflow/shared
  • Database transaction management via @mystoryflow/database

New Requirements

  • PDF text extraction library
  • DOCX parsing library
  • Chapter/section detection logic
  • Metadata extraction

MVP Implementation

1. Package Installation

# In packages/manuscript-analysis directory npm install pdf-parse mammoth textract

2. Database Schema

-- Extracted content storage (in analyzer schema) CREATE TABLE analyzer.manuscript_content ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), manuscript_id UUID REFERENCES analyzer.manuscripts(id) UNIQUE, raw_text TEXT NOT NULL, word_count INTEGER NOT NULL, character_count INTEGER NOT NULL, metadata JSONB DEFAULT '{}', created_at TIMESTAMP DEFAULT NOW() ); -- Chapter/section structure CREATE TABLE analyzer.manuscript_structure ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), manuscript_id UUID REFERENCES analyzer.manuscripts(id), type VARCHAR(50) NOT NULL, -- 'chapter', 'section', 'part' title VARCHAR(255), sequence_number INTEGER NOT NULL, start_position INTEGER NOT NULL, end_position INTEGER NOT NULL, word_count INTEGER NOT NULL, content TEXT NOT NULL, created_at TIMESTAMP DEFAULT NOW() ); -- Processing jobs CREATE TABLE analyzer.extraction_jobs ( id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), manuscript_id UUID REFERENCES analyzer.manuscripts(id), status VARCHAR(50) DEFAULT 'pending', progress INTEGER DEFAULT 0, error_message TEXT, started_at TIMESTAMP, completed_at TIMESTAMP, created_at TIMESTAMP DEFAULT NOW() ); -- Indexes CREATE INDEX idx_manuscript_content_manuscript_id ON manuscript_content(manuscript_id); CREATE INDEX idx_manuscript_structure_manuscript_id ON manuscript_structure(manuscript_id); CREATE INDEX idx_extraction_jobs_status ON extraction_jobs(status);

3. Content Extractor Service

// packages/manuscript-analysis/src/services/content-extractor.ts import { getSupabaseBrowserClient } from '@mystoryflow/database' import { downloadFromB2 } from '@mystoryflow/shared/storage' import { trackAIUsage } from '@mystoryflow/analytics' import pdf from 'pdf-parse' import mammoth from 'mammoth' import * as fs from 'fs/promises' interface ExtractedContent { text: string wordCount: number characterCount: number metadata: { title?: string author?: string pageCount?: number language?: string } chapters: Chapter[] } interface Chapter { title: string number: number content: string startPosition: number endPosition: number wordCount: number } export class ContentExtractor { private supabase = getSupabaseBrowserClient() async extractContent(manuscriptId: string): Promise<void> { // Create extraction job const { data: job } = await this.supabase .from('analyzer.extraction_jobs') .insert({ manuscript_id: manuscriptId }) .select() .single() try { // Update job status await this.updateJobStatus(job.id, 'processing', 10) // Get manuscript details const { data: manuscript } = await this.supabase .from('analyzer.manuscripts') .select('*') .eq('id', manuscriptId) .single() if (!manuscript) throw new Error('Manuscript not found') // Download file from Backblaze B2 const fileData = await downloadFromB2({ bucket: 'story-analyzer', path: manuscript.storage_path }) if (!fileData) throw new Error('File not found') await this.updateJobStatus(job.id, 'processing', 30) // Extract content based on file type let extracted: ExtractedContent switch (manuscript.file_type) { case 'application/pdf': extracted = await this.extractFromPDF(fileData) break case 'application/vnd.openxmlformats-officedocument.wordprocessingml.document': extracted = await this.extractFromDOCX(fileData) break case 'text/plain': extracted = await this.extractFromTXT(fileData) break default: throw new Error('Unsupported file type') } await this.updateJobStatus(job.id, 'processing', 60) // Detect chapters and structure extracted.chapters = this.detectChapters(extracted.text) await this.updateJobStatus(job.id, 'processing', 80) // Save extracted content await this.saveExtractedContent(manuscriptId, extracted) // Update manuscript status await this.supabase .from('analyzer.manuscripts') .update({ status: 'extracted', word_count: extracted.wordCount, metadata: { ...manuscript.metadata, ...extracted.metadata } }) .eq('id', manuscriptId) // Track extraction operation await trackAIUsage({ userId: manuscript.user_id, operation: 'content-extraction', metadata: { manuscriptId, wordCount: extracted.wordCount, fileType: manuscript.file_type } }) await this.updateJobStatus(job.id, 'completed', 100) } catch (error) { await this.updateJobStatus(job.id, 'failed', 0, error.message) throw error } } private async extractFromPDF(fileData: Blob): Promise<ExtractedContent> { const buffer = await fileData.arrayBuffer() const data = await pdf(Buffer.from(buffer)) return { text: data.text, wordCount: this.countWords(data.text), characterCount: data.text.length, metadata: { pageCount: data.numpages, title: data.info?.Title, author: data.info?.Author }, chapters: [] } } private async extractFromDOCX(fileData: Blob): Promise<ExtractedContent> { const buffer = await fileData.arrayBuffer() const result = await mammoth.extractRawText({ buffer }) return { text: result.value, wordCount: this.countWords(result.value), characterCount: result.value.length, metadata: {}, chapters: [] } } private async extractFromTXT(fileData: Blob): Promise<ExtractedContent> { const text = await fileData.text() return { text, wordCount: this.countWords(text), characterCount: text.length, metadata: {}, chapters: [] } } private detectChapters(text: string): Chapter[] { const chapters: Chapter[] = [] // Common chapter patterns const chapterPatterns = [ /^Chapter\s+(\d+|[IVXLCDM]+)(?:\s*[:\-\.]\s*(.+))?$/gim, /^(\d+|[IVXLCDM]+)\.\s+(.+)$/gim, /^Part\s+(\d+|[IVXLCDM]+)(?:\s*[:\-\.]\s*(.+))?$/gim ] // Split by common chapter indicators const lines = text.split('\n') let currentChapter: Chapter | null = null let chapterNumber = 0 for (let i = 0; i < lines.length; i++) { const line = lines[i].trim() // Check if line matches chapter pattern let isChapter = false for (const pattern of chapterPatterns) { const match = pattern.exec(line) if (match) { // Save previous chapter if (currentChapter) { chapters.push(currentChapter) } chapterNumber++ currentChapter = { title: match[2] || `Chapter ${chapterNumber}`, number: chapterNumber, content: '', startPosition: text.indexOf(line), endPosition: 0, wordCount: 0 } isChapter = true break } } // Add content to current chapter if (!isChapter && currentChapter) { currentChapter.content += line + '\n' } } // Save last chapter if (currentChapter) { chapters.push(currentChapter) } // If no chapters detected, create single chapter if (chapters.length === 0) { chapters.push({ title: 'Full Manuscript', number: 1, content: text, startPosition: 0, endPosition: text.length, wordCount: this.countWords(text) }) } // Calculate end positions and word counts chapters.forEach((chapter, index) => { chapter.endPosition = index < chapters.length - 1 ? chapters[index + 1].startPosition : text.length chapter.wordCount = this.countWords(chapter.content) }) return chapters } private countWords(text: string): number { return text.trim().split(/\s+/).filter(word => word.length > 0).length } private async saveExtractedContent( manuscriptId: string, content: ExtractedContent ): Promise<void> { // Save main content await this.supabase .from('analyzer.manuscript_content') .insert({ manuscript_id: manuscriptId, raw_text: content.text, word_count: content.wordCount, character_count: content.characterCount, metadata: content.metadata }) // Save chapter structure if (content.chapters.length > 0) { await this.supabase .from('analyzer.manuscript_structure') .insert( content.chapters.map(chapter => ({ manuscript_id: manuscriptId, type: 'chapter', title: chapter.title, sequence_number: chapter.number, start_position: chapter.startPosition, end_position: chapter.endPosition, word_count: chapter.wordCount, content: chapter.content })) ) } } private async updateJobStatus( jobId: string, status: string, progress: number, error?: string ): Promise<void> { const update: any = { status, progress } if (status === 'processing' && !update.started_at) { update.started_at = new Date().toISOString() } if (status === 'completed' || status === 'failed') { update.completed_at = new Date().toISOString() } if (error) { update.error_message = error } await this.supabase .from('analyzer.extraction_jobs') .update(update) .eq('id', jobId) } }

4. API Endpoint

// apps/analyzer-app/src/app/api/manuscripts/[id]/extract/route.ts import { NextRequest, NextResponse } from 'next/server' import { ContentExtractor } from '@mystoryflow/manuscript-analysis' import { withAuth } from '@mystoryflow/auth' import { getSupabaseBrowserClient } from '@mystoryflow/database' export async function POST( req: NextRequest, { params }: { params: { id: string } } ) { const session = await withAuth(req) if (!session) { return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) } try { const extractor = new ContentExtractor() // Start extraction in background extractor.extractContent(params.id).catch(console.error) return NextResponse.json({ message: 'Extraction started', manuscriptId: params.id }) } catch (error) { return NextResponse.json( { error: 'Extraction failed to start' }, { status: 500 } ) } } // Get extraction status export async function GET( req: NextRequest, { params }: { params: { id: string } } ) { const session = await withAuth(req) if (!session) { return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) } const supabase = getSupabaseBrowserClient() const { data: job } = await supabase .from('analyzer.extraction_jobs') .select('*') .eq('manuscript_id', params.id) .order('created_at', { ascending: false }) .limit(1) .single() return NextResponse.json({ job }) }

MVP Acceptance Criteria

  • Extract text from PDF files
  • Extract text from DOCX files
  • Extract text from TXT files
  • Detect chapter structure
  • Count words and characters
  • Extract basic metadata
  • Progress tracking for extraction
  • Error handling and recovery
  • AI usage tracking for extraction operations

Post-MVP Enhancements

  • OCR support for scanned PDFs
  • Advanced metadata extraction
  • Multi-language support
  • Footnote/endnote handling
  • Image caption extraction
  • Table of contents parsing
  • Style and formatting preservation
  • Parallel processing for large files

Implementation Time

  • Development: 1.5 days
  • Testing: 0.5 days
  • Total: 2 days

Dependencies

  • F005-DOCUMENT-UPLOAD must be completed
  • Backblaze B2 storage must be configured
  • @mystoryflow/shared storage utilities must be available

Next Feature

After completion, proceed to F007-GENRE-DETECTION for AI-powered genre classification.