You can download this notebook directly here
IO: Getting Data
3 classes are available to facilitate data access:
HiveData
: Getting data from a Hive cluster, returningKoalas
DataFramesPandasData
: Getting data from tables saved on disk, returningPandas
DataFramesPostgresData
: Getting data from a PostGreSQL DB, returningPandas
DataFrames
from eds_scikit.io import HiveData, PandasData, PostgresData
Loading from Hive: HiveData
The HiveData
class expects two parameters:
- A
SparkSession
variable - The name of the Database to connect to
Using Spark kernels
All kernels designed to use Spark are configured to expose 3 variables at startup:
spark
, the current SparkSessionsc
, the current SparkContextsql
, a function to execute SQL code on the Hive Database.
In this case you can just provide the spark
variable to HiveData
!
Working with an I2B2 database
To use a built-in I2B2 to OMOP connector, specify database_type="I2b2"
when instantiating HiveData
If needed, the following snippet allows to create the necessary variables:
from pyspark import SparkConf, SparkContext
from pyspark.sql.session import SparkSession
conf = SparkConf()
sc = SparkContext(conf=conf)
spark = SparkSession.builder \
.enableHiveSupport() \
.getOrCreate()
sql = spark.sql
The class HiveData
provides a convenient interface to OMOP data stored in Hive.
The OMOP tables can be accessed as attribute and they are represented as Koalas DataFrames.
You simply need to mention your Hive database name.
data = HiveData(
"cse_210038_20221219",#DB_NAME,
spark,
database_type="I2B2",
)
By default, only a subset of tables are added as attributes:
data.available_tables
Koalas
DataFrames, like Spark
DataFrames, rely on a lazy execution plan: As long as no data needs to be specifically collected, saved or displayed, no code is executed. It is simply saved for a later execution.
The main interest of Koalas DataFrames is that you can use (most of) the Pandas API:
person = data.person
person.drop(columns = ['person_id']).head()
from datetime import datetime
person['is_over_50'] = (person['birth_datetime'] >= datetime(1971,1,1))
stats = (
person
.groupby('is_over_50')
.person_id
.count()
)
Once data has been sufficiently aggregated, it can be converted back to Pandas, e.g. for plotting.
stats_pd = stats.to_pandas()
stats_pd
Similarily, if you want to work on the Spark
DataFrame instead, a similar method is available:
person_spark = person.to_spark()
Persisting/Reading a sample to/from disk: PandasData
Working with Pandas DataFrame is, when possible, more convenient.
You have the possibility to save your database or at least a subset of it.
Doing so allows you to work on it later without having to go through Spark
again.
Careful with cohort size
Do not save it if your cohort is big: This saves all available tables on disk.
For instance, let us define a dummy subset of 1000 patients:
visits = data.visit_occurrence
selected_visits = (
visits
.loc[visits["visit_source_value"] == "urgence"]
)
sample_patients = (
selected_visits["person_id"]
.drop_duplicates()
.head(1000)
)
And save every table restricted to this small cohort as a parquet
file:
MY_FOLDER_PATH = "./test_cohort"
import os
folder = os.path.abspath(MY_FOLDER_PATH)
tables_to_save = ["person", "visit_detail", "visit_occurrence"]
data.persist_tables_to_folder(
folder,
tables=tables_to_save,
person_ids=sample_patients
)
Once you saved some data to disk, a dedicated class can be used to access it:
The class PandasData
can be used to load OMOP data from a folder containing several parquet files. The tables
are accessed as attributes and are returned as Pandas DataFrame.
Warning: in this case, the whole table will be loaded into memory on a single jupyter server. Consequently it is advised to only use this for small datasets.
data = PandasData(folder)
data.available_tables
person = data.person
print(f"type: {type(person)}")
print(f"shape: {person.shape}")
Loading from PostGres: PostgresData
OMOP data can be stored in a PostgreSQL database. The PostgresData
class provides a convinient interface to it.
Note : this class relies on the file ~/.pgpass
that contains your identifiers for several databases.
data = PostgresData(
dbname=DB,
schema="omop",
user=USER,
)
data.read_sql("select count(*) from person")