Skip to content

Commit

Permalink
feat: Create unified pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
iusztinpaul committed Aug 9, 2024
1 parent 180ddd7 commit 90e009e
Show file tree
Hide file tree
Showing 18 changed files with 208 additions and 21 deletions.
2 changes: 2 additions & 0 deletions configs/digital_data_etl_alex_vesa.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ settings:
docker:
parent_image: 992382797823.dkr.ecr.eu-central-1.amazonaws.com/zenml-voplyj:latest
skip_build: True
orchestrator.aws-stack-skgh:
synchronous: false

parameters:
user_full_name: Alex Vesa # [First Name(s)] [Last Name]
Expand Down
4 changes: 3 additions & 1 deletion configs/digital_data_etl_maxime_labonne.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ settings:
docker:
parent_image: 992382797823.dkr.ecr.eu-central-1.amazonaws.com/zenml-voplyj:latest
skip_build: True

orchestrator.aws-stack-skgh:
synchronous: false

parameters:
user_full_name: Maxime Labonne # [First Name(s)] [Last Name]
links:
Expand Down
4 changes: 3 additions & 1 deletion configs/digital_data_etl_paul_iusztin.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ settings:
docker:
parent_image: 992382797823.dkr.ecr.eu-central-1.amazonaws.com/zenml-voplyj:latest
skip_build: True

orchestrator.aws-stack-skgh:
synchronous: false

parameters:
user_full_name: Paul Iusztin # [First Name(s)] [Last Name]
links:
Expand Down
103 changes: 103 additions & 0 deletions configs/end_to_end_data.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
settings:
docker:
parent_image: 992382797823.dkr.ecr.eu-central-1.amazonaws.com/zenml-voplyj:latest
skip_build: True
orchestrator.aws-stack-skgh:
synchronous: false

