diff --git a/CLAUDE.md b/CLAUDE.md index 40bca50..16cfc62 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -10,7 +10,7 @@ The project that has the following components: 1. An ETL flow that fetches data from Snowflake - Refer to `example_data.py` for a sample - - Data is processed in batches of at most 1000 rows + - Data is processed in batches of at most 100 rows - Store the IDs of rows that were processed, next time the flow executes, fetch the next batch - Store the state of processing in an artifact, use Metaflow client to retrieve the state - Include an option for resetting state diff --git a/README.md b/README.md new file mode 100644 index 0000000..d7965fe --- /dev/null +++ b/README.md @@ -0,0 +1,38 @@ +# Company Enrichment Pipeline + +An Outerbounds project that continuously fetches company data from Snowflake, +enriches it with website-derived tags using a local LLM, and provides an +interactive UI for exploring the results. + +## Components + +### Flows + +- **snowflake-etl** - Hourly ETL that fetches company data from Snowflake in + batches of 1000 rows, tracking progress across runs. +- **company-enricher** - Event-triggered flow that fetches company websites, + uses a local LLM to generate descriptive tags, and merges results across runs. + +### Deployments + +- **company-explorer** - Streamlit dashboard for browsing enriched companies + and filtering by tags. + +### Assets + +- **Data: `company_batch`** - Latest batch of raw company data from Snowflake. +- **Data: `enriched_companies`** - Accumulated enrichment results with tags. +- **Model: `tag_llm`** - Small local LLM used for tag extraction. + +## Running Locally + +```bash +# ETL flow +OBPROJECT_SKIP_PYPI_BASE=1 python flows/snowflake-etl/flow.py run + +# Enrichment flow (normally triggered by ETL event) +OBPROJECT_SKIP_PYPI_BASE=1 python flows/company-enricher/flow.py run + +# Dashboard +streamlit run deployments/company-explorer/app.py +``` diff --git a/data/company-batch/README.md b/data/company-batch/README.md new file mode 100644 index 0000000..8718dd4 --- /dev/null +++ b/data/company-batch/README.md @@ -0,0 +1,4 @@ +# Company Batch + +Raw company data fetched from Snowflake by the Snowflake ETL flow. +Each instance contains up to 100 rows as a pandas DataFrame. diff --git a/data/company-batch/asset_config.toml b/data/company-batch/asset_config.toml new file mode 100644 index 0000000..4084a9c --- /dev/null +++ b/data/company-batch/asset_config.toml @@ -0,0 +1,7 @@ +name = "Company Batch" +id = "company_batch" +description = "Latest batch of raw company data fetched from Snowflake" + +[properties] +source = "free_company_dataset.public.freecompanydataset" +batch_size = "100" diff --git a/data/enriched-companies/README.md b/data/enriched-companies/README.md new file mode 100644 index 0000000..f6989db --- /dev/null +++ b/data/enriched-companies/README.md @@ -0,0 +1,5 @@ +# Enriched Companies + +Accumulated enrichment results produced by the Company Enricher flow. +Each company entry includes the original ID, name, domain, LLM-generated +tags, and whether the website was successfully fetched. diff --git a/data/enriched-companies/asset_config.toml b/data/enriched-companies/asset_config.toml new file mode 100644 index 0000000..bc80784 --- /dev/null +++ b/data/enriched-companies/asset_config.toml @@ -0,0 +1,6 @@ +name = "Enriched Companies" +id = "enriched_companies" +description = "Accumulated company data enriched with LLM-generated tags from website content" + +[properties] +format = "list of dicts with keys: id, name, domain, tags, website_fetched" diff --git a/deployments/company-explorer/README.md b/deployments/company-explorer/README.md new file mode 100644 index 0000000..c5737e0 --- /dev/null +++ b/deployments/company-explorer/README.md @@ -0,0 +1,15 @@ +# Company Explorer + +Streamlit dashboard for browsing and exploring enriched company data. + +## Features + +- Search companies by name or domain +- Filter by LLM-generated tags +- View tag distribution chart +- Inspect individual company details + +## Data Source + +Reads the `enriched_companies` data asset produced by the Company Enricher flow, +accessed via the Metaflow client API. diff --git a/deployments/company-explorer/app.py b/deployments/company-explorer/app.py new file mode 100644 index 0000000..e0ee579 --- /dev/null +++ b/deployments/company-explorer/app.py @@ -0,0 +1,103 @@ +"""Streamlit dashboard for exploring enriched company data.""" + +import os + +import streamlit as st +from metaflow import Flow, namespace + +st.set_page_config(page_title="Company Explorer", layout="wide") +st.title("Company Explorer") + + +@st.cache_data(ttl=300) +def load_enriched_data(): + """Load the latest enriched company data from the CompanyEnricher flow.""" + namespace(None) + try: + latest = Flow("CompanyEnricher").latest_successful_run + if latest and hasattr(latest.data, "enriched"): + return latest.data.enriched + except Exception: + pass + return [] + + +data = load_enriched_data() + +if not data: + st.warning("No enriched company data available yet. Run the pipeline first.") + st.stop() + +# Collect all unique tags +all_tags = sorted({tag for row in data for tag in row.get("tags", [])}) + +# --- Sidebar filters --- +st.sidebar.header("Filters") + +search = st.sidebar.text_input("Search companies", "") +selected_tags = st.sidebar.multiselect("Filter by tags", all_tags) + +# --- Apply filters --- +filtered = data + +if search: + search_lower = search.lower() + filtered = [ + r for r in filtered + if search_lower in r.get("name", "").lower() + or search_lower in r.get("domain", "").lower() + ] + +if selected_tags: + filtered = [ + r for r in filtered + if any(tag in r.get("tags", []) for tag in selected_tags) + ] + +# --- Metrics --- +col1, col2, col3 = st.columns(3) +col1.metric("Total Companies", len(data)) +col2.metric("Filtered", len(filtered)) +col3.metric("Unique Tags", len(all_tags)) + +# --- Company table --- +st.subheader(f"Companies ({len(filtered)})") + +if filtered: + import pandas as pd + + df = pd.DataFrame([ + { + "Name": r.get("name", ""), + "Domain": r.get("domain", ""), + "Tags": ", ".join(r.get("tags", [])), + "Website Fetched": r.get("website_fetched", False), + } + for r in filtered + ]) + st.dataframe(df, use_container_width=True, hide_index=True) +else: + st.info("No companies match the current filters.") + +# --- Tag distribution --- +st.subheader("Tag Distribution") + +from collections import Counter + +tag_counts = Counter(tag for r in data for tag in r.get("tags", [])) +if tag_counts: + top_tags = tag_counts.most_common(25) + import pandas as pd + + tag_df = pd.DataFrame(top_tags, columns=["Tag", "Count"]) + st.bar_chart(tag_df.set_index("Tag")) + +# --- Company detail --- +st.subheader("Company Detail") + +company_names = [r.get("name", "Unknown") for r in filtered] +if company_names: + selected = st.selectbox("Select a company", company_names) + company = next((r for r in filtered if r.get("name") == selected), None) + if company: + st.json(company) diff --git a/deployments/company-explorer/config.yml b/deployments/company-explorer/config.yml new file mode 100644 index 0000000..937c725 --- /dev/null +++ b/deployments/company-explorer/config.yml @@ -0,0 +1,16 @@ +name: company-explorer +port: 8000 +description: Interactive UI for exploring enriched company data and tags + +replicas: + min: 1 + max: 1 + +dependencies: + pypi: + streamlit: "" + pandas: "" + outerbounds: "" + +commands: + - streamlit run deployments/company-explorer/app.py --server.port 8000 diff --git a/flows/company-enricher/README.md b/flows/company-enricher/README.md new file mode 100644 index 0000000..91019ff --- /dev/null +++ b/flows/company-enricher/README.md @@ -0,0 +1,25 @@ +# Company Enricher + +Event-triggered flow that enriches company data with website-derived tags. + +## Behavior + +- Triggered by the `enrich` event from the Snowflake ETL flow +- Splits the batch into up to 10 parallel chunks for processing +- For each company: + 1. Fetches the landing page from the company's domain + 2. Uses TinyLlama 1.1B to generate 5 descriptive tags +- Merges new results with previous runs to build a cumulative dataset +- Registers results as the `enriched_companies` data asset + +## Resources + +Each parallel enrichment task requests 4 CPUs and 16 GB memory +to run the local LLM inference. + +## Running + +```bash +# Normally triggered by event, but can be run manually: +OBPROJECT_SKIP_PYPI_BASE=1 python flows/company-enricher/flow.py run +``` diff --git a/flows/company-enricher/flow.py b/flows/company-enricher/flow.py new file mode 100644 index 0000000..be6929c --- /dev/null +++ b/flows/company-enricher/flow.py @@ -0,0 +1,285 @@ +"""Company enricher flow. + +Triggered by the Snowflake ETL flow. Fetches each company's landing page, +uses a small local LLM to extract descriptive tags, and accumulates results +across runs. +""" + +import json + +from metaflow import ( + card, + current, + pypi, + resources, + retry, + step, +) +from metaflow.cards import Markdown, Table +from metaflow import trigger_on_finish +from obproject import ProjectFlow, highlight + +def load_tag_model(): + """Load the LLM pipeline once for reuse across all rows.""" + from transformers import pipeline + + return pipeline( + "text-generation", + model="TinyLlama/TinyLlama-1.1B-Chat-v1.0", + max_new_tokens=100, + do_sample=False, + ) + + +def extract_tags_with_llm(company_name, description, website_text, generator): + """Use a small local LLM to generate 5 descriptive tags for a company.""" + prompt = ( + f"You are a business analyst. Given the following company information, " + f"produce exactly 5 short descriptive tags (2-3 words each) that " + f"categorize this company. Return only a JSON list of strings.\n\n" + f"Company: {company_name}\n" + f"Description: {(description or 'N/A')[:500]}\n" + f"Website content: {(website_text or 'N/A')[:1500]}\n\n" + f"Tags:" + ) + + result = generator(prompt)[0]["generated_text"] + # Parse the generated text after the prompt + generated = result[len(prompt):] + + # Try to extract a JSON list from the output + try: + start = generated.index("[") + end = generated.index("]") + 1 + tags = json.loads(generated[start:end]) + if isinstance(tags, list): + return [str(t).strip() for t in tags[:5]] + except (ValueError, json.JSONDecodeError): + pass + + # Fallback: split by commas or newlines + parts = [p.strip().strip('"').strip("'").strip("-").strip() + for p in generated.replace("\n", ",").split(",")] + tags = [p for p in parts if 2 <= len(p) <= 40][:5] + return tags if tags else ["unclassified"] + + +@trigger_on_finish(flow='SnowflakeETL') +class CompanyEnricher(ProjectFlow): + + @card(type="blank", id="start_card") + @step + def start(self): + """Load the latest company batch and split into chunks for parallel processing.""" + from metaflow import Flow, namespace + + try: + batch_df = self.prj.get_data("company_batch") + except Exception: + # Fallback: read directly from SnowflakeETL artifacts + batch_df = None + namespace(None) + try: + latest = Flow("SnowflakeETL").latest_successful_run + if latest and hasattr(latest.data, "batch_df"): + batch_df = latest.data.batch_df + except Exception: + pass + + if batch_df is None or len(batch_df) == 0: + self.chunks = [{}] + current.card["start_card"].append(Markdown("# Company Enricher\n\nNo data to process.")) + self.next(self.enrich, foreach="chunks") + return + + # Convert DataFrame rows to list of dicts for foreach + rows = batch_df.to_dict(orient="records") + + # Split into chunks for 10 parallel tasks + n_chunks = min(10, len(rows)) + chunk_size = (len(rows) + n_chunks - 1) // n_chunks + self.chunks = [ + rows[i : i + chunk_size] for i in range(0, len(rows), chunk_size) + ] + + current.card["start_card"].append( + Markdown( + f"# Company Enricher\n\n" + f"- **Rows to process:** {len(rows)}\n" + f"- **Parallel chunks:** {len(self.chunks)}\n" + ) + ) + self.next(self.enrich, foreach="chunks") + + @retry(times=1) + @resources(cpu=4, memory=16000) + @pypi( + python="3.11", + packages={ + "transformers": ">=4.40", + "torch": ">=2.2", + "requests": ">=2.31", + "beautifulsoup4": ">=4.12", + "accelerate": ">=0.27", + }, + ) + @card(type="blank", id="enrichment", refresh_interval=5) + @step + def enrich(self): + """Fetch website and extract tags for each company in this chunk.""" + from company_utils.scraper import fetch_landing_page + + rows = self.input + self.results = [] + successes = 0 + failures = 0 + total = len([r for r in rows if r]) + + current.card["enrichment"].append( + Markdown(f"## Enriching {total} companies...\n") + ) + + generator = load_tag_model() + + for i, row in enumerate(rows): + if not row: + continue + + company_name = row.get("NAME", row.get("COMPANY_NAME", "Unknown")) + domain = row.get("DOMAIN", row.get("WEBSITE", "")) + description = row.get("DESCRIPTION", row.get("INDUSTRY", "")) + company_id = str(row.get("ID", "")) + + website_text = fetch_landing_page(domain) + fetched = website_text is not None + + try: + tags = extract_tags_with_llm(company_name, description, website_text, generator) + successes += 1 + except Exception: + tags = ["error"] + failures += 1 + + self.results.append({ + "id": company_id, + "name": company_name, + "domain": domain, + "tags": tags, + "website_fetched": fetched, + }) + + # Update card with progress + current.card["enrichment"].clear() + current.card["enrichment"].append( + Markdown( + f"## Chunk progress: {i + 1}/{total}\n\n" + f"- Successes: {successes}\n" + f"- Failures: {failures}\n" + f"- Latest: **{company_name}** — {', '.join(tags)}\n" + ) + ) + current.card["enrichment"].refresh() + + # Final card + current.card["enrichment"].clear() + current.card["enrichment"].append( + Markdown( + f"## Chunk results\n\n" + f"- Processed: {len(self.results)}\n" + f"- Successes: {successes}\n" + f"- Failures: {failures}\n" + ) + ) + if self.results: + current.card["enrichment"].append( + Table( + [[r["name"], r["domain"], ", ".join(r["tags"]), str(r["website_fetched"])] + for r in self.results[:20]], + headers=["Company", "Domain", "Tags", "Website Fetched"], + ) + ) + + self.next(self.join) + + @highlight + @card(type="blank", id="join_card") + @step + def join(self, inputs): + """Merge chunk results and combine with previous run data.""" + from metaflow import Flow, namespace + + # Merge results from all parallel tasks + all_results = [] + for inp in inputs: + all_results.extend(inp.results) + + # Fetch previous accumulated results + previous_results = {} + namespace(None) + try: + latest = Flow("CompanyEnricher").latest_successful_run + if latest and hasattr(latest.data, "enriched"): + for r in latest.data.enriched: + previous_results[r["id"]] = r + except Exception: + pass + + # Merge: new results overwrite previous for the same ID + for r in all_results: + previous_results[r["id"]] = r + + self.enriched = list(previous_results.values()) + + # Register as asset + self.prj.register_data( + "enriched_companies", + "enriched", + annotations={ + "total_companies": str(len(self.enriched)), + "new_in_batch": str(len(all_results)), + }, + ) + + # Summary stats + total_tags = set() + for r in self.enriched: + total_tags.update(r.get("tags", [])) + + successes = sum(1 for r in all_results if r.get("website_fetched")) + failures = len(all_results) - successes + + current.card["join_card"].append( + Markdown( + f"# Company Enricher - Summary\n\n" + f"- **New companies processed:** {len(all_results)}\n" + f"- **Website fetch successes:** {successes}\n" + f"- **Website fetch failures:** {failures}\n" + f"- **Total accumulated companies:** {len(self.enriched)}\n" + f"- **Unique tags:** {len(total_tags)}\n" + ) + ) + if all_results: + current.card["join_card"].append(Markdown("## Latest batch")) + current.card["join_card"].append( + Table( + [[r["name"], r["domain"], ", ".join(r["tags"])] + for r in all_results[:30]], + headers=["Company", "Domain", "Tags"], + ) + ) + + # Highlight card + self.highlight.title = "Company Enricher" + self.highlight.add_column(str(len(all_results)), "enriched") + self.highlight.add_column(str(len(self.enriched)), "total") + self.highlight.add_column(str(len(total_tags)), "unique tags") + + self.next(self.end) + + @step + def end(self): + pass + + +if __name__ == "__main__": + CompanyEnricher() diff --git a/flows/snowflake-etl/README.md b/flows/snowflake-etl/README.md new file mode 100644 index 0000000..e7ba480 --- /dev/null +++ b/flows/snowflake-etl/README.md @@ -0,0 +1,25 @@ +# Snowflake ETL + +Hourly scheduled flow that extracts company data from Snowflake. + +## Behavior + +- Fetches up to 1000 rows per run from `free_company_dataset.public.freecompanydataset` +- Tracks processed row IDs across runs using Metaflow artifacts +- Skips previously processed rows on subsequent runs +- Triggers the company-enricher flow on completion via `@trigger_on_finish` +- Registers each batch as the `company_batch` data asset + +## Parameters + +| Parameter | Default | Description | +|-----------|---------|-------------| +| `--reset` | `no` | Set to `yes` to clear processing state and start over | + +## Running + +```bash +OBPROJECT_SKIP_PYPI_BASE=1 python flows/snowflake-etl/flow.py run +# Reset state: +OBPROJECT_SKIP_PYPI_BASE=1 python flows/snowflake-etl/flow.py run --reset yes +``` diff --git a/flows/snowflake-etl/flow.py b/flows/snowflake-etl/flow.py new file mode 100644 index 0000000..1fe64a5 --- /dev/null +++ b/flows/snowflake-etl/flow.py @@ -0,0 +1,105 @@ +"""Snowflake ETL flow. + +Fetches company data from Snowflake in batches of 100, tracking which rows +have already been processed across runs. Publishes an event to trigger the +company-enricher flow for each new batch. +""" + +from metaflow import ( + Parameter, + card, + current, + pypi, + schedule, + step, +) +from metaflow.cards import Markdown, Table +from obproject import ProjectFlow, highlight + +from company_utils.snowflake import fetch_company_batch + + +@schedule(hourly=True) +class SnowflakeETL(ProjectFlow): + + reset = Parameter( + "reset", + help="Set to 'yes' to reset processing state and re-fetch from scratch", + default="no", + ) + + @highlight + @card(type="blank", id="etl") + @pypi(packages={"snowflake-connector-python[pandas]": ">=3.6.0"}) + @step + def start(self): + from metaflow import Flow, namespace + + # --- Retrieve previously processed IDs --- + processed_ids = set() + if self.reset.lower() != "yes": + namespace(None) + try: + latest = Flow("SnowflakeETL").latest_successful_run + if latest and hasattr(latest.data, "all_processed_ids"): + processed_ids = set(latest.data.all_processed_ids) + except Exception: + pass # First run or no successful runs yet + + self.previous_count = len(processed_ids) + + # --- Fetch next batch --- + df = fetch_company_batch(processed_ids if processed_ids else None) + self.batch_size = len(df) + + if self.batch_size == 0: + self.batch_df = None + self.all_processed_ids = list(processed_ids) + current.card["etl"].append(Markdown("# Snowflake ETL\n\nNo new rows to process.")) + else: + self.batch_df = df + new_ids = set(df["ID"].astype(str).tolist()) + self.all_processed_ids = list(processed_ids | new_ids) + + # Register as data asset + self.prj.register_data( + "company_batch", + "batch_df", + annotations={ + "batch_size": str(self.batch_size), + "total_processed": str(len(self.all_processed_ids)), + }, + ) + + # Build card + current.card["etl"].append( + Markdown( + f"# Snowflake ETL\n\n" + f"- **New rows fetched:** {self.batch_size}\n" + f"- **Previously processed:** {self.previous_count}\n" + f"- **Total processed:** {len(self.all_processed_ids)}\n" + ) + ) + preview = df.head(10) + current.card["etl"].append(Markdown("## Preview (first 10 rows)")) + current.card["etl"].append( + Table( + [[str(v) for v in row] for row in preview.values], + headers=list(preview.columns), + ) + ) + + # Highlight card + self.highlight.title = "Snowflake ETL" + self.highlight.add_column(str(self.batch_size), "new rows") + self.highlight.add_column(str(len(self.all_processed_ids)), "total processed") + + self.next(self.end) + + @step + def end(self): + pass + + +if __name__ == "__main__": + SnowflakeETL() diff --git a/models/tag-llm/README.md b/models/tag-llm/README.md new file mode 100644 index 0000000..d04e055 --- /dev/null +++ b/models/tag-llm/README.md @@ -0,0 +1,5 @@ +# Tag Extraction LLM + +Small language model (TinyLlama 1.1B) used by the Company Enricher flow +to generate 5 descriptive tags per company based on the company name, +description, and website landing page content. diff --git a/models/tag-llm/asset_config.toml b/models/tag-llm/asset_config.toml new file mode 100644 index 0000000..7b87949 --- /dev/null +++ b/models/tag-llm/asset_config.toml @@ -0,0 +1,8 @@ +name = "Tag Extraction LLM" +id = "tag_llm" +description = "Small local LLM used to generate descriptive tags for companies" + +[properties] +model = "TinyLlama/TinyLlama-1.1B-Chat-v1.0" +task = "text-generation" +max_new_tokens = "100" diff --git a/obproject.toml b/obproject.toml index 0641949..29cc7a9 100644 --- a/obproject.toml +++ b/obproject.toml @@ -1,7 +1,7 @@ platform = 'dev-yellow.outerbounds.xyz' project = 'ob_agentic_code_example' -title = 'Agentic Code Example' +title = 'Company Enrichment Pipeline' [dev-assets] branch = 'dev' diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..c107ca7 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,6 @@ +[project] +dependencies = [ + "pandas>=2.0", + "requests>=2.31", + "beautifulsoup4>=4.12", +] diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..f3e0783 --- /dev/null +++ b/src/__init__.py @@ -0,0 +1 @@ +METAFLOW_PACKAGE_POLICY = "include" diff --git a/src/company_utils/__init__.py b/src/company_utils/__init__.py new file mode 100644 index 0000000..f3e0783 --- /dev/null +++ b/src/company_utils/__init__.py @@ -0,0 +1 @@ +METAFLOW_PACKAGE_POLICY = "include" diff --git a/src/company_utils/scraper.py b/src/company_utils/scraper.py new file mode 100644 index 0000000..90a6bdc --- /dev/null +++ b/src/company_utils/scraper.py @@ -0,0 +1,29 @@ +import requests +from bs4 import BeautifulSoup + +TIMEOUT = 10 + + +def fetch_landing_page(url): + """Fetch the landing page text content from a URL. + + Returns the visible text stripped of scripts and styles, + or None if the request fails. + """ + if not url: + return None + if not url.startswith("http"): + url = "https://" + url + try: + resp = requests.get(url, timeout=TIMEOUT, headers={ + "User-Agent": "Mozilla/5.0 (compatible; CompanyEnricher/1.0)" + }) + resp.raise_for_status() + soup = BeautifulSoup(resp.text, "html.parser") + for tag in soup(["script", "style", "nav", "footer", "header"]): + tag.decompose() + text = soup.get_text(separator=" ", strip=True) + # Truncate to keep LLM context manageable + return text[:3000] + except Exception: + return None diff --git a/src/company_utils/snowflake.py b/src/company_utils/snowflake.py new file mode 100644 index 0000000..c1ff640 --- /dev/null +++ b/src/company_utils/snowflake.py @@ -0,0 +1,24 @@ +from metaflow import Snowflake + +TABLE = "free_company_dataset.public.freecompanydataset" +BATCH_SIZE = 100 + + +def fetch_company_batch(processed_ids=None): + """Fetch the next batch of company rows from Snowflake. + + Returns a pandas DataFrame of up to BATCH_SIZE rows, excluding any + IDs already in *processed_ids*. + """ + if processed_ids: + id_list = ", ".join(f"'{i}'" for i in processed_ids) + where = f"WHERE id NOT IN ({id_list})" + else: + where = "" + + query = f"SELECT * FROM {TABLE} {where} LIMIT {BATCH_SIZE}" + + with Snowflake(integration="snowflake-test") as cn: + cursor = cn.cursor() + cursor.execute(query) + return cursor.fetch_pandas_all()