Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix!: Only load schemas for allocated subgraphs #36

Merged
merged 1 commit into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 43 additions & 39 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,61 +35,65 @@ Because it is a stateless service, and only consumes from a RabbitMQ queue and a
Autoagora Processor can safely be dynamically scaled horizontally. Note however that it has been observed processing
upwards of 50 queries per second on a single CPU core.

Note that it also needs to connect to the `graph-node`'s database to determine the currently indexed subgraphs, and then
pull their schemas using the `graph-node` query endpoint.
Note that it also needs to connect to the `indexer-agent`'s management API to determine the indexer's current
allocations, and then pull their schemas using the `graph-node` query endpoint.

Configuration:

```txt
usage: autoagora-processor [-h] --graph-postgres-host GRAPH_POSTGRES_HOST --graph-postgres-database
GRAPH_POSTGRES_DATABASE --graph-postgres-username GRAPH_POSTGRES_USERNAME
--graph-postgres-password GRAPH_POSTGRES_PASSWORD --graph-node-query-endpoint
GRAPH_NODE_QUERY_ENDPOINT --rabbitmq-host RABBITMQ_HOST
[--rabbitmq-queue-name RABBITMQ_QUEUE_NAME] [--rabbitmq-queue-limit RABBITMQ_QUEUE_LIMIT]
[--rabbitmq-exchange-name RABBITMQ_EXCHANGE_NAME] [--rabbitmq-username RABBITMQ_USERNAME]
[--rabbitmq-password RABBITMQ_PASSWORD] --postgres-host POSTGRES_HOST --postgres-database
POSTGRES_DATABASE --postgres-username POSTGRES_USERNAME --postgres-password
POSTGRES_PASSWORD [--log-level {DEBUG,INFO,WARNING,ERROR,CRITICAL}]
usage: autoagora-processor [-h] [--log-level {DEBUG,INFO,WARNING,ERROR,CRITICAL}] --rabbitmq-host
RABBITMQ_HOST [--rabbitmq-queue-name RABBITMQ_QUEUE_NAME]
[--rabbitmq-queue-limit RABBITMQ_QUEUE_LIMIT]
[--rabbitmq-exchange-name RABBITMQ_EXCHANGE_NAME]
[--rabbitmq-username RABBITMQ_USERNAME]
[--rabbitmq-password RABBITMQ_PASSWORD] --postgres-host POSTGRES_HOST
--postgres-database POSTGRES_DATABASE --postgres-username
POSTGRES_USERNAME --postgres-password POSTGRES_PASSWORD
[--postgres-port POSTGRES_PORT] --graph-node-query-endpoint
GRAPH_NODE_QUERY_ENDPOINT --indexer-agent-mgmt-endpoint
INDEXER_AGENT_MGMT_ENDPOINT

options:
-h, --help show this help message and exit
--graph-postgres-host GRAPH_POSTGRES_HOST
URL of the postgres instance use by the graph-nodes. [env var: GRAPH_POSTGRES_HOST] (default:
None)
--graph-postgres-database GRAPH_POSTGRES_DATABASE
Name of the graph-node database. [env var: GRAPH_POSTGRES_DATABASE] (default: None)
--graph-postgres-username GRAPH_POSTGRES_USERNAME
Username for the graph-node databse. [env var: GRAPH_POSTGRES_USERNAME] (default: None)
--graph-postgres-password GRAPH_POSTGRES_PASSWORD
Password for the graph-node database. [env var: GRAPH_POSTGRES_PASSWORD] (default: None)
--graph-node-query-endpoint GRAPH_NODE_QUERY_ENDPOINT
URL of the indexer's graph-node GraphQL query endpoint. [env var: GRAPH_NODE_QUERY_ENDPOINT]
(default: None)
--log-level {DEBUG,INFO,WARNING,ERROR,CRITICAL}
[env var: LOG_LEVEL] (default: WARNING)
--rabbitmq-host RABBITMQ_HOST
Hostname of the RabbitMQ server used for queuing the GQL logs. [env var: RABBITMQ_HOST]
(default: None)
Hostname of the RabbitMQ server used for queuing the GQL logs. [env var:
RABBITMQ_HOST] (default: None)
--rabbitmq-queue-name RABBITMQ_QUEUE_NAME
Name of the RabbitMQ queue to pull query-node logs from. [env var: RABBITMQ_QUEUE_NAME]
(default: gql_logs_processor)
Name of the RabbitMQ queue to pull query-node logs from. [env var:
RABBITMQ_QUEUE_NAME] (default: gql_logs_processor)
--rabbitmq-queue-limit RABBITMQ_QUEUE_LIMIT
Size limit of the created RabbitMQ queue. It is discouraged to change that value while the
system is running, because it requires manual destruction of the queue and a restart of the
whole Auto Agora stack. [env var: RABBITMQ_QUEUE_LIMIT] (default: 1000)
Size limit of the created RabbitMQ queue. It is discouraged to change that
value while the system is running, because it requires manual destruction
of the queue and a restart of the whole Auto Agora stack. [env var:
RABBITMQ_QUEUE_LIMIT] (default: 1000)
--rabbitmq-exchange-name RABBITMQ_EXCHANGE_NAME
Name of the RabbitMQ exchange query-node logs are pushed to. [env var: RABBITMQ_EXCHANGE_NAME]
(default: gql_logs)
Name of the RabbitMQ exchange query-node logs are pushed to. [env var:
RABBITMQ_EXCHANGE_NAME] (default: gql_logs)
--rabbitmq-username RABBITMQ_USERNAME
Username to use for the GQL logs RabbitMQ queue. [env var: RABBITMQ_USERNAME] (default: guest)
Username to use for the GQL logs RabbitMQ queue. [env var:
RABBITMQ_USERNAME] (default: guest)
--rabbitmq-password RABBITMQ_PASSWORD
Password to use for the GQL logs RabbitMQ queue. [env var: RABBITMQ_PASSWORD] (default: guest)
Password to use for the GQL logs RabbitMQ queue. [env var:
RABBITMQ_PASSWORD] (default: guest)
--postgres-host POSTGRES_HOST
Host of the postgres instance storing the logs. [env var: POSTGRES_HOST] (default: None)
Host of the postgres instance storing the logs. [env var: POSTGRES_HOST]
(default: None)
--postgres-database POSTGRES_DATABASE
Name of the logs database. [env var: POSTGRES_DATABASE] (default: None)
--postgres-username POSTGRES_USERNAME
Username for the logs databse. [env var: POSTGRES_USERNAME] (default: None)
Username for the logs database. [env var: POSTGRES_USERNAME] (default:
None)
--postgres-password POSTGRES_PASSWORD
Password for the logs database. [env var: POSTGRES_PASSWORD] (default: None)
--log-level {DEBUG,INFO,WARNING,ERROR,CRITICAL}
[env var: LOG_LEVEL] (default: WARNING)
Password for the logs database. [env var: POSTGRES_PASSWORD] (default:
None)
--postgres-port POSTGRES_PORT
Port for the logs database. [env var: POSTGRES_PORT] (default: 5432)
--graph-node-query-endpoint GRAPH_NODE_QUERY_ENDPOINT
URL of the indexer's graph-node GraphQL query endpoint. [env var:
GRAPH_NODE_QUERY_ENDPOINT] (default: None)
--indexer-agent-mgmt-endpoint INDEXER_AGENT_MGMT_ENDPOINT
URL to the indexer-agent management GraphQL endpoint. [env var:
INDEXER_AGENT_MGMT_ENDPOINT] (default: None)
```
33 changes: 4 additions & 29 deletions autoagora_processor/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,38 +130,13 @@ def init_config():
help="URL of the indexer's graph-node GraphQL query endpoint.",
)
#
# Indexer Agent API endpoint
#
#
argsparser.add_argument(
"--graph-postgres-host",
env_var="GRAPH_POSTGRES_HOST",
required=True,
help="URL of the postgres instance use by the graph-nodes.",
)
argsparser.add_argument(
"--graph-postgres-database",
env_var="GRAPH_POSTGRES_DATABASE",
required=True,
help="Name of the graph-node database.",
)
argsparser.add_argument(
"--graph-postgres-username",
env_var="GRAPH_POSTGRES_USERNAME",
"--indexer-agent-mgmt-endpoint",
env_var="INDEXER_AGENT_MGMT_ENDPOINT",
required=True,
help="Username for the graph-node databse.",
)
argsparser.add_argument(
"--graph-postgres-password",
env_var="GRAPH_POSTGRES_PASSWORD",
required=True,
help="Password for the graph-node database.",
)
argsparser.add_argument(
"--graph-postgres-port",
env_var="GRAPH_POSTGRES_PORT",
default=5432,
required=False,
help="Port for the graph-node database.",
help="URL to the indexer-agent management GraphQL endpoint.",
)
argsparser.parse_args(namespace=args)

