Skip to content

eds_scikit.io.improve_performance

koalas_options

koalas_options() -> None

Set necessary options to optimise Koalas

Source code in eds_scikit/io/improve_performance.py
31
32
33
34
35
36
37
38
39
40
41
def koalas_options() -> None:
    """
    Set necessary options to optimise Koalas
    """

    # Reloading Koalas to use the new configuration
    ks = load_koalas()

    ks.set_option("compute.default_index_type", "distributed")
    ks.set_option("compute.ops_on_diff_frames", True)
    ks.set_option("display.max_rows", 50)

pyarrow_fix

pyarrow_fix()

Fixing error 'pyarrow has no attributes open_stream' due to PySpark 2 incompatibility with pyarrow > 0.17

Source code in eds_scikit/io/improve_performance.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def pyarrow_fix():
    """
    Fixing error 'pyarrow has no attributes open_stream' due to PySpark 2 incompatibility with pyarrow > 0.17
    """

    # Setting path to our patched pyarrow module
    pyarrow.open_stream = pyarrow.ipc.open_stream

    sys.path.insert(
        0, (Path(__file__).parent / "package-override").absolute().as_posix()
    )
    os.environ["PYTHONPATH"] = ":".join(sys.path)

    # Setting this path for Pyspark executors
    global spark, sc, sql

    spark = SparkSession.builder.getOrCreate()

    conf = spark.sparkContext.getConf()
    conf.set(
        "spark.executorEnv.PYTHONPATH",
        f"{Path(__file__).parent.parent}/package-override:{conf.get('spark.executorEnv.PYTHONPATH')}",
    )
    spark = SparkSession.builder.enableHiveSupport().config(conf=conf).getOrCreate()

    sc = spark.sparkContext

    sql = spark.sql

improve_performances

improve_performances(to_add_conf: List[Tuple[str, str]] = [], quiet_spark: bool = True, app_name: str = '') -> Tuple[SparkSession, SparkContext, SparkSession.sql]

(Re)defines various Spark variable with some configuration changes to improve performances by enabling Arrow This has to be done - Before launching a SparkCOntext - Before importing Koalas Those two points are being taken care on this function. If a SparkSession already exists, it will copy its configuration before creating a new one

RETURNS DESCRIPTION
Tuple of
- A SparkSession
- The associated SparkContext
- The associated
Source code in eds_scikit/io/improve_performance.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
def improve_performances(
    to_add_conf: List[Tuple[str, str]] = [],
    quiet_spark: bool = True,
    app_name: str = "",
) -> Tuple[SparkSession, SparkContext, SparkSession.sql]:
    """
    (Re)defines various Spark variable with some configuration changes
    to improve performances by enabling Arrow
    This has to be done
    - Before launching a SparkCOntext
    - Before importing Koalas
    Those two points are being taken care on this function.
    If a SparkSession already exists, it will copy its configuration before
    creating a new one

    Returns
    -------
    Tuple of
    - A SparkSession
    - The associated SparkContext
    - The associated ``sql`` object to run SQL queries
    """

    # Check if a spark Session is up
    global spark, sc, sql

    spark = SparkSession.builder.getOrCreate()
    sc = spark.sparkContext

    if quiet_spark:
        sc.setLogLevel("ERROR")

    conf = sc.getConf()

    # Synchronizing TimeZone
    tz = os.environ.get("TZ", "UTC")
    os.environ["TZ"] = tz
    time.tzset()

    to_add_conf.extend(
        [
            ("spark.app.name", f"{os.environ.get('USER')}_{app_name}_scikit"),
            ("spark.sql.session.timeZone", tz),
            ("spark.sql.execution.arrow.enabled", "true"),
            ("spark.sql.execution.arrow.pyspark.enabled", "true"),
        ]
    )

    for key, value in to_add_conf:
        conf.set(key, value)

    # Stopping context to add necessary env variables
    sc.stop()
    spark.stop()

    set_env_variables()

    spark = SparkSession.builder.enableHiveSupport().config(conf=conf).getOrCreate()

    sc = spark.sparkContext

    if quiet_spark:
        sc.setLogLevel("ERROR")

    sql = spark.sql

    koalas_options()

    return spark, sc, sql
Back to top