omopy.connector¶
Database CDM access layer for OMOPy — connect to OMOP CDM databases and get lazy, Ibis-backed table access.
This module is the Python equivalent of the R CDMConnector package.
Connection & Factory¶
cdm_from_con¶
cdm_from_con
¶
Factory function to create a CdmReference from a database connection.
cdm_from_con() is the primary entry point for users. It takes an Ibis
backend connection (or DuckDB file path), auto-detects the CDM schema and
version, and returns a fully-populated CdmReference with lazy table access.
cdm_from_con
¶
cdm_from_con(
con: IbisConnection | str | Path,
*,
cdm_schema: str | None = None,
write_schema: str | None = None,
cdm_version: CdmVersion | None = None,
cdm_name: str | None = None,
cdm_tables: list[str] | None = None,
) -> CdmReference
Create a CdmReference from a database connection.
This is the main entry point for connecting to an OMOP CDM database. It auto-detects the schema, CDM version, and available tables, then returns a CdmReference with lazy (Ibis-backed) table access.
Parameters¶
con
An Ibis backend connection, or a path to a DuckDB file.
If a string/Path is given, it is opened as a DuckDB database.
cdm_schema
The database schema containing CDM tables. Auto-detected if None.
write_schema
The schema for writing results (cohorts, etc.). Defaults to cdm_schema.
cdm_version
The OMOP CDM version. Auto-detected from cdm_source table if None.
cdm_name
Human-readable name. Auto-detected from cdm_source table if None.
cdm_tables
Specific table names to load. If None, loads all standard CDM tables
found in the schema.
Returns¶
CdmReference A CDM reference with lazy database-backed tables.
Examples¶
cdm = cdm_from_con("synthea.duckdb", cdm_schema="base") cdm.cdm_name 'dbt-synthea' cdm["person"].count() 27
Tables are lazy — no data fetched until .collect()¶
person_df = cdm["person"].collect()
connect_duckdb¶
connect_duckdb
¶
detect_cdm_schema¶
detect_cdm_schema
¶
Detect the database schema containing CDM tables.
Looks for a schema that contains the person table, which is
mandatory in every valid OMOP CDM instance.
Parameters¶
con An Ibis backend connection. cdm_schema If provided, validates this schema has CDM tables and returns it.
Returns¶
str
The schema name (e.g. "base", "cdm", "main").
Raises¶
ValueError If no schema with CDM tables can be found.
Core Classes¶
DbSource¶
DbSource
¶
DbSource(
con: IbisConnection,
cdm_schema: str,
*,
write_schema: str | None = None,
cdm_version: CdmVersion | None = None,
cdm_name: str | None = None,
)
Database-backed CDM source.
Implements the :class:~omopy.generics.cdm_reference.CdmSource protocol.
Provides lazy table access via Ibis — no data is fetched until a table
is materialised with CdmTable.collect().
Parameters¶
con
An Ibis backend connection (e.g. from ibis.duckdb.connect()).
cdm_schema
The database schema containing CDM tables (e.g. "base").
write_schema
The schema for writing results (cohorts, etc.). Defaults to cdm_schema.
cdm_version
The OMOP CDM version. If None, auto-detected from cdm_source table.
cdm_name
Human-readable name for this CDM. If None, read from cdm_source.
Examples¶
import ibis con = ibis.duckdb.connect("synthea.duckdb", read_only=True) source = DbSource(con, cdm_schema="base") tables = source.list_tables() "person" in tables True
read_table
¶
Return a lazy CdmTable wrapping an Ibis table expression.
No data is fetched — the returned CdmTable holds a lazy Ibis
reference that only executes on .collect().
write_table
¶
Write/materialise a table into the write schema.
Supports writing Polars DataFrames, Ibis expressions, and PyArrow tables into the database.
CdmSnapshot¶
CdmSnapshot
¶
IbisConnection¶
Cohort Generation¶
generate_concept_cohort_set¶
generate_concept_cohort_set
¶
generate_concept_cohort_set(
cdm: CdmReference,
concept_set: Codelist
| ConceptSetExpression
| dict[str, list[int]],
name: str,
*,
limit: Literal["first", "all"] = "first",
required_observation: tuple[int, int] = (0, 0),
end: Literal[
"observation_period_end_date", "event_end_date"
]
| int = "observation_period_end_date",
) -> CdmReference
Generate a cohort from concept sets.
Each concept set becomes one cohort. The function:
- Resolves concept IDs (optionally including descendants)
- Looks up clinical events in the appropriate domain tables
- Constrains to observation periods
- Applies required observation time before/after index
- Applies limit (first occurrence or all)
- Collapses overlapping periods
- Creates a CohortTable with settings, attrition, and codelist
Parameters¶
cdm
A CdmReference with database-backed tables.
concept_set
Concept sets to generate cohorts from. Can be:
- Codelist: named mapping of concept ID lists
- ConceptSetExpression: with descendant/exclude flags
- dict[str, list[int]]: simple named list of concept IDs
name
Name for the cohort table in the CDM.
limit
"first" (default) keeps only the first event per person per cohort.
"all" keeps all events.
required_observation
(prior_days, future_days) — minimum observation time before and
after the index date for inclusion.
end
How to set cohort_end_date:
- "observation_period_end_date" (default): use observation period end
- "event_end_date": use the clinical event's end date
- int: fixed number of days after the event start date
Returns¶
CdmReference
The CDM with a new CohortTable added under cdm[name].
generate_cohort_set¶
generate_cohort_set
¶
generate_cohort_set(
cdm: CdmReference,
cohort_set: dict[str, Any]
| list[dict[str, Any]]
| str
| Path,
name: str = "cohort",
) -> CdmReference
Generate cohorts from CIRCE JSON definitions.
Parameters¶
cdm
A database-backed CdmReference (from cdm_from_con).
cohort_set
One of:
- A single CIRCE JSON dict (with id, name, json)
- A list of such dicts
- A path to a directory of *.json files
- A path to a single *.json file
- A JSON string
name
Name for the cohort table in the CDM.
Returns¶
CdmReference
The CDM with a new :class:CohortTable added under cdm[name].
CDM Operations¶
cdm_subset¶
cdm_subset
¶
CDM subsetting and sampling utilities.
Provides functions to create subsets of a CDM by cohort membership
or random sampling, equivalent to R's cdmSubsetCohort() and
cdmSample().
cdm_subset
¶
Subset a CDM to a specific set of person IDs.
Returns a new CdmReference where all clinical tables are filtered to only include the specified persons. Tables remain lazy (Ibis-backed) — the filtering is applied as SQL predicates, not materialised.
Parameters¶
cdm A CdmReference with database-backed or local tables. person_ids Explicit list of person_id values to include.
Returns¶
CdmReference A new CDM with all person-linked tables filtered.
Raises¶
ValueError If person_ids is empty.
cdm_subset_cohort
¶
cdm_subset_cohort(
cdm: CdmReference,
cohort_table: str = "cohort",
cohort_id: list[int] | None = None,
) -> CdmReference
Subset a CDM to individuals in one or more cohorts.
Returns a new CdmReference where all clinical tables are filtered to only include persons who appear in the specified cohort(s). Tables remain lazy (Ibis-backed) — the filtering is applied as SQL predicates, not materialised.
Parameters¶
cdm A CdmReference with database-backed tables. cohort_table Name of a cohort table in the CDM. cohort_id Specific cohort definition IDs to include. If None, all cohorts in the table are used.
Returns¶
CdmReference A new CDM with all person-linked tables filtered to the cohort subjects.
cdm_sample
¶
Subset a CDM to a random sample of persons.
Only persons present in both the person table and
observation_period table are eligible for sampling.
Parameters¶
cdm A CdmReference with database-backed tables. n Number of persons to include. seed Random seed for reproducibility.
Returns¶
CdmReference A new CDM with all person-linked tables filtered to the sampled persons.
cdm_subset_cohort¶
cdm_subset_cohort
¶
cdm_subset_cohort(
cdm: CdmReference,
cohort_table: str = "cohort",
cohort_id: list[int] | None = None,
) -> CdmReference
Subset a CDM to individuals in one or more cohorts.
Returns a new CdmReference where all clinical tables are filtered to only include persons who appear in the specified cohort(s). Tables remain lazy (Ibis-backed) — the filtering is applied as SQL predicates, not materialised.
Parameters¶
cdm A CdmReference with database-backed tables. cohort_table Name of a cohort table in the CDM. cohort_id Specific cohort definition IDs to include. If None, all cohorts in the table are used.
Returns¶
CdmReference A new CDM with all person-linked tables filtered to the cohort subjects.
cdm_sample¶
cdm_sample
¶
Subset a CDM to a random sample of persons.
Only persons present in both the person table and
observation_period table are eligible for sampling.
Parameters¶
cdm A CdmReference with database-backed tables. n Number of persons to include. seed Random seed for reproducibility.
Returns¶
CdmReference A new CDM with all person-linked tables filtered to the sampled persons.
cdm_flatten¶
cdm_flatten
¶
Flatten a CDM into a single longitudinal observation table.
Provides cdm_flatten() to UNION ALL clinical domain tables into a
single table with normalised columns. Equivalent to R's
cdmFlatten().
cdm_flatten
¶
cdm_flatten(
cdm: CdmReference,
domains: list[str] | None = None,
*,
include_concept_name: bool = True,
) -> ir.Table | pl.DataFrame
Flatten a CDM into a single observation table.
Each included domain table is projected to a common schema
(person_id, observation_concept_id, start_date,
end_date, type_concept_id, domain) and then UNION-ALLed
together. Optionally joins concept names from the concept table.
Parameters¶
cdm
A CdmReference.
domains
Domains to include. Defaults to
["condition_occurrence", "drug_exposure", "procedure_occurrence"].
Valid values: condition_occurrence, drug_exposure,
procedure_occurrence, measurement, visit_occurrence,
death, observation.
include_concept_name
If True (default), join concept names from the concept
table, adding observation_concept_name and
type_concept_name columns.
Returns¶
ir.Table | pl.DataFrame A lazy Ibis table (if DB-backed) or Polars DataFrame with the flattened observations. Distinct rows only.
Raises¶
ValueError If an invalid domain is specified. KeyError If a requested domain table is not in the CDM.
copy_cdm_to¶
copy_cdm_to
¶
copy_cdm_to(
cdm: CdmReference,
con: IbisConnection,
*,
schema: str,
overwrite: bool = False,
) -> CdmReference
Copy a CDM from one source to a new database connection.
Collects each table from the source CDM (materialising lazy tables), uploads it to the target connection, and builds a new CdmReference pointing to the target database. Cohort tables have their settings, attrition, and codelist metadata preserved.
Parameters¶
cdm
The source CDM reference to copy.
con
Target Ibis backend connection to copy into.
schema
Schema in the target database where tables will be created.
The schema is created if it does not exist.
overwrite
If True, overwrite existing tables. If False (default),
raise an error if a table already exists.
Returns¶
CdmReference A new CDM reference pointing to the tables in the target database.
Raises¶
ValueError
If a table already exists in the target schema and overwrite
is False.
Examples¶
from omopy.connector import cdm_from_con, copy_cdm_to import ibis
Load source CDM¶
cdm = cdm_from_con("source.duckdb", cdm_schema="base")
Copy to a new database¶
target_con = ibis.duckdb.connect("target.duckdb") new_cdm = copy_cdm_to(cdm, target_con, schema="cdm")
tbl_group¶
tbl_group
¶
Table grouping helper.
Provides tbl_group() to retrieve table names by logical group
(vocab, clinical, all, default, derived). Equivalent to R's
tblGroup().
tbl_group
¶
tbl_group(
group: str | TableGroup | list[str | TableGroup],
*,
cdm_version: str | CdmVersion = CdmVersion.V5_4,
) -> list[str]
Return CDM table names belonging to one or more logical groups.
Parameters¶
group
A group name (or TableGroup enum), or a list of them.
Valid values: "vocab", "clinical", "all",
"default", "derived".
cdm_version
CDM version to use for the lookup. Defaults to 5.4.
The set of table groups is the same across 5.3 and 5.4.
Returns¶
list[str] Unique table names in the requested group(s), in schema order.
Examples¶
tbl_group("vocab") ['concept', 'vocabulary', ...] tbl_group(["clinical", "derived"]) ['person', 'observation_period', ..., 'drug_era', ...]
snapshot¶
snapshot
¶
CDM snapshot — metadata extraction.
Provides snapshot() to extract summary metadata from a CDM,
including person count, observation period range, vocabulary version,
and CDM source information. Equivalent to R's snapshot().
CdmSnapshot
¶
snapshot
¶
Extract summary metadata from a CDM.
Queries the person, observation_period, cdm_source, and
vocabulary tables to produce a concise summary of the CDM
contents and provenance.
Parameters¶
cdm A CdmReference (database-backed or local).
Returns¶
CdmSnapshot A frozen dataclass with all metadata fields.
Raises¶
KeyError
If required tables (person, observation_period) are
missing from the CDM.
Compute & Persistence¶
compute_permanent¶
compute_permanent
¶
compute_permanent(
expr: Table | CdmTable,
*,
name: str,
con: IbisConnection | None = None,
schema: str | None = None,
overwrite: bool = True,
) -> ir.Table
Materialise an Ibis expression to a permanent database table.
Executes the query and stores the result as a persistent table. Returns a lazy Ibis reference to the newly created table.
Parameters¶
expr
An Ibis Table expression or CdmTable to materialise.
name
Name for the new table.
con
Ibis backend connection. If None, inferred from expr.
schema
Database schema for the new table. If None, uses default.
overwrite
If True (default), drop an existing table with the same name
before creating. If False, raise an error if the table exists.
Returns¶
ir.Table A lazy Ibis reference to the newly created permanent table.
Raises¶
ValueError
If the table exists and overwrite is False.
TypeError
If expr is not an Ibis expression or CdmTable.
Examples¶
import ibis con = ibis.duckdb.connect("my.duckdb") concept = con.table("concept", database=("mydb", "base")) drug_count = concept.filter(concept.domain_id == "Drug").count() result = compute_permanent( ... concept.filter(concept.domain_id == "Drug"), ... name="drug_concepts", ... schema="results", ... )
compute_query¶
compute_query
¶
compute_query(
expr: Table | CdmTable,
*,
name: str | None = None,
con: IbisConnection | None = None,
temporary: bool = True,
schema: str | None = None,
overwrite: bool = True,
) -> ir.Table
Execute an Ibis query and store the result in the database.
This is a general-purpose materialisation function. If temporary
is True (default), creates a temporary table. Otherwise creates
a permanent table in the specified schema.
Parameters¶
expr
An Ibis Table expression or CdmTable.
name
Table name. If None, a unique name is generated.
con
Ibis backend connection. If None, inferred from expr.
temporary
If True (default), create a temporary table.
schema
Schema for permanent tables (ignored when temporary=True).
overwrite
Whether to overwrite existing tables.
Returns¶
ir.Table A lazy Ibis reference to the newly created table.
append_permanent¶
append_permanent
¶
append_permanent(
expr: Table | CdmTable,
*,
name: str,
con: IbisConnection | None = None,
schema: str | None = None,
) -> ir.Table
Append the result of an Ibis expression to an existing table.
If the target table does not exist, it is created (equivalent to
compute_permanent with overwrite=False).
Parameters¶
expr
An Ibis Table expression or CdmTable to append.
name
Name of the target table.
con
Ibis backend connection. If None, inferred from expr.
schema
Database schema for the target table.
Returns¶
ir.Table A lazy Ibis reference to the (now-appended) table.
Examples¶
First batch¶
compute_permanent(batch1, name="results", schema="work")
Subsequent batches¶
append_permanent(batch2, name="results", schema="work")
Date Helpers¶
dateadd¶
dateadd
¶
dateadd(
expr: Column | Table,
date_col: str,
number: int | str,
*,
interval: IntervalUnit = "day",
) -> ir.Column | ir.Table
Add a time interval to a date column.
Works on both Ibis table expressions and individual columns. If expr is a Table, returns a new Table with a computed column added. If it's a Column (date expression), returns a new date Column.
Parameters¶
expr
An Ibis Table or date Column expression.
date_col
Name of the date column to modify.
number
Number of units to add (can be negative). If a string, it is
interpreted as a column name containing the number.
interval
The unit of time: "day" (default), "month", or "year".
Returns¶
ir.Column A new date column expression with the interval added.
Examples¶
import ibis t = ibis.table({"start_date": "date", "days_offset": "int64"}, name="events")
Add 30 days to start_date¶
new_date = dateadd(t, "start_date", 30)
Add a column-based number of years¶
new_date = dateadd(t, "start_date", "days_offset", interval="day")
datediff¶
datediff
¶
datediff(
table: Table,
start_col: str,
end_col: str,
*,
interval: IntervalUnit = "day",
) -> ir.Column
Compute the difference between two date columns.
Parameters¶
table
An Ibis Table expression containing both date columns.
start_col
Name of the start date column.
end_col
Name of the end date column.
interval
The unit for the difference: "day" (default), "month",
or "year".
Returns¶
ir.Column An integer column expression with the difference in the specified units.
Notes¶
For "day" interval, this returns end_col - start_col in whole
days. For "month" and "year", it uses calendar-based
computation (matching R's clock::date_count_between):
- Months:
(year_end * 12 + month_end) - (year_start * 12 + month_start), adjusted for day-of-month. - Years:
year_end - year_start, adjusted for month+day.
Examples¶
import ibis t = ibis.table( ... {"start_date": "date", "end_date": "date"}, name="periods" ... ) days_diff = datediff(t, "start_date", "end_date") years_diff = datediff(t, "start_date", "end_date", interval="year")
datepart¶
datepart
¶
Extract a part (day, month, year) from a date column.
Parameters¶
table
An Ibis Table expression.
date_col
Name of the date column.
part
The part to extract: "year" (default), "month", or
"day".
Returns¶
ir.Column An integer column expression with the extracted part.
Examples¶
import ibis t = ibis.table({"birth_date": "date"}, name="person") birth_year = datepart(t, "birth_date", "year") birth_month = datepart(t, "birth_date", "month")
dateadd_polars¶
dateadd_polars
¶
dateadd_polars(
df: DataFrame | LazyFrame,
date_col: str,
number: int | str,
*,
interval: IntervalUnit = "day",
result_col: str | None = None,
) -> pl.DataFrame | pl.LazyFrame
Add a time interval to a date column in a Polars DataFrame.
Parameters¶
df
A Polars DataFrame or LazyFrame.
date_col
Name of the date column.
number
Number of units to add, or name of a column containing the number.
interval
The unit: "day" (default), "month", or "year".
result_col
Name for the result column. Defaults to date_col (in-place).
Returns¶
pl.DataFrame | pl.LazyFrame The DataFrame with the new/modified date column.
datediff_polars¶
datediff_polars
¶
datediff_polars(
df: DataFrame | LazyFrame,
start_col: str,
end_col: str,
*,
interval: IntervalUnit = "day",
result_col: str = "date_diff",
) -> pl.DataFrame | pl.LazyFrame
Compute date difference in a Polars DataFrame.
Parameters¶
df
A Polars DataFrame or LazyFrame.
start_col
Name of the start date column.
end_col
Name of the end date column.
interval
The unit: "day" (default), "month", or "year".
result_col
Name for the result column (default "date_diff").
Returns¶
pl.DataFrame | pl.LazyFrame The DataFrame with a new integer column containing the difference.
Analytics¶
summarise_quantile¶
summarise_quantile
¶
DBMS-independent quantile estimation.
Provides summarise_quantile() which computes quantiles using a
cumulative-sum approach that works on any database backend (including
those without native PERCENTILE_CONT).
Equivalent to R's summariseQuantile2().
summarise_quantile
¶
summarise_quantile(
data: Table | DataFrame | LazyFrame,
columns: str | list[str],
probs: list[float],
*,
group_by: str | list[str] | None = None,
) -> ir.Table | pl.DataFrame
Compute quantiles for one or more numeric columns.
Uses a cumulative-sum / inverse-CDF approach (equivalent to
quantile(type=1) in R) that generates pure SQL and works on all
database backends.
Parameters¶
data An Ibis table, Polars DataFrame, or Polars LazyFrame. columns Column name(s) to compute quantiles for. probs Probability values in [0, 1]. group_by Optional grouping column(s).
Returns¶
ir.Table | pl.DataFrame
One row per group (or a single row if ungrouped), with columns
named q{pct:02d}_{col} (e.g. q25_age, q50_age).
Raises¶
ValueError If probs contains values outside [0, 1], or columns is empty.
compute_data_hash¶
compute_data_hash
¶
Compute an MD5 hash per CDM table for change tracking.
For each standard CDM table, computes the row count and the number of distinct values in a key column, then hashes those values to produce a fingerprint. A change in the hash between runs indicates the data has changed.
Parameters¶
cdm A CdmReference.
Returns¶
pl.DataFrame
A DataFrame with columns: cdm_name, table_name,
table_row_count, unique_column, n_unique_values,
table_hash, compute_time_secs.
benchmark¶
benchmark
¶
CDM benchmark — timed diagnostic queries.
Provides benchmark() to run a set of standard queries against a CDM
and measure execution time. Equivalent to R's
benchmarkCDMConnector().
benchmark
¶
Run benchmark queries against a CDM and return timings.
Executes a standard set of queries (distinct count, group-by, joins, collect) against vocab and clinical tables, timing each.
Parameters¶
cdm A CdmReference (database-backed recommended for meaningful timings).
Returns¶
pl.DataFrame
A DataFrame with columns: task, time_taken_secs,
time_taken_mins, dbms, person_n.