Skip to content

Loading Data

Here is a tutorial for loading your data which is the first step in the EDS-TeVa usage workflow.

import os
from datetime import datetime

3 classes are available to facilitate data access:

  • HiveData: Getting data from a Hive cluster, returning Koalas DataFrames.
  • LocalData: Getting data from tables saved on disk, returning Pandas DataFrames.
  • PostgresData: Getting data from a PostGreSQL DB, returning Pandas DataFrames.
from edsteva.io import HiveData, LocalData, PostgresData

Loading from Hive: HiveData

The HiveData class expects two parameters:

  • A SparkSession variable
  • The name of the Database to connect to

Using Spark kernels

All kernels designed to use Spark are configured to expose 3 variables at startup:

  • spark, the current SparkSession
  • sc, the current SparkContext
  • sql, a function to execute SQL code on the Hive Database.

In this case you can just provide the spark variable to HiveData !

If needed, the following snippet allows to create the necessary variables:

from pyspark import SparkConf, SparkContext
from pyspark.sql.session import SparkSession

conf = SparkConf()
sc = SparkContext(conf=conf)
spark = SparkSession.builder \
                    .enableHiveSupport() \
                    .getOrCreate()
sql = spark.sql

Alternatively, we propose an all-in-one function that creates the necessary variables adequately configured to use Koalas.

from edsteva import improve_performances

spark, sc, sql = improve_performances()

The class HiveData provides a convenient interface to OMOP data stored in Hive.
The OMOP tables can be accessed as attribute and they are represented as Koalas DataFrames. You simply need to mention your Hive database name.

db_name = "my_db"
data = HiveData(
    database_name=db_name,
    spark_session=spark
    )

By default, only a subset of tables are added as attributes:

data.available_tables
['care_site',
 'concept',
 'condition_occurrence',
 'person',
 'procedure_occurrence',
 'visit_detail',
 'visit_occurrence']

Koalas DataFrames, like Spark DataFrames, rely on a lazy execution plan: As long as no data needs to be specifically collected, saved or displayed, no code is executed. It is simply saved for a later execution.
The main interest of Koalas DataFrames is that you can use (most of) the Pandas API:

person = data.person
person.drop(columns = ['person_id']).head()
location_id year_of_birth month_of_birth day_of_birth birth_datetime death_datetime gender_source_value gender_source_concept_id cdm_source
0 3347087777 1949 8 2 1949-08-02 NaT f 2008119903 ORBIS
1 9818741928 1975 7 6 1975-07-06 NaT m 2008119900 ORBIS
2 3345464435 1990 9 7 1990-09-07 NaT f 2008119903 ORBIS
3 3346060919 1964 5 18 1964-05-18 NaT f 2008119903 ORBIS
4 3347197472 1990 2 2 1990-02-02 NaT m 2008119900 ORBIS
person['is_over_50'] = (person['birth_datetime'] >= datetime(1971,1,1))

stats = (
    person
    .groupby('is_over_50')
    .person_id
    .count()
)

Once data has been sufficiently aggregated, it can be converted back to Pandas, e.g. for plotting.

stats_pd = stats.to_pandas()

Similarily, if you want to work on the Spark DataFrame instead, a similar method is available:

person_spark = person.to_spark()

Persisting/Reading a sample to/from disk: LocalData

Working with Pandas DataFrame is, when possible, more convenient.
You have the possibility to save your database or at least a subset of it.
Doing so allows you to work on it later without having to go through Spark again.

Careful with cohort size

Do not save it if your cohort is big: This saves all available tables on disk.

For instance, let us define a dummy subset of 1000 patients:

visits = data.visit_occurrence

selected_visits = (
    visits
    .loc[visits["stay_source_value"] == "MCO"]
)

sample_patients = (
    selected_visits["person_id"]
    .drop_duplicates()
    .head(1000)
    .to_list()
)

And save every table restricted to this small cohort as a parquet file:

folder= os.path.abspath(MY_FOLDER_PATH)
os.makedirs(folder, exist_ok=True)

tables_to_save = ["person", "visit_detail", "visit_occurrence"]

data.persist_tables_to_folder(folder, 
                              tables=tables_to_save,
                              person_ids=sample_patients)

Once you saved some data to disk, a dedicated class can be used to access it:
The class LocalData can be used to load OMOP data from a folder containing several parquet files. The tables are accessed as attributes and are returned as Pandas DataFrame.

Warning

In this case, the whole table will be loaded into memory on a single jupyter server. Consequently it is advised to only use this for small datasets.

data = LocalData(folder)
data.available_tables
['visit_occurrence', 'visit_detail', 'person']
person = data.person
print(f"type: {type(person)}")
print(f"shape: {person.shape}")
type: <class 'pandas.core.frame.DataFrame'>
shape: (1000, 10)

Loading from PostGres: PostgresData

OMOP data can be stored in a PostreSQL database. The PostgresData class provides a convinient interface to it.

Note

This class relies on the file ~/.pgpass that contains your identifiers for several databases.

data = PostgresData(dbname=DB, 
                    schema="omop", 
                    user=USER)
data.read_sql("select count(*) from person")
count
0 12688670