Expand Down
42 changes: 0 additions & 42 deletions autoagora_processor/db_utils.py

This file was deleted.

50 changes: 50 additions & 0 deletions autoagora_processor/indexer_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright 2023-, Semiotic AI, Inc.
# SPDX-License-Identifier: Apache-2.0

import logging
from typing import Mapping, Optional, Set

import backoff
from gql import Client, gql
from gql.transport.requests import RequestsHTTPTransport
from requests import RequestException

from autoagora_processor.config import args


@backoff.on_exception(backoff.expo, RequestException, max_time=30, logger=logging.root)
def query_indexer_agent(query: str, variables: Optional[Mapping] = None):
client = Client(
transport=RequestsHTTPTransport(args.indexer_agent_mgmt_endpoint),
fetch_schema_from_transport=False,
)
result = client.execute(gql(query), variable_values=variables) # type: ignore
return result


def get_network_allocated_subgraphs(network: str) -> Set[str]:
"""Get the indexer's subgraph allocations for the given Graph network."""

result = query_indexer_agent(
"""
query ($protocolNetwork: String!) {
indexerAllocations (protocolNetwork: $protocolNetwork) {
subgraphDeployment
}
}
""",
variables={
"protocolNetwork": network,
},
)

return set(e["subgraphDeployment"] for e in result["indexerAllocations"])


def get_allocated_subgraphs() -> Set[str]:
"""Get the indexer's subgraph allocations for all Graph networks."""

networks = ("mainnet", "arbitrum-one")
results = map(get_network_allocated_subgraphs, networks)

return set.union(*results)
1 change: 0 additions & 1 deletion autoagora_processor/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from hashlib import blake2b
from typing import Any, List, Mapping, Tuple, Union

import configargparse
import pika
from graphql import GraphQLSchema, parse
from graphql.utilities import strip_ignored_characters as gql_strip_ignored_characters
Expand Down
7 changes: 3 additions & 4 deletions autoagora_processor/subgraph_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from collections.abc import Mapping
from typing import Dict, Iterator, Optional

import configargparse
from gql import Client, gql
from gql.transport.exceptions import TransportQueryError
from gql.transport.requests import RequestsHTTPTransport
Expand All @@ -17,7 +16,7 @@
)

from autoagora_processor.config import args
from autoagora_processor.db_utils import get_indexed_subgraphs
from autoagora_processor.indexer_api import get_allocated_subgraphs


class SubgraphSchemas(Mapping):
Expand All @@ -26,7 +25,7 @@ def __init__(self) -> None:

self.schemas: Dict[str, Optional[GraphQLSchema]] = {
subgraph: self.get_schema_from_graph_node(subgraph)
for subgraph in get_indexed_subgraphs()
for subgraph in get_allocated_subgraphs()
}
logging.info("Added schemas for %s subgraphs", len(self.schemas))

Expand Down Expand Up @@ -70,7 +69,7 @@ def get_schema_from_graph_node(subgraph_ipfs_hash: str) -> Optional[GraphQLSchem

def __getitem__(self, subgraph_ipfs_hash: str) -> Optional[GraphQLSchema]:
if subgraph_ipfs_hash not in self.schemas.keys():
assert subgraph_ipfs_hash in get_indexed_subgraphs()
assert subgraph_ipfs_hash in get_allocated_subgraphs()
new_schema = self.get_schema_from_graph_node(subgraph_ipfs_hash)
self.schemas[subgraph_ipfs_hash] = new_schema
if new_schema:
Expand Down
Loading