defget_i2b2_table(spark_session:SparkSession,db_name:str,db_source:str,table:str)->SparkDataFrame:""" Convert a Spark table from i2b2 to OMOP format. Parameters ---------- db_name: str Name of the database where the data is stored. table: str Name of the table to extract. Returns ------- df: Spark DataFrame Spark DataFrame extracted from the i2b2 database given and converted to OMOP standard. """i2b2_table_name=i2b2_tables[db_source][table]# Dictionary of omop_col -> i2b2_colcolumns=i2b2_renaming.get(table)# Can be None if creating a table from scratch (e.g. concept_relationshipifcolumnsisnotNone:query=f"describe {db_name}.{i2b2_table_name}"available_columns=set(spark_session.sql(query).toPandas().col_name.tolist())ifdb_source=="cse":columns.pop("i2b2_action",None)cols=", ".join([f"{i2b2} AS {omop}"foromop,i2b2incolumns.items()ifi2b2inavailable_columns])query=f"SELECT {cols} FROM {db_name}.{i2b2_table_name}"df=spark_session.sql(query)# Special mapping for i2b2 :# CIM10iftable=="condition_occurrence":df=df.withColumn("condition_source_value",F.substring(F.col("condition_source_value"),7,20),)# CCAMeliftable=="procedure_occurrence":df=df.withColumn("procedure_source_value",F.substring(F.col("procedure_source_value"),6,20),)# Visitseliftable=="visit_occurrence":df=df.withColumn("visit_source_value",mapping_dict(visit_type_mapping,"Non Renseigné")(F.col("visit_source_value")),)ifdb_source=="cse":df=df.withColumn("row_status_source_value",F.lit("Actif"))df=df.withColumn("visit_occurrence_source_value",df["visit_occurrence_id"])else:df=df.withColumn("row_status_source_value",F.when(F.col("row_status_source_value").isin([-1,-2]),"supprimé").otherwise("Actif"),)# Retrieve Hospital trigramufr=spark_session.sql(f"SELECT * FROM {db_name}.{i2b2_tables[db_source]['visit_detail']}")ufr=ufr.withColumn("care_site_id",F.substring(F.split(F.col("concept_cd"),":").getItem(1),1,3),)ufr=ufr.withColumnRenamed("encounter_num","visit_occurrence_id")ufr=ufr.drop_duplicates(subset=["visit_occurrence_id"])ufr=ufr.select(["visit_occurrence_id","care_site_id"])df=df.join(ufr,how="inner",on=["visit_occurrence_id"])# Patientseliftable=="person":df=df.withColumn("gender_source_value",mapping_dict(sex_cd_mapping,"Non Renseigné")(F.col("gender_source_value")),)# Documentseliftable.startswith("note"):df=df.withColumn("note_class_source_value",F.substring(F.col("note_class_source_value"),4,100),)ifdb_source=="cse":df=df.withColumn("row_status_source_value",F.lit("Actif"))else:df=df.withColumn("row_status_source_value",F.when(F.col("row_status_source_value")<0,"SUPP").otherwise("Actif"),)# Hospital trigramseliftable=="care_site":df=df.withColumn("care_site_type_source_value",F.lit("Hôpital"))df=df.withColumn("care_site_source_value",F.split(F.col("care_site_source_value"),":").getItem(1),)df=df.withColumn("care_site_id",F.substring(F.col("care_site_source_value"),1,3))df=df.drop_duplicates(subset=["care_site_id"])df=df.withColumn("care_site_short_name",mapping_dict(dict_code_UFR,"Non Renseigné")(F.col("care_site_id")),)# UFReliftable=="visit_detail":df=df.withColumn("care_site_id",F.split(F.col("care_site_id"),":").getItem(1))df=df.withColumn("visit_detail_type_source_value",F.lit("PASS"))df=df.withColumn("row_status_source_value",F.lit("Actif"))# measurementeliftable=="measurement":df=df.withColumn("measurement_source_concept_id",F.substring(F.col("measurement_source_concept_id"),5,20),).withColumn("row_status_source_value",F.lit("Validé"))# concepteliftable=="concept":df=(df.withColumn("concept_source_value",F.substring(F.col("concept_source_value"),5,20),# TODO: use regexp_extract to take substring after ':').withColumn("concept_id",F.col("concept_source_value")).withColumn("concept_code",F.col("concept_id")).withColumn("vocabulary_id",F.lit("ANABIO")))# Adding LOINCif"get_additional_i2b2_concept"inregistry.data.get_all():loinc_pd=registry.get("data","get_additional_i2b2_concept")()assertlen(loinc_pd.columns)==len(df.columns)loinc_pd=loinc_pd[df.columns]# for columns orderingdf=df.union(spark_session.createDataFrame(loinc_pd,df.schema,verifySchema=False)).cache()# fact_relationshipeliftable=="fact_relationship":# Retrieve UF informationdf=df.withColumn("fact_id_1",F.split(F.col("care_site_source_value"),":").getItem(1),)df=df.withColumn("domain_concept_id_1",F.lit(57))# Care_site domain# Retrieve hospital informationdf=df.withColumn("fact_id_2",F.substring(F.col("fact_id_1"),1,3))df=df.withColumn("domain_concept_id_2",F.lit(57))# Care_site domaindf=df.drop_duplicates(subset=["fact_id_1","fact_id_2"])# Only UF-Hospital relationships in i2b2df=df.withColumn("relationship_concept_id",F.lit(46233688))# Included ineliftable=="concept_relationship":data=[]schema=T.StructType([T.StructField("concept_id_1",T.StringType(),True),T.StructField("concept_id_2",T.StringType(),True),T.StructField("relationship_id",T.StringType(),True),])if"get_additional_i2b2_concept_relationship"inregistry.data.get_all():data=registry.get("data","get_additional_i2b2_concept_relationship")()df=spark_session.createDataFrame(data,schema).cache()returndf
defmapping_dict(mapping:Dict[str,str],default:str)->FunctionUDF:""" Returns a function that maps data according to a mapping dictionnary in a Spark DataFrame. Parameters ---------- mapping: Dict Mapping dictionnary default: str Value to return if the function input is not find in the mapping dictionnary. Returns ------- Callable Function that maps the values of Spark DataFrame column. """deff(x):returnmapping.get(x,default)returnF.udf(f)