Skip to content

edsteva

koalas_options

koalas_options() -> None

Set necessary options to optimise Koalas

Source code in edsteva/__init__.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def koalas_options() -> None:
    """
    Set necessary options to optimise Koalas
    """

    # Reloading Koalas to use the new configuration
    ks = sys.modules.get("databricks.koalas", None)

    if ks is not None:
        importlib.reload(ks)

    else:  # pragma: no cover
        import databricks.koalas as ks

    ks.set_option("compute.default_index_type", "distributed")
    ks.set_option("compute.ops_on_diff_frames", True)

improve_performances

improve_performances(
    to_add_conf: List[Tuple[str, str]] = None,
    quiet_spark: bool = True,
) -> Tuple[SparkSession, SparkContext, SparkSession.sql]

(Re)defines various Spark variable with some configuration changes to improve performances by enabling Arrow This has to be done - Before launching a SparkCOntext - Before importing Koalas Those two points are being taken care on this function. If a SparkSession already exists, it will copy its configuration before creating a new one

RETURNS DESCRIPTION
Tuple of
- A SparkSession
- The associated SparkContext
- The associated ``sql`` object to run SQL queries
Source code in edsteva/__init__.py
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def improve_performances(
    to_add_conf: List[Tuple[str, str]] = None,
    quiet_spark: bool = True,
) -> Tuple[SparkSession, SparkContext, SparkSession.sql]:
    """
    (Re)defines various Spark variable with some configuration changes
    to improve performances by enabling Arrow
    This has to be done
    - Before launching a SparkCOntext
    - Before importing Koalas
    Those two points are being taken care on this function.
    If a SparkSession already exists, it will copy its configuration before
    creating a new one

    Returns
    -------
    Tuple of
    - A SparkSession
    - The associated SparkContext
    - The associated ``sql`` object to run SQL queries
    """

    # Check if a spark Session is up
    global spark, sc, sql

    spark = SparkSession.builder.getOrCreate()
    sc = spark.sparkContext

    if quiet_spark:
        sc.setLogLevel("ERROR")

    conf = sc.getConf()

    # Synchronizing TimeZone
    tz = os.environ.get("TZ", "UTC")
    os.environ["TZ"] = tz
    time.tzset()

    if to_add_conf is None:
        to_add_conf = []

    to_add_conf.extend(
        [
            ("spark.app.name", f"{os.environ.get('USER')}_scikit"),
            ("spark.sql.session.timeZone", tz),
            ("spark.sql.execution.arrow.enabled", "true"),
            ("spark.sql.execution.arrow.pyspark.enabled", "true"),
        ]
    )

    for key, value in to_add_conf:
        conf.set(key, value)

    # Stopping context to add necessary env variables
    sc.stop()
    spark.stop()

    set_env_variables()

    spark = SparkSession.builder.enableHiveSupport().config(conf=conf).getOrCreate()

    sc = spark.sparkContext

    if quiet_spark:
        sc.setLogLevel("ERROR")

    sql = spark.sql

    koalas_options()

    return spark, sc, sql