The Hadoop ecosystem is evolving rapidly in the incorporation of big data analytical capabilities. One of the latest additions is AMPLab’s Apache Spark. In Spark, MapReduce is only one of the supported data processing paradigms.
Spark has a very rich API, supporting interactive, batch, and near-real-time processing, in your language of choice: Python, Scala, or Java. Moreover, Spark is platform independent. Hadoop YARN is just one of the supported platforms to run Spark applications.
In this post I will talk about another frontend for Spark that is being developed by the guys at AMPLab: SparkR. Its API is (still) not feature-rich as its counterparts in Python, Scala, and Java, but nevertheless it is rich enough to support MapReduce and other processing dataflows using R as your front-end.
We will do that through an example application. Actually, we will port the Logistic Regression code from a previous post. That code already leverages data distribution on HDFS and processing parallelism through Hadoop’s MapReduce. But Spark provides a significant advantage for iterative applications like that one: it has a built-in in-memory distributed cache, allowing for a much more efficient processing.
Before diving in to the code, please notice that the goal of this post is not to show you how to set up the environment to run it. For that purpose, I suggest you follow two guidelines: - Apache Spark Documentation, for hoe to build and/or deploy Spark either as a standalone cluster, or running on top of Hadoop YARN. Actually, at the writing of this post, SparkR is supported only with Spark running as a standalone cluster. - SparkR Documentation
Now let’s dive into the code:
The first step is, of course, load the SparkR library into your R environment:
In local mode, Spark runs in a local thread pool in your JVM. It is a good way to develop and test your application, before deploying it on your cluster. The number 4 between brackets means that Spark will run in 4 local worker threads:
The next step is then to parse each list element and transform it in a meaningful representation for our R Logistic Regression algorithm. This is a typical operation in Spark, easily achieved by using the MapPartitions transformation. In SparkR, this is implemented by the lapplyPartition function, which operates in a very similar way to the R lapply function. Again, the returned RDD is just a pointer to the actual RDD object in your Spark cluster:
Now we have our dataset split into train and test parts, and almost ready to go into the model training. But not quite yet. We still need to transform it into R’s matrix format, which is necessary for the numeric calculations we need when training the model. As before, lapplyPartition takes care of it:
The next function we need is one to balance the train matrix, so that we get the same number of positive and negative examples. This is necessary, because this dataset has 70% of positive and 30% of negative examples:
Before running the training algorithm, we need to instantiate out parameters vector and define both the hypothesis and cost functions. Those will all be local objects in R:
A side note about the indexing [[1]][[2]] in the collect function above: its output is a list with one element (because we emitted only a single key); that element is also a list, whose first element is the key, and the second is the value we want to collect. Thus the notation [[1]][[2]].
Finally we get to the main loop of the Gradient Descent, where we call the train function to operate on the train data RDD until our convergence criteria is satisfied:
Spark has a very rich API, supporting interactive, batch, and near-real-time processing, in your language of choice: Python, Scala, or Java. Moreover, Spark is platform independent. Hadoop YARN is just one of the supported platforms to run Spark applications.
In this post I will talk about another frontend for Spark that is being developed by the guys at AMPLab: SparkR. Its API is (still) not feature-rich as its counterparts in Python, Scala, and Java, but nevertheless it is rich enough to support MapReduce and other processing dataflows using R as your front-end.
We will do that through an example application. Actually, we will port the Logistic Regression code from a previous post. That code already leverages data distribution on HDFS and processing parallelism through Hadoop’s MapReduce. But Spark provides a significant advantage for iterative applications like that one: it has a built-in in-memory distributed cache, allowing for a much more efficient processing.
Before diving in to the code, please notice that the goal of this post is not to show you how to set up the environment to run it. For that purpose, I suggest you follow two guidelines: - Apache Spark Documentation, for hoe to build and/or deploy Spark either as a standalone cluster, or running on top of Hadoop YARN. Actually, at the writing of this post, SparkR is supported only with Spark running as a standalone cluster. - SparkR Documentation
Now let’s dive into the code:
The first step is, of course, load the SparkR library into your R environment:
# load SparkR library into R session
library(SparkR)In local mode, Spark runs in a local thread pool in your JVM. It is a good way to develop and test your application, before deploying it on your cluster. The number 4 between brackets means that Spark will run in 4 local worker threads:
# instantiate a local Spark Context with 4 worker threads
sc <- sparkR.init(master='local[4]')# instantiate a remote Spark Context with 4 executors, each one running with 1 CPU-core and 1 GB of memory
sc <- sparkR.init(master='spark://localhost:7077',
                  appName='sparkr_logistic_regression',
                  sparkEnvir=list(spark.executor.memory='1g',
                                  spark.num.executors='4',
                                  spark.executor.cores='1'))# reference to a input file on HDFS
input_file <- 'hdfs://localhost/tmp/german.data-numeric'
# RDD on Spark: in SparkR RDDs are abstracted as R lists
# this RDD is constructed from a text file and partitioned in 4 chunks in the Spark cluster
input_rdd <- textFile(sc, input_file, minSplits=4)# take 3 sample elements from the RDD, without replacement
takeSample(input_rdd, F, 3L, 0L)[[1]]
[1] "   2   9   4  11   4   5   3   3   4  32   3   2   2   1   1   0   0   1   0   0   0   0   0   1   2 "
[[2]]
[1] "   2  48   4  51   1   3   2   3   3  30   3   1   1   2   1   0   0   1   0   0   1   0   0   0   2 "
[[3]]
[1] "   4  18   4  28   1   4   3   2   2  31   1   2   1   1   1   1   0   1   0   0   1   0   0   1   2 "The next step is then to parse each list element and transform it in a meaningful representation for our R Logistic Regression algorithm. This is a typical operation in Spark, easily achieved by using the MapPartitions transformation. In SparkR, this is implemented by the lapplyPartition function, which operates in a very similar way to the R lapply function. Again, the returned RDD is just a pointer to the actual RDD object in your Spark cluster:
# parse the text on each RDD element (on each partition in parallel), transforming each element in a
# numeric R vector
dataset_rdd <- lapplyPartition(input_rdd, function(part) {
  part <- lapply(part, function(x) unlist(strsplit(x, '\\s')))
  part <- lapply(part, function(x) as.numeric(x[x != '']))
  part
})# take 3 sample elements from the RDD and inspect its content through the R str function
str(takeSample(dataset_rdd, F, 3L, 0L))
})List of 3
 $ : num [1:25] 2 9 4 11 4 5 3 3 4 32 ...
 $ : num [1:25] 2 48 4 51 1 3 2 3 3 30 ...
 $ : num [1:25] 4 18 4 28 1 4 3 2 2 31 ...
})split_dataset <- function(rdd, ptest) {
  # create the test RDD with the first 'ptest' percentage of the input elements
  data_test_rdd <- lapplyPartition(rdd, function(part) {
    part_test <- part[1:(length(part)*ptest)]
    part_test
  })
  # create the train RDD with the remaining input elements
  data_train_rdd <- lapplyPartition(rdd, function(part) {
    part_train <- part[((length(part)*ptest)+1):length(part)]
    part_train
  })
  # return a list with the references for the test and train RDDs
  list(data_test_rdd, data_train_rdd)
}Now we have our dataset split into train and test parts, and almost ready to go into the model training. But not quite yet. We still need to transform it into R’s matrix format, which is necessary for the numeric calculations we need when training the model. As before, lapplyPartition takes care of it:
get_matrix_rdd <- function(rdd) {
  # create an RDD whose elements are R matrix chunks
  matrix_rdd <- lapplyPartition(rdd, function(part) {
    m <- Matrix(data=unlist(part, F, F), ncol=25, byrow=T)
    m <- cBind(1, m)
    m[,ncol(m)] <- m[,ncol(m)]-1
    m
  })
  matrix_rdd
}The next function we need is one to balance the train matrix, so that we get the same number of positive and negative examples. This is necessary, because this dataset has 70% of positive and 30% of negative examples:
balance_matrix_rdd <- function(matrix_rdd) {
  # create an RDD with a balanced number of positive and negative examples
  balanced_matrix_rdd <- lapplyPartition(matrix_rdd, function(part) {
    y <- part[,26]
    index <- sample(which(y==0),length(which(y==1)))
    index <- c(index, which(y==1))
    part <- part[index,]
    part
  })
  balanced_matrix_rdd
}# split the data into train and test sets
dataset <- split_dataset(dataset_rdd, 0.2)
# create the test data RDD
matrix_test_rdd <- get_matrix_rdd(dataset[[1]])
# create the train data RDD
matrix_train_rdd <- balance_matrix_rdd(get_matrix_rdd(dataset[[2]]))
# cache both train and test RDDs into the Spark cluster distributed memory
cache(matrix_test_rdd)
cache(matrix_train_rdd)Before running the training algorithm, we need to instantiate out parameters vector and define both the hypothesis and cost functions. Those will all be local objects in R:
# initialize parameters vector theta
theta <- as.matrix(rep(0,25))
# hypothesis function
hypot <- function(z) {
  1/(1+exp(-z))
}
# gradient of cost function
gCost <- function(t,X,y) {
  1/nrow(X)*(t(X)%*%(hypot(X%*%t)-y))
}train <- function(theta, rdd) {
  # runs one step of a batch gradient computation in a subset of the input data defined by the Spark cluster
  # compute partial results
  gradient_rdd <- lapplyPartition(rdd, function(part) {
    X <- part[,1:25]
    y <- part[,26]
    p_gradient <- gCost(theta,X,y)
    list(list(1, p_gradient))
  })
  agg_gradient_rdd <- reduceByKey(gradient_rdd, '+', 1L)
  # aggregated output of one iteration
  collect(agg_gradient_rdd)[[1]][[2]]
}A side note about the indexing [[1]][[2]] in the collect function above: its output is a list with one element (because we emitted only a single key); that element is also a list, whose first element is the key, and the second is the value we want to collect. Thus the notation [[1]][[2]].
Finally we get to the main loop of the Gradient Descent, where we call the train function to operate on the train data RDD until our convergence criteria is satisfied:
# cost function optimization through batch gradient descent (training)
# alpha = learning rate
# steps = iterations of gradient descent
# tol = convergence criteria
# convergence is measured by comparing L2 norm of current gradient and previous one
alpha <- 0.05
tol <- 1e-4
step <- 1
while(T) {
  cat("step: ",step,"\n")
  p_gradient <- train(theta, matrix_train_rdd)
  theta <- theta-alpha*p_gradient
  gradient <- train(theta, matrix_train_rdd)
  if(abs(norm(gradient,type="F")-norm(p_gradient,type="F"))<=tol) break
  step <- step+1
}# hypothesis testing
# counts the predictions from the test set classified as 'good' and ' bad' credit and
# compares with the actual values
data_test <- collect(matrix_test_rdd)
data_test <- lapply(data_test, function(x) as.matrix(x))
data_test <- do.call(rbind, data_test)
X_test <- data_test[,1:25]
y_test <- data_test[,26]
y_pred <- as.matrix(hypot(X_test%*%theta))
result <- xor(as.vector(round(y_pred)),as.vector(y_test))
corrects = length(result[result==F])
wrongs = length(result[result==T])
cat("\ncorrects: ",corrects,"\n")
cat("wrongs: ",wrongs,"\n")
cat("accuracy: ",corrects/length(y_pred),"\n")
cat("\nCross Tabulation:\n")
print(table(y_test,round(y_pred), dnn=c('y_test','y_pred')))
©2014 - Alexandre Vilcek
 
 
No comments:
Post a Comment