parameters:
# Data ETL & Feature engineering pipelines parameters
author_links:
- user_full_name: Paul Iusztin # [First Name(s)] [Last Name]
links:
# Medium (only articles that are not under the paid wall work)
- https://medium.com/decodingml/an-end-to-end-framework-for-production-ready-llm-systems-by-building-your-llm-twin-2cc6bb01141f
- https://medium.com/decodingml/a-real-time-retrieval-system-for-rag-on-social-media-data-9cc01d50a2a0
- https://medium.com/decodingml/sota-python-streaming-pipelines-for-fine-tuning-llms-and-rag-in-real-time-82eb07795b87
- https://medium.com/decodingml/the-4-advanced-rag-algorithms-you-must-know-to-implement-5d0c7f1199d2
- https://medium.com/decodingml/architect-scalable-and-cost-effective-llm-rag-inference-pipelines-73b94ef82a99
# Substack
- https://decodingml.substack.com/p/a-blueprint-for-designing-production?r=1ttoeh
- https://decodingml.substack.com/p/the-difference-between-development?r=1ttoeh
- https://decodingml.substack.com/p/architect-scalable-and-cost-effective?r=1ttoeh
- https://decodingml.substack.com/p/7-tips-to-reduce-your-vram-when-training?r=1ttoeh
- https://decodingml.substack.com/p/using-this-python-package-you-can?r=1ttoeh
- https://decodingml.substack.com/p/the-4-advanced-rag-algorithms-you?r=1ttoeh
- https://decodingml.substack.com/p/problems-deploying-your-ml-models?r=1ttoeh
- https://decodingml.substack.com/p/sota-python-streaming-pipelines-for?r=1ttoeh
- https://decodingml.substack.com/p/ready-for-production-ml-here-are?r=1ttoeh
- https://decodingml.substack.com/p/ready-for-production-ml-here-are?r=1ttoeh
- https://decodingml.substack.com/p/my-ml-monthly-learning-resource-recommendations?r=1ttoeh
- https://decodingml.substack.com/p/an-end-to-end-framework-for-production?r=1ttoeh
- https://decodingml.substack.com/p/upskill-your-llm-knowledge-base-with?r=1ttoeh
- https://decodingml.substack.com/p/want-to-learn-an-end-to-end-framework?r=1ttoeh
- https://decodingml.substack.com/p/my-favorite-way-to-implement-a-configuration?r=1ttoeh
- https://decodingml.substack.com/p/a-real-time-retrieval-system-for?r=1ttoeh
- https://decodingml.substack.com/p/4-key-decoding-strategies-for-llms?r=1ttoeh
- https://decodingml.substack.com/p/dml-new-year-the-new-and-improved?r=1ttoeh
- https://decodingml.substack.com/p/dml-8-types-of-mlops-tools-that-must?r=1ttoeh
- https://decodingml.substack.com/p/dml-this-is-what-you-need-to-build?r=1ttoeh
- https://decodingml.substack.com/p/dml-7-steps-on-how-to-fine-tune-an?r=1ttoeh
- https://decodingml.substack.com/p/dml-how-do-you-generate-a-q-and-a?r=1ttoeh
- https://decodingml.substack.com/p/dml-what-do-you-need-to-fine-tune?r=1ttoeh
- https://decodingml.substack.com/p/dml-why-and-when-do-you-need-to-fine?r=1ttoeh
- https://decodingml.substack.com/p/dml-how-to-implement-a-streaming?r=1ttoeh
- https://decodingml.substack.com/p/dml-why-and-what-do-you-need-a-streaming?r=1ttoeh
- https://decodingml.substack.com/p/dml-unwrapping-the-3-pipeline-design?r=1ttoeh
- https://decodingml.substack.com/p/dml-how-to-design-an-llm-system-for?r=1ttoeh
- https://decodingml.substack.com/p/dml-synced-vector-dbs-a-guide-to?r=1ttoeh
- https://decodingml.substack.com/p/dml-what-is-the-difference-between?r=1ttoeh
- https://decodingml.substack.com/p/dml-7-steps-to-build-a-production?r=1ttoeh
- https://decodingml.substack.com/p/dml-chain-of-thought-reasoning-write?r=1ttoeh
- https://decodingml.substack.com/p/dml-build-and-serve-a-production?r=1ttoeh
- https://decodingml.substack.com/p/dml-4-key-ideas-you-must-know-to?r=1ttoeh
- https://decodingml.substack.com/p/dml-how-to-add-real-time-monitoring?r=1ttoeh
- https://decodingml.substack.com/p/dml-top-6-ml-platform-features-you?r=1ttoeh
- user_full_name: Alex Vesa # [First Name(s)] [Last Name]
links:
# Medium (only articles that are not under the paid wall work)
- https://medium.com/decodingml/the-importance-of-data-pipelines-in-the-era-of-generative-ai-673e1505a861
- https://medium.com/decodingml/the-3nd-out-of-11-lessons-of-the-llm-twin-free-course-ba82752dad5a
- https://medium.com/decodingml/the-role-of-feature-stores-in-fine-tuning-llms-22bd60afd4b9
# Substack
- https://decodingml.substack.com/p/2-crucial-antipatterns-to-avoid-when?r=1ttoeh
- https://decodingml.substack.com/p/the-role-of-feature-stores-in-fine?r=1ttoeh
- https://decodingml.substack.com/p/change-data-capture-enabling-event?r=1ttoeh
- https://decodingml.substack.com/p/the-importance-of-data-pipelines?r=1ttoeh
- https://decodingml.substack.com/p/steal-my-code-to-solve-real-world?r=1ttoeh
- https://decodingml.substack.com/p/framework-for-measuring-generative?r=1ttoeh
- https://decodingml.substack.com/p/youre-not-digging-deeper-into-concepts?r=1ttoeh
- https://decodingml.substack.com/p/do-not-evaluate-llm-summary-task?r=1ttoeh
- https://decodingml.substack.com/p/dml-introduction-to-deploying-private?r=1ttoeh
- user_full_name: Maxime Labonne # [First Name(s)] [Last Name]
links:
# Substack
- https://maximelabonne.substack.com/p/uncensor-any-llm-with-abliteration-d30148b7d43e
- https://maximelabonne.substack.com/p/create-mixtures-of-experts-with-mergekit-11b318c99562
- https://maximelabonne.substack.com/p/merge-large-language-models-with-mergekit-2118fb392b54
- https://maximelabonne.substack.com/p/fine-tune-a-mistral-7b-model-with-direct-preference-optimization-708042745aac
- https://maximelabonne.substack.com/p/exllamav2-the-fastest-library-to-run-llms-32aeda294d26
- https://maximelabonne.substack.com/p/quantize-llama-models-with-ggml-and-llama-cpp-3612dfbcc172
- https://maximelabonne.substack.com/p/a-beginners-guide-to-llm-fine-tuning-4bae7d4da672
- https://maximelabonne.substack.com/p/graph-convolutional-networks-introduction-to-gnns-24b3f60d6c95
- https://maximelabonne.substack.com/p/4-bit-quantization-with-gptq-36b0f4f02c34
- https://maximelabonne.substack.com/p/fine-tune-your-own-llama-2-model-in-a-colab-notebook-df9823a04a32
- https://maximelabonne.substack.com/p/introduction-to-weight-quantization-2494701b9c0c
- https://maximelabonne.substack.com/p/decoding-strategies-in-large-language-models-9733a8f70539
- https://maximelabonne.substack.com/p/the-art-of-spending-optimizing-your-marketing-budget-with-nonlinear-optimization-6c8a39afb3c2
- https://maximelabonne.substack.com/p/create-a-bot-to-find-diamonds-in-minecraft-d836606a993a
- https://maximelabonne.substack.com/p/constraint-programming-67ac16fa0c81
- https://maximelabonne.substack.com/p/how-to-design-the-most-powerful-graph-neural-network-3d18b07a6e66
- https://maximelabonne.substack.com/p/introduction-to-graphsage-in-python-a9e7f9ecf9d7
- https://maximelabonne.substack.com/p/graph-attention-networks-in-python-975736ac5c0c
- https://maximelabonne.substack.com/p/integer-programming-vs-linear-programming-in-python-f1be5bb4e60e
- https://maximelabonne.substack.com/p/introduction-to-linear-programming-in-python-9261e7eb44b
- https://maximelabonne.substack.com/p/what-is-a-tensor-in-deep-learning-6dedd95d6507
- https://maximelabonne.substack.com/p/efficiently-iterating-over-rows-in-a-pandas-dataframe-7dd5f9992c01
- https://maximelabonne.substack.com/p/q-learning-for-beginners-2837b777741
- https://maximelabonne.substack.com/p/how-to-start-machine-learning-for-developers-in-2022-390af12b193f
# Generate instruct dataset pipeline parameters
test_split_size: 0.1
push_to_huggingface: false
dataset_id: pauliusztin/llmtwin
mock: true
4 changes: 3 additions & 1 deletion configs/export_artifact_to_json.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ settings:
docker:
parent_image: 992382797823.dkr.ecr.eu-central-1.amazonaws.com/zenml-voplyj:latest
skip_build: True

