From 129b5bc6c4fbc2d2a2a971f9d5719806344d8ad9 Mon Sep 17 00:00:00 2001
From: iusztinpaul
Date: Wed, 19 Jun 2024 17:24:20 +0300
Subject: [PATCH] feat: Add metadata to ETL
---
.../application/crawlers/dispatcher.py | 5 +++
.../orchestrator/steps/etl/crawl_links.py | 34 ++++++++++++++-----
.../steps/etl/get_or_create_user.py | 18 +++++++++-
3 files changed, 48 insertions(+), 9 deletions(-)
diff --git a/llm_engineering/application/crawlers/dispatcher.py b/llm_engineering/application/crawlers/dispatcher.py
index ec9dfcb..8f0607f 100644
--- a/llm_engineering/application/crawlers/dispatcher.py
+++ b/llm_engineering/application/crawlers/dispatcher.py
@@ -9,6 +9,7 @@
class CrawlerDispatcher:
def __init__(self) -> None:
self._crawlers = {}
+ self._class_to_domain = {}
@classmethod
def build(cls) -> "CrawlerDispatcher":
@@ -33,6 +34,7 @@ def register_github(self) -> "CrawlerDispatcher":
def register(self, domain: str, crawler: type[BaseCrawler]) -> None:
self._crawlers[r"https://(www\.)?{}.com/*".format(re.escape(domain))] = crawler
+ self._class_to_domain[crawler] = domain
def get_crawler(self, url: str) -> BaseCrawler:
for pattern, crawler in self._crawlers.items():
@@ -40,3 +42,6 @@ def get_crawler(self, url: str) -> BaseCrawler:
return crawler()
else:
raise ValueError(f"No crawler found for the provided link: {url}")
+
+ def get_domain(self, crawler: BaseCrawler) -> str:
+ return self._class_to_domain[crawler.__class__]
diff --git a/llm_engineering/interfaces/orchestrator/steps/etl/crawl_links.py b/llm_engineering/interfaces/orchestrator/steps/etl/crawl_links.py
index f393184..ad91d8d 100644
--- a/llm_engineering/interfaces/orchestrator/steps/etl/crawl_links.py
+++ b/llm_engineering/interfaces/orchestrator/steps/etl/crawl_links.py
@@ -1,31 +1,49 @@
from loguru import logger
-from zenml import step
+from typing_extensions import Annotated
+from zenml import get_step_context, step
from llm_engineering.application.crawlers.dispatcher import CrawlerDispatcher
from llm_engineering.domain.documents import UserDocument
@step
-def crawl_links(user: UserDocument, links: list[str]):
+def crawl_links(user: UserDocument, links: list[str]) -> Annotated[list[str], "crawled_links"]:
dispatcher = CrawlerDispatcher.build().register_linkedin().register_medium().register_github()
logger.info(f"Starting to crawl {len(links)} links.")
- successfull_crawls = 0
+ metadata = {}
for link in links:
- successfull_crawls += _crawl_link(dispatcher, link, user)
+ successfull_crawl, crawled_domain = _crawl_link(dispatcher, link, user)
- logger.info(f"Successfully crawled {successfull_crawls} / {len(links)} links.")
+ metadata = _add_to_metadata(metadata, crawled_domain, successfull_crawl)
+ step_context = get_step_context()
+ step_context.add_output_metadata(output_name="crawled_links", metadata=metadata)
-def _crawl_link(dispatcher: CrawlerDispatcher, link: str, user: UserDocument) -> bool:
+ logger.info(f"Successfully crawled {successfull_crawl} / {len(links)} links.")
+
+ return links
+
+
+def _crawl_link(dispatcher: CrawlerDispatcher, link: str, user: UserDocument) -> tuple[bool, str]:
crawler = dispatcher.get_crawler(link)
+ crawler_domain = dispatcher.get_domain(crawler)
try:
crawler.extract(link=link, user=user)
- return True
+ return (True, crawler_domain)
except Exception as e:
logger.error(f"An error occurred while crowling: {e!s}")
- return False
+ return (False, crawler_domain)
+
+
+def _add_to_metadata(metadata: dict, domain: str, successfull_crawl: bool) -> dict:
+ if domain not in metadata:
+ metadata[domain] = {}
+ metadata[domain]["successful"] = metadata.get(domain, {}).get("successful", 0) + successfull_crawl
+ metadata[domain]["total"] = metadata.get(domain, {}).get("total", 0) + 1
+
+ return metadata
diff --git a/llm_engineering/interfaces/orchestrator/steps/etl/get_or_create_user.py b/llm_engineering/interfaces/orchestrator/steps/etl/get_or_create_user.py
index edeebcd..5b70523 100644
--- a/llm_engineering/interfaces/orchestrator/steps/etl/get_or_create_user.py
+++ b/llm_engineering/interfaces/orchestrator/steps/etl/get_or_create_user.py
@@ -1,6 +1,6 @@
from loguru import logger
from typing_extensions import Annotated
-from zenml import step
+from zenml import get_step_context, step
from llm_engineering.application import utils
from llm_engineering.domain.documents import UserDocument
@@ -14,4 +14,20 @@ def get_or_create_user(user_full_name: str) -> Annotated[UserDocument, "user"]:
user = UserDocument.get_or_create(first_name=first_name, last_name=last_name)
+ step_context = get_step_context()
+ step_context.add_output_metadata(output_name="user", metadata=_get_metadata(user_full_name, user))
+
return user
+
+
+def _get_metadata(user_full_name: str, user: UserDocument) -> dict:
+ return {
+ "query": {
+ "user_full_name": user_full_name,
+ },
+ "retrieved": {
+ "user_id": str(user.id),
+ "first_name": user.first_name,
+ "last_name": user.last_name,
+ },
+ }