Spark joins performance issue

Related searches

I'm trying to merge historical and incremental data. As part of the incremental data, I'm getting deletes. Below is the case.

  • historical data - 100 records ( 20 columns, id is the key column)
  • incremental data - 10 records ( 20 columns, id is the key column)

Out of the 10 records in incremental data, only 5 will match with historical data.

Now I want 100 records in the final dataframe of which 95 records belong to historical data and 5 records belong to incremental data(wherever id column is match).

Update timestamp field is available in both historical and incremental data.

Below is the approach I tried.

    DF1 - Historical  Data
    DF2 - Incremental Delete Dataset
    DF3 = DF1 LEFTANTIJOIN DF2
    DF4 = DF2 INNERJOIN DF1
    DF5 = DF3 UNION DF4

However, I observed It has lot of performance issue as I'm running this join on billions of records. Any better way to do this?

you can use the cogroup operator combined with a user defined function to construct the different variations of the join. Suppose we have these two RDDs as an example :

visits = sc.parallelize([("h", "1.2.3.4"), ("a", "3.4.5.6"), ("h","1.3.3.1")] )
pageNames = sc.parallelize([("h", "Home"), ("a", "About"), ("o", "Other")])
cg = visits.cogroup(pageNames).map(lambda x :(x[0], ( list(x[1][0]), list(x[1][1]))))

You can implement an inner join as such :

innerjoin = cg.flatMap(lambda x: J(x))

Where J is defined as such :

def J(x):
    j=[]
    k=x[0]
    if x[1][0]!=[] and x[1][1]!=[]:
        for l in x[1][0]:
            for r in x[1][1]:
                j.append((k,(l,r)))
    return j

For a right outer join for example you just need to change the J function to an roJ function defined as such :

def roJ(x):
    j=[]
    k=x[0]
    if x[1][0]!=[] and x[1][1]!=[]:
        for l in x[1][0]:
            for r in x[1][1]:
                j.append((k,(l,r)))
    elif x[1][1]!=[] :
        for r in x[1][1]:
            j.append((k, (None, r)))
    return j

And call it like so :

rightouterjoin = cg.flatMap(lambda x: roJ(x))

And so on for other types of join you'd wish to implement

Apache Spark Join guidelines and Performance tuning, With duplicate keys, the size of the data may expand dramatically causing performance issues, and if one key is not present in both RDDs you will lose that row of� Let’s focus on this issue… Understanding joins performance in Spark. Relational database engines use tree based indexes to perform the joins, that help the engines to avoid loading and scanning the tables searching the matching rows. Spark lacks index definitions (whether it should or not could be discussed in another post).

Performance issues are not just related to the size of your data. It depends on many other parameters like, the keys you used for partition, your partitioned file sizes and the cluster configuration you are running your job on. I would recommend you to go through the official documentation on Tuning your spark jobs and make necessary changes. https://spark.apache.org/docs/latest/tuning.html

4. Joins (SQL and Core) - High Performance Spark [Book], Performance optimization, in Apache Spark, can be challenging. In this Skewness is a common issue when you want to join two tables. Join operations in Apache Spark is often a biggest source of performance problems and even full-blown exceptions in Spark. After this talk, you will understand the two most basic methods Spark employs for joining dataframes – to the level of detail of how Spark distributes the data within the cluster.

Below is the approach I did.

historical_data.as("a").join(
incr_data.as("b"),
$"a.id" === $"b.id", "full")
.select(historical_data.columns.map(f => expr(s"""case when a.id=b.id then b.${f} else a.${f} end as $f""")): _*)

Performance Tuning - Spark 3.0.0 Documentation, Performance of Spark joins depends upon the strategy used to tackle joins also solves uneven sharding and limited parallelism problems if� Spark is the core component of Teads’s Machine Learning stack.We use it for many ML applications, from ad performance predictions to user Look-alike Modeling. We also use Spark for processing

The art of joining in Spark. Practical tips to speedup joins in…, Azure Databricks is an Apache Spark–based analytics service that makes it easy to rapidly develop and deploy big data analytics. Monitoring and troubleshooting performance issues is a critical when operating production Azure Databricks workloads.

Optimize Spark SQL Joins. Joins are one of the fundamental…, The performance of your Apache Spark jobs depends on multiple factors. These performance factors include: how your data is stored, how the cluster is configured, and the operations that are used when processing the data. Common challenges you might face include: memory constraints due to improperly

With spark.sql.selfJoinAutoResolveAmbiguity option enabled (which it is by default), join will automatically resolve ambiguous join conditions into ones that might make sense. See [SPARK-6231] Join on two tables (generated from same one) is broken .

Comments
  • as you you are using df1 and df2 more than once , and you are creating new df4,df3 from them, you could try persisting df1 and df2 and increase their partitions(depends on ur default parellilism and your cluster). you could also look into sort-merge joins that involve sorting before joining for better performance. you could also look into repartitioning(numpartitions, join column) before persisting. this way ur dfs will be distributed on the join columns with more partitions ..
  • Mohammad Murtaza Hashmi, I tried caching and re partition, but they didn't helped me.
  • Hello, instead of using Join operators, you can use a cogroup combined with a user defined function that filters the two lists generated by the cogroup to obtain the different types of joins, if this answer interests you I will post detailed code of the alternative of join operator using cogroup below.
  • What is spark version.? If you are using latest spark version, then go for delta format.
  • @Ahlam AIS, Could you post the solution please