Tags down


Does it make sense to write a UDAF to perform a rolling regression on a spark dataframe?

By : Puneet Goel
Date : October 18 2020, 03:08 PM
seems to work fine
After doing a good bit of research I was planning on creating a window object, making a UDF that specified how I wanted my linear regression to occur (using the spark ml linear regression inside the function
code :

Share : facebook icon twitter icon

implement sum aggregator on custom case class on spark sql dataframe (UDAF)

By : Faber
Date : March 29 2020, 07:55 AM
it helps some times As of Spark 1.4, I don't think UDAF are supported.
Please have a look at the following tickets for more information:

Spark: How change DataFrame to LibSVM and perform logistic regression

By : Anonymous
Date : March 29 2020, 07:55 AM
will help you I would say don't load it into DataFrame in the first place and simply use MLUtils.loadLibSVMFile but if for some reason this is not on an option you can convert to RDD[String] and use the same map logic as used by loadLibSVMFile
code :
import org.apache.spark.sql.Row
import org.apache.spark.mllib.regression.LabeledPoint

  .map{ case Row(line: String) => line }
  .filter(line => !(line.isEmpty || line.startsWith("#")))
  .map { line => ??? }

How to write a Spark UDAF which simply do row collection?

By : stewmo77
Date : March 29 2020, 07:55 AM
This might help you If I understood your question correct, following shall be your solution :
code :
class CollectorUDAF() extends UserDefinedAggregateFunction {

  // Input Data Type Schema
  def inputSchema: StructType = new StructType().add("value", DataTypes.DoubleType).add("y", DataTypes.DoubleType)

  // Intermediate Schema
  val bufferFields : util.ArrayList[StructField] = new util.ArrayList[StructField]
  val bufferStructField : StructField = DataTypes.createStructField("array", DataTypes.createArrayType(DataTypes.StringType, true), true)
  def bufferSchema: StructType = DataTypes.createStructType(bufferFields)

  // Returned Data Type .
  def dataType: DataType = DataTypes.createArrayType(DataTypes.DoubleType)

  // Self-explaining
  def deterministic = true

  // This function is called whenever key changes
  def initialize(buffer: MutableAggregationBuffer) = {
    buffer(0, new java.util.ArrayList[Double])

  // Iterate over each entry of a group
  def update(buffer: MutableAggregationBuffer, input: Row) = {
    val DoubleList: util.ArrayList[Double]  = new util.ArrayList[Double](buffer.getList(0))
    buffer.update(0, DoubleList)

  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1.update(0, buffer1.getList(0).toArray() ++ buffer2.getList(0).toArray())
  // Called after all the entries are exhausted.
  def evaluate(buffer: Row) = {

Spark Dataframe UDAF issue

By : Luanpn
Date : March 29 2020, 07:55 AM
it should still fix some issue Please post code details here and steps to replicate the issue.
And also make sure that spark and scala version are correct in your project.

UDAF merge rows where are first orderdby in a Spark DataSet/Dataframe

By : Data Processing
Date : March 29 2020, 07:55 AM
hope this fix your issue Here is a sollution with Spark 2's groupByKey (used with an untyped Dataset).The advantage of groupByKey is that you have access to the group (you get an Iterator[Row] in mapGroups):
code :
 df.groupByKey(r => r.getAs[Int]("ID"))
      .mapGroups{case(id,rows) => {
        val sorted = rows
          .map(r => (r.getAs[String]("Word"),r.getAs[java.sql.Timestamp]("Timestamp")))

         sorted.map(_._1).mkString(" "),
Related Posts Related Posts :
  • Is there a way I can get the Gatling "Report ID"?
  • class needs to be abstract since method in trait is not defined error
  • How to mock or stub AWS SDK S3 bucket calls in Scala or Java?
  • Multipart Form Errors with Lagom
  • Possible ways to check if a value exists in a sequence scala
  • How convert Spark dataframe column from Array[Int] to linalg.Vector?
  • What is the best way to write Scala codes for "if else" logic?
  • spark flatten records using a key column
  • Scala: create instance by type parameter
  • Spark Scala: Count Consecutive Months
  • How to use fixture-context objects with async specs in ScalaTest?
  • Problems with Source.fromURL when trying to close it in finally block
  • Scala: custom grammar/parser combinators
  • Convert a groupByKey to reduceByKey
  • Displaying output under a certain format
  • How to write a nested query?
  • shadow
    Privacy Policy - Terms - Contact Us © soohba.com