ainfo
Entry points for the ainfo
package.
1"""Entry points for the ``ainfo`` package.""" 2 3from __future__ import annotations 4 5import asyncio 6import hashlib 7import json 8import logging 9from pathlib import Path 10from urllib.parse import urlparse 11 12import typer 13 14__version__ = "1.3.0" 15 16from .chunking import chunk_text, stream_chunks 17from .crawler import crawl as crawl_urls 18from .extraction import extract_information, extract_text, extract_custom 19from .fetching import fetch_data, async_fetch_data 20from .llm_service import LLMService 21from .output import output_results, to_json, json_schema 22from .parsing import parse_data 23from .schemas import ContactDetails 24from .extractors import AVAILABLE_EXTRACTORS 25 26app = typer.Typer() 27logger = logging.getLogger(__name__) 28 29 30@app.callback() 31def cli( 32 verbose: bool = typer.Option( 33 False, "--verbose", "-v", help="Enable verbose logging" 34 ) 35) -> None: 36 """Configure global CLI options such as logging verbosity.""" 37 38 level = logging.DEBUG if verbose else logging.WARNING 39 logging.basicConfig( 40 level=level, format="%(levelname)s: %(message)s", force=True 41 ) 42 43 44@app.command() 45def run( 46 url: str, 47 render_js: bool = typer.Option( 48 False, help="Render pages using a headless browser before extraction", 49 ), 50 use_llm: bool = typer.Option( 51 False, help="Use an LLM instead of regex for contact extraction", 52 ), 53 summarize: bool = typer.Option( 54 False, help="Summarize page content using the LLM", 55 ), 56 summary_language: str = typer.Option( 57 "German", 58 "--summary-language", 59 help="Language used for LLM summaries", 60 envvar="AINFO_SUMMARY_LANGUAGE", 61 ), 62 summary_prompt: str | None = typer.Option( 63 None, 64 "--summary-prompt", 65 help="Custom instruction supplied to the LLM when summarising", 66 envvar="AINFO_SUMMARY_PROMPT", 67 ), 68 summary_prompt_file: Path | None = typer.Option( 69 None, 70 "--summary-prompt-file", 71 help="Read the summary prompt from PATH", 72 ), 73 extract: list[str] = typer.Option( 74 [], "--extract", "-e", help="Additional extractors to run", 75 ), 76 output: Path | None = typer.Option( 77 None, "--output", "-o", help="Write JSON results to PATH.", 78 ), 79 json_output: bool = typer.Option( 80 False, "--json", help="Print extracted data as JSON to stdout", 81 ), 82 include_text: bool = typer.Option( 83 True, 84 "--text/--no-text", 85 help="Include page text in the results", 86 ), 87) -> None: 88 """Fetch ``url`` and display extracted text and optional information.""" 89 90 if summary_prompt is not None and summary_prompt_file is not None: 91 raise typer.BadParameter( 92 "Use either --summary-prompt or --summary-prompt-file, not both" 93 ) 94 95 custom_summary_prompt = summary_prompt 96 if summary_prompt_file is not None: 97 try: 98 custom_summary_prompt = summary_prompt_file.read_text(encoding="utf-8") 99 except OSError as exc: 100 raise typer.BadParameter( 101 f"Unable to read summary prompt file: {exc}" 102 ) from exc 103 104 raw = fetch_data(url, render_js=render_js) 105 document = parse_data(raw, url=url) 106 text: str | None = None 107 if include_text or summarize: 108 text = extract_text(document) 109 110 results: dict[str, object] = {} 111 if include_text and text is not None: 112 results["text"] = text 113 114 needs_llm = summarize or (use_llm and "contacts" in extract) 115 116 if needs_llm: 117 with LLMService() as llm: 118 for name in extract: 119 func = AVAILABLE_EXTRACTORS.get(name) 120 if func is None: 121 raise typer.BadParameter(f"Unknown extractor: {name}") 122 if name == "contacts": 123 results[name] = func( 124 document, method="llm" if use_llm else "regex", llm=llm 125 ) 126 else: 127 results[name] = func(document) 128 if summarize and text is not None: 129 results["summary"] = llm.summarize( 130 text, language=summary_language, prompt=custom_summary_prompt 131 ) 132 else: 133 for name in extract: 134 func = AVAILABLE_EXTRACTORS.get(name) 135 if func is None: 136 raise typer.BadParameter(f"Unknown extractor: {name}") 137 if name == "contacts": 138 results[name] = func(document, method="regex", llm=None) 139 else: 140 results[name] = func(document) 141 142 if output is not None: 143 serialisable = { 144 k: (v.model_dump() if isinstance(v, ContactDetails) else v) 145 for k, v in results.items() 146 } 147 output.write_text(json.dumps(serialisable)) 148 149 if json_output: 150 serialisable = { 151 k: (v.model_dump() if isinstance(v, ContactDetails) else v) 152 for k, v in results.items() 153 } 154 typer.echo(json.dumps(serialisable)) 155 else: 156 if include_text and text is not None: 157 typer.echo(text) 158 for name in extract: 159 value = results.get(name) 160 if name == "contacts" and isinstance(value, ContactDetails): 161 output_results(value) 162 else: 163 typer.echo(f"{name}:") 164 if isinstance(value, dict): 165 for key, items in value.items(): 166 typer.echo(f" {key}: {', '.join(items)}") 167 elif isinstance(value, list): 168 for item in value: 169 typer.echo(f" - {item}") 170 elif value is not None: 171 typer.echo(f" {value}") 172 if summarize and "summary" in results: 173 typer.echo("summary:") 174 typer.echo(results["summary"]) 175 176 177@app.command() 178def crawl( 179 url: str, 180 depth: int = 1, 181 render_js: bool = typer.Option( 182 False, help="Render pages using a headless browser before extraction", 183 ), 184 use_llm: bool = typer.Option( 185 False, help="Use an LLM instead of regex for contact extraction", 186 ), 187 extract: list[str] = typer.Option( 188 [], "--extract", "-e", help="Additional extractors to run", 189 ), 190 output: Path | None = typer.Option( 191 None, "--output", "-o", help="Write JSON results to PATH.", 192 ), 193 json_output: bool = typer.Option( 194 False, "--json", help="Print aggregated results as JSON to stdout", 195 ), 196 include_text: bool = typer.Option( 197 True, 198 "--text/--no-text", 199 help="Include page text in the results", 200 ), 201) -> None: 202 """Crawl ``url`` up to ``depth`` levels and extract text and data.""" 203 204 method = "llm" if use_llm else "regex" 205 aggregated_results: dict[str, dict[str, object]] = {} 206 207 async def _crawl(llm: LLMService | None = None) -> None: 208 async for link, raw in crawl_urls(url, depth, render_js=render_js): 209 document = parse_data(raw, url=link) 210 page_results: dict[str, object] = {} 211 text = "" 212 if include_text: 213 text = extract_text(document) 214 page_results["text"] = text 215 for name in extract: 216 func = AVAILABLE_EXTRACTORS.get(name) 217 if func is None: 218 raise typer.BadParameter(f"Unknown extractor: {name}") 219 if name == "contacts": 220 page_results[name] = func(document, method=method, llm=llm) 221 else: 222 page_results[name] = func(document) 223 aggregated_results[link] = page_results 224 if not json_output: 225 typer.echo(f"Results for {link}:") 226 if include_text: 227 typer.echo(text) 228 for name in extract: 229 value = page_results.get(name) 230 if name == "contacts" and isinstance(value, ContactDetails): 231 output_results(value) 232 else: 233 typer.echo(f"{name}: {value}") 234 typer.echo() 235 236 if use_llm: 237 with LLMService() as llm: 238 asyncio.run(_crawl(llm)) 239 else: 240 asyncio.run(_crawl()) 241 242 if output is not None: 243 serialisable = { 244 url: { 245 k: (v.model_dump() if isinstance(v, ContactDetails) else v) 246 for k, v in res.items() 247 } 248 for url, res in aggregated_results.items() 249 } 250 output.write_text(json.dumps(serialisable)) 251 if json_output: 252 serialisable = { 253 url: { 254 k: (v.model_dump() if isinstance(v, ContactDetails) else v) 255 for k, v in res.items() 256 } 257 for url, res in aggregated_results.items() 258 } 259 typer.echo(json.dumps(serialisable)) 260 261 262async def async_extract_site( 263 url: str, 264 *, 265 depth: int = 0, 266 render_js: bool = False, 267 extract: list[str] | None = None, 268 include_text: bool = False, 269 use_llm: bool = False, 270 llm: LLMService | None = None, 271 dedupe: bool = True, 272) -> dict[str, dict[str, object]]: 273 """Crawl ``url`` up to ``depth`` levels and run extractors on each page. 274 275 Results are returned as a mapping of page URL to the extracted data. 276 Duplicate pages are skipped by comparing a SHA-256 hash of their HTML 277 content. Only pages on the same domain as ``url`` are processed. 278 """ 279 280 extract_names = list(extract or ["contacts"]) 281 method = "llm" if use_llm else "regex" 282 if use_llm and llm is None: 283 msg = "llm service required when use_llm=True" 284 raise ValueError(msg) 285 286 start_domain = urlparse(url).netloc 287 results: dict[str, dict[str, object]] = {} 288 seen_hashes: set[str] = set() 289 290 async for link, raw in crawl_urls(url, depth, render_js=render_js): 291 if urlparse(link).netloc != start_domain: 292 continue 293 294 if dedupe: 295 digest = hashlib.sha256(raw.encode("utf-8", errors="ignore")).hexdigest() 296 if digest in seen_hashes: 297 logger.debug("Skipping %s due to duplicate content hash", link) 298 continue 299 seen_hashes.add(digest) 300 301 document = parse_data(raw, url=link) 302 page_results: dict[str, object] = {} 303 304 if include_text: 305 page_results["text"] = extract_text(document) 306 307 for name in extract_names: 308 func = AVAILABLE_EXTRACTORS.get(name) 309 if func is None: 310 raise ValueError(f"Unknown extractor: {name}") 311 if name == "contacts": 312 page_results[name] = func(document, method=method, llm=llm) 313 else: 314 page_results[name] = func(document) 315 316 results[link] = page_results 317 318 return results 319 320 321def extract_site( 322 url: str, 323 *, 324 depth: int = 0, 325 render_js: bool = False, 326 extract: list[str] | None = None, 327 include_text: bool = False, 328 use_llm: bool = False, 329 llm: LLMService | None = None, 330 dedupe: bool = True, 331) -> dict[str, dict[str, object]] | asyncio.Task[dict[str, dict[str, object]]]: 332 """Synchronously run :func:`async_extract_site` when no event loop exists. 333 334 When called from within a running event loop a task is scheduled instead. 335 """ 336 337 try: 338 loop = asyncio.get_running_loop() 339 except RuntimeError: 340 if use_llm and llm is None: 341 with LLMService() as managed_llm: 342 return asyncio.run( 343 async_extract_site( 344 url, 345 depth=depth, 346 render_js=render_js, 347 extract=extract, 348 include_text=include_text, 349 use_llm=True, 350 llm=managed_llm, 351 dedupe=dedupe, 352 ) 353 ) 354 return asyncio.run( 355 async_extract_site( 356 url, 357 depth=depth, 358 render_js=render_js, 359 extract=extract, 360 include_text=include_text, 361 use_llm=use_llm, 362 llm=llm, 363 dedupe=dedupe, 364 ) 365 ) 366 else: 367 if use_llm and llm is None: 368 msg = "llm must be provided when use_llm=True inside an event loop" 369 raise RuntimeError(msg) 370 return loop.create_task( 371 async_extract_site( 372 url, 373 depth=depth, 374 render_js=render_js, 375 extract=extract, 376 include_text=include_text, 377 use_llm=use_llm, 378 llm=llm, 379 dedupe=dedupe, 380 ) 381 ) 382 383 384def main() -> None: 385 app() 386 387 388__all__ = [ 389 "main", 390 "run", 391 "crawl", 392 "app", 393 "fetch_data", 394 "async_fetch_data", 395 "parse_data", 396 "extract_information", 397 "extract_text", 398 "extract_custom", 399 "extract_site", 400 "async_extract_site", 401 "output_results", 402 "to_json", 403 "json_schema", 404 "chunk_text", 405 "stream_chunks", 406 "LLMService", 407 "ContactDetails", 408 "__version__", 409]
45@app.command() 46def run( 47 url: str, 48 render_js: bool = typer.Option( 49 False, help="Render pages using a headless browser before extraction", 50 ), 51 use_llm: bool = typer.Option( 52 False, help="Use an LLM instead of regex for contact extraction", 53 ), 54 summarize: bool = typer.Option( 55 False, help="Summarize page content using the LLM", 56 ), 57 summary_language: str = typer.Option( 58 "German", 59 "--summary-language", 60 help="Language used for LLM summaries", 61 envvar="AINFO_SUMMARY_LANGUAGE", 62 ), 63 summary_prompt: str | None = typer.Option( 64 None, 65 "--summary-prompt", 66 help="Custom instruction supplied to the LLM when summarising", 67 envvar="AINFO_SUMMARY_PROMPT", 68 ), 69 summary_prompt_file: Path | None = typer.Option( 70 None, 71 "--summary-prompt-file", 72 help="Read the summary prompt from PATH", 73 ), 74 extract: list[str] = typer.Option( 75 [], "--extract", "-e", help="Additional extractors to run", 76 ), 77 output: Path | None = typer.Option( 78 None, "--output", "-o", help="Write JSON results to PATH.", 79 ), 80 json_output: bool = typer.Option( 81 False, "--json", help="Print extracted data as JSON to stdout", 82 ), 83 include_text: bool = typer.Option( 84 True, 85 "--text/--no-text", 86 help="Include page text in the results", 87 ), 88) -> None: 89 """Fetch ``url`` and display extracted text and optional information.""" 90 91 if summary_prompt is not None and summary_prompt_file is not None: 92 raise typer.BadParameter( 93 "Use either --summary-prompt or --summary-prompt-file, not both" 94 ) 95 96 custom_summary_prompt = summary_prompt 97 if summary_prompt_file is not None: 98 try: 99 custom_summary_prompt = summary_prompt_file.read_text(encoding="utf-8") 100 except OSError as exc: 101 raise typer.BadParameter( 102 f"Unable to read summary prompt file: {exc}" 103 ) from exc 104 105 raw = fetch_data(url, render_js=render_js) 106 document = parse_data(raw, url=url) 107 text: str | None = None 108 if include_text or summarize: 109 text = extract_text(document) 110 111 results: dict[str, object] = {} 112 if include_text and text is not None: 113 results["text"] = text 114 115 needs_llm = summarize or (use_llm and "contacts" in extract) 116 117 if needs_llm: 118 with LLMService() as llm: 119 for name in extract: 120 func = AVAILABLE_EXTRACTORS.get(name) 121 if func is None: 122 raise typer.BadParameter(f"Unknown extractor: {name}") 123 if name == "contacts": 124 results[name] = func( 125 document, method="llm" if use_llm else "regex", llm=llm 126 ) 127 else: 128 results[name] = func(document) 129 if summarize and text is not None: 130 results["summary"] = llm.summarize( 131 text, language=summary_language, prompt=custom_summary_prompt 132 ) 133 else: 134 for name in extract: 135 func = AVAILABLE_EXTRACTORS.get(name) 136 if func is None: 137 raise typer.BadParameter(f"Unknown extractor: {name}") 138 if name == "contacts": 139 results[name] = func(document, method="regex", llm=None) 140 else: 141 results[name] = func(document) 142 143 if output is not None: 144 serialisable = { 145 k: (v.model_dump() if isinstance(v, ContactDetails) else v) 146 for k, v in results.items() 147 } 148 output.write_text(json.dumps(serialisable)) 149 150 if json_output: 151 serialisable = { 152 k: (v.model_dump() if isinstance(v, ContactDetails) else v) 153 for k, v in results.items() 154 } 155 typer.echo(json.dumps(serialisable)) 156 else: 157 if include_text and text is not None: 158 typer.echo(text) 159 for name in extract: 160 value = results.get(name) 161 if name == "contacts" and isinstance(value, ContactDetails): 162 output_results(value) 163 else: 164 typer.echo(f"{name}:") 165 if isinstance(value, dict): 166 for key, items in value.items(): 167 typer.echo(f" {key}: {', '.join(items)}") 168 elif isinstance(value, list): 169 for item in value: 170 typer.echo(f" - {item}") 171 elif value is not None: 172 typer.echo(f" {value}") 173 if summarize and "summary" in results: 174 typer.echo("summary:") 175 typer.echo(results["summary"])
Fetch url
and display extracted text and optional information.
178@app.command() 179def crawl( 180 url: str, 181 depth: int = 1, 182 render_js: bool = typer.Option( 183 False, help="Render pages using a headless browser before extraction", 184 ), 185 use_llm: bool = typer.Option( 186 False, help="Use an LLM instead of regex for contact extraction", 187 ), 188 extract: list[str] = typer.Option( 189 [], "--extract", "-e", help="Additional extractors to run", 190 ), 191 output: Path | None = typer.Option( 192 None, "--output", "-o", help="Write JSON results to PATH.", 193 ), 194 json_output: bool = typer.Option( 195 False, "--json", help="Print aggregated results as JSON to stdout", 196 ), 197 include_text: bool = typer.Option( 198 True, 199 "--text/--no-text", 200 help="Include page text in the results", 201 ), 202) -> None: 203 """Crawl ``url`` up to ``depth`` levels and extract text and data.""" 204 205 method = "llm" if use_llm else "regex" 206 aggregated_results: dict[str, dict[str, object]] = {} 207 208 async def _crawl(llm: LLMService | None = None) -> None: 209 async for link, raw in crawl_urls(url, depth, render_js=render_js): 210 document = parse_data(raw, url=link) 211 page_results: dict[str, object] = {} 212 text = "" 213 if include_text: 214 text = extract_text(document) 215 page_results["text"] = text 216 for name in extract: 217 func = AVAILABLE_EXTRACTORS.get(name) 218 if func is None: 219 raise typer.BadParameter(f"Unknown extractor: {name}") 220 if name == "contacts": 221 page_results[name] = func(document, method=method, llm=llm) 222 else: 223 page_results[name] = func(document) 224 aggregated_results[link] = page_results 225 if not json_output: 226 typer.echo(f"Results for {link}:") 227 if include_text: 228 typer.echo(text) 229 for name in extract: 230 value = page_results.get(name) 231 if name == "contacts" and isinstance(value, ContactDetails): 232 output_results(value) 233 else: 234 typer.echo(f"{name}: {value}") 235 typer.echo() 236 237 if use_llm: 238 with LLMService() as llm: 239 asyncio.run(_crawl(llm)) 240 else: 241 asyncio.run(_crawl()) 242 243 if output is not None: 244 serialisable = { 245 url: { 246 k: (v.model_dump() if isinstance(v, ContactDetails) else v) 247 for k, v in res.items() 248 } 249 for url, res in aggregated_results.items() 250 } 251 output.write_text(json.dumps(serialisable)) 252 if json_output: 253 serialisable = { 254 url: { 255 k: (v.model_dump() if isinstance(v, ContactDetails) else v) 256 for k, v in res.items() 257 } 258 for url, res in aggregated_results.items() 259 } 260 typer.echo(json.dumps(serialisable))
Crawl url
up to depth
levels and extract text and data.
24def fetch_data(url: str, render_js: bool = False) -> str | asyncio.Task[str]: 25 """Fetch raw HTML from ``url``. 26 27 The function adapts to the surrounding asynchronous environment. If no 28 event loop is running, the coroutine is executed immediately and the HTML 29 is returned. When called while an event loop is already running, the 30 coroutine is scheduled on that loop and an :class:`asyncio.Task` is 31 returned. For fully asynchronous workflows use :func:`async_fetch_data`. 32 33 Parameters 34 ---------- 35 url: 36 The address to retrieve. 37 render_js: 38 Whether to render the page with a headless browser so that any 39 JavaScript on the page executes before the HTML is returned. 40 41 Returns 42 ------- 43 str | asyncio.Task[str] 44 The HTML body of the page or a task that resolves to it. 45 """ 46 47 try: 48 loop = asyncio.get_running_loop() 49 except RuntimeError: 50 return asyncio.run(_fetch(url, render_js)) 51 else: 52 return loop.create_task(_fetch(url, render_js))
Fetch raw HTML from url
.
The function adapts to the surrounding asynchronous environment. If no
event loop is running, the coroutine is executed immediately and the HTML
is returned. When called while an event loop is already running, the
coroutine is scheduled on that loop and an asyncio.Task
is
returned. For fully asynchronous workflows use async_fetch_data()
.
Parameters
url: The address to retrieve. render_js: Whether to render the page with a headless browser so that any JavaScript on the page executes before the HTML is returned.
Returns
str | asyncio.Task[str] The HTML body of the page or a task that resolves to it.
18async def async_fetch_data(url: str, render_js: bool = False) -> str: 19 """Fetch raw HTML from ``url`` asynchronously.""" 20 21 return await _fetch(url, render_js)
Fetch raw HTML from url
asynchronously.
10def parse_data(raw: str, url: str | None = None) -> Document: 11 """Parse raw HTML into a :class:`~ainfo.models.Document`. 12 13 Parameters 14 ---------- 15 raw: 16 The raw HTML string. 17 url: 18 Optional source URL associated with the HTML. 19 """ 20 21 return parse_html(raw, url=url)
Parse raw HTML into a ~ainfo.models.Document
.
Parameters
raw: The raw HTML string. url: Optional source URL associated with the HTML.
79def extract_information( 80 doc: Document, 81 method: str = "regex", 82 llm: LLMService | None = None, 83 instruction: str | None = None, 84 model: str | None = None, 85) -> ContactDetails: 86 """Extract contact details from a parsed document. 87 88 Parameters 89 ---------- 90 doc: 91 Parsed :class:`Document` to process. 92 method: 93 ``"regex"`` to use the built-in regular expressions or ``"llm"`` to 94 delegate extraction to an LLM service. 95 llm: 96 Instance of :class:`LLMService` required when ``method`` is ``"llm"``. 97 """ 98 99 logger.info("Extracting contact information using %s", method) 100 text = extract_text(doc, content_only=False) 101 if method == "llm": 102 if llm is None: 103 msg = "LLMService instance required when method='llm'" 104 raise ValueError(msg) 105 instruction = instruction or ( 106 "Extract any email addresses, phone numbers, street addresses and " 107 "social media profiles from the following text. Respond in JSON " 108 "with keys 'emails', 'phone_numbers', 'addresses' and " 109 "'social_media'." 110 ) 111 response = llm.extract(text, instruction, model=model) 112 try: 113 data = json.loads(response) 114 except Exception: 115 data = {} 116 return ContactDetails( 117 emails=data.get("emails", []), 118 phone_numbers=data.get("phone_numbers", []), 119 addresses=data.get("addresses", []), 120 social_media=data.get("social_media", []), 121 ) 122 123 # Default to regex based extraction 124 return ContactDetails( 125 emails=extract_emails(doc), 126 phone_numbers=extract_phone_numbers(text), 127 addresses=extract_addresses(text), 128 social_media=extract_social_profiles(text), 129 )
Extract contact details from a parsed document.
Parameters
doc:
Parsed Document
to process.
method:
"regex"
to use the built-in regular expressions or "llm"
to
delegate extraction to an LLM service.
llm:
Instance of LLMService
required when method
is "llm"
.
43def extract_text( 44 doc: Document, 45 joiner: str = " ", 46 as_list: bool = False, 47 *, 48 content_only: bool = True, 49) -> str | list[str]: 50 """Extract and clean the main textual content from ``doc``. 51 52 Parameters 53 ---------- 54 doc: 55 Parsed :class:`Document` to process. 56 joiner: 57 String used to join individual text fragments when ``as_list`` is 58 ``False``. Defaults to a single space. 59 as_list: 60 When ``True`` return a list of text fragments instead of a single 61 string. 62 content_only: 63 When ``True`` include only nodes identified as primary content. Set to 64 ``False`` to include navigation and footer text as well. 65 """ 66 67 logger.info("Extracting text from document") 68 parts = [ 69 re.sub(r"\s+", " ", p).strip() 70 for p in _gather_text(doc.nodes, content_only=content_only) 71 ] 72 if as_list: 73 return [p for p in parts if p] 74 filtered = [p for p in parts if p] 75 text = joiner.join(filtered) 76 return text.strip()
Extract and clean the main textual content from doc
.
Parameters
doc:
Parsed Document
to process.
joiner:
String used to join individual text fragments when as_list
is
False
. Defaults to a single space.
as_list:
When True
return a list of text fragments instead of a single
string.
content_only:
When True
include only nodes identified as primary content. Set to
False
to include navigation and footer text as well.
132def extract_custom( 133 doc: Document, 134 patterns: dict[str, str] | None = None, 135 *, 136 llm: LLMService | None = None, 137 prompt: str | None = None, 138 model: str | None = None, 139) -> dict[str, list[str]]: 140 """Extract arbitrary information from ``doc``. 141 142 The extraction can be performed either using regular expression 143 ``patterns`` or delegated to an LLM service when ``llm`` is provided. 144 145 Parameters 146 ---------- 147 doc: 148 Parsed :class:`Document` to search. 149 patterns: 150 Mapping of field names to regular expression patterns. Required when 151 ``llm`` is ``None``. 152 llm: 153 Optional :class:`LLMService` used to perform extraction via a large 154 language model. 155 prompt: 156 Custom prompt supplied to the LLM. It should describe the desired JSON 157 structure, for example ``"Extract product names as a list under the key 158 'products'"``. If omitted a generic instruction is used. 159 model: 160 Identifier of the model to use when ``llm`` is provided. 161 162 Returns 163 ------- 164 dict[str, list[str]] 165 A mapping of field names to lists of extracted strings. 166 """ 167 168 logger.info("Extracting custom information") 169 text = extract_text(doc) 170 if llm is not None: 171 instruction = prompt or "Extract the requested information as JSON." 172 response = llm.extract(text, instruction, model=model) 173 try: 174 data = json.loads(response) 175 except Exception: 176 data = {} 177 results: dict[str, list[str]] = {} 178 for key, value in data.items(): 179 if isinstance(value, list): 180 results[key] = value 181 elif value is not None: 182 results[key] = [value] 183 return results 184 185 if patterns is None: 186 msg = "patterns required when llm is None" 187 raise ValueError(msg) 188 189 results: dict[str, list[str]] = {} 190 for key, pattern in patterns.items(): 191 regex = re.compile(pattern, re.IGNORECASE) 192 matches = [m.group(0) for m in regex.finditer(text)] 193 results[key] = list(dict.fromkeys(matches)) 194 return results
Extract arbitrary information from doc
.
The extraction can be performed either using regular expression
patterns
or delegated to an LLM service when llm
is provided.
Parameters
doc:
Parsed Document
to search.
patterns:
Mapping of field names to regular expression patterns. Required when
llm
is None
.
llm:
Optional LLMService
used to perform extraction via a large
language model.
prompt:
Custom prompt supplied to the LLM. It should describe the desired JSON
structure, for example "Extract product names as a list under the key
'products'"
. If omitted a generic instruction is used.
model:
Identifier of the model to use when llm
is provided.
Returns
dict[str, list[str]] A mapping of field names to lists of extracted strings.
322def extract_site( 323 url: str, 324 *, 325 depth: int = 0, 326 render_js: bool = False, 327 extract: list[str] | None = None, 328 include_text: bool = False, 329 use_llm: bool = False, 330 llm: LLMService | None = None, 331 dedupe: bool = True, 332) -> dict[str, dict[str, object]] | asyncio.Task[dict[str, dict[str, object]]]: 333 """Synchronously run :func:`async_extract_site` when no event loop exists. 334 335 When called from within a running event loop a task is scheduled instead. 336 """ 337 338 try: 339 loop = asyncio.get_running_loop() 340 except RuntimeError: 341 if use_llm and llm is None: 342 with LLMService() as managed_llm: 343 return asyncio.run( 344 async_extract_site( 345 url, 346 depth=depth, 347 render_js=render_js, 348 extract=extract, 349 include_text=include_text, 350 use_llm=True, 351 llm=managed_llm, 352 dedupe=dedupe, 353 ) 354 ) 355 return asyncio.run( 356 async_extract_site( 357 url, 358 depth=depth, 359 render_js=render_js, 360 extract=extract, 361 include_text=include_text, 362 use_llm=use_llm, 363 llm=llm, 364 dedupe=dedupe, 365 ) 366 ) 367 else: 368 if use_llm and llm is None: 369 msg = "llm must be provided when use_llm=True inside an event loop" 370 raise RuntimeError(msg) 371 return loop.create_task( 372 async_extract_site( 373 url, 374 depth=depth, 375 render_js=render_js, 376 extract=extract, 377 include_text=include_text, 378 use_llm=use_llm, 379 llm=llm, 380 dedupe=dedupe, 381 ) 382 )
Synchronously run async_extract_site()
when no event loop exists.
When called from within a running event loop a task is scheduled instead.
263async def async_extract_site( 264 url: str, 265 *, 266 depth: int = 0, 267 render_js: bool = False, 268 extract: list[str] | None = None, 269 include_text: bool = False, 270 use_llm: bool = False, 271 llm: LLMService | None = None, 272 dedupe: bool = True, 273) -> dict[str, dict[str, object]]: 274 """Crawl ``url`` up to ``depth`` levels and run extractors on each page. 275 276 Results are returned as a mapping of page URL to the extracted data. 277 Duplicate pages are skipped by comparing a SHA-256 hash of their HTML 278 content. Only pages on the same domain as ``url`` are processed. 279 """ 280 281 extract_names = list(extract or ["contacts"]) 282 method = "llm" if use_llm else "regex" 283 if use_llm and llm is None: 284 msg = "llm service required when use_llm=True" 285 raise ValueError(msg) 286 287 start_domain = urlparse(url).netloc 288 results: dict[str, dict[str, object]] = {} 289 seen_hashes: set[str] = set() 290 291 async for link, raw in crawl_urls(url, depth, render_js=render_js): 292 if urlparse(link).netloc != start_domain: 293 continue 294 295 if dedupe: 296 digest = hashlib.sha256(raw.encode("utf-8", errors="ignore")).hexdigest() 297 if digest in seen_hashes: 298 logger.debug("Skipping %s due to duplicate content hash", link) 299 continue 300 seen_hashes.add(digest) 301 302 document = parse_data(raw, url=link) 303 page_results: dict[str, object] = {} 304 305 if include_text: 306 page_results["text"] = extract_text(document) 307 308 for name in extract_names: 309 func = AVAILABLE_EXTRACTORS.get(name) 310 if func is None: 311 raise ValueError(f"Unknown extractor: {name}") 312 if name == "contacts": 313 page_results[name] = func(document, method=method, llm=llm) 314 else: 315 page_results[name] = func(document) 316 317 results[link] = page_results 318 319 return results
Crawl url
up to depth
levels and run extractors on each page.
Results are returned as a mapping of page URL to the extracted data.
Duplicate pages are skipped by comparing a SHA-256 hash of their HTML
content. Only pages on the same domain as url
are processed.
18def output_results(results: Mapping[str, list[str]] | BaseModel) -> None: 19 """Pretty-print ``results`` to the console.""" 20 21 data = _to_mapping(results) 22 for key, values in data.items(): 23 print(f"{key}:") 24 for value in values: 25 print(f" - {value}")
Pretty-print results
to the console.
38def to_json(results: Mapping[str, object] | BaseModel, path: str | Path | None = None) -> str: 39 """Serialize ``results`` to JSON and optionally write to ``path``. 40 41 Parameters 42 ---------- 43 results: 44 A mapping containing the extracted information. 45 path: 46 Optional path to a file where the JSON representation should be 47 written. If omitted, the JSON string is returned without writing to 48 disk. 49 50 Returns 51 ------- 52 str 53 The JSON representation of ``results``. 54 """ 55 56 json_data = json.dumps(_serialize(results)) 57 if path is not None: 58 Path(path).write_text(json_data) 59 return json_data
Serialize results
to JSON and optionally write to path
.
Parameters
results: A mapping containing the extracted information. path: Optional path to a file where the JSON representation should be written. If omitted, the JSON string is returned without writing to disk.
Returns
str
The JSON representation of results
.
62def json_schema(model: type[BaseModel]) -> dict[str, object]: 63 """Return the JSON schema for ``model``.""" 64 65 return model.model_json_schema()
Return the JSON schema for model
.
16def chunk_text(text: str, size: int) -> list[str]: 17 """Return a list of substrings of ``text`` with at most ``size`` characters.""" 18 if size <= 0: 19 raise ValueError("size must be positive") 20 return [text[i : i + size] for i in range(0, len(text), size)]
Return a list of substrings of text
with at most size
characters.
23def stream_chunks(source: str, size: int) -> Iterator[str]: 24 """Yield successive ``size``-sized chunks from ``source``. 25 26 ``source`` may be raw text or a URL. When a URL is supplied the 27 referenced page is fetched, parsed and its textual content chunked. 28 """ 29 if size <= 0: 30 raise ValueError("size must be positive") 31 32 if source.startswith("http://") or source.startswith("https://"): 33 raw = fetch_data(source) 34 if isinstance(raw, asyncio.Task): 35 raw = asyncio.run(raw) 36 doc = parse_data(raw, url=source) 37 text = extract_text(doc) 38 else: 39 text = source 40 41 for i in range(0, len(text), size): 42 yield text[i : i + size]
Yield successive size
-sized chunks from source
.
source
may be raw text or a URL. When a URL is supplied the
referenced page is fetched, parsed and its textual content chunked.
50class LLMService: 51 """Client for interacting with an LLM via the OpenRouter API.""" 52 53 def __init__(self, config: LLMConfig | None = None) -> None: 54 self.config = config or LLMConfig() 55 configured_language = ( 56 self.config.summary_language or DEFAULT_SUMMARY_LANGUAGE 57 ) 58 self.summary_language = configured_language.strip() or DEFAULT_SUMMARY_LANGUAGE 59 configured_prompt = self.config.summary_prompt 60 if configured_prompt is not None and not configured_prompt.strip(): 61 configured_prompt = None 62 self.summary_prompt = configured_prompt 63 if not self.config.api_key: 64 msg = "OPENROUTER_API_KEY is required to use the LLM service" 65 raise RuntimeError(msg) 66 headers = {"Authorization": f"Bearer {self.config.api_key}"} 67 self._client = httpx.Client(base_url=self.config.base_url, headers=headers) 68 69 # ------------------------------------------------------------------ 70 # lifecycle management 71 # ------------------------------------------------------------------ 72 def close(self) -> None: 73 """Close the underlying :class:`httpx.Client` instance.""" 74 75 self._client.close() 76 77 def __enter__(self) -> "LLMService": 78 return self 79 80 def __exit__(self, exc_type, exc, tb) -> bool: 81 self.close() 82 # Do not suppress exceptions 83 return False 84 85 def _chat(self, messages: list[dict[str, str]], model: str | None = None) -> str: 86 payload = {"model": model or self.config.model, "messages": messages} 87 resp = self._client.post("/chat/completions", json=payload, timeout=60) 88 resp.raise_for_status() 89 data = resp.json() 90 return data["choices"][0]["message"]["content"].strip() 91 92 def extract(self, text: str, instruction: str, model: str | None = None) -> str: 93 """Return the model's response to ``instruction`` applied to ``text``. 94 95 Parameters 96 ---------- 97 text: 98 The content to analyse. 99 instruction: 100 The instruction or prompt supplied to the model. 101 model: 102 Optional identifier of the model to use. Defaults to the model 103 configured on the service instance. 104 """ 105 106 prompt = f"{instruction}\n\n{text}" 107 return self._chat([{"role": "user", "content": prompt}], model=model) 108 109 def summarize( 110 self, 111 text: str, 112 model: str | None = None, 113 language: str | None = None, 114 prompt: str | None = None, 115 ) -> str: 116 """Return a cold-outreach-ready summary of ``text``. 117 118 When ``prompt`` is supplied it takes precedence over any configured 119 defaults or language-specific templates. 120 """ 121 122 chosen_prompt = prompt 123 if chosen_prompt is not None and not chosen_prompt.strip(): 124 chosen_prompt = None 125 if chosen_prompt is None: 126 configured_prompt = self.summary_prompt 127 if configured_prompt is not None and not configured_prompt.strip(): 128 configured_prompt = None 129 chosen_prompt = configured_prompt 130 if chosen_prompt is None: 131 chosen_prompt = build_summary_prompt(language or self.summary_language) 132 return self.extract(text, chosen_prompt, model=model)
Client for interacting with an LLM via the OpenRouter API.
53 def __init__(self, config: LLMConfig | None = None) -> None: 54 self.config = config or LLMConfig() 55 configured_language = ( 56 self.config.summary_language or DEFAULT_SUMMARY_LANGUAGE 57 ) 58 self.summary_language = configured_language.strip() or DEFAULT_SUMMARY_LANGUAGE 59 configured_prompt = self.config.summary_prompt 60 if configured_prompt is not None and not configured_prompt.strip(): 61 configured_prompt = None 62 self.summary_prompt = configured_prompt 63 if not self.config.api_key: 64 msg = "OPENROUTER_API_KEY is required to use the LLM service" 65 raise RuntimeError(msg) 66 headers = {"Authorization": f"Bearer {self.config.api_key}"} 67 self._client = httpx.Client(base_url=self.config.base_url, headers=headers)
72 def close(self) -> None: 73 """Close the underlying :class:`httpx.Client` instance.""" 74 75 self._client.close()
Close the underlying httpx.Client
instance.
92 def extract(self, text: str, instruction: str, model: str | None = None) -> str: 93 """Return the model's response to ``instruction`` applied to ``text``. 94 95 Parameters 96 ---------- 97 text: 98 The content to analyse. 99 instruction: 100 The instruction or prompt supplied to the model. 101 model: 102 Optional identifier of the model to use. Defaults to the model 103 configured on the service instance. 104 """ 105 106 prompt = f"{instruction}\n\n{text}" 107 return self._chat([{"role": "user", "content": prompt}], model=model)
Return the model's response to instruction
applied to text
.
Parameters
text: The content to analyse. instruction: The instruction or prompt supplied to the model. model: Optional identifier of the model to use. Defaults to the model configured on the service instance.
109 def summarize( 110 self, 111 text: str, 112 model: str | None = None, 113 language: str | None = None, 114 prompt: str | None = None, 115 ) -> str: 116 """Return a cold-outreach-ready summary of ``text``. 117 118 When ``prompt`` is supplied it takes precedence over any configured 119 defaults or language-specific templates. 120 """ 121 122 chosen_prompt = prompt 123 if chosen_prompt is not None and not chosen_prompt.strip(): 124 chosen_prompt = None 125 if chosen_prompt is None: 126 configured_prompt = self.summary_prompt 127 if configured_prompt is not None and not configured_prompt.strip(): 128 configured_prompt = None 129 chosen_prompt = configured_prompt 130 if chosen_prompt is None: 131 chosen_prompt = build_summary_prompt(language or self.summary_language) 132 return self.extract(text, chosen_prompt, model=model)
Return a cold-outreach-ready summary of text
.
When prompt
is supplied it takes precedence over any configured
defaults or language-specific templates.
18class ContactDetails(BaseModel): 19 """Standardised contact information extracted from a page.""" 20 21 emails: list[str] = Field( 22 default_factory=list, description="Email addresses found in the document." 23 ) 24 phone_numbers: list[str] = Field( 25 default_factory=list, description="Phone numbers detected in the document." 26 ) 27 addresses: list[Address | str] = Field( 28 default_factory=list, description="Street addresses discovered in the document." 29 ) 30 social_media: list[str] = Field( 31 default_factory=list, 32 description= 33 "Social media profile URLs or handles extracted from the document.", 34 )
Standardised contact information extracted from a page.