F006 - Content Processing Pipeline
Objective
Extract and process text content from uploaded manuscripts (PDF, DOCX, TXT), detecting structure, chapters, and metadata for analysis.
Quick Implementation
Using MyStoryFlow Components
- Background job processing with
@mystoryflow/shared - Progress tracking from
@mystoryflow/ui - Error handling patterns from
@mystoryflow/shared - Database transaction management via
@mystoryflow/database
New Requirements
- PDF text extraction library
- DOCX parsing library
- Chapter/section detection logic
- Metadata extraction
MVP Implementation
1. Package Installation
# In packages/manuscript-analysis directory
npm install pdf-parse mammoth textract2. Database Schema
-- Extracted content storage (in analyzer schema)
CREATE TABLE analyzer.manuscript_content (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
manuscript_id UUID REFERENCES analyzer.manuscripts(id) UNIQUE,
raw_text TEXT NOT NULL,
word_count INTEGER NOT NULL,
character_count INTEGER NOT NULL,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMP DEFAULT NOW()
);
-- Chapter/section structure
CREATE TABLE analyzer.manuscript_structure (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
manuscript_id UUID REFERENCES analyzer.manuscripts(id),
type VARCHAR(50) NOT NULL, -- 'chapter', 'section', 'part'
title VARCHAR(255),
sequence_number INTEGER NOT NULL,
start_position INTEGER NOT NULL,
end_position INTEGER NOT NULL,
word_count INTEGER NOT NULL,
content TEXT NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
);
-- Processing jobs
CREATE TABLE analyzer.extraction_jobs (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
manuscript_id UUID REFERENCES analyzer.manuscripts(id),
status VARCHAR(50) DEFAULT 'pending',
progress INTEGER DEFAULT 0,
error_message TEXT,
started_at TIMESTAMP,
completed_at TIMESTAMP,
created_at TIMESTAMP DEFAULT NOW()
);
-- Indexes
CREATE INDEX idx_manuscript_content_manuscript_id ON manuscript_content(manuscript_id);
CREATE INDEX idx_manuscript_structure_manuscript_id ON manuscript_structure(manuscript_id);
CREATE INDEX idx_extraction_jobs_status ON extraction_jobs(status);3. Content Extractor Service
// packages/manuscript-analysis/src/services/content-extractor.ts
import { getSupabaseBrowserClient } from '@mystoryflow/database'
import { downloadFromB2 } from '@mystoryflow/shared/storage'
import { trackAIUsage } from '@mystoryflow/analytics'
import pdf from 'pdf-parse'
import mammoth from 'mammoth'
import * as fs from 'fs/promises'
interface ExtractedContent {
text: string
wordCount: number
characterCount: number
metadata: {
title?: string
author?: string
pageCount?: number
language?: string
}
chapters: Chapter[]
}
interface Chapter {
title: string
number: number
content: string
startPosition: number
endPosition: number
wordCount: number
}
export class ContentExtractor {
private supabase = getSupabaseBrowserClient()
async extractContent(manuscriptId: string): Promise<void> {
// Create extraction job
const { data: job } = await this.supabase
.from('analyzer.extraction_jobs')
.insert({ manuscript_id: manuscriptId })
.select()
.single()
try {
// Update job status
await this.updateJobStatus(job.id, 'processing', 10)
// Get manuscript details
const { data: manuscript } = await this.supabase
.from('analyzer.manuscripts')
.select('*')
.eq('id', manuscriptId)
.single()
if (!manuscript) throw new Error('Manuscript not found')
// Download file from Backblaze B2
const fileData = await downloadFromB2({
bucket: 'story-analyzer',
path: manuscript.storage_path
})
if (!fileData) throw new Error('File not found')
await this.updateJobStatus(job.id, 'processing', 30)
// Extract content based on file type
let extracted: ExtractedContent
switch (manuscript.file_type) {
case 'application/pdf':
extracted = await this.extractFromPDF(fileData)
break
case 'application/vnd.openxmlformats-officedocument.wordprocessingml.document':
extracted = await this.extractFromDOCX(fileData)
break
case 'text/plain':
extracted = await this.extractFromTXT(fileData)
break
default:
throw new Error('Unsupported file type')
}
await this.updateJobStatus(job.id, 'processing', 60)
// Detect chapters and structure
extracted.chapters = this.detectChapters(extracted.text)
await this.updateJobStatus(job.id, 'processing', 80)
// Save extracted content
await this.saveExtractedContent(manuscriptId, extracted)
// Update manuscript status
await this.supabase
.from('analyzer.manuscripts')
.update({
status: 'extracted',
word_count: extracted.wordCount,
metadata: { ...manuscript.metadata, ...extracted.metadata }
})
.eq('id', manuscriptId)
// Track extraction operation
await trackAIUsage({
userId: manuscript.user_id,
operation: 'content-extraction',
metadata: {
manuscriptId,
wordCount: extracted.wordCount,
fileType: manuscript.file_type
}
})
await this.updateJobStatus(job.id, 'completed', 100)
} catch (error) {
await this.updateJobStatus(job.id, 'failed', 0, error.message)
throw error
}
}
private async extractFromPDF(fileData: Blob): Promise<ExtractedContent> {
const buffer = await fileData.arrayBuffer()
const data = await pdf(Buffer.from(buffer))
return {
text: data.text,
wordCount: this.countWords(data.text),
characterCount: data.text.length,
metadata: {
pageCount: data.numpages,
title: data.info?.Title,
author: data.info?.Author
},
chapters: []
}
}
private async extractFromDOCX(fileData: Blob): Promise<ExtractedContent> {
const buffer = await fileData.arrayBuffer()
const result = await mammoth.extractRawText({ buffer })
return {
text: result.value,
wordCount: this.countWords(result.value),
characterCount: result.value.length,
metadata: {},
chapters: []
}
}
private async extractFromTXT(fileData: Blob): Promise<ExtractedContent> {
const text = await fileData.text()
return {
text,
wordCount: this.countWords(text),
characterCount: text.length,
metadata: {},
chapters: []
}
}
private detectChapters(text: string): Chapter[] {
const chapters: Chapter[] = []
// Common chapter patterns
const chapterPatterns = [
/^Chapter\s+(\d+|[IVXLCDM]+)(?:\s*[:\-\.]\s*(.+))?$/gim,
/^(\d+|[IVXLCDM]+)\.\s+(.+)$/gim,
/^Part\s+(\d+|[IVXLCDM]+)(?:\s*[:\-\.]\s*(.+))?$/gim
]
// Split by common chapter indicators
const lines = text.split('\n')
let currentChapter: Chapter | null = null
let chapterNumber = 0
for (let i = 0; i < lines.length; i++) {
const line = lines[i].trim()
// Check if line matches chapter pattern
let isChapter = false
for (const pattern of chapterPatterns) {
const match = pattern.exec(line)
if (match) {
// Save previous chapter
if (currentChapter) {
chapters.push(currentChapter)
}
chapterNumber++
currentChapter = {
title: match[2] || `Chapter ${chapterNumber}`,
number: chapterNumber,
content: '',
startPosition: text.indexOf(line),
endPosition: 0,
wordCount: 0
}
isChapter = true
break
}
}
// Add content to current chapter
if (!isChapter && currentChapter) {
currentChapter.content += line + '\n'
}
}
// Save last chapter
if (currentChapter) {
chapters.push(currentChapter)
}
// If no chapters detected, create single chapter
if (chapters.length === 0) {
chapters.push({
title: 'Full Manuscript',
number: 1,
content: text,
startPosition: 0,
endPosition: text.length,
wordCount: this.countWords(text)
})
}
// Calculate end positions and word counts
chapters.forEach((chapter, index) => {
chapter.endPosition = index < chapters.length - 1
? chapters[index + 1].startPosition
: text.length
chapter.wordCount = this.countWords(chapter.content)
})
return chapters
}
private countWords(text: string): number {
return text.trim().split(/\s+/).filter(word => word.length > 0).length
}
private async saveExtractedContent(
manuscriptId: string,
content: ExtractedContent
): Promise<void> {
// Save main content
await this.supabase
.from('analyzer.manuscript_content')
.insert({
manuscript_id: manuscriptId,
raw_text: content.text,
word_count: content.wordCount,
character_count: content.characterCount,
metadata: content.metadata
})
// Save chapter structure
if (content.chapters.length > 0) {
await this.supabase
.from('analyzer.manuscript_structure')
.insert(
content.chapters.map(chapter => ({
manuscript_id: manuscriptId,
type: 'chapter',
title: chapter.title,
sequence_number: chapter.number,
start_position: chapter.startPosition,
end_position: chapter.endPosition,
word_count: chapter.wordCount,
content: chapter.content
}))
)
}
}
private async updateJobStatus(
jobId: string,
status: string,
progress: number,
error?: string
): Promise<void> {
const update: any = { status, progress }
if (status === 'processing' && !update.started_at) {
update.started_at = new Date().toISOString()
}
if (status === 'completed' || status === 'failed') {
update.completed_at = new Date().toISOString()
}
if (error) {
update.error_message = error
}
await this.supabase
.from('analyzer.extraction_jobs')
.update(update)
.eq('id', jobId)
}
}4. API Endpoint
// apps/analyzer-app/src/app/api/manuscripts/[id]/extract/route.ts
import { NextRequest, NextResponse } from 'next/server'
import { ContentExtractor } from '@mystoryflow/manuscript-analysis'
import { withAuth } from '@mystoryflow/auth'
import { getSupabaseBrowserClient } from '@mystoryflow/database'
export async function POST(
req: NextRequest,
{ params }: { params: { id: string } }
) {
const session = await withAuth(req)
if (!session) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
try {
const extractor = new ContentExtractor()
// Start extraction in background
extractor.extractContent(params.id).catch(console.error)
return NextResponse.json({
message: 'Extraction started',
manuscriptId: params.id
})
} catch (error) {
return NextResponse.json(
{ error: 'Extraction failed to start' },
{ status: 500 }
)
}
}
// Get extraction status
export async function GET(
req: NextRequest,
{ params }: { params: { id: string } }
) {
const session = await withAuth(req)
if (!session) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const supabase = getSupabaseBrowserClient()
const { data: job } = await supabase
.from('analyzer.extraction_jobs')
.select('*')
.eq('manuscript_id', params.id)
.order('created_at', { ascending: false })
.limit(1)
.single()
return NextResponse.json({ job })
}MVP Acceptance Criteria
- Extract text from PDF files
- Extract text from DOCX files
- Extract text from TXT files
- Detect chapter structure
- Count words and characters
- Extract basic metadata
- Progress tracking for extraction
- Error handling and recovery
- AI usage tracking for extraction operations
Post-MVP Enhancements
- OCR support for scanned PDFs
- Advanced metadata extraction
- Multi-language support
- Footnote/endnote handling
- Image caption extraction
- Table of contents parsing
- Style and formatting preservation
- Parallel processing for large files
Implementation Time
- Development: 1.5 days
- Testing: 0.5 days
- Total: 2 days
Dependencies
- F005-DOCUMENT-UPLOAD must be completed
- Backblaze B2 storage must be configured
- @mystoryflow/shared storage utilities must be available
Next Feature
After completion, proceed to F007-GENRE-DETECTION for AI-powered genre classification.