infrahub_sdk.client
Classes​
InfrahubClient​
GraphQL Client to interact with Infrahub.
Methods:
get​
get(self, kind: type[SchemaType], raise_when_missing: Literal[False], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., query_name: str | None = ..., **kwargs: Any) -> SchemaType | None
Show 6 other overloads
get​
get(self, kind: type[SchemaType], raise_when_missing: Literal[True], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., query_name: str | None = ..., **kwargs: Any) -> SchemaType
get​
get(self, kind: type[SchemaType], raise_when_missing: bool = ..., at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., query_name: str | None = ..., **kwargs: Any) -> SchemaType
get​
get(self, kind: str, raise_when_missing: Literal[False], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., query_name: str | None = ..., **kwargs: Any) -> InfrahubNode | None
get​
get(self, kind: str, raise_when_missing: Literal[True], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., query_name: str | None = ..., **kwargs: Any) -> InfrahubNode
get​
get(self, kind: str, raise_when_missing: bool = ..., at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., query_name: str | None = ..., **kwargs: Any) -> InfrahubNode
get​
get(self, kind: str | type[SchemaType], raise_when_missing: bool = True, at: Timestamp | None = None, branch: str | None = None, timeout: int | None = None, id: str | None = None, hfid: list[str] | None = None, include: list[str] | None = None, exclude: list[str] | None = None, populate_store: bool = True, fragment: bool = False, prefetch_relationships: bool = False, property: bool = False, include_metadata: bool = False, query_name: str | None = None, **kwargs: Any) -> InfrahubNode | SchemaType | None
delete​
delete(self, kind: str | type[SchemaType], id: str, branch: str | None = None) -> None
create​
create(self, kind: str | type[SchemaType], data: dict | None = None, branch: str | None = None, timeout: int | None = None, **kwargs: Any) -> InfrahubNode | SchemaType
Show 2 other overloads
get_version​
get_version(self) -> str
Return the Infrahub version.
get_user​
get_user(self) -> dict
Return user information.
get_user_permissions​
get_user_permissions(self) -> dict
Return user permissions.
count​
count(self, kind: str | type[SchemaType], at: Timestamp | None = None, branch: str | None = None, timeout: int | None = None, partial_match: bool = False, query_name: str | None = None, **kwargs: Any) -> int
Return the number of nodes of a given kind.
all​
all(self, kind: type[SchemaType], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., populate_store: bool = ..., offset: int | None = ..., limit: int | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., parallel: bool = ..., order: Order | None = ..., include_metadata: bool = ..., query_name: str | None = ...) -> list[SchemaType]
Show 2 other overloads
all​
all(self, kind: str, at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., populate_store: bool = ..., offset: int | None = ..., limit: int | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., parallel: bool = ..., order: Order | None = ..., include_metadata: bool = ..., query_name: str | None = ...) -> list[InfrahubNode]
all​
all(self, kind: str | type[SchemaType], at: Timestamp | None = None, branch: str | None = None, timeout: int | None = None, populate_store: bool = True, offset: int | None = None, limit: int | None = None, include: list[str] | None = None, exclude: list[str] | None = None, fragment: bool = False, prefetch_relationships: bool = False, property: bool = False, parallel: bool = False, order: Order | None = None, include_metadata: bool = False, query_name: str | None = None) -> list[InfrahubNode] | list[SchemaType]
Retrieve all nodes of a given kind.
Args:
kind: kind of the nodes to queryat: Time of the query. Defaults to Now.branch: Name of the branch to query from. Defaults to default_branch.populate_store: Flag to indicate whether to populate the store with the retrieved nodes.timeout: Overrides default timeout used when querying the GraphQL API. Specified in seconds.offset: The offset for pagination.limit: The limit for pagination.include: List of attributes or relationships to include in the query.exclude: List of attributes or relationships to exclude from the query.fragment: Flag to use GraphQL fragments for generic schemas.prefetch_relationships: Flag to indicate whether to pre-fetch related node data.parallel: Whether to use parallel processing for the query.order: Ordering related options. Settingdisable=Trueenhances performances.include_metadata: If True, includes node_metadata and relationship_metadata in the query.query_name: If provided is used as the GraphQL operation name else All_<kind> is used.
Returns:
- list[InfrahubNode]: List of Nodes
filters​
filters(self, kind: type[SchemaType], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., populate_store: bool = ..., offset: int | None = ..., limit: int | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., fragment: bool = ..., prefetch_relationships: bool = ..., partial_match: bool = ..., property: bool = ..., parallel: bool = ..., order: Order | None = ..., include_metadata: bool = ..., query_name: str | None = ..., **kwargs: Any) -> list[SchemaType]
Show 2 other overloads
filters​
filters(self, kind: str, at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., populate_store: bool = ..., offset: int | None = ..., limit: int | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., fragment: bool = ..., prefetch_relationships: bool = ..., partial_match: bool = ..., property: bool = ..., parallel: bool = ..., order: Order | None = ..., include_metadata: bool = ..., query_name: str | None = ..., **kwargs: Any) -> list[InfrahubNode]
filters​
filters(self, kind: str | type[SchemaType], at: Timestamp | None = None, branch: str | None = None, timeout: int | None = None, populate_store: bool = True, offset: int | None = None, limit: int | None = None, include: list[str] | None = None, exclude: list[str] | None = None, fragment: bool = False, prefetch_relationships: bool = False, partial_match: bool = False, property: bool = False, parallel: bool = False, order: Order | None = None, include_metadata: bool = False, query_name: str | None = None, **kwargs: Any) -> list[InfrahubNode] | list[SchemaType]
Retrieve nodes of a given kind based on provided filters.
Args:
kind: kind of the nodes to queryat: Time of the query. Defaults to Now.branch: Name of the branch to query from. Defaults to default_branch.timeout: Overrides default timeout used when querying the GraphQL API. Specified in seconds.populate_store: Flag to indicate whether to populate the store with the retrieved nodes.offset: The offset for pagination.limit: The limit for pagination.include: List of attributes or relationships to include in the query.exclude: List of attributes or relationships to exclude from the query.fragment: Flag to use GraphQL fragments for generic schemas.prefetch_relationships: Flag to indicate whether to pre-fetch related node data.partial_match: Allow partial match of filter criteria for the query.parallel: Whether to use parallel processing for the query.order: Ordering related options. Settingdisable=Trueenhances performances.include_metadata: If True, includes node_metadata and relationship_metadata in the query.query_name: If provided is used as the GraphQL operation name else Filters_<kind> is used.**kwargs: Additional filter criteria for the query.
Returns:
- list[InfrahubNodeSync]: List of Nodes that match the given filters.
clone​
clone(self, branch: str | None = None) -> InfrahubClient
Return a cloned version of the client using the same configuration.
execute_graphql​
execute_graphql(self, query: str, variables: dict | None = None, branch_name: str | None = None, at: str | Timestamp | None = None, timeout: int | None = None, tracker: str | None = None) -> dict
Execute a GraphQL query (or mutation).
If retry_on_failure is True, the query will retry until the server becomes reachable.
Args:
query: GraphQL Query to execute, can be a query or a mutationvariables: Variables to pass along with the GraphQL query. Defaults to None.branch_name: Name of the branch on which the query will be executed. Defaults to None.at: Time when the query should be executed. Defaults to None.timeout: Timeout in second for the query. Defaults to None.
Returns:
- The GraphQL data payload (response["data"]).
Raises:
GraphQLError: When the GraphQL response contains errors.ServerNotReachableError: If the server is not reachable after exhausting retries.AuthenticationError: If the server returns a 401 or 403 response.URLNotFoundError: If the server returns a 404 response.Error: If the response is unexpectedly missing.
refresh_login​
refresh_login(self) -> None
login​
login(self, refresh: bool = False) -> None
query_gql_query​
query_gql_query(self, name: str, variables: dict | None = None, update_group: bool = False, subscribers: list[str] | None = None, params: dict | None = None, branch_name: str | None = None, at: str | None = None, timeout: int | None = None, tracker: str | None = None) -> dict
create_diff​
create_diff(self, branch: str, name: str, from_time: datetime, to_time: datetime, wait_until_completion: bool = True) -> bool | str
get_diff_summary​
get_diff_summary(self, branch: str, name: str | None = None, from_time: datetime | None = None, to_time: datetime | None = None, timeout: int | None = None, tracker: str | None = None) -> list[NodeDiff]
get_diff_tree​
get_diff_tree(self, branch: str, name: str | None = None, from_time: datetime | None = None, to_time: datetime | None = None, timeout: int | None = None, tracker: str | None = None) -> DiffTreeData | None
Get complete diff tree with metadata and nodes.
Returns None if no diff exists.
Raises:
ValueError: Iffrom_timeis later thanto_time.
allocate_next_ip_address​
allocate_next_ip_address(self, resource_pool: CoreNode, kind: type[SchemaType], identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ...) -> SchemaType | None
Show 2 other overloads
allocate_next_ip_address​
allocate_next_ip_address(self, resource_pool: CoreNode, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ...) -> CoreNode | None
allocate_next_ip_address​
allocate_next_ip_address(self, resource_pool: CoreNode, kind: type[SchemaType] | None = None, identifier: str | None = None, prefix_length: int | None = None, address_type: str | None = None, data: dict[str, Any] | None = None, branch: str | None = None, timeout: int | None = None, tracker: str | None = None) -> CoreNode | SchemaType | None
Allocate a new IP address by using the provided resource pool.
Args:
resource_pool: Node corresponding to the pool to allocate resources from.identifier: Value to perform idempotent allocation, the same resource will be returned for a given identifier.prefix_length: Length of the prefix to set on the address to allocate.address_type: Kind of the address to allocate.data: A key/value map to use to set attributes values on the allocated address.branch: Name of the branch to allocate from. Defaults to default_branch.timeout: Flag to indicate whether to populate the store with the retrieved nodes.tracker: The offset for pagination.
Returns:
- Node corresponding to the allocated resource.
Raises:
ValueError: Ifresource_poolis not aCoreIPAddressPool.
allocate_next_ip_prefix​
allocate_next_ip_prefix(self, resource_pool: CoreNode, kind: type[SchemaType], identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ...) -> SchemaType | None
Show 2 other overloads
allocate_next_ip_prefix​
allocate_next_ip_prefix(self, resource_pool: CoreNode, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ...) -> CoreNode | None
allocate_next_ip_prefix​
allocate_next_ip_prefix(self, resource_pool: CoreNode, kind: type[SchemaType] | None = None, identifier: str | None = None, prefix_length: int | None = None, member_type: str | None = None, prefix_type: str | None = None, data: dict[str, Any] | None = None, branch: str | None = None, timeout: int | None = None, tracker: str | None = None) -> CoreNode | SchemaType | None
Allocate a new IP prefix by using the provided resource pool.
Args:
resource_pool: Node corresponding to the pool to allocate resources from.identifier: Value to perform idempotent allocation, the same resource will be returned for a given identifier.prefix_length: Length of the prefix to allocate.member_type: Member type of the prefix to allocate.prefix_type: Kind of the prefix to allocate.data: A key/value map to use to set attributes values on the allocated prefix.branch: Name of the branch to allocate from. Defaults to default_branch.timeout: Flag to indicate whether to populate the store with the retrieved nodes.tracker: The offset for pagination.
Returns:
- Node corresponding to the allocated resource.
Raises:
ValueError: Ifresource_poolis not aCoreIPPrefixPool.
create_batch​
create_batch(self, return_exceptions: bool = False) -> InfrahubBatch
get_list_repositories​
get_list_repositories(self, branches: dict[str, BranchData] | None = None, kind: str = 'CoreGenericRepository') -> dict[str, RepositoryData]
repository_update_commit​
repository_update_commit(self, branch_name: str, repository_id: str, commit: str, is_read_only: bool = False) -> bool
convert_object_type​
convert_object_type(self, node_id: str, target_kind: str, branch: str | None = None, fields_mapping: dict[str, ConversionFieldInput] | None = None) -> InfrahubNode
Convert a given node to another kind on a given branch.
fields_mapping keys are target fields names and its values indicate how to fill in these fields.
Any mandatory field not having an equivalent field in the source kind should be specified in this
mapping. See https://docs.infrahub.app/guides/object-conversion for more information.
InfrahubClientSync​
Methods:
get​
get(self, kind: type[SchemaTypeSync], raise_when_missing: Literal[False], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., query_name: str | None = ..., **kwargs: Any) -> SchemaTypeSync | None
Show 6 other overloads
get​
get(self, kind: type[SchemaTypeSync], raise_when_missing: Literal[True], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., query_name: str | None = ..., **kwargs: Any) -> SchemaTypeSync
get​
get(self, kind: type[SchemaTypeSync], raise_when_missing: bool = ..., at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., query_name: str | None = ..., **kwargs: Any) -> SchemaTypeSync
get​
get(self, kind: str, raise_when_missing: Literal[False], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., query_name: str | None = ..., **kwargs: Any) -> InfrahubNodeSync | None
get​
get(self, kind: str, raise_when_missing: Literal[True], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., query_name: str | None = ..., **kwargs: Any) -> InfrahubNodeSync
get​
get(self, kind: str, raise_when_missing: bool = ..., at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., id: str | None = ..., hfid: list[str] | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., populate_store: bool = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., include_metadata: bool = ..., query_name: str | None = ..., **kwargs: Any) -> InfrahubNodeSync
get​
get(self, kind: str | type[SchemaTypeSync], raise_when_missing: bool = True, at: Timestamp | None = None, branch: str | None = None, timeout: int | None = None, id: str | None = None, hfid: list[str] | None = None, include: list[str] | None = None, exclude: list[str] | None = None, populate_store: bool = True, fragment: bool = False, prefetch_relationships: bool = False, property: bool = False, include_metadata: bool = False, query_name: str | None = None, **kwargs: Any) -> InfrahubNodeSync | SchemaTypeSync | None
delete​
delete(self, kind: str | type[SchemaTypeSync], id: str, branch: str | None = None) -> None
create​
create(self, kind: str | type[SchemaTypeSync], data: dict | None = None, branch: str | None = None, timeout: int | None = None, **kwargs: Any) -> InfrahubNodeSync | SchemaTypeSync
Show 2 other overloads
get_version​
get_version(self) -> str
Return the Infrahub version.
get_user​
get_user(self) -> dict
Return user information.
get_user_permissions​
get_user_permissions(self) -> dict
Return user permissions.
clone​
clone(self, branch: str | None = None) -> InfrahubClientSync
Return a cloned version of the client using the same configuration.
execute_graphql​
execute_graphql(self, query: str, variables: dict | None = None, branch_name: str | None = None, at: str | Timestamp | None = None, timeout: int | None = None, tracker: str | None = None) -> dict
Execute a GraphQL query (or mutation).
If retry_on_failure is True, the query will retry until the server becomes reachable.
Args:
query: GraphQL Query to execute, can be a query or a mutationvariables: Variables to pass along with the GraphQL query. Defaults to None.branch_name: Name of the branch on which the query will be executed. Defaults to None.at: Time when the query should be executed. Defaults to None.timeout: Timeout in second for the query. Defaults to None.
Returns:
- The GraphQL data payload (
response["data"]).
Raises:
GraphQLError: When the GraphQL response contains errors.ServerNotReachableError: If the server is not reachable after exhausting retries.AuthenticationError: If the server returns a 401 or 403 response.URLNotFoundError: If the server returns a 404 response.Error: If the response is unexpectedly missing.
count​
count(self, kind: str | type[SchemaType], at: Timestamp | None = None, branch: str | None = None, timeout: int | None = None, partial_match: bool = False, query_name: str | None = None, **kwargs: Any) -> int
Return the number of nodes of a given kind.
all​
all(self, kind: type[SchemaTypeSync], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., populate_store: bool = ..., offset: int | None = ..., limit: int | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., parallel: bool = ..., order: Order | None = ..., include_metadata: bool = ..., query_name: str | None = ...) -> list[SchemaTypeSync]
Show 2 other overloads
all​
all(self, kind: str, at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., populate_store: bool = ..., offset: int | None = ..., limit: int | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., fragment: bool = ..., prefetch_relationships: bool = ..., property: bool = ..., parallel: bool = ..., order: Order | None = ..., include_metadata: bool = ..., query_name: str | None = ...) -> list[InfrahubNodeSync]
all​
all(self, kind: str | type[SchemaTypeSync], at: Timestamp | None = None, branch: str | None = None, timeout: int | None = None, populate_store: bool = True, offset: int | None = None, limit: int | None = None, include: list[str] | None = None, exclude: list[str] | None = None, fragment: bool = False, prefetch_relationships: bool = False, property: bool = False, parallel: bool = False, order: Order | None = None, include_metadata: bool = False, query_name: str | None = None) -> list[InfrahubNodeSync] | list[SchemaTypeSync]
Retrieve all nodes of a given kind.
Args:
kind: kind of the nodes to queryat: Time of the query. Defaults to Now.branch: Name of the branch to query from. Defaults to default_branch.timeout: Overrides default timeout used when querying the GraphQL API. Specified in seconds.populate_store: Flag to indicate whether to populate the store with the retrieved nodes.offset: The offset for pagination.limit: The limit for pagination.include: List of attributes or relationships to include in the query.exclude: List of attributes or relationships to exclude from the query.fragment: Flag to use GraphQL fragments for generic schemas.prefetch_relationships: Flag to indicate whether to pre-fetch related node data.parallel: Whether to use parallel processing for the query.order: Ordering related options. Settingdisable=Trueenhances performances.include_metadata: If True, includes node_metadata and relationship_metadata in the query.query_name: If provided is used as the GraphQL operation name else All_<kind> is used.
Returns:
- list[InfrahubNodeSync]: List of Nodes
filters​
filters(self, kind: type[SchemaTypeSync], at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., populate_store: bool = ..., offset: int | None = ..., limit: int | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., fragment: bool = ..., prefetch_relationships: bool = ..., partial_match: bool = ..., property: bool = ..., parallel: bool = ..., order: Order | None = ..., include_metadata: bool = ..., query_name: str | None = ..., **kwargs: Any) -> list[SchemaTypeSync]
Show 2 other overloads
filters​
filters(self, kind: str, at: Timestamp | None = ..., branch: str | None = ..., timeout: int | None = ..., populate_store: bool = ..., offset: int | None = ..., limit: int | None = ..., include: list[str] | None = ..., exclude: list[str] | None = ..., fragment: bool = ..., prefetch_relationships: bool = ..., partial_match: bool = ..., property: bool = ..., parallel: bool = ..., order: Order | None = ..., include_metadata: bool = ..., query_name: str | None = ..., **kwargs: Any) -> list[InfrahubNodeSync]
filters​
filters(self, kind: str | type[SchemaTypeSync], at: Timestamp | None = None, branch: str | None = None, timeout: int | None = None, populate_store: bool = True, offset: int | None = None, limit: int | None = None, include: list[str] | None = None, exclude: list[str] | None = None, fragment: bool = False, prefetch_relationships: bool = False, partial_match: bool = False, property: bool = False, parallel: bool = False, order: Order | None = None, include_metadata: bool = False, query_name: str | None = None, **kwargs: Any) -> list[InfrahubNodeSync] | list[SchemaTypeSync]
Retrieve nodes of a given kind based on provided filters.
Args:
kind: kind of the nodes to queryat: Time of the query. Defaults to Now.branch: Name of the branch to query from. Defaults to default_branch.timeout: Overrides default timeout used when querying the GraphQL API. Specified in seconds.populate_store: Flag to indicate whether to populate the store with the retrieved nodes.offset: The offset for pagination.limit: The limit for pagination.include: List of attributes or relationships to include in the query.exclude: List of attributes or relationships to exclude from the query.fragment: Flag to use GraphQL fragments for generic schemas.prefetch_relationships: Flag to indicate whether to pre-fetch related node data.partial_match: Allow partial match of filter criteria for the query.parallel: Whether to use parallel processing for the query.order: Ordering related options. Settingdisable=Trueenhances performances.include_metadata: If True, includes node_metadata and relationship_metadata in the query.query_name: If provided is used as the GraphQL operation name else Filters_<kind> is used.**kwargs: Additional filter criteria for the query.
Returns:
- list[InfrahubNodeSync]: List of Nodes that match the given filters.
create_batch​
create_batch(self, return_exceptions: bool = False) -> InfrahubBatchSync
Create a batch to execute multiple queries concurrently.
Executing the batch will be performed using a thread pool, meaning it cannot guarantee the execution order. It is not recommended to use such batch to manipulate objects that depend on each others.
get_list_repositories​
get_list_repositories(self, branches: dict[str, BranchData] | None = None, kind: str = 'CoreGenericRepository') -> dict[str, RepositoryData]
query_gql_query​
query_gql_query(self, name: str, variables: dict | None = None, update_group: bool = False, subscribers: list[str] | None = None, params: dict | None = None, branch_name: str | None = None, at: str | None = None, timeout: int | None = None, tracker: str | None = None) -> dict
create_diff​
create_diff(self, branch: str, name: str, from_time: datetime, to_time: datetime, wait_until_completion: bool = True) -> bool | str
get_diff_summary​
get_diff_summary(self, branch: str, name: str | None = None, from_time: datetime | None = None, to_time: datetime | None = None, timeout: int | None = None, tracker: str | None = None) -> list[NodeDiff]
get_diff_tree​
get_diff_tree(self, branch: str, name: str | None = None, from_time: datetime | None = None, to_time: datetime | None = None, timeout: int | None = None, tracker: str | None = None) -> DiffTreeData | None
Get complete diff tree with metadata and nodes.
Returns None if no diff exists.
Raises:
ValueError: Iffrom_timeis later thanto_time.
allocate_next_ip_address​
allocate_next_ip_address(self, resource_pool: CoreNodeSync, kind: type[SchemaTypeSync], identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ...) -> SchemaTypeSync | None
Show 2 other overloads
allocate_next_ip_address​
allocate_next_ip_address(self, resource_pool: CoreNodeSync, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., address_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ...) -> CoreNodeSync | None
allocate_next_ip_address​
allocate_next_ip_address(self, resource_pool: CoreNodeSync, kind: type[SchemaTypeSync] | None = None, identifier: str | None = None, prefix_length: int | None = None, address_type: str | None = None, data: dict[str, Any] | None = None, branch: str | None = None, timeout: int | None = None, tracker: str | None = None) -> CoreNodeSync | SchemaTypeSync | None
Allocate a new IP address by using the provided resource pool.
Args:
resource_pool: Node corresponding to the pool to allocate resources from.identifier: Value to perform idempotent allocation, the same resource will be returned for a given identifier.prefix_length: Length of the prefix to set on the address to allocate.address_type: Kind of the address to allocate.data: A key/value map to use to set attributes values on the allocated address.branch: Name of the branch to allocate from. Defaults to default_branch.timeout: Flag to indicate whether to populate the store with the retrieved nodes.tracker: The offset for pagination.
Returns:
- Node corresponding to the allocated resource.
Raises:
ValueError: Ifresource_poolis not aCoreIPAddressPool.
allocate_next_ip_prefix​
allocate_next_ip_prefix(self, resource_pool: CoreNodeSync, kind: type[SchemaTypeSync], identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ...) -> SchemaTypeSync | None
Show 2 other overloads
allocate_next_ip_prefix​
allocate_next_ip_prefix(self, resource_pool: CoreNodeSync, kind: None = ..., identifier: str | None = ..., prefix_length: int | None = ..., member_type: str | None = ..., prefix_type: str | None = ..., data: dict[str, Any] | None = ..., branch: str | None = ..., timeout: int | None = ..., tracker: str | None = ...) -> CoreNodeSync | None
allocate_next_ip_prefix​
allocate_next_ip_prefix(self, resource_pool: CoreNodeSync, kind: type[SchemaTypeSync] | None = None, identifier: str | None = None, prefix_length: int | None = None, member_type: str | None = None, prefix_type: str | None = None, data: dict[str, Any] | None = None, branch: str | None = None, timeout: int | None = None, tracker: str | None = None) -> CoreNodeSync | SchemaTypeSync | None
Allocate a new IP prefix by using the provided resource pool.
Args:
resource_pool: Node corresponding to the pool to allocate resources from.identifier: Value to perform idempotent allocation, the same resource will be returned for a given identifier.size: Length of the prefix to allocate.member_type: Member type of the prefix to allocate.prefix_type: Kind of the prefix to allocate.data: A key/value map to use to set attributes values on the allocated prefix.branch: Name of the branch to allocate from. Defaults to default_branch.timeout: Flag to indicate whether to populate the store with the retrieved nodes.tracker: The offset for pagination.
Returns:
- Node corresponding to the allocated resource.
Raises:
ValueError: Ifresource_poolis not aCoreIPPrefixPool.
repository_update_commit​
repository_update_commit(self, branch_name: str, repository_id: str, commit: str, is_read_only: bool = False) -> bool
refresh_login​
refresh_login(self) -> None
login​
login(self, refresh: bool = False) -> None
convert_object_type​
convert_object_type(self, node_id: str, target_kind: str, branch: str | None = None, fields_mapping: dict[str, ConversionFieldInput] | None = None) -> InfrahubNodeSync
Convert a given node to another kind on a given branch.
fields_mapping keys are target fields names and its values indicate how to fill in these fields.
Any mandatory field not having an equivalent field in the source kind should be specified in this
mapping. See https://docs.infrahub.app/guides/object-conversion for more information.
ProcessRelationsNode​
ProxyConfig​
ProxyConfigSync​
ProcessRelationsNodeSync​
BaseClient​
Base class for InfrahubClient and InfrahubClientSync.
Methods:
request_context​
request_context(self) -> RequestContext | None
request_context​
request_context(self, request_context: RequestContext) -> None
start_tracking​
start_tracking(self, identifier: str | None = None, params: dict[str, Any] | None = None, delete_unused_nodes: bool = False, group_type: str | None = None, group_params: dict[str, Any] | None = None, branch: str | None = None) -> Self
set_context_properties​
set_context_properties(self, identifier: str, params: dict[str, str] | None = None, delete_unused_nodes: bool = True, reset: bool = True, group_type: str | None = None, group_params: dict[str, Any] | None = None, branch: str | None = None) -> None
Functions​
handle_relogin​
handle_relogin(func: Callable[..., Coroutine[Any, Any, httpx.Response]]) -> Callable[..., Coroutine[Any, Any, httpx.Response]]
handle_relogin_sync​
handle_relogin_sync(func: Callable[..., httpx.Response]) -> Callable[..., httpx.Response]
get_kind_as_string​
get_kind_as_string(kind: str | type[SchemaType | SchemaTypeSync]) -> str