Skip to content

You can download this notebook directly here

IO: Getting Data

3 classes are available to facilitate data access:

  • HiveData: Getting data from a Hive cluster, returning Koalas DataFrames
  • PandasData: Getting data from tables saved on disk, returning Pandas DataFrames
  • PostgresData: Getting data from a PostGreSQL DB, returning Pandas DataFrames
from eds_scikit.io import HiveData, PandasData, PostgresData

Loading from Hive: HiveData

The HiveData class expects two parameters:

  • A SparkSession variable
  • The name of the Database to connect to

Using Spark kernels

All kernels designed to use Spark are configured to expose 3 variables at startup:

  • spark, the current SparkSession
  • sc, the current SparkContext
  • sql, a function to execute SQL code on the Hive Database.

In this case you can just provide the spark variable to HiveData !

Working with an I2B2 database

To use a built-in I2B2 to OMOP connector, specify database_type="I2b2" when instantiating HiveData

If needed, the following snippet allows to create the necessary variables:

from pyspark import SparkConf, SparkContext
from pyspark.sql.session import SparkSession

conf = SparkConf()
sc = SparkContext(conf=conf)
spark = SparkSession.builder \
                    .enableHiveSupport() \
                    .getOrCreate()
sql = spark.sql

The class HiveData provides a convenient interface to OMOP data stored in Hive.
The OMOP tables can be accessed as attribute and they are represented as Koalas DataFrames. You simply need to mention your Hive database name.

data = HiveData(
    "cse_210038_20221219",#DB_NAME,
    spark,
    database_type="I2B2",
)

By default, only a subset of tables are added as attributes:

data.available_tables
['concept',
 'visit_detail',
 'note_deid',
 'person',
 'care_site',
 'visit_occurrence',
 'measurement',
 'procedure_occurrence',
 'condition_occurrence',
 'fact_relationship',
 'concept_relationship']

Koalas DataFrames, like Spark DataFrames, rely on a lazy execution plan: As long as no data needs to be specifically collected, saved or displayed, no code is executed. It is simply saved for a later execution.
The main interest of Koalas DataFrames is that you can use (most of) the Pandas API:

person = data.person
person.drop(columns = ['person_id']).head()
birth_datetime death_datetime gender_source_value cdm_source
0 1946-06-04 NaT m ORBIS
1 1940-01-21 2018-05-07 m ORBIS
2 1979-04-25 NaT m ORBIS
3 2007-10-13 NaT f ORBIS
4 ... ... ... ...
from datetime import datetime

person['is_over_50'] = (person['birth_datetime'] >= datetime(1971,1,1))

stats = (
    person
    .groupby('is_over_50')
    .person_id
    .count()
)

Once data has been sufficiently aggregated, it can be converted back to Pandas, e.g. for plotting.

stats_pd = stats.to_pandas()
stats_pd
is_over_50
True     132794
False     66808
Name: person_id, dtype: int64

Similarily, if you want to work on the Spark DataFrame instead, a similar method is available:

person_spark = person.to_spark()

Persisting/Reading a sample to/from disk: PandasData

Working with Pandas DataFrame is, when possible, more convenient.
You have the possibility to save your database or at least a subset of it.
Doing so allows you to work on it later without having to go through Spark again.

Careful with cohort size

Do not save it if your cohort is big: This saves all available tables on disk.

For instance, let us define a dummy subset of 1000 patients:

visits = data.visit_occurrence

selected_visits = (
    visits
    .loc[visits["visit_source_value"] == "urgence"]
)

sample_patients = (
    selected_visits["person_id"]
    .drop_duplicates()
    .head(1000)
)

And save every table restricted to this small cohort as a parquet file:

MY_FOLDER_PATH = "./test_cohort"
import os

folder = os.path.abspath(MY_FOLDER_PATH)

tables_to_save = ["person", "visit_detail", "visit_occurrence"]

data.persist_tables_to_folder(
    folder,
    tables=tables_to_save,
    person_ids=sample_patients
)

Once you saved some data to disk, a dedicated class can be used to access it:
The class PandasData can be used to load OMOP data from a folder containing several parquet files. The tables are accessed as attributes and are returned as Pandas DataFrame.

Warning: in this case, the whole table will be loaded into memory on a single jupyter server. Consequently it is advised to only use this for small datasets.

data = PandasData(folder)
data.available_tables
['visit_detail', 'visit_occurrence', 'person']
person = data.person
print(f"type: {type(person)}")
print(f"shape: {person.shape}")
type: <class 'pandas.core.frame.DataFrame'>
shape: (1000, 5)

Loading from PostGres: PostgresData

OMOP data can be stored in a PostgreSQL database. The PostgresData class provides a convinient interface to it.

Note : this class relies on the file ~/.pgpass that contains your identifiers for several databases.

data = PostgresData(
    dbname=DB, 
    schema="omop", 
    user=USER,
)

data.read_sql("select count(*) from person")