Query Execution
This page explains how SQL queries are executed against OMOP databases in the OMCP project. The database layer handles connections, validation, and query execution, ensuring secure and efficient data access.
OmopDatabase Class
The core of database connectivity is the OmopDatabase
class in the db.py
module. This class:
- Establishes connections to various database backends
- Validates SQL queries through the
SQLValidator
- Executes safe queries and returns formatted results
Initialization
The OmopDatabase
is initialized with connection details and security parameters:
def __init__(
self,
connection_string: str,
read_only=True,
cdm_schema: str = "cdm",
vocab_schema: str = "vocab",
allow_source_value_columns: bool = False,
allowed_tables: Optional[List[str]] = None,
):
Parameter | Description |
---|---|
connection_string |
Database URI (e.g., duckdb://path/to/file.duckdb ) |
read_only |
Whether to open the connection in read-only mode |
cdm_schema |
Schema name for clinical data tables |
vocab_schema |
Schema name for vocabulary tables |
allow_source_value_columns |
Whether to allow querying source value columns |
allowed_tables |
List of specific tables to allow (defaults to standard OMOP tables) |
Database Connection
The class supports multiple database backends through the Ibis framework, with DuckDB as the current default implementation:
if connection_string.startswith("duckdb://"):
self.conn = ibis.duckdb.connect(
connection_string.replace("duckdb://", ""),
read_only=read_only
)
# Support for other databases (PostgreSQL, etc.) is planned
Executing Queries
The read_query()
method is the primary interface for executing SQL queries:
@lru_cache(maxsize=128)
def read_query(self, query: str) -> str:
"""
Execute a read-only SQL query and return results as CSV
"""
# Validation and execution implementation
This method:
- Validates the SQL through
sql_validator.validate_sql()
- If validation passes, executes the query through Ibis
- Applies row limits to prevent performance issues
- Returns results in CSV format
- Handles errors and exceptions
Caching
Query results are cached using Python's lru_cache
decorator to improve performance:
@lru_cache(maxsize=128)
def read_query(self, query: str) -> str:
# Implementation...
This caches up to 128 recent query results, avoiding redundant database calls.
Information Schema Access
The get_information_schema()
method provides metadata about tables and columns:
@lru_cache(maxsize=128)
def get_information_schema(self) -> Dict[str, List[str]]:
"""Get the information schema of the database."""
# Implementation...
This method returns table schema information as CSV, filtered according to security settings.
Error Handling
When errors occur during execution, specific exception types are raised:
ExceptionGroup
for validation errors (containing detailed validation failure information)QueryError
for execution failuresConnectionError
for database connection issues
Row Limiting
To prevent resource exhaustion, a row limit is applied to all queries:
result = self.conn.sql(query).limit(self.row_limit) # Default: 1000 rows
This limit can be configured during initialization.
Integration with MCP Tools
The database functionality is exposed through MCP tools in main.py
:
Get_Information_Schema
- Calls theget_information_schema()
methodSelect_Query
- Calls theread_query()
method with user-provided SQL
Usage Example
Here's an example of how to query an OMOP database using this system:
# Initialize database connection
db = OmopDatabase(
connection_string="duckdb:///path/to/omop.duckdb",
read_only=True
)
# Execute a query and get results as CSV
try:
results_csv = db.read_query("""
SELECT p.person_id, p.year_of_birth, c.concept_name as gender
FROM person p
JOIN concept c ON p.gender_concept_id = c.concept_id
LIMIT 10
""")
print(results_csv)
except Exception as e:
print(f"Query failed: {e}")
Best Practice
Always use the JOIN syntax to resolve concept IDs to their human-readable names from the concept table rather than using source value columns directly.