Skip to content

omopy.connector

Database CDM access layer for OMOPy — connect to OMOP CDM databases and get lazy, Ibis-backed table access.

This module is the Python equivalent of the R CDMConnector package.

Connection & Factory

cdm_from_con

cdm_from_con

Factory function to create a CdmReference from a database connection.

cdm_from_con() is the primary entry point for users. It takes an Ibis backend connection (or DuckDB file path), auto-detects the CDM schema and version, and returns a fully-populated CdmReference with lazy table access.

cdm_from_con

cdm_from_con(
    con: IbisConnection | str | Path,
    *,
    cdm_schema: str | None = None,
    write_schema: str | None = None,
    cdm_version: CdmVersion | None = None,
    cdm_name: str | None = None,
    cdm_tables: list[str] | None = None,
) -> CdmReference

Create a CdmReference from a database connection.

This is the main entry point for connecting to an OMOP CDM database. It auto-detects the schema, CDM version, and available tables, then returns a CdmReference with lazy (Ibis-backed) table access.

Parameters

con An Ibis backend connection, or a path to a DuckDB file. If a string/Path is given, it is opened as a DuckDB database. cdm_schema The database schema containing CDM tables. Auto-detected if None. write_schema The schema for writing results (cohorts, etc.). Defaults to cdm_schema. cdm_version The OMOP CDM version. Auto-detected from cdm_source table if None. cdm_name Human-readable name. Auto-detected from cdm_source table if None. cdm_tables Specific table names to load. If None, loads all standard CDM tables found in the schema.

Returns

CdmReference A CDM reference with lazy database-backed tables.

Examples

cdm = cdm_from_con("synthea.duckdb", cdm_schema="base") cdm.cdm_name 'dbt-synthea' cdm["person"].count() 27

Tables are lazy — no data fetched until .collect()

person_df = cdm["person"].collect()

connect_duckdb

connect_duckdb

connect_duckdb(
    database: str | Path,
    *,
    read_only: bool = False,
    **kwargs: Any,
) -> IbisConnection

Connect to a DuckDB database file via Ibis.

Parameters

database Path to the .duckdb file. read_only Open in read-only mode (default False). **kwargs Extra keyword arguments forwarded to ibis.duckdb.connect().

Returns

IbisConnection An Ibis DuckDB backend connection.

detect_cdm_schema

detect_cdm_schema

detect_cdm_schema(
    con: IbisConnection, *, cdm_schema: str | None = None
) -> str

Detect the database schema containing CDM tables.

Looks for a schema that contains the person table, which is mandatory in every valid OMOP CDM instance.

Parameters

con An Ibis backend connection. cdm_schema If provided, validates this schema has CDM tables and returns it.

Returns

str The schema name (e.g. "base", "cdm", "main").

Raises

ValueError If no schema with CDM tables can be found.

Core Classes

DbSource

DbSource

DbSource(
    con: IbisConnection,
    cdm_schema: str,
    *,
    write_schema: str | None = None,
    cdm_version: CdmVersion | None = None,
    cdm_name: str | None = None,
)

Database-backed CDM source.

Implements the :class:~omopy.generics.cdm_reference.CdmSource protocol. Provides lazy table access via Ibis — no data is fetched until a table is materialised with CdmTable.collect().

Parameters

con An Ibis backend connection (e.g. from ibis.duckdb.connect()). cdm_schema The database schema containing CDM tables (e.g. "base"). write_schema The schema for writing results (cohorts, etc.). Defaults to cdm_schema. cdm_version The OMOP CDM version. If None, auto-detected from cdm_source table. cdm_name Human-readable name for this CDM. If None, read from cdm_source.

Examples

import ibis con = ibis.duckdb.connect("synthea.duckdb", read_only=True) source = DbSource(con, cdm_schema="base") tables = source.list_tables() "person" in tables True

source_type property

source_type: str

Backend identifier derived from the Ibis connection.

connection property

connection: IbisConnection

The underlying Ibis backend connection.

cdm_schema property

cdm_schema: str

The database schema containing CDM tables.

write_schema property

write_schema: str

The database schema for writing results.

catalog property

catalog: str

The database catalog (e.g. DuckDB database name).

cdm_version property

cdm_version: CdmVersion

The detected or specified CDM version.

cdm_name property

cdm_name: str

The CDM source name.

list_tables

list_tables() -> list[str]

Return names of all available tables in the CDM schema.

read_table

read_table(table_name: str) -> CdmTable

Return a lazy CdmTable wrapping an Ibis table expression.

No data is fetched — the returned CdmTable holds a lazy Ibis reference that only executes on .collect().

write_table

write_table(
    table: CdmTable, table_name: str | None = None
) -> None

Write/materialise a table into the write schema.

Supports writing Polars DataFrames, Ibis expressions, and PyArrow tables into the database.

drop_table

drop_table(table_name: str) -> None

Drop a table from the write schema.

CdmSnapshot

CdmSnapshot

Bases: BaseModel

Immutable container for CDM snapshot metadata.

to_dict

to_dict() -> dict[str, str]

Return all fields as a {name: str_value} dict.

to_polars

to_polars() -> pl.DataFrame

Return snapshot as a single-row Polars DataFrame (all string cols).

IbisConnection

IbisConnection module-attribute

IbisConnection = BaseBackend

Cohort Generation

generate_concept_cohort_set

generate_concept_cohort_set

generate_concept_cohort_set(
    cdm: CdmReference,
    concept_set: Codelist
    | ConceptSetExpression
    | dict[str, list[int]],
    name: str,
    *,
    limit: Literal["first", "all"] = "first",
    required_observation: tuple[int, int] = (0, 0),
    end: Literal[
        "observation_period_end_date", "event_end_date"
    ]
    | int = "observation_period_end_date",
) -> CdmReference

Generate a cohort from concept sets.

Each concept set becomes one cohort. The function:

  1. Resolves concept IDs (optionally including descendants)
  2. Looks up clinical events in the appropriate domain tables
  3. Constrains to observation periods
  4. Applies required observation time before/after index
  5. Applies limit (first occurrence or all)
  6. Collapses overlapping periods
  7. Creates a CohortTable with settings, attrition, and codelist

Parameters

cdm A CdmReference with database-backed tables. concept_set Concept sets to generate cohorts from. Can be: - Codelist: named mapping of concept ID lists - ConceptSetExpression: with descendant/exclude flags - dict[str, list[int]]: simple named list of concept IDs name Name for the cohort table in the CDM. limit "first" (default) keeps only the first event per person per cohort. "all" keeps all events. required_observation (prior_days, future_days) — minimum observation time before and after the index date for inclusion. end How to set cohort_end_date: - "observation_period_end_date" (default): use observation period end - "event_end_date": use the clinical event's end date - int: fixed number of days after the event start date

Returns

CdmReference The CDM with a new CohortTable added under cdm[name].

generate_cohort_set

generate_cohort_set

generate_cohort_set(
    cdm: CdmReference,
    cohort_set: dict[str, Any]
    | list[dict[str, Any]]
    | str
    | Path,
    name: str = "cohort",
) -> CdmReference

Generate cohorts from CIRCE JSON definitions.

Parameters

cdm A database-backed CdmReference (from cdm_from_con). cohort_set One of: - A single CIRCE JSON dict (with id, name, json) - A list of such dicts - A path to a directory of *.json files - A path to a single *.json file - A JSON string name Name for the cohort table in the CDM.

Returns

CdmReference The CDM with a new :class:CohortTable added under cdm[name].

CDM Operations

cdm_subset

cdm_subset

CDM subsetting and sampling utilities.

Provides functions to create subsets of a CDM by cohort membership or random sampling, equivalent to R's cdmSubsetCohort() and cdmSample().

cdm_subset

cdm_subset(
    cdm: CdmReference, person_ids: list[int]
) -> CdmReference

Subset a CDM to a specific set of person IDs.

Returns a new CdmReference where all clinical tables are filtered to only include the specified persons. Tables remain lazy (Ibis-backed) — the filtering is applied as SQL predicates, not materialised.

Parameters

cdm A CdmReference with database-backed or local tables. person_ids Explicit list of person_id values to include.

Returns

CdmReference A new CDM with all person-linked tables filtered.

Raises

ValueError If person_ids is empty.

cdm_subset_cohort

cdm_subset_cohort(
    cdm: CdmReference,
    cohort_table: str = "cohort",
    cohort_id: list[int] | None = None,
) -> CdmReference

Subset a CDM to individuals in one or more cohorts.

Returns a new CdmReference where all clinical tables are filtered to only include persons who appear in the specified cohort(s). Tables remain lazy (Ibis-backed) — the filtering is applied as SQL predicates, not materialised.

Parameters

cdm A CdmReference with database-backed tables. cohort_table Name of a cohort table in the CDM. cohort_id Specific cohort definition IDs to include. If None, all cohorts in the table are used.

Returns

CdmReference A new CDM with all person-linked tables filtered to the cohort subjects.

cdm_sample

cdm_sample(
    cdm: CdmReference, n: int, *, seed: int | None = None
) -> CdmReference

Subset a CDM to a random sample of persons.

Only persons present in both the person table and observation_period table are eligible for sampling.

Parameters

cdm A CdmReference with database-backed tables. n Number of persons to include. seed Random seed for reproducibility.

Returns

CdmReference A new CDM with all person-linked tables filtered to the sampled persons.

cdm_subset_cohort

cdm_subset_cohort

cdm_subset_cohort(
    cdm: CdmReference,
    cohort_table: str = "cohort",
    cohort_id: list[int] | None = None,
) -> CdmReference

Subset a CDM to individuals in one or more cohorts.

Returns a new CdmReference where all clinical tables are filtered to only include persons who appear in the specified cohort(s). Tables remain lazy (Ibis-backed) — the filtering is applied as SQL predicates, not materialised.

Parameters

cdm A CdmReference with database-backed tables. cohort_table Name of a cohort table in the CDM. cohort_id Specific cohort definition IDs to include. If None, all cohorts in the table are used.

Returns

CdmReference A new CDM with all person-linked tables filtered to the cohort subjects.

cdm_sample

cdm_sample

cdm_sample(
    cdm: CdmReference, n: int, *, seed: int | None = None
) -> CdmReference

Subset a CDM to a random sample of persons.

Only persons present in both the person table and observation_period table are eligible for sampling.

Parameters

cdm A CdmReference with database-backed tables. n Number of persons to include. seed Random seed for reproducibility.

Returns

CdmReference A new CDM with all person-linked tables filtered to the sampled persons.

cdm_flatten

cdm_flatten

Flatten a CDM into a single longitudinal observation table.

Provides cdm_flatten() to UNION ALL clinical domain tables into a single table with normalised columns. Equivalent to R's cdmFlatten().

cdm_flatten

cdm_flatten(
    cdm: CdmReference,
    domains: list[str] | None = None,
    *,
    include_concept_name: bool = True,
) -> ir.Table | pl.DataFrame

Flatten a CDM into a single observation table.

Each included domain table is projected to a common schema (person_id, observation_concept_id, start_date, end_date, type_concept_id, domain) and then UNION-ALLed together. Optionally joins concept names from the concept table.

Parameters

cdm A CdmReference. domains Domains to include. Defaults to ["condition_occurrence", "drug_exposure", "procedure_occurrence"]. Valid values: condition_occurrence, drug_exposure, procedure_occurrence, measurement, visit_occurrence, death, observation. include_concept_name If True (default), join concept names from the concept table, adding observation_concept_name and type_concept_name columns.

Returns

ir.Table | pl.DataFrame A lazy Ibis table (if DB-backed) or Polars DataFrame with the flattened observations. Distinct rows only.

Raises

ValueError If an invalid domain is specified. KeyError If a requested domain table is not in the CDM.

copy_cdm_to

copy_cdm_to

copy_cdm_to(
    cdm: CdmReference,
    con: IbisConnection,
    *,
    schema: str,
    overwrite: bool = False,
) -> CdmReference

Copy a CDM from one source to a new database connection.

Collects each table from the source CDM (materialising lazy tables), uploads it to the target connection, and builds a new CdmReference pointing to the target database. Cohort tables have their settings, attrition, and codelist metadata preserved.

Parameters

cdm The source CDM reference to copy. con Target Ibis backend connection to copy into. schema Schema in the target database where tables will be created. The schema is created if it does not exist. overwrite If True, overwrite existing tables. If False (default), raise an error if a table already exists.

Returns

CdmReference A new CDM reference pointing to the tables in the target database.

Raises

ValueError If a table already exists in the target schema and overwrite is False.

Examples

from omopy.connector import cdm_from_con, copy_cdm_to import ibis

Load source CDM

cdm = cdm_from_con("source.duckdb", cdm_schema="base")

Copy to a new database

target_con = ibis.duckdb.connect("target.duckdb") new_cdm = copy_cdm_to(cdm, target_con, schema="cdm")

tbl_group

tbl_group

Table grouping helper.

Provides tbl_group() to retrieve table names by logical group (vocab, clinical, all, default, derived). Equivalent to R's tblGroup().

tbl_group

tbl_group(
    group: str | TableGroup | list[str | TableGroup],
    *,
    cdm_version: str | CdmVersion = CdmVersion.V5_4,
) -> list[str]

Return CDM table names belonging to one or more logical groups.

Parameters

group A group name (or TableGroup enum), or a list of them. Valid values: "vocab", "clinical", "all", "default", "derived". cdm_version CDM version to use for the lookup. Defaults to 5.4. The set of table groups is the same across 5.3 and 5.4.

Returns

list[str] Unique table names in the requested group(s), in schema order.

Examples

tbl_group("vocab") ['concept', 'vocabulary', ...] tbl_group(["clinical", "derived"]) ['person', 'observation_period', ..., 'drug_era', ...]

snapshot

snapshot

CDM snapshot — metadata extraction.

Provides snapshot() to extract summary metadata from a CDM, including person count, observation period range, vocabulary version, and CDM source information. Equivalent to R's snapshot().

CdmSnapshot

Bases: BaseModel

Immutable container for CDM snapshot metadata.

to_dict

to_dict() -> dict[str, str]

Return all fields as a {name: str_value} dict.

to_polars

to_polars() -> pl.DataFrame

Return snapshot as a single-row Polars DataFrame (all string cols).

snapshot

snapshot(cdm: CdmReference) -> CdmSnapshot

Extract summary metadata from a CDM.

Queries the person, observation_period, cdm_source, and vocabulary tables to produce a concise summary of the CDM contents and provenance.

Parameters

cdm A CdmReference (database-backed or local).

Returns

CdmSnapshot A frozen dataclass with all metadata fields.

Raises

KeyError If required tables (person, observation_period) are missing from the CDM.

Compute & Persistence

compute_permanent

compute_permanent

compute_permanent(
    expr: Table | CdmTable,
    *,
    name: str,
    con: IbisConnection | None = None,
    schema: str | None = None,
    overwrite: bool = True,
) -> ir.Table

Materialise an Ibis expression to a permanent database table.

Executes the query and stores the result as a persistent table. Returns a lazy Ibis reference to the newly created table.

Parameters

expr An Ibis Table expression or CdmTable to materialise. name Name for the new table. con Ibis backend connection. If None, inferred from expr. schema Database schema for the new table. If None, uses default. overwrite If True (default), drop an existing table with the same name before creating. If False, raise an error if the table exists.

Returns

ir.Table A lazy Ibis reference to the newly created permanent table.

Raises

ValueError If the table exists and overwrite is False. TypeError If expr is not an Ibis expression or CdmTable.

Examples

import ibis con = ibis.duckdb.connect("my.duckdb") concept = con.table("concept", database=("mydb", "base")) drug_count = concept.filter(concept.domain_id == "Drug").count() result = compute_permanent( ... concept.filter(concept.domain_id == "Drug"), ... name="drug_concepts", ... schema="results", ... )

compute_query

compute_query

compute_query(
    expr: Table | CdmTable,
    *,
    name: str | None = None,
    con: IbisConnection | None = None,
    temporary: bool = True,
    schema: str | None = None,
    overwrite: bool = True,
) -> ir.Table

Execute an Ibis query and store the result in the database.

This is a general-purpose materialisation function. If temporary is True (default), creates a temporary table. Otherwise creates a permanent table in the specified schema.

Parameters

expr An Ibis Table expression or CdmTable. name Table name. If None, a unique name is generated. con Ibis backend connection. If None, inferred from expr. temporary If True (default), create a temporary table. schema Schema for permanent tables (ignored when temporary=True). overwrite Whether to overwrite existing tables.

Returns

ir.Table A lazy Ibis reference to the newly created table.

append_permanent

append_permanent

append_permanent(
    expr: Table | CdmTable,
    *,
    name: str,
    con: IbisConnection | None = None,
    schema: str | None = None,
) -> ir.Table

Append the result of an Ibis expression to an existing table.

If the target table does not exist, it is created (equivalent to compute_permanent with overwrite=False).

Parameters

expr An Ibis Table expression or CdmTable to append. name Name of the target table. con Ibis backend connection. If None, inferred from expr. schema Database schema for the target table.

Returns

ir.Table A lazy Ibis reference to the (now-appended) table.

Examples

First batch

compute_permanent(batch1, name="results", schema="work")

Subsequent batches

append_permanent(batch2, name="results", schema="work")

Date Helpers

dateadd

dateadd

dateadd(
    expr: Column | Table,
    date_col: str,
    number: int | str,
    *,
    interval: IntervalUnit = "day",
) -> ir.Column | ir.Table

Add a time interval to a date column.

Works on both Ibis table expressions and individual columns. If expr is a Table, returns a new Table with a computed column added. If it's a Column (date expression), returns a new date Column.

Parameters

expr An Ibis Table or date Column expression. date_col Name of the date column to modify. number Number of units to add (can be negative). If a string, it is interpreted as a column name containing the number. interval The unit of time: "day" (default), "month", or "year".

Returns

ir.Column A new date column expression with the interval added.

Examples

import ibis t = ibis.table({"start_date": "date", "days_offset": "int64"}, name="events")

Add 30 days to start_date

new_date = dateadd(t, "start_date", 30)

Add a column-based number of years

new_date = dateadd(t, "start_date", "days_offset", interval="day")

datediff

datediff

datediff(
    table: Table,
    start_col: str,
    end_col: str,
    *,
    interval: IntervalUnit = "day",
) -> ir.Column

Compute the difference between two date columns.

Parameters

table An Ibis Table expression containing both date columns. start_col Name of the start date column. end_col Name of the end date column. interval The unit for the difference: "day" (default), "month", or "year".

Returns

ir.Column An integer column expression with the difference in the specified units.

Notes

For "day" interval, this returns end_col - start_col in whole days. For "month" and "year", it uses calendar-based computation (matching R's clock::date_count_between):

  • Months: (year_end * 12 + month_end) - (year_start * 12 + month_start), adjusted for day-of-month.
  • Years: year_end - year_start, adjusted for month+day.

Examples

import ibis t = ibis.table( ... {"start_date": "date", "end_date": "date"}, name="periods" ... ) days_diff = datediff(t, "start_date", "end_date") years_diff = datediff(t, "start_date", "end_date", interval="year")

datepart

datepart

datepart(
    table: Table, date_col: str, part: DatePartUnit = "year"
) -> ir.Column

Extract a part (day, month, year) from a date column.

Parameters

table An Ibis Table expression. date_col Name of the date column. part The part to extract: "year" (default), "month", or "day".

Returns

ir.Column An integer column expression with the extracted part.

Examples

import ibis t = ibis.table({"birth_date": "date"}, name="person") birth_year = datepart(t, "birth_date", "year") birth_month = datepart(t, "birth_date", "month")

dateadd_polars

dateadd_polars

dateadd_polars(
    df: DataFrame | LazyFrame,
    date_col: str,
    number: int | str,
    *,
    interval: IntervalUnit = "day",
    result_col: str | None = None,
) -> pl.DataFrame | pl.LazyFrame

Add a time interval to a date column in a Polars DataFrame.

Parameters

df A Polars DataFrame or LazyFrame. date_col Name of the date column. number Number of units to add, or name of a column containing the number. interval The unit: "day" (default), "month", or "year". result_col Name for the result column. Defaults to date_col (in-place).

Returns

pl.DataFrame | pl.LazyFrame The DataFrame with the new/modified date column.

datediff_polars

datediff_polars

datediff_polars(
    df: DataFrame | LazyFrame,
    start_col: str,
    end_col: str,
    *,
    interval: IntervalUnit = "day",
    result_col: str = "date_diff",
) -> pl.DataFrame | pl.LazyFrame

Compute date difference in a Polars DataFrame.

Parameters

df A Polars DataFrame or LazyFrame. start_col Name of the start date column. end_col Name of the end date column. interval The unit: "day" (default), "month", or "year". result_col Name for the result column (default "date_diff").

Returns

pl.DataFrame | pl.LazyFrame The DataFrame with a new integer column containing the difference.

Analytics

summarise_quantile

summarise_quantile

DBMS-independent quantile estimation.

Provides summarise_quantile() which computes quantiles using a cumulative-sum approach that works on any database backend (including those without native PERCENTILE_CONT).

Equivalent to R's summariseQuantile2().

summarise_quantile

summarise_quantile(
    data: Table | DataFrame | LazyFrame,
    columns: str | list[str],
    probs: list[float],
    *,
    group_by: str | list[str] | None = None,
) -> ir.Table | pl.DataFrame

Compute quantiles for one or more numeric columns.

Uses a cumulative-sum / inverse-CDF approach (equivalent to quantile(type=1) in R) that generates pure SQL and works on all database backends.

Parameters

data An Ibis table, Polars DataFrame, or Polars LazyFrame. columns Column name(s) to compute quantiles for. probs Probability values in [0, 1]. group_by Optional grouping column(s).

Returns

ir.Table | pl.DataFrame One row per group (or a single row if ungrouped), with columns named q{pct:02d}_{col} (e.g. q25_age, q50_age).

Raises

ValueError If probs contains values outside [0, 1], or columns is empty.

compute_data_hash

compute_data_hash

compute_data_hash(cdm: CdmReference) -> pl.DataFrame

Compute an MD5 hash per CDM table for change tracking.

For each standard CDM table, computes the row count and the number of distinct values in a key column, then hashes those values to produce a fingerprint. A change in the hash between runs indicates the data has changed.

Parameters

cdm A CdmReference.

Returns

pl.DataFrame A DataFrame with columns: cdm_name, table_name, table_row_count, unique_column, n_unique_values, table_hash, compute_time_secs.

benchmark

benchmark

CDM benchmark — timed diagnostic queries.

Provides benchmark() to run a set of standard queries against a CDM and measure execution time. Equivalent to R's benchmarkCDMConnector().

benchmark

benchmark(cdm: CdmReference) -> pl.DataFrame

Run benchmark queries against a CDM and return timings.

Executes a standard set of queries (distinct count, group-by, joins, collect) against vocab and clinical tables, timing each.

Parameters

cdm A CdmReference (database-backed recommended for meaningful timings).

Returns

pl.DataFrame A DataFrame with columns: task, time_taken_secs, time_taken_mins, dbms, person_n.