improve_performances(
to_add_conf: List[Tuple[str, str]] = None,
quiet_spark: bool = True,
) -> Tuple[SparkSession, SparkContext, SparkSession.sql]
(Re)defines various Spark variable with some configuration changes
to improve performances by enabling Arrow
This has to be done
- Before launching a SparkCOntext
- Before importing Koalas
Those two points are being taken care on this function.
If a SparkSession already exists, it will copy its configuration before
creating a new one
RETURNS |
DESCRIPTION |
Tuple of
|
|
- A SparkSession
|
|
- The associated SparkContext
|
|
- The associated ``sql`` object to run SQL queries
|
|
Source code in edsteva/__init__.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129 | def improve_performances(
to_add_conf: List[Tuple[str, str]] = None,
quiet_spark: bool = True,
) -> Tuple[SparkSession, SparkContext, SparkSession.sql]:
"""
(Re)defines various Spark variable with some configuration changes
to improve performances by enabling Arrow
This has to be done
- Before launching a SparkCOntext
- Before importing Koalas
Those two points are being taken care on this function.
If a SparkSession already exists, it will copy its configuration before
creating a new one
Returns
-------
Tuple of
- A SparkSession
- The associated SparkContext
- The associated ``sql`` object to run SQL queries
"""
# Check if a spark Session is up
global spark, sc, sql
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
if quiet_spark:
sc.setLogLevel("ERROR")
conf = sc.getConf()
# Synchronizing TimeZone
tz = os.environ.get("TZ", "UTC")
os.environ["TZ"] = tz
time.tzset()
if to_add_conf is None:
to_add_conf = []
to_add_conf.extend(
[
("spark.app.name", f"{os.environ.get('USER')}_scikit"),
("spark.sql.session.timeZone", tz),
("spark.sql.execution.arrow.enabled", "true"),
("spark.sql.execution.arrow.pyspark.enabled", "true"),
]
)
for key, value in to_add_conf:
conf.set(key, value)
# Stopping context to add necessary env variables
sc.stop()
spark.stop()
set_env_variables()
spark = SparkSession.builder.enableHiveSupport().config(conf=conf).getOrCreate()
sc = spark.sparkContext
if quiet_spark:
sc.setLogLevel("ERROR")
sql = spark.sql
koalas_options()
return spark, sc, sql
|