improve_performances(to_add_conf: List[Tuple[str, str]] = [], quiet_spark: bool = True, app_name: str = '') -> Tuple[SparkSession, SparkContext, SparkSession.sql]
(Re)defines various Spark variable with some configuration changes
to improve performances by enabling Arrow
This has to be done
- Before launching a SparkCOntext
- Before importing Koalas
Those two points are being taken care on this function.
If a SparkSession already exists, it will copy its configuration before
creating a new one
Source code in eds_scikit/io/improve_performance.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121 | def improve_performances(
to_add_conf: List[Tuple[str, str]] = [],
quiet_spark: bool = True,
app_name: str = "",
) -> Tuple[SparkSession, SparkContext, SparkSession.sql]:
"""
(Re)defines various Spark variable with some configuration changes
to improve performances by enabling Arrow
This has to be done
- Before launching a SparkCOntext
- Before importing Koalas
Those two points are being taken care on this function.
If a SparkSession already exists, it will copy its configuration before
creating a new one
Returns
-------
Tuple of
- A SparkSession
- The associated SparkContext
- The associated ``sql`` object to run SQL queries
"""
# Check if a spark Session is up
global spark, sc, sql
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
if quiet_spark:
sc.setLogLevel("ERROR")
conf = sc.getConf()
# Synchronizing TimeZone
tz = os.environ.get("TZ", "UTC")
os.environ["TZ"] = tz
time.tzset()
to_add_conf.extend(
[
("spark.app.name", f"{os.environ.get('USER')}_{app_name}_scikit"),
("spark.sql.session.timeZone", tz),
("spark.sql.execution.arrow.enabled", "true"),
("spark.sql.execution.arrow.pyspark.enabled", "true"),
]
)
for key, value in to_add_conf:
conf.set(key, value)
# Stopping context to add necessary env variables
sc.stop()
spark.stop()
set_env_variables()
spark = SparkSession.builder.enableHiveSupport().config(conf=conf).getOrCreate()
sc = spark.sparkContext
if quiet_spark:
sc.setLogLevel("ERROR")
sql = spark.sql
koalas_options()
return spark, sc, sql
|