feat(datafabric): add fetch_ontology tool to DF inner SQL agent#911
feat(datafabric): add fetch_ontology tool to DF inner SQL agent#911sankalp-uipath wants to merge 18 commits into
Conversation
There was a problem hiding this comment.
Pull request overview
Adds an optional fetch_ontology inner tool to the Data Fabric SQL sub-agent so the inner LLM can retrieve a configured ontology’s OWL schema from the QueryEngine REST API and use it to generate semantically-correct SQL.
Changes:
- Introduces an ontology REST client (
fetch_ontology_owl) with name validation and size limiting. - Adds a
fetch_ontologyleaf tool with an instance-level cache and wires it into the inner Data Fabric subgraph alongsideexecute_sql. - Threads
ontology_name/folder_keyinto the Data Fabric tool construction path (with an env-var fallback).
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
src/uipath_langchain/agent/tools/datafabric_tool/ontology_fetch_tool.py |
New leaf tool (fetch_ontology) and cached fetcher wrapper for inner SQL agent use. |
src/uipath_langchain/agent/tools/datafabric_tool/ontology_client.py |
New client helper to fetch OWL content via EntitiesService.request_async, including name validation and payload cap. |
src/uipath_langchain/agent/tools/datafabric_tool/models.py |
Adds an intentionally-empty args schema (OntologyFetchInput) for the new tool. |
src/uipath_langchain/agent/tools/datafabric_tool/datafabric_tool.py |
Plumbs ontology_name / folder_key into the query handler creation (currently with env-var fallback). |
src/uipath_langchain/agent/tools/datafabric_tool/datafabric_subgraph.py |
Adds optional fetch_ontology tool binding and dispatch-by-tool-name inside the inner subgraph. |
| The result is cached on this instance. Because the instance lives as long | ||
| as the compiled sub-graph (which the handler caches), repeated calls across | ||
| queries hit the API at most once, surviving the per-query reset of the | ||
| inner sub-graph state. |
| safe_name = _validate_ontology_name(ontology_name) | ||
| # Same datafabric_ service the entities calls target; matches the | ||
| # QueryEngine ontology route GET /ontologies/{ontologyName}/files/{fileType}. | ||
| endpoint = f"datafabric_/api/ontologies/{safe_name}/files/owl" |
There was a problem hiding this comment.
these need to be stitched in uipath-python
…logy_file (drop local client)
| results = await asyncio.gather( | ||
| *[self._execute_tool_call(tc) for tc in last.tool_calls] | ||
| ) | ||
| tool_messages = [msg for msg, _ in results] | ||
| all_succeeded = bool(results) and all(success for _, success in results) | ||
| # End as soon as ANY tool call is a terminal success (a row-returning | ||
| # execute_sql). `any` not `all`: a non-terminal tool (e.g. fetch_ontology) | ||
| # co-issued in the same turn must not prevent a successful SQL from ending | ||
| # the loop. | ||
| any_succeeded = any(success for _, success in results) |
There was a problem hiding this comment.
Instead of any_ check for FF to see what graph gets constructed.
| ToolMessage( | ||
| content=str(result), | ||
| tool_call_id=tool_call["id"], | ||
| name="execute_sql", | ||
| name=name, | ||
| ), |
| # Ontologies are first-class bindings, mirroring entity_set: a LIST, each | ||
| # carrying its own folderId so it is resolved from its own folder (entities | ||
| # may also span several folders). Empty → no fetch tool added. Config comes | ||
| # only from the agent definition (the binding), never from process env. | ||
| entity_folders = { |
| out = await graph.tool_node(DataFabricSubgraphState(messages=[ai])) | ||
| # SQL returned rows → terminal, even though fetch_ontology (non-terminal) | ||
| # was co-issued in the same turn. This is the all()->any() fix. | ||
| assert out["last_tool_success"] is True | ||
| assert len(out["messages"]) == 2 | ||
|
|
…age.status to match host node
| import logging | ||
| from typing import Any | ||
|
|
| return self._cached | ||
| if not self._ontologies: | ||
| return "No ontologies are configured for this agent." | ||
| blocks = [await self._fetch_one(name, folder) for name, folder in self._ontologies] |
| tool_messages = [msg for msg, _ in results] | ||
| return { | ||
| "messages": tool_messages, | ||
| "iteration_count": state.iteration_count + len(last.tool_calls), |
| # End as soon as ANY tool call is a terminal success (a row-returning | ||
| # execute_sql). `any` not `all`: a non-terminal tool (e.g. fetch_ontology) | ||
| # co-issued in the same turn must not prevent a successful SQL from ending | ||
| # the loop. | ||
| any_succeeded = any(success for _, success in results) | ||
| # When short-circuiting to END, return ONLY the terminal-success | ||
| # ToolMessages so the outer agent's result is the query rows — not a | ||
| # co-issued fetch_ontology's OWL. On a non-terminal turn keep all messages | ||
| # so the inner LLM can use them on its next pass. | ||
| if any_succeeded: | ||
| tool_messages = [msg for msg, success in results if success] | ||
| else: | ||
| tool_messages = [msg for msg, _ in results] |
| self._entities_service = entities_service | ||
| self._ontologies = ontologies | ||
| self._cached: str | None = None |
| async def __call__(self, **_kwargs: Any) -> str: | ||
| """Fetch all configured ontologies (cached), concatenated for the LLM.""" | ||
| if self._cached is not None: | ||
| return self._cached | ||
| if not self._ontologies: | ||
| return "No ontologies are configured for this agent." | ||
| # Fetch all ontologies concurrently — each fetch is independent; order is | ||
| # preserved by gather, so the concatenation is deterministic. | ||
| blocks = await asyncio.gather( | ||
| *(self._fetch_one(name, folder) for name, folder in self._ontologies) | ||
| ) | ||
| self._cached = "\n\n".join(blocks) | ||
| return self._cached |
| tool = create_datafabric_query_tool(resource, MagicMock()) # type: ignore[arg-type] | ||
|
|
||
| assert tool.coroutine._ontologies == [("library", "f1")] |
| tool = create_datafabric_query_tool(resource, MagicMock()) # type: ignore[arg-type] | ||
|
|
||
| assert tool.coroutine._ontologies == [("finance", "f2")] |
| tool = create_datafabric_query_tool(resource, MagicMock()) # type: ignore[arg-type] | ||
|
|
||
| assert tool.coroutine._ontologies == [] |
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
| return { | ||
| "messages": tool_messages, | ||
| "iteration_count": state.iteration_count + len(last.tool_calls), | ||
| "last_tool_success": all_succeeded, | ||
| "last_tool_success": any_succeeded, | ||
| } |
| async def __call__(self, **_kwargs: Any) -> str: | ||
| """Fetch all configured ontologies (cached), concatenated for the LLM.""" | ||
| if self._cached is not None: | ||
| return self._cached | ||
| if not self._ontologies: | ||
| return "No ontologies are configured for this agent." | ||
| # Fetch all ontologies concurrently — each fetch is independent; order is | ||
| # preserved by gather, so the concatenation is deterministic. | ||
| blocks = await asyncio.gather( | ||
| *(self._fetch_one(name, folder) for name, folder in self._ontologies) | ||
| ) | ||
| self._cached = "\n\n".join(blocks) | ||
| return self._cached |
|
| # Inner toolset: always execute_sql; optionally an LLM-decided | ||
| # fetch_ontology tool when one or more ontologies are configured. | ||
| inner_tools: list[BaseTool] = [self._execute_sql_tool] | ||
| if ontologies: |
There was a problem hiding this comment.
EnabledNewLlmClients <- check for the feature flag impl of this to ensure out feature is behind the feature flag.
| # fetch_ontology tool when one or more ontologies are configured. | ||
| inner_tools: list[BaseTool] = [self._execute_sql_tool] | ||
| if ontologies: | ||
| inner_tools.append( |
There was a problem hiding this comment.
This doesnt update the subgraph ? correct?
| results = await asyncio.gather( | ||
| *[self._execute_tool_call(tc) for tc in last.tool_calls] | ||
| ) | ||
| tool_messages = [msg for msg, _ in results] | ||
| all_succeeded = bool(results) and all(success for _, success in results) | ||
| # End as soon as ANY tool call is a terminal success (a row-returning | ||
| # execute_sql). `any` not `all`: a non-terminal tool (e.g. fetch_ontology) | ||
| # co-issued in the same turn must not prevent a successful SQL from ending | ||
| # the loop. | ||
| any_succeeded = any(success for _, success in results) |
There was a problem hiding this comment.
Instead of any_ check for FF to see what graph gets constructed.
| entity set) as ``ontologySet`` items. Each carries its own ``folderId``, so | ||
| it is fetched from its own folder. | ||
| """ | ||
| items = getattr(resource, "ontology_set", None) or [] |
There was a problem hiding this comment.
Same as other PR. ontology_set?



What
Adds a
fetch_ontologytool to the Data Fabric inner SQL agent. When a context has a nestedontologySet, the inner ReAct loop can fetch the ontology's OWL schema from QueryEngine to ground its SQL.datafabric_tool.py—resolve_context_ontologies(resource)maps the context's nestedontology_set→(name, folder_key)pairs, passed to the tool factory.datafabric_subgraph.py—fetch_ontologyis bound only when the context has ontologies. It is non-terminal (loops back); only a row-returningexecute_sqlends the loop. Terminal-batch logic isany(...)notall(...), so a co-issuedexecute_sql+fetch_ontologybatch still terminates on the SQL rows.Why
Giving the LLM the ontology's class/property names and value formats lets it write SQL against the real schema instead of guessing from entity names alone.
Notes
uipath 2.11.11, nestedontologySetmodel). Unit/lint CI stays red until #1728 merges and publishes 2.11.11; thenuv lock(rangeuipath<2.12.0) turns them green. Do not merge a.devpin.feat/datafabric-ontology-r2rml-grounding).