Loading Data
Here is a tutorial for loading your data which is the first step in the EDS-TeVa usage workflow.
import os
from datetime import datetime
3 classes are available to facilitate data access:
HiveData
: Getting data from a Hive cluster, returningKoalas
DataFrames.LocalData
: Getting data from tables saved on disk, returningPandas
DataFrames.PostgresData
: Getting data from a PostGreSQL DB, returningPandas
DataFrames.
from edsteva.io import HiveData, LocalData, PostgresData
Loading from Hive: HiveData
The HiveData
class expects two parameters:
- A
SparkSession
variable - The name of the Database to connect to
Using Spark kernels
All kernels designed to use Spark are configured to expose 3 variables at startup:
spark
, the current SparkSessionsc
, the current SparkContextsql
, a function to execute SQL code on the Hive Database.
In this case you can just provide the spark
variable to HiveData
!
If needed, the following snippet allows to create the necessary variables:
from pyspark import SparkConf, SparkContext
from pyspark.sql.session import SparkSession
conf = SparkConf()
sc = SparkContext(conf=conf)
spark = SparkSession.builder \
.enableHiveSupport() \
.getOrCreate()
sql = spark.sql
Alternatively, we propose an all-in-one function that creates the necessary variables adequately configured to use Koalas.
from edsteva import improve_performances
spark, sc, sql = improve_performances()
The class HiveData
provides a convenient interface to OMOP data stored in Hive.
The OMOP tables can be accessed as attribute and they are represented as Koalas DataFrames.
You simply need to mention your Hive database name.
db_name = "my_db"
data = HiveData(
database_name=db_name,
spark_session=spark
)
By default, only a subset of tables are added as attributes:
data.available_tables
Koalas
DataFrames, like Spark
DataFrames, rely on a lazy execution plan: As long as no data needs to be specifically collected, saved or displayed, no code is executed. It is simply saved for a later execution.
The main interest of Koalas DataFrames is that you can use (most of) the Pandas API:
person = data.person
person.drop(columns = ['person_id']).head()
person['is_over_50'] = (person['birth_datetime'] >= datetime(1971,1,1))
stats = (
person
.groupby('is_over_50')
.person_id
.count()
)
Once data has been sufficiently aggregated, it can be converted back to Pandas, e.g. for plotting.
stats_pd = stats.to_pandas()
Similarily, if you want to work on the Spark
DataFrame instead, a similar method is available:
person_spark = person.to_spark()
Persisting/Reading a sample to/from disk: LocalData
Working with Pandas DataFrame is, when possible, more convenient.
You have the possibility to save your database or at least a subset of it.
Doing so allows you to work on it later without having to go through Spark
again.
Careful with cohort size
Do not save it if your cohort is big: This saves all available tables on disk.
For instance, let us define a dummy subset of 1000 patients:
visits = data.visit_occurrence
selected_visits = (
visits
.loc[visits["stay_source_value"] == "MCO"]
)
sample_patients = (
selected_visits["person_id"]
.drop_duplicates()
.head(1000)
.to_list()
)
And save every table restricted to this small cohort as a parquet
file:
folder= os.path.abspath(MY_FOLDER_PATH)
os.makedirs(folder, exist_ok=True)
tables_to_save = ["person", "visit_detail", "visit_occurrence"]
data.persist_tables_to_folder(folder,
tables=tables_to_save,
person_ids=sample_patients)
Once you saved some data to disk, a dedicated class can be used to access it:
The class LocalData
can be used to load OMOP data from a folder containing several parquet files. The tables
are accessed as attributes and are returned as Pandas DataFrame.
Warning
In this case, the whole table will be loaded into memory on a single jupyter server. Consequently it is advised to only use this for small datasets.
data = LocalData(folder)
data.available_tables
person = data.person
print(f"type: {type(person)}")
print(f"shape: {person.shape}")
Loading from PostGres: PostgresData
OMOP data can be stored in a PostreSQL database. The PostgresData
class provides a convinient interface to it.
Note
This class relies on the file ~/.pgpass
that contains your identifiers for several databases.
data = PostgresData(dbname=DB,
schema="omop",
user=USER)
data.read_sql("select count(*) from person")