HiveData(database_name: str, spark_session: Optional[SparkSession] = None, person_ids: Optional[Iterable[int]] = None, tables_to_load: Optional[Union[Dict[str, Optional[List[str]]], List[str]]] = None, columns_to_load: Optional[Union[Dict[str, Optional[List[str]]], List[str]]] = None, database_type: Optional[str] = 'OMOP', prune_omop_date_columns: bool = True, cache: bool = True)
Bases: BaseData
Spark interface for OMOP data stored in a Hive database.
This class provides a simple access to data stored in Hive.
Data is returned as koalas dataframes that match the tables
stored in Hive.
PARAMETER |
DESCRIPTION |
database_name |
The name of you database in Hive. Ex: "cse_82727572"
TYPE:
str
|
spark_session |
If None, a SparkSession will be retrieved or created via SparkSession.builder.enableHiveSupport().getOrCreate()
TYPE:
pyspark.sql.SparkSession
DEFAULT:
None
|
person_ids |
An iterable of person_id that is used to define a subset of the database.
TYPE:
Optional[Iterable[int]]
DEFAULT:
None
|
tables_to_load |
deprecated
TYPE:
dict, default
DEFAULT:
None
|
columns_to_load |
deprecated
TYPE:
dict, default
DEFAULT:
None
|
database_type |
Whether to use the native OMOP schema or to convert I2B2 inputs to OMOP.
TYPE:
Optional[str]
DEFAULT:
'OMOP'
|
prune_omop_date_columns |
In OMOP, most date values are stored both in a <str>_date and <str>_datetime column
Koalas has trouble handling the date time, so we only keep the datetime column
TYPE:
bool
DEFAULT:
True
|
cache |
Whether to cache each table after preprocessing or not.
Will speed-up subsequent calculations, but can be long/infeasable for very large tables
TYPE:
bool
DEFAULT:
True
|
ATTRIBUTE |
DESCRIPTION |
person |
Hive data for table person as a koalas dataframe.
Other OMOP tables can also be accessed as attributes
TYPE:
koalas dataframe
|
available_tables |
names of OMOP tables that can be accessed as attributes with this
HiveData object.
TYPE:
list of str
|
Examples:
data = HiveData(database_name="edsomop_prod_a")
data.available_tables
# Out: ["person", "care_site", "condition_occurrence", ... ]
person = data.person
type(person)
# Out: databricks.koalas.frame.DataFrame
person["person_id"].count()
# Out: 12670874
This class can be used to create a subset of data for a given
list of person_id
. This is useful because the smaller dataset
can then be used to prototype more rapidly.
my_person_ids = [9226726, 2092082, ...]
data = HiveData(
spark_session=spark, database_name="edsomop_prod_a", person_ids=my_person_ids
)
data.person["person_id"].count()
# Out: 1000
tables_to_save = ["person", "visit_occurrence"]
data.persist_tables_to_folder("./cohort_sample_1000", table_names=tables_to_save)
# Out: writing /export/home/USER/cohort_sample_1000/person.parquet
# Out: writing /export/home/USER/cohort_sample_1000/visit_occurrence.parquet
# Out: ...
Source code in eds_scikit/io/hive.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145 | def __init__(
self,
database_name: str,
spark_session: Optional[SparkSession] = None,
person_ids: Optional[Iterable[int]] = None,
tables_to_load: Optional[
Union[Dict[str, Optional[List[str]]], List[str]]
] = None,
columns_to_load: Optional[
Union[Dict[str, Optional[List[str]]], List[str]]
] = None,
database_type: Optional[str] = "OMOP",
prune_omop_date_columns: bool = True,
cache: bool = True,
):
"""Spark interface for OMOP data stored in a Hive database.
This class provides a simple access to data stored in Hive.
Data is returned as koalas dataframes that match the tables
stored in Hive.
Parameters
----------
database_name : str
The name of you database in Hive. Ex: "cse_82727572"
spark_session : pyspark.sql.SparkSession
If None, a SparkSession will be retrieved or created via `SparkSession.builder.enableHiveSupport().getOrCreate()`
person_ids : Optional[Iterable[int]]
An iterable of `person_id` that is used to define a subset of the database.
tables_to_load : dict, default=None
*deprecated*
columns_to_load : dict, default=None
*deprecated*
database_type: Optional[str] = 'OMOP'. Must be 'OMOP' or 'I2B2'
Whether to use the native OMOP schema or to convert I2B2 inputs to OMOP.
prune_omop_date_columns: bool, default=True
In OMOP, most date values are stored both in a `<str>_date` and `<str>_datetime` column
Koalas has trouble handling the `date` time, so we only keep the `datetime` column
cache: bool, default=True
Whether to cache each table after preprocessing or not.
Will speed-up subsequent calculations, but can be long/infeasable for very large tables
Attributes
----------
person : koalas dataframe
Hive data for table `person` as a koalas dataframe.
Other OMOP tables can also be accessed as attributes
available_tables : list of str
names of OMOP tables that can be accessed as attributes with this
HiveData object.
Examples
--------
```python
data = HiveData(database_name="edsomop_prod_a")
data.available_tables
# Out: ["person", "care_site", "condition_occurrence", ... ]
person = data.person
type(person)
# Out: databricks.koalas.frame.DataFrame
person["person_id"].count()
# Out: 12670874
```
This class can be used to create a subset of data for a given
list of `person_id`. This is useful because the smaller dataset
can then be used to prototype more rapidly.
```python
my_person_ids = [9226726, 2092082, ...]
data = HiveData(
spark_session=spark, database_name="edsomop_prod_a", person_ids=my_person_ids
)
data.person["person_id"].count()
# Out: 1000
tables_to_save = ["person", "visit_occurrence"]
data.persist_tables_to_folder("./cohort_sample_1000", table_names=tables_to_save)
# Out: writing /export/home/USER/cohort_sample_1000/person.parquet
# Out: writing /export/home/USER/cohort_sample_1000/visit_occurrence.parquet
# Out: ...
```
"""
super().__init__()
if columns_to_load is not None:
logger.warning("'columns_to_load' is deprecated and won't be used")
if tables_to_load is not None:
logger.warning("'tables_to_load' is deprecated and won't be used")
self.spark_session = (
spark_session or SparkSession.builder.enableHiveSupport().getOrCreate()
)
self.database_name = database_name
if database_type not in ["I2B2", "OMOP"]:
raise ValueError(
f"`database_type` must be either 'I2B2' or 'OMOP'. Got {database_type}"
)
self.database_type = database_type
if self.database_type == "I2B2":
self.database_source = "cse" if "cse" in self.database_name else "edsprod"
self.omop_to_i2b2 = settings.i2b2_tables[self.database_source]
self.i2b2_to_omop = defaultdict(list)
for omop_table, i2b2_table in self.omop_to_i2b2.items():
self.i2b2_to_omop[i2b2_table].append(omop_table)
self.prune_omop_date_columns = prune_omop_date_columns
self.cache = cache
self.user = os.environ["USER"]
self.person_ids, self.person_ids_df = self._prepare_person_ids(person_ids)
self.available_tables = self.list_available_tables()
self._tables = {}
|
persist_tables_to_folder
persist_tables_to_folder(folder: str, person_ids: Optional[Iterable[int]] = None, tables: List[str] = None, overwrite: bool = False) -> None
Save OMOP tables as parquet files in a given folder.
PARAMETER |
DESCRIPTION |
folder |
path to folder where the tables will be written.
TYPE:
str
|
person_ids : iterable
person_ids to keep in the subcohort.
tables : list of str, default None
list of table names to save. Default value is
data:~eds_scikit.io.settings.default_tables_to_save
.
overwrite : bool, default=False
whether to overwrite files if 'folder' already exists.
Source code in eds_scikit/io/hive.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345 | def persist_tables_to_folder(
self,
folder: str,
person_ids: Optional[Iterable[int]] = None,
tables: List[str] = None,
overwrite: bool = False,
) -> None:
"""Save OMOP tables as parquet files in a given folder.
Parameters
----------
folder : str
path to folder where the tables will be written.
person_ids : iterable
person_ids to keep in the subcohort.
tables : list of str, default None
list of table names to save. Default value is
:py:data:`~eds_scikit.io.settings.default_tables_to_save`.
overwrite : bool, default=False
whether to overwrite files if 'folder' already exists.
"""
# Manage tables
if tables is None:
tables = settings.default_tables_to_save
unknown_tables = [
table for table in tables if table not in self.available_tables
]
if unknown_tables:
raise ValueError(
f"The following tables are not available : {str(unknown_tables)}"
)
# Create folder
folder = Path(folder).absolute()
if folder.exists() and overwrite:
shutil.rmtree(folder)
folder.mkdir(parents=True, mode=0o766)
assert os.path.exists(folder) and os.path.isdir(
folder
), f"Folder {folder} not found."
# TODO: remove everything in this folder that is a valid
# omop table. This prevents a user from having a
# folder containing datasets generated from different
# patient subsets.
# TODO: maybe check how much the user wants to persist
# to disk. Set a limit on the number of patients in the cohort ?
if person_ids is not None:
person_ids = self._prepare_person_ids(person_ids, return_df=False)
database_path = self.get_db_path()
for idx, table in enumerate(tables):
if self.database_type == "I2B2":
table_path = self._hdfs_write_orc_to_parquet(
table, person_ids, overwrite
)
else:
table_path = os.path.join(database_path, table)
df = self.get_table_from_parquet(table_path, person_ids=person_ids)
local_file_path = os.path.join(folder, f"{table}.parquet")
df.to_parquet(
local_file_path,
allow_truncated_timestamps=True,
coerce_timestamps="ms",
)
logger.info(
f"({idx+1}/{len(tables)}) Table {table} saved at "
f"{local_file_path} (N={len(df)})."
)
|
get_db_path
Get the HDFS path of the database
Source code in eds_scikit/io/hive.py
374
375
376
377
378
379
380
381 | def get_db_path(self):
"""Get the HDFS path of the database"""
return (
self.spark_session.sql(f"DESCRIBE DATABASE EXTENDED {self.database_name}")
.filter("database_description_item=='Location'")
.collect()[0]
.database_description_value
)
|