Skip to content

Commit

Permalink
Made use of BulkTransactionsClient easily overridable
Browse files Browse the repository at this point in the history
  • Loading branch information
keul committed May 11, 2023
1 parent e93cfd3 commit abc1a09
Showing 1 changed file with 44 additions and 42 deletions.
86 changes: 44 additions & 42 deletions stac_fastapi/sqlalchemy/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,48 @@
logger = logging.getLogger(__name__)


@attr.s
class BulkTransactionsClient(BaseBulkTransactionsClient):
"""Postgres bulk transactions."""

session: Session = attr.ib(default=attr.Factory(Session.create_from_env))
debug: bool = attr.ib(default=False)
item_table: Type[database.Item] = attr.ib(default=database.Item)
item_serializer: Type[serializers.Serializer] = attr.ib(
default=serializers.ItemSerializer
)

def __attrs_post_init__(self):
"""Create sqlalchemy engine."""
self.engine = self.session.writer.cached_engine

def _preprocess_item(self, item: stac_types.Item) -> stac_types.Item:
"""Preprocess items to match data model.
# TODO: dedup with GetterDict logic (ref #58)
"""
db_model = self.item_serializer.stac_to_db(item)
return self.item_serializer.row_to_dict(db_model)

def bulk_item_insert(
self, items: Items, chunk_size: Optional[int] = None, **kwargs
) -> str:
"""Bulk item insertion using sqlalchemy core.
https://docs.sqlalchemy.org/en/13/faq/performance.html#i-m-inserting-400-000-rows-with-the-orm-and-it-s-really-slow
"""
# Use items.items because schemas.Items is a model with an items key
processed_items = [self._preprocess_item(item) for item in items]
return_msg = f"Successfully added {len(processed_items)} items."
if chunk_size:
for chunk in self._chunks(processed_items, chunk_size):
self.engine.execute(self.item_table.__table__.insert(), chunk)
return return_msg

self.engine.execute(self.item_table.__table__.insert(), processed_items)
return return_msg


@attr.s
class TransactionsClient(BaseTransactionsClient):
"""Transactions extension specific CRUD operations."""
Expand All @@ -34,6 +76,7 @@ class TransactionsClient(BaseTransactionsClient):
collection_serializer: Type[serializers.Serializer] = attr.ib(
default=serializers.CollectionSerializer
)
bulk_client_cls = attr.ib(default=BulkTransactionsClient)

def create_item(
self,
Expand All @@ -46,7 +89,7 @@ def create_item(

# If a feature collection is posted
if item["type"] == "FeatureCollection":
bulk_client = BulkTransactionsClient(session=self.session)
bulk_client = self.bulk_client_cls(session=self.session)
bulk_client.bulk_item_insert(items=item["features"])
return None

Expand Down Expand Up @@ -158,44 +201,3 @@ def delete_collection(
query.delete()
return self.collection_serializer.db_to_stac(data, base_url=base_url)


@attr.s
class BulkTransactionsClient(BaseBulkTransactionsClient):
"""Postgres bulk transactions."""

session: Session = attr.ib(default=attr.Factory(Session.create_from_env))
debug: bool = attr.ib(default=False)
item_table: Type[database.Item] = attr.ib(default=database.Item)
item_serializer: Type[serializers.Serializer] = attr.ib(
default=serializers.ItemSerializer
)

def __attrs_post_init__(self):
"""Create sqlalchemy engine."""
self.engine = self.session.writer.cached_engine

def _preprocess_item(self, item: stac_types.Item) -> stac_types.Item:
"""Preprocess items to match data model.
# TODO: dedup with GetterDict logic (ref #58)
"""
db_model = self.item_serializer.stac_to_db(item)
return self.item_serializer.row_to_dict(db_model)

def bulk_item_insert(
self, items: Items, chunk_size: Optional[int] = None, **kwargs
) -> str:
"""Bulk item insertion using sqlalchemy core.
https://docs.sqlalchemy.org/en/13/faq/performance.html#i-m-inserting-400-000-rows-with-the-orm-and-it-s-really-slow
"""
# Use items.items because schemas.Items is a model with an items key
processed_items = [self._preprocess_item(item) for item in items]
return_msg = f"Successfully added {len(processed_items)} items."
if chunk_size:
for chunk in self._chunks(processed_items, chunk_size):
self.engine.execute(self.item_table.__table__.insert(), chunk)
return return_msg

self.engine.execute(self.item_table.__table__.insert(), processed_items)
return return_msg

0 comments on commit abc1a09

Please sign in to comment.