defkoalas_options()->None:""" Set necessary options to optimise Koalas """# Reloading Koalas to use the new configurationks=sys.modules.get("databricks.koalas",None)ifksisnotNone:importlib.reload(ks)else:# pragma: no coverimportdatabricks.koalasasksks.set_option("compute.default_index_type","distributed")ks.set_option("compute.ops_on_diff_frames",True)
(Re)defines various Spark variable with some configuration changes
to improve performances by enabling Arrow
This has to be done
- Before launching a SparkCOntext
- Before importing Koalas
Those two points are being taken care on this function.
If a SparkSession already exists, it will copy its configuration before
creating a new one
RETURNS
DESCRIPTION
Tuple of
- A SparkSession
- The associated SparkContext
- The associated ``sql`` object to run SQL queries
defimprove_performances(to_add_conf:List[Tuple[str,str]]=None,quiet_spark:bool=True,)->Tuple[SparkSession,SparkContext,SparkSession.sql]:""" (Re)defines various Spark variable with some configuration changes to improve performances by enabling Arrow This has to be done - Before launching a SparkCOntext - Before importing Koalas Those two points are being taken care on this function. If a SparkSession already exists, it will copy its configuration before creating a new one Returns ------- Tuple of - A SparkSession - The associated SparkContext - The associated ``sql`` object to run SQL queries """# Check if a spark Session is upglobalspark,sc,sqlspark=SparkSession.builder.getOrCreate()sc=spark.sparkContextifquiet_spark:sc.setLogLevel("ERROR")conf=sc.getConf()# Synchronizing TimeZonetz=os.environ.get("TZ","UTC")os.environ["TZ"]=tztime.tzset()ifto_add_confisNone:to_add_conf=[]to_add_conf.extend([("spark.app.name",f"{os.environ.get('USER')}_scikit"),("spark.sql.session.timeZone",tz),("spark.sql.execution.arrow.enabled","true"),("spark.sql.execution.arrow.pyspark.enabled","true"),])forkey,valueinto_add_conf:conf.set(key,value)# Stopping context to add necessary env variablessc.stop()spark.stop()set_env_variables()spark=SparkSession.builder.enableHiveSupport().config(conf=conf).getOrCreate()sc=spark.sparkContextifquiet_spark:sc.setLogLevel("ERROR")sql=spark.sqlkoalas_options()returnspark,sc,sql