improve_performances(to_add_conf: List[Tuple[str, str]] = [], quiet_spark: bool = True, app_name: str = '') -> Tuple[SparkSession, SparkContext, SparkSession.sql]
(Re)defines various Spark variable with some configuration changes
to improve performances by enabling Arrow
This has to be done
- Before launching a SparkCOntext
- Before importing Koalas
Those two points are being taken care on this function.
If a SparkSession already exists, it will copy its configuration before
creating a new one
RETURNS |
DESCRIPTION |
Tuple of
|
|
- A SparkSession
|
|
- The associated SparkContext
|
|
- The associated
|
|
Source code in eds_scikit/io/improve_performance.py
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152 | def improve_performances(
to_add_conf: List[Tuple[str, str]] = [],
quiet_spark: bool = True,
app_name: str = "",
) -> Tuple[SparkSession, SparkContext, SparkSession.sql]:
"""
(Re)defines various Spark variable with some configuration changes
to improve performances by enabling Arrow
This has to be done
- Before launching a SparkCOntext
- Before importing Koalas
Those two points are being taken care on this function.
If a SparkSession already exists, it will copy its configuration before
creating a new one
Returns
-------
Tuple of
- A SparkSession
- The associated SparkContext
- The associated ``sql`` object to run SQL queries
"""
# Check if a spark Session is up
global spark, sc, sql
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
if quiet_spark:
sc.setLogLevel("ERROR")
conf = sc.getConf()
# Synchronizing TimeZone
tz = os.environ.get("TZ", "UTC")
os.environ["TZ"] = tz
time.tzset()
to_add_conf.extend(
[
("spark.app.name", f"{os.environ.get('USER')}_{app_name}_scikit"),
("spark.sql.session.timeZone", tz),
("spark.sql.execution.arrow.enabled", "true"),
("spark.sql.execution.arrow.pyspark.enabled", "true"),
]
)
for key, value in to_add_conf:
conf.set(key, value)
# Stopping context to add necessary env variables
sc.stop()
spark.stop()
set_env_variables()
spark = SparkSession.builder.enableHiveSupport().config(conf=conf).getOrCreate()
sc = spark.sparkContext
if quiet_spark:
sc.setLogLevel("ERROR")
sql = spark.sql
koalas_options()
return spark, sc, sql
|