From 90e009e2f2a850110ab43f9f8c804065401e1c4c Mon Sep 17 00:00:00 2001 From: iusztinpaul Date: Fri, 9 Aug 2024 19:52:45 +0300 Subject: [PATCH] feat: Create unified pipeline --- configs/digital_data_etl_alex_vesa.yaml | 2 + configs/digital_data_etl_maxime_labonne.yaml | 4 +- configs/digital_data_etl_paul_iusztin.yaml | 4 +- configs/end_to_end_data.yaml | 103 ++++++++++++++++++ configs/export_artifact_to_json.yaml | 4 +- configs/feature_engineering.yaml | 4 +- configs/generate_instruct_datasets.yaml | 6 +- configs/training.yaml | 9 +- .../application/preprocessing/dispatchers.py | 4 +- llm_engineering/domain/base/vector.py | 2 +- pipelines/__init__.py | 2 + pipelines/digital_data_etl.py | 6 +- pipelines/end_to_end_data.py | 33 ++++++ pipelines/feature_engineering.py | 10 +- pipelines/generate_instruct_datasets.py | 8 +- pyproject.toml | 1 + run.py | 18 ++- .../feature_engineering/load_to_vector_db.py | 9 +- 18 files changed, 208 insertions(+), 21 deletions(-) create mode 100644 configs/end_to_end_data.yaml create mode 100644 pipelines/end_to_end_data.py diff --git a/configs/digital_data_etl_alex_vesa.yaml b/configs/digital_data_etl_alex_vesa.yaml index 95644df..a16aff1 100644 --- a/configs/digital_data_etl_alex_vesa.yaml +++ b/configs/digital_data_etl_alex_vesa.yaml @@ -2,6 +2,8 @@ settings: docker: parent_image: 992382797823.dkr.ecr.eu-central-1.amazonaws.com/zenml-voplyj:latest skip_build: True + orchestrator.aws-stack-skgh: + synchronous: false parameters: user_full_name: Alex Vesa # [First Name(s)] [Last Name] diff --git a/configs/digital_data_etl_maxime_labonne.yaml b/configs/digital_data_etl_maxime_labonne.yaml index 5e96f2c..4be733e 100644 --- a/configs/digital_data_etl_maxime_labonne.yaml +++ b/configs/digital_data_etl_maxime_labonne.yaml @@ -2,7 +2,9 @@ settings: docker: parent_image: 992382797823.dkr.ecr.eu-central-1.amazonaws.com/zenml-voplyj:latest skip_build: True - + orchestrator.aws-stack-skgh: + synchronous: false + parameters: user_full_name: Maxime Labonne # [First Name(s)] [Last Name] links: diff --git a/configs/digital_data_etl_paul_iusztin.yaml b/configs/digital_data_etl_paul_iusztin.yaml index 91b0cc7..17b8a35 100644 --- a/configs/digital_data_etl_paul_iusztin.yaml +++ b/configs/digital_data_etl_paul_iusztin.yaml @@ -2,7 +2,9 @@ settings: docker: parent_image: 992382797823.dkr.ecr.eu-central-1.amazonaws.com/zenml-voplyj:latest skip_build: True - + orchestrator.aws-stack-skgh: + synchronous: false + parameters: user_full_name: Paul Iusztin # [First Name(s)] [Last Name] links: diff --git a/configs/end_to_end_data.yaml b/configs/end_to_end_data.yaml new file mode 100644 index 0000000..b410d05 --- /dev/null +++ b/configs/end_to_end_data.yaml @@ -0,0 +1,103 @@ +settings: + docker: + parent_image: 992382797823.dkr.ecr.eu-central-1.amazonaws.com/zenml-voplyj:latest + skip_build: True + orchestrator.aws-stack-skgh: + synchronous: false + +parameters: + # Data ETL & Feature engineering pipelines parameters + author_links: + - user_full_name: Paul Iusztin # [First Name(s)] [Last Name] + links: + # Medium (only articles that are not under the paid wall work) + - https://medium.com/decodingml/an-end-to-end-framework-for-production-ready-llm-systems-by-building-your-llm-twin-2cc6bb01141f + - https://medium.com/decodingml/a-real-time-retrieval-system-for-rag-on-social-media-data-9cc01d50a2a0 + - https://medium.com/decodingml/sota-python-streaming-pipelines-for-fine-tuning-llms-and-rag-in-real-time-82eb07795b87 + - https://medium.com/decodingml/the-4-advanced-rag-algorithms-you-must-know-to-implement-5d0c7f1199d2 + - https://medium.com/decodingml/architect-scalable-and-cost-effective-llm-rag-inference-pipelines-73b94ef82a99 + # Substack + - https://decodingml.substack.com/p/a-blueprint-for-designing-production?r=1ttoeh + - https://decodingml.substack.com/p/the-difference-between-development?r=1ttoeh + - https://decodingml.substack.com/p/architect-scalable-and-cost-effective?r=1ttoeh + - https://decodingml.substack.com/p/7-tips-to-reduce-your-vram-when-training?r=1ttoeh + - https://decodingml.substack.com/p/using-this-python-package-you-can?r=1ttoeh + - https://decodingml.substack.com/p/the-4-advanced-rag-algorithms-you?r=1ttoeh + - https://decodingml.substack.com/p/problems-deploying-your-ml-models?r=1ttoeh + - https://decodingml.substack.com/p/sota-python-streaming-pipelines-for?r=1ttoeh + - https://decodingml.substack.com/p/ready-for-production-ml-here-are?r=1ttoeh + - https://decodingml.substack.com/p/ready-for-production-ml-here-are?r=1ttoeh + - https://decodingml.substack.com/p/my-ml-monthly-learning-resource-recommendations?r=1ttoeh + - https://decodingml.substack.com/p/an-end-to-end-framework-for-production?r=1ttoeh + - https://decodingml.substack.com/p/upskill-your-llm-knowledge-base-with?r=1ttoeh + - https://decodingml.substack.com/p/want-to-learn-an-end-to-end-framework?r=1ttoeh + - https://decodingml.substack.com/p/my-favorite-way-to-implement-a-configuration?r=1ttoeh + - https://decodingml.substack.com/p/a-real-time-retrieval-system-for?r=1ttoeh + - https://decodingml.substack.com/p/4-key-decoding-strategies-for-llms?r=1ttoeh + - https://decodingml.substack.com/p/dml-new-year-the-new-and-improved?r=1ttoeh + - https://decodingml.substack.com/p/dml-8-types-of-mlops-tools-that-must?r=1ttoeh + - https://decodingml.substack.com/p/dml-this-is-what-you-need-to-build?r=1ttoeh + - https://decodingml.substack.com/p/dml-7-steps-on-how-to-fine-tune-an?r=1ttoeh + - https://decodingml.substack.com/p/dml-how-do-you-generate-a-q-and-a?r=1ttoeh + - https://decodingml.substack.com/p/dml-what-do-you-need-to-fine-tune?r=1ttoeh + - https://decodingml.substack.com/p/dml-why-and-when-do-you-need-to-fine?r=1ttoeh + - https://decodingml.substack.com/p/dml-how-to-implement-a-streaming?r=1ttoeh + - https://decodingml.substack.com/p/dml-why-and-what-do-you-need-a-streaming?r=1ttoeh + - https://decodingml.substack.com/p/dml-unwrapping-the-3-pipeline-design?r=1ttoeh + - https://decodingml.substack.com/p/dml-how-to-design-an-llm-system-for?r=1ttoeh + - https://decodingml.substack.com/p/dml-synced-vector-dbs-a-guide-to?r=1ttoeh + - https://decodingml.substack.com/p/dml-what-is-the-difference-between?r=1ttoeh + - https://decodingml.substack.com/p/dml-7-steps-to-build-a-production?r=1ttoeh + - https://decodingml.substack.com/p/dml-chain-of-thought-reasoning-write?r=1ttoeh + - https://decodingml.substack.com/p/dml-build-and-serve-a-production?r=1ttoeh + - https://decodingml.substack.com/p/dml-4-key-ideas-you-must-know-to?r=1ttoeh + - https://decodingml.substack.com/p/dml-how-to-add-real-time-monitoring?r=1ttoeh + - https://decodingml.substack.com/p/dml-top-6-ml-platform-features-you?r=1ttoeh + - user_full_name: Alex Vesa # [First Name(s)] [Last Name] + links: + # Medium (only articles that are not under the paid wall work) + - https://medium.com/decodingml/the-importance-of-data-pipelines-in-the-era-of-generative-ai-673e1505a861 + - https://medium.com/decodingml/the-3nd-out-of-11-lessons-of-the-llm-twin-free-course-ba82752dad5a + - https://medium.com/decodingml/the-role-of-feature-stores-in-fine-tuning-llms-22bd60afd4b9 + # Substack + - https://decodingml.substack.com/p/2-crucial-antipatterns-to-avoid-when?r=1ttoeh + - https://decodingml.substack.com/p/the-role-of-feature-stores-in-fine?r=1ttoeh + - https://decodingml.substack.com/p/change-data-capture-enabling-event?r=1ttoeh + - https://decodingml.substack.com/p/the-importance-of-data-pipelines?r=1ttoeh + - https://decodingml.substack.com/p/steal-my-code-to-solve-real-world?r=1ttoeh + - https://decodingml.substack.com/p/framework-for-measuring-generative?r=1ttoeh + - https://decodingml.substack.com/p/youre-not-digging-deeper-into-concepts?r=1ttoeh + - https://decodingml.substack.com/p/do-not-evaluate-llm-summary-task?r=1ttoeh + - https://decodingml.substack.com/p/dml-introduction-to-deploying-private?r=1ttoeh + - user_full_name: Maxime Labonne # [First Name(s)] [Last Name] + links: + # Substack + - https://maximelabonne.substack.com/p/uncensor-any-llm-with-abliteration-d30148b7d43e + - https://maximelabonne.substack.com/p/create-mixtures-of-experts-with-mergekit-11b318c99562 + - https://maximelabonne.substack.com/p/merge-large-language-models-with-mergekit-2118fb392b54 + - https://maximelabonne.substack.com/p/fine-tune-a-mistral-7b-model-with-direct-preference-optimization-708042745aac + - https://maximelabonne.substack.com/p/exllamav2-the-fastest-library-to-run-llms-32aeda294d26 + - https://maximelabonne.substack.com/p/quantize-llama-models-with-ggml-and-llama-cpp-3612dfbcc172 + - https://maximelabonne.substack.com/p/a-beginners-guide-to-llm-fine-tuning-4bae7d4da672 + - https://maximelabonne.substack.com/p/graph-convolutional-networks-introduction-to-gnns-24b3f60d6c95 + - https://maximelabonne.substack.com/p/4-bit-quantization-with-gptq-36b0f4f02c34 + - https://maximelabonne.substack.com/p/fine-tune-your-own-llama-2-model-in-a-colab-notebook-df9823a04a32 + - https://maximelabonne.substack.com/p/introduction-to-weight-quantization-2494701b9c0c + - https://maximelabonne.substack.com/p/decoding-strategies-in-large-language-models-9733a8f70539 + - https://maximelabonne.substack.com/p/the-art-of-spending-optimizing-your-marketing-budget-with-nonlinear-optimization-6c8a39afb3c2 + - https://maximelabonne.substack.com/p/create-a-bot-to-find-diamonds-in-minecraft-d836606a993a + - https://maximelabonne.substack.com/p/constraint-programming-67ac16fa0c81 + - https://maximelabonne.substack.com/p/how-to-design-the-most-powerful-graph-neural-network-3d18b07a6e66 + - https://maximelabonne.substack.com/p/introduction-to-graphsage-in-python-a9e7f9ecf9d7 + - https://maximelabonne.substack.com/p/graph-attention-networks-in-python-975736ac5c0c + - https://maximelabonne.substack.com/p/integer-programming-vs-linear-programming-in-python-f1be5bb4e60e + - https://maximelabonne.substack.com/p/introduction-to-linear-programming-in-python-9261e7eb44b + - https://maximelabonne.substack.com/p/what-is-a-tensor-in-deep-learning-6dedd95d6507 + - https://maximelabonne.substack.com/p/efficiently-iterating-over-rows-in-a-pandas-dataframe-7dd5f9992c01 + - https://maximelabonne.substack.com/p/q-learning-for-beginners-2837b777741 + - https://maximelabonne.substack.com/p/how-to-start-machine-learning-for-developers-in-2022-390af12b193f + # Generate instruct dataset pipeline parameters + test_split_size: 0.1 + push_to_huggingface: false + dataset_id: pauliusztin/llmtwin + mock: true \ No newline at end of file diff --git a/configs/export_artifact_to_json.yaml b/configs/export_artifact_to_json.yaml index 655f853..01a26ed 100644 --- a/configs/export_artifact_to_json.yaml +++ b/configs/export_artifact_to_json.yaml @@ -2,7 +2,9 @@ settings: docker: parent_image: 992382797823.dkr.ecr.eu-central-1.amazonaws.com/zenml-voplyj:latest skip_build: True - + orchestrator.aws-stack-skgh: + synchronous: false + parameters: artifact_names: - raw_documents diff --git a/configs/feature_engineering.yaml b/configs/feature_engineering.yaml index 505d457..089a2bf 100644 --- a/configs/feature_engineering.yaml +++ b/configs/feature_engineering.yaml @@ -2,7 +2,9 @@ settings: docker: parent_image: 992382797823.dkr.ecr.eu-central-1.amazonaws.com/zenml-voplyj:latest skip_build: True - + orchestrator.aws-stack-skgh: + synchronous: false + parameters: author_full_names: - Alex Vesa diff --git a/configs/generate_instruct_datasets.yaml b/configs/generate_instruct_datasets.yaml index 7167e04..7a881c9 100644 --- a/configs/generate_instruct_datasets.yaml +++ b/configs/generate_instruct_datasets.yaml @@ -2,9 +2,11 @@ settings: docker: parent_image: 992382797823.dkr.ecr.eu-central-1.amazonaws.com/zenml-voplyj:latest skip_build: True - + orchestrator.aws-stack-skgh: + synchronous: false + parameters: test_split_size: 0.1 push_to_huggingface: false dataset_id: pauliusztin/llmtwin - mock: false + mock: true diff --git a/configs/training.yaml b/configs/training.yaml index e7b6487..77dd372 100644 --- a/configs/training.yaml +++ b/configs/training.yaml @@ -1 +1,8 @@ -parameters: \ No newline at end of file +settings: + docker: + parent_image: 992382797823.dkr.ecr.eu-central-1.amazonaws.com/zenml-voplyj:latest + skip_build: True + orchestrator.aws-stack-skgh: + synchronous: false + +parameters: diff --git a/llm_engineering/application/preprocessing/dispatchers.py b/llm_engineering/application/preprocessing/dispatchers.py index 322e921..bad80a4 100644 --- a/llm_engineering/application/preprocessing/dispatchers.py +++ b/llm_engineering/application/preprocessing/dispatchers.py @@ -47,7 +47,7 @@ def dispatch(cls, data_model: NoSQLBaseDocument) -> VectorBaseDocument: clean_model = handler.clean(data_model) logger.info( - "Data cleaned successfully.", + "Document cleaned successfully.", data_category=data_category, cleaned_content_len=len(clean_model.content), ) @@ -78,7 +78,7 @@ def dispatch(cls, data_model: VectorBaseDocument) -> list[VectorBaseDocument]: chunk_models = handler.chunk(data_model) logger.info( - "Cleaned content chunked successfully.", + "Document chunked successfully.", num=len(chunk_models), data_category=data_category, ) diff --git a/llm_engineering/domain/base/vector.py b/llm_engineering/domain/base/vector.py index 2928a9a..68c81e4 100644 --- a/llm_engineering/domain/base/vector.py +++ b/llm_engineering/domain/base/vector.py @@ -90,7 +90,7 @@ def bulk_insert(cls: Type[T], documents: list["VectorBaseDocument"]) -> bool: def _bulk_insert(cls: Type[T], documents: list["VectorBaseDocument"]) -> None: points = [doc.to_point() for doc in documents] - connection.upsert(collection_name=cls.get_collection_name(), points=points, wait=False) + connection.upsert(collection_name=cls.get_collection_name(), points=points) @classmethod def bulk_find(cls: Type[T], limit: int = 10, **kwargs) -> tuple[list[T], UUID | None]: diff --git a/pipelines/__init__.py b/pipelines/__init__.py index a2c1656..f9ce086 100644 --- a/pipelines/__init__.py +++ b/pipelines/__init__.py @@ -1,4 +1,5 @@ from .digital_data_etl import digital_data_etl +from .end_to_end_data import end_to_end_data from .export_artifact_to_json import export_artifact_to_json from .feature_engineering import feature_engineering from .generate_instruct_datasets import generate_instruct_datasets @@ -6,6 +7,7 @@ __all__ = [ "generate_instruct_datasets", + "end_to_end_data", "export_artifact_to_json", "digital_data_etl", "feature_engineering", diff --git a/pipelines/digital_data_etl.py b/pipelines/digital_data_etl.py index 0b5b041..f51e450 100644 --- a/pipelines/digital_data_etl.py +++ b/pipelines/digital_data_etl.py @@ -4,6 +4,8 @@ @pipeline -def digital_data_etl(user_full_name: str, links: list[str]) -> None: +def digital_data_etl(user_full_name: str, links: list[str]) -> str: user = get_or_create_user(user_full_name) - crawl_links(user=user, links=links) + last_step = crawl_links(user=user, links=links) + + return last_step.invocation_id diff --git a/pipelines/end_to_end_data.py b/pipelines/end_to_end_data.py new file mode 100644 index 0000000..294c043 --- /dev/null +++ b/pipelines/end_to_end_data.py @@ -0,0 +1,33 @@ +from zenml import pipeline + +from .digital_data_etl import digital_data_etl +from .feature_engineering import feature_engineering +from .generate_instruct_datasets import generate_instruct_datasets + + +@pipeline +def end_to_end_data( + author_links: list[dict[str, str | list[str]]], + test_split_size: float = 0.1, + push_to_huggingface: bool = False, + dataset_id: str | None = None, + mock: bool = False, +) -> None: + wait_for_ids = [] + for author_data in author_links: + last_step_invocation_id = digital_data_etl( + user_full_name=author_data["user_full_name"], links=author_data["links"] + ) + + wait_for_ids.append(last_step_invocation_id) + + author_full_names = [author_data["user_full_name"] for author_data in author_links] + wait_for_ids = feature_engineering(author_full_names=author_full_names, wait_for=wait_for_ids) + + generate_instruct_datasets( + test_split_size=test_split_size, + push_to_huggingface=push_to_huggingface, + dataset_id=dataset_id, + mock=mock, + wait_for=wait_for_ids, + ) diff --git a/pipelines/feature_engineering.py b/pipelines/feature_engineering.py index 8d58b35..c7d2a85 100644 --- a/pipelines/feature_engineering.py +++ b/pipelines/feature_engineering.py @@ -4,11 +4,13 @@ @pipeline -def feature_engineering(author_full_names: list[str]) -> None: - raw_documents = fe_steps.query_data_warehouse(author_full_names) +def feature_engineering(author_full_names: list[str], wait_for: str | list[str] | None = None) -> list[str]: + raw_documents = fe_steps.query_data_warehouse(author_full_names, after=wait_for) cleaned_documents = fe_steps.clean_documents(raw_documents) - fe_steps.load_to_vector_db(cleaned_documents) + last_step_1 = fe_steps.load_to_vector_db(cleaned_documents) embedded_documents = fe_steps.chunk_and_embed(cleaned_documents) - fe_steps.load_to_vector_db(embedded_documents) + last_step_2 = fe_steps.load_to_vector_db(embedded_documents) + + return [last_step_1.invocation_id, last_step_2.invocation_id] diff --git a/pipelines/generate_instruct_datasets.py b/pipelines/generate_instruct_datasets.py index 10492ea..aa4c537 100644 --- a/pipelines/generate_instruct_datasets.py +++ b/pipelines/generate_instruct_datasets.py @@ -5,9 +5,13 @@ @pipeline def generate_instruct_datasets( - test_split_size: float = 0.1, push_to_huggingface: bool = False, dataset_id: str | None = None, mock: bool = False + test_split_size: float = 0.1, + push_to_huggingface: bool = False, + dataset_id: str | None = None, + mock: bool = False, + wait_for: str | list[str] | None = None, ) -> None: - cleaned_documents = cd_steps.query_feature_store() + cleaned_documents = cd_steps.query_feature_store(after=wait_for) prompts = cd_steps.create_prompts(documents=cleaned_documents) instruct_dataset = cd_steps.generate(prompts=prompts, test_split_size=test_split_size, mock=mock) if push_to_huggingface: diff --git a/pyproject.toml b/pyproject.toml index 55d3f13..7d73e34 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -80,6 +80,7 @@ run-preprocessing-pipeline = [ "run-feature-engineering-pipeline", "run-generate-instruct-datasets-pipeline", ] +run-end-to-end-data-pipeline = "python run.py --no-cache --run-end-to-end-data" run-export-artifact-to-json-pipeline = "python run.py --no-cache --run-export-artifact-to-json" run-training-pipeline = "python run.py --no-cache --run-training" diff --git a/run.py b/run.py index f9a1d7a..55ff478 100644 --- a/run.py +++ b/run.py @@ -7,6 +7,7 @@ from llm_engineering import settings from pipelines import ( digital_data_etl, + end_to_end_data, export_artifact_to_json, feature_engineering, generate_instruct_datasets, @@ -49,6 +50,12 @@ default=False, help="Disable caching for the pipeline run.", ) +@click.option( + "--run-end-to-end-data", + is_flag=True, + default=False, + help="Whether to run all the data pipelines in one go.", +) @click.option( "--run-etl", is_flag=True, @@ -92,6 +99,7 @@ ) def main( no_cache: bool = False, + run_end_to_end_data: bool = False, run_etl: bool = False, etl_config_filename: str = "digital_data_etl_paul_iusztin.yaml", run_export_artifact_to_json: bool = False, @@ -101,7 +109,8 @@ def main( export_settings: bool = False, ) -> None: assert ( - run_etl + run_end_to_end_data + or run_etl or run_export_artifact_to_json or run_feature_engineering or run_generate_instruct_datasets @@ -118,6 +127,13 @@ def main( } root_dir = Path(__file__).resolve().parent + if run_end_to_end_data: + run_args_end_to_end = {} + pipeline_args["config_path"] = root_dir / "configs" / "end_to_end_data.yaml" + assert pipeline_args["config_path"].exists(), f"Config file not found: {pipeline_args['config_path']}" + pipeline_args["run_name"] = f"end_to_end_data_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}" + end_to_end_data.with_options(**pipeline_args)(**run_args_end_to_end) + if run_etl: run_args_etl = {} pipeline_args["config_path"] = root_dir / "configs" / etl_config_filename diff --git a/steps/feature_engineering/load_to_vector_db.py b/steps/feature_engineering/load_to_vector_db.py index 58846c7..054bb53 100644 --- a/steps/feature_engineering/load_to_vector_db.py +++ b/steps/feature_engineering/load_to_vector_db.py @@ -9,11 +9,16 @@ @step def load_to_vector_db( documents: Annotated[list, "documents"], -) -> None: +) -> Annotated[bool, "success"]: logger.info(f"Loading {len(documents)} documents into the vector database.") grouped_documents = VectorBaseDocument.group_by_class(documents) for document_class, documents in grouped_documents.items(): logger.info(f"Loading documents into {document_class.get_collection_name()}") for documents_batch in utils.misc.batch(documents, size=4): - document_class.bulk_insert(documents_batch) + try: + document_class.bulk_insert(documents_batch) + except Exception: + return False + + return True