orchestrator.aws-stack-skgh:
synchronous: false

parameters:
artifact_names:
- raw_documents
Expand Down
4 changes: 3 additions & 1 deletion configs/feature_engineering.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ settings:
docker:
parent_image: 992382797823.dkr.ecr.eu-central-1.amazonaws.com/zenml-voplyj:latest
skip_build: True

orchestrator.aws-stack-skgh:
synchronous: false

parameters:
author_full_names:
- Alex Vesa
Expand Down
6 changes: 4 additions & 2 deletions configs/generate_instruct_datasets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ settings:
docker:
parent_image: 992382797823.dkr.ecr.eu-central-1.amazonaws.com/zenml-voplyj:latest
skip_build: True

orchestrator.aws-stack-skgh:
synchronous: false

parameters:
test_split_size: 0.1
push_to_huggingface: false
dataset_id: pauliusztin/llmtwin
mock: false
mock: true
9 changes: 8 additions & 1 deletion configs/training.yaml
Original file line number Diff line number Diff line change
@@ -1 +1,8 @@
parameters:
settings:
docker:
parent_image: 992382797823.dkr.ecr.eu-central-1.amazonaws.com/zenml-voplyj:latest
skip_build: True
orchestrator.aws-stack-skgh:
synchronous: false

parameters:
4 changes: 2 additions & 2 deletions llm_engineering/application/preprocessing/dispatchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def dispatch(cls, data_model: NoSQLBaseDocument) -> VectorBaseDocument:
clean_model = handler.clean(data_model)

logger.info(
"Data cleaned successfully.",
"Document cleaned successfully.",
data_category=data_category,
cleaned_content_len=len(clean_model.content),
)
Expand Down Expand Up @@ -78,7 +78,7 @@ def dispatch(cls, data_model: VectorBaseDocument) -> list[VectorBaseDocument]:
chunk_models = handler.chunk(data_model)

logger.info(
"Cleaned content chunked successfully.",
"Document chunked successfully.",
num=len(chunk_models),
data_category=data_category,
)
Expand Down
2 changes: 1 addition & 1 deletion llm_engineering/domain/base/vector.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def bulk_insert(cls: Type[T], documents: list["VectorBaseDocument"]) -> bool:
def _bulk_insert(cls: Type[T], documents: list["VectorBaseDocument"]) -> None:
points = [doc.to_point() for doc in documents]

connection.upsert(collection_name=cls.get_collection_name(), points=points, wait=False)
connection.upsert(collection_name=cls.get_collection_name(), points=points)

@classmethod
def bulk_find(cls: Type[T], limit: int = 10, **kwargs) -> tuple[list[T], UUID | None]:
Expand Down
2 changes: 2 additions & 0 deletions pipelines/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from .digital_data_etl import digital_data_etl
from .end_to_end_data import end_to_end_data
from .export_artifact_to_json import export_artifact_to_json
from .feature_engineering import feature_engineering
from .generate_instruct_datasets import generate_instruct_datasets
from .training import training

__all__ = [
"generate_instruct_datasets",
"end_to_end_data",
"export_artifact_to_json",
"digital_data_etl",
"feature_engineering",
Expand Down
6 changes: 4 additions & 2 deletions pipelines/digital_data_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@


@pipeline
def digital_data_etl(user_full_name: str, links: list[str]) -> None:
def digital_data_etl(user_full_name: str, links: list[str]) -> str:
user = get_or_create_user(user_full_name)
crawl_links(user=user, links=links)
last_step = crawl_links(user=user, links=links)

return last_step.invocation_id
33 changes: 33 additions & 0 deletions pipelines/end_to_end_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from zenml import pipeline

from .digital_data_etl import digital_data_etl
from .feature_engineering import feature_engineering
from .generate_instruct_datasets import generate_instruct_datasets


@pipeline
def end_to_end_data(
author_links: list[dict[str, str | list[str]]],
test_split_size: float = 0.1,
push_to_huggingface: bool = False,
dataset_id: str | None = None,
mock: bool = False,
) -> None:
wait_for_ids = []
for author_data in author_links:
last_step_invocation_id = digital_data_etl(
user_full_name=author_data["user_full_name"], links=author_data["links"]
)

wait_for_ids.append(last_step_invocation_id)

author_full_names = [author_data["user_full_name"] for author_data in author_links]
wait_for_ids = feature_engineering(author_full_names=author_full_names, wait_for=wait_for_ids)

generate_instruct_datasets(
test_split_size=test_split_size,
push_to_huggingface=push_to_huggingface,
dataset_id=dataset_id,
mock=mock,
wait_for=wait_for_ids,
)
10 changes: 6 additions & 4 deletions pipelines/feature_engineering.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@


@pipeline
def feature_engineering(author_full_names: list[str]) -> None:
raw_documents = fe_steps.query_data_warehouse(author_full_names)
def feature_engineering(author_full_names: list[str], wait_for: str | list[str] | None = None) -> list[str]:
raw_documents = fe_steps.query_data_warehouse(author_full_names, after=wait_for)

cleaned_documents = fe_steps.clean_documents(raw_documents)
fe_steps.load_to_vector_db(cleaned_documents)
last_step_1 = fe_steps.load_to_vector_db(cleaned_documents)

embedded_documents = fe_steps.chunk_and_embed(cleaned_documents)
fe_steps.load_to_vector_db(embedded_documents)
last_step_2 = fe_steps.load_to_vector_db(embedded_documents)

return [last_step_1.invocation_id, last_step_2.invocation_id]
8 changes: 6 additions & 2 deletions pipelines/generate_instruct_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@

@pipeline
def generate_instruct_datasets(
test_split_size: float = 0.1, push_to_huggingface: bool = False, dataset_id: str | None = None, mock: bool = False
test_split_size: float = 0.1,
push_to_huggingface: bool = False,
dataset_id: str | None = None,
mock: bool = False,
wait_for: str | list[str] | None = None,
) -> None:
cleaned_documents = cd_steps.query_feature_store()
cleaned_documents = cd_steps.query_feature_store(after=wait_for)
prompts = cd_steps.create_prompts(documents=cleaned_documents)
instruct_dataset = cd_steps.generate(prompts=prompts, test_split_size=test_split_size, mock=mock)
if push_to_huggingface:
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ run-preprocessing-pipeline = [
"run-feature-engineering-pipeline",
"run-generate-instruct-datasets-pipeline",
]
run-end-to-end-data-pipeline = "python run.py --no-cache --run-end-to-end-data"
run-export-artifact-to-json-pipeline = "python run.py --no-cache --run-export-artifact-to-json"

run-training-pipeline = "python run.py --no-cache --run-training"
Expand Down
18 changes: 17 additions & 1 deletion run.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from llm_engineering import settings
from pipelines import (
digital_data_etl,
end_to_end_data,
export_artifact_to_json,
feature_engineering,
generate_instruct_datasets,
Expand Down Expand Up @@ -49,6 +50,12 @@
default=False,
help="Disable caching for the pipeline run.",
)
@click.option(
"--run-end-to-end-data",
is_flag=True,
default=False,
help="Whether to run all the data pipelines in one go.",
)
@click.option(
"--run-etl",
is_flag=True,
Expand Down Expand Up @@ -92,6 +99,7 @@
)
def main(
no_cache: bool = False,
run_end_to_end_data: bool = False,
run_etl: bool = False,
etl_config_filename: str = "digital_data_etl_paul_iusztin.yaml",
run_export_artifact_to_json: bool = False,
Expand All @@ -101,7 +109,8 @@ def main(
export_settings: bool = False,
) -> None:
assert (
run_etl
run_end_to_end_data
or run_etl
or run_export_artifact_to_json
or run_feature_engineering
or run_generate_instruct_datasets
Expand All @@ -118,6 +127,13 @@ def main(
}
root_dir = Path(__file__).resolve().parent

if run_end_to_end_data:
run_args_end_to_end = {}
pipeline_args["config_path"] = root_dir / "configs" / "end_to_end_data.yaml"
assert pipeline_args["config_path"].exists(), f"Config file not found: {pipeline_args['config_path']}"
pipeline_args["run_name"] = f"end_to_end_data_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}"
end_to_end_data.with_options(**pipeline_args)(**run_args_end_to_end)

if run_etl:
run_args_etl = {}
pipeline_args["config_path"] = root_dir / "configs" / etl_config_filename
Expand Down
9 changes: 7 additions & 2 deletions steps/feature_engineering/load_to_vector_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,16 @@
@step
def load_to_vector_db(
documents: Annotated[list, "documents"],
) -> None:
) -> Annotated[bool, "success"]:
logger.info(f"Loading {len(documents)} documents into the vector database.")

grouped_documents = VectorBaseDocument.group_by_class(documents)
for document_class, documents in grouped_documents.items():
logger.info(f"Loading documents into {document_class.get_collection_name()}")
for documents_batch in utils.misc.batch(documents, size=4):
document_class.bulk_insert(documents_batch)
try:
document_class.bulk_insert(documents_batch)
except Exception:
return False

return True

0 comments on commit 90e009e

Please sign in to comment.