Commit 1dd58689 authored by titicaca's avatar titicaca
Browse files

add spark-iforest implementation

parents
.idea
target
*.iml
spark-warehouse
.git
# Spark-iForest
Isolation Forest (iForest) is an effective model that focuses on anomaly isolation.
iForest uses tree structure for modeling data, iTree isolates anomalies closer to the root of the tree as compared to normal points.
A anomaly score is calculated by iForest model to measure the abnormality of the data instances. The lower, the more abnormal.
More details about iForest can be found in the following papers:
<a href="https://dl.acm.org/citation.cfm?id=1511387">Isolation Forest</a> [1]
and <a href="https://dl.acm.org/citation.cfm?id=2133363">Isolation-Based Anomaly Detection</a> [2].
We design and implement a distributed iForest on Spark, which is trained via model-wise parallelism, and predicts a new Dataset via data-wise parallelism.
It is implemented in the following steps:
1. Sampling data from a Dataset. Data instances are sampled and grouped for each iTree.
As indicated in the paper, the number samples for constructing each tree is usually not very large (default value 256).
Thus we can construct sampled paired RDD, where each row key is tree index and row value is a group of sampled data instances for a tree.
1. Training and constructing each iTree on parallel via a map operation and collect all iTrees to construct a iForest model.
1. Predict a new Dataset on parallel via a map operation with the collected iForest model.
## Usage
Spark iForest is designed and implemented easy to use. The usage is similar to the iForest sklearn implementation [3].
*Parameters:*
- *numTrees:* The number of trees in the iforest model (>0).
- *maxSamples:* The number of samples to draw from data to train each tree (>0).
If maxSamples <= 1, the algorithm will draw maxSamples * totalSample samples.
If maxSamples > 1, the algorithm will draw maxSamples samples.
The total memory is about maxSamples * numTrees * 4 + maxSamples * 8 bytes.
- *maxFeatures:* The number of features to draw from data to train each tree (>0).
If maxFeatures <= 1, the algorithm will draw maxFeatures * totalFeatures features.
If maxFeatures > 1, the algorithm will draw maxFeatures features.
- *maxDepth:* The height limit used in constructing a tree (>0).
The default value will be about log2(numSamples).
- *contamination:* The proportion of outliers in the data set, the value should be in (0, 1).
It is only used in the prediction phase to convert anomaly score to predicted labels.
In order to enhance performance, Our method to get anomaly score threshold is caculated by approxQuantile.
Note that this is an approximate quantiles computation, if you want an exactly answer,
you can extract ”$anomalyScoreCol" to select your anomalies.
- *bootstrap:* If true, individual trees are fit on random subsets of the training data sampled with replacement.
If false, sampling without replacement is performed.
- *seed:* The seed used by the randam number generator.
- *featuresCol:* features column name, default "features".
- *anomalyScoreCol:* Anomaly score column name, default "anomalyScore".
- *predictionCol:* Prediction column name, default "prediction".
## Examples
The following codes are an example for detecting anamaly data points using
Wisconsin Breast Cancer (Breastw) Dataset [4].
*Scala API*
```scala
val spark = SparkSession
.builder()
.master("local") // test in local mode
.appName("iforest example")
.getOrCreate()
val startTime = System.currentTimeMillis()
// Wisconsin Breast Cancer Dataset
val dataset = spark.read.option("inferSchema", "true")
.csv("data/anomaly-detection/breastw.csv")
// Index label values: 2 -> 0, 4 -> 1
val indexer = new StringIndexer()
.setInputCol("_c10")
.setOutputCol("label")
val assembler = new VectorAssembler()
assembler.setInputCols(Array("_c1", "_c2", "_c3", "_c4", "_c5", "_c6", "_c7", "_c8", "_c9"))
assembler.setOutputCol("features")
val iForest = new IForest()
.setNumTrees(100)
.setMaxSamples(256)
.setContamination(0.35)
.setBootstrap(false)
.setMaxDepth(100)
.setSeed(123456L)
val pipeline = new Pipeline().setStages(Array(indexer, assembler, iForest))
val model = pipeline.fit(dataset)
val predictions = model.transform(dataset)
val binaryMetrics = new BinaryClassificationMetrics(
predictions.select("prediction", "label").rdd.map {
case Row(label: Double, ground: Double) => (label, ground)
}
)
val endTime = System.currentTimeMillis()
println(s"Training and predicting time: ${(endTime - startTime) / 1000} seconds.")
println(s"The model's auc: ${binaryMetrics.areaUnderROC()}")
```
## Benchmark
TO BE ADDED
## Requirements
Spark-iForest is built on Spark 2.1.1 or later version.
## Build From Source
`mvn clean package`
## Licenses
Spark-IForest is available under Apache Licenses 2.0.
## Acknowledgement
Spark iForest is designed and implemented together with my former intern Fang, Jie at Transwarp (transwarp.io).
Thanks for his great contribution. In addition, thanks for the supports of Discover Team.
## Contact and Feedback
If you encounter any bugs, feel free to submit an issue or pull request. Also you can email to:
<a href="fangzhou.yang@hotmail.com">Yang, Fangzhou (fangzhou.yang@hotmail.com)</a>
## References:
[1] Liu F T, Ting K M, Zhou Z, et al. Isolation Forest[C]. international conference on data mining, 2008.
[2] Liu F T, Ting K M, Zhou Z, et al. Isolation-Based Anomaly Detection[J]. ACM Transactions on Knowledge Discovery From Data, 2012, 6(1).
[3] Scikit-learn: Machine Learning in Python, Pedregosa et al., JMLR 12, pp. 2825-2830, 2011.
[4] A. Asuncion and D. Newman. UCI machine learning repository, 2007.
This diff is collapsed.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.spark.ml</groupId>
<artifactId>spark-iforest</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<scala.version>2.11.8</scala.version>
<scala.binary.version>2.11</scala.binary.version>
<slf4j.version>1.7.16</slf4j.version>
<log4j.version>1.2.17</log4j.version>
<skipTests>false</skipTests>
<maven.version>3.3.9</maven.version>
<spark.version>2.2.0</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<!--scope>provided</scope-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<!--scope>provided</scope-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<!--scope>provided</scope-->
<type>test-jar</type>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<version>3.0.0</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package org.apache.spark.examples.ml
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
import org.apache.spark.ml.iforest.IForest
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.sql.{Row, SparkSession}
/**
* An example demonstrating IForest.
* Run with
* {{{
* ./spark-sumbit ...
* }}}
*/
object IForestExample {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local") // test in local mode
.appName("iforest example")
.getOrCreate()
val startTime = System.currentTimeMillis()
// Dataset from https://archive.ics.uci.edu/ml/datasets/Breast+Cancer+Wisconsin+(Original)
val dataset = spark.read.option("inferSchema", "true")
.csv("data/anomaly-detection/breastw.csv")
// Index label values: 2 -> 0, 4 -> 1
val indexer = new StringIndexer()
.setInputCol("_c10")
.setOutputCol("label")
val assembler = new VectorAssembler()
assembler.setInputCols(Array("_c1", "_c2", "_c3", "_c4", "_c5", "_c6", "_c7", "_c8", "_c9"))
assembler.setOutputCol("features")
val iForest = new IForest()
.setNumTrees(100)
.setMaxSamples(256)
.setContamination(0.35)
.setBootstrap(false)
.setMaxDepth(100)
.setSeed(123456L)
val pipeline = new Pipeline().setStages(Array(indexer, assembler, iForest))
val model = pipeline.fit(dataset)
val predictions = model.transform(dataset)
val binaryMetrics = new BinaryClassificationMetrics(
predictions.select("prediction", "label").rdd.map {
case Row(label: Double, ground: Double) => (label, ground)
}
)
val endTime = System.currentTimeMillis()
println(s"Training and predicting time: ${(endTime - startTime) / 1000} seconds.")
println(s"The model's auc: ${binaryMetrics.areaUnderROC()}")
}
}
// scalastyle:on println
\ No newline at end of file
package org.apache.spark.ml.iforest
sealed abstract class IFNode extends Serializable {
}
/**
* Data Structure for Isolation Forest Internal Node
* @param leftChild
* @param rightChild
* @param featureIndex
* @param featureValue
*/
class IFInternalNode (
val leftChild: IFNode,
val rightChild: IFNode,
val featureIndex: Int,
val featureValue: Double) extends IFNode {
}
class IFLeafNode (
val numInstance: Long) extends IFNode {
}
This diff is collapsed.
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark
// scalastyle:off
import java.io.File
import org.apache.spark.internal.Logging
import org.apache.spark.util.AccumulatorContext
import org.scalatest.{BeforeAndAfterAll, FunSuite, Outcome}
/**
* Base abstract class for all unit tests in Spark for handling common functionality.
*/
abstract class SparkFunSuite
extends FunSuite
with BeforeAndAfterAll
with Logging {
// scalastyle:on
protected override def afterAll(): Unit = {
try {
// Avoid leaking map entries in tests that use accumulators without SparkContext
AccumulatorContext.clear()
} finally {
super.afterAll()
}
}
// helper function
protected final def getTestResourceFile(file: String): File = {
new File(getClass.getClassLoader.getResource(file).getFile)
}
protected final def getTestResourcePath(file: String): String = {
getTestResourceFile(file).getCanonicalPath
}
/**
* Log the suite name and the test name before and after each test.
*
* Subclasses should never override this method. If they wish to run
* custom code before and after each test, they should mix in the
* {{org.scalatest.BeforeAndAfter}} trait instead.
*/
final protected override def withFixture(test: NoArgTest): Outcome = {
val testName = test.text
val suiteName = this.getClass.getName
val shortSuiteName = suiteName.replaceAll("org.apache.spark", "o.a.s")
try {
logInfo(s"\n\n===== TEST OUTPUT FOR $shortSuiteName: '$testName' =====\n")
test()
} finally {
logInfo(s"\n\n===== FINISHED $shortSuiteName: '$testName' =====\n")
}
}
}
package org.apache.spark.ml.iforest
import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.util.DefaultReadWriteTest
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
class IForestSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
@transient var dataset: Dataset[_] = _
override def beforeAll(): Unit = {
super.beforeAll()
dataset = IForestSuite.generateIForestData(spark, 10, 2)
}
test("default parameters") {
val iforest = new IForest()
assert(iforest.getNumTrees === 100)
assert(iforest.getMaxSamples === 1.0)
assert(iforest.getMaxFeatures === 1.0)
assert(iforest.getContamination === 0.1)
assert(!iforest.getBootstrap)
assert(iforest.getFeaturesCol === "features")
assert(iforest.getPredictionCol === "prediction")
assert(iforest.getLabelCol === "label")
assert(iforest.getAnomalyScoreCol === "anomalyScore")
}
test("set parameters") {
val iforest = new IForest()
.setNumTrees(10)
.setMaxSamples(10)
.setMaxFeatures(10)
.setMaxDepth(10)
.setContamination(0.01)
.setBootstrap(true)
.setSeed(123L)
.setFeaturesCol("test_features")
.setPredictionCol("test_prediction")
.setLabelCol("test_label")
.setAnomalyScoreCol("test_anomalyScore")
assert(iforest.getNumTrees === 10)
assert(iforest.getMaxSamples === 10)
assert(iforest.getMaxFeatures === 10)
assert(iforest.getMaxDepth === 10)
assert(iforest.getContamination === 0.01)
assert(iforest.getBootstrap)
assert(iforest.getSeed === 123L)
assert(iforest.getFeaturesCol === "test_features")
assert(iforest.getPredictionCol === "test_prediction")
assert(iforest.getLabelCol === "test_label")
assert(iforest.getAnomalyScoreCol === "test_anomalyScore")
}
test("split data") {
// test with bootsrap
val iforest1 = new IForest()
.setNumTrees(2)
.setMaxSamples(1.0)
.setMaxFeatures(1.0)
.setBootstrap(false)
val rdd1 = iforest1.splitData(dataset)
val arr = rdd1.map(elem => elem._2).collect
assert(arr.length === 2 && arr(0) === arr(1))
// test without bootstrap
val iforest2 = new IForest()
.setNumTrees(2)
.setMaxSamples(1.0)
.setMaxFeatures(1.0)
.setBootstrap(true)
val rdd2 = iforest2.splitData(dataset)
val arr2 = rdd1.map(elem => elem._2).collect
assert(arr.length === 2 && arr(0) === arr(1))
}
test("sample features") {
val data = IForestSuite.generateIVectorArray(4, 3)
val iforest = new IForest()
val sampleResult = iforest.sampleFeatures(data, 4)
assert(sampleResult.length === 4 && sampleResult(0).length === 3 &&
sampleResult(1).length === 3 && sampleResult(2).length === 3)
val sampleResult2 = iforest.sampleFeatures(data, 2)
assert(sampleResult2.length === 4 && sampleResult2(0).length === 2 &&
sampleResult2(1).length === 2 && sampleResult2(2).length === 2)
}
test("fit, transform and summary") {
val predictionColName = "test_prediction"
val anomalyScoreName = "test_anomalyScore"
val iforest = new IForest()
.setNumTrees(10)
.setPredictionCol(predictionColName)
.setAnomalyScoreCol(anomalyScoreName)
.setContamination(0.2)
val model = iforest.fit(dataset)
assert(model.trees.length === 10)
val summary = model.summary
val anomalies = summary.anomalies.collect
assert(anomalies.length === 10)
assert(summary.numAnomalies === 2)
val transformed = model.transform(dataset)
val expectedColumns = Array("features", predictionColName, anomalyScoreName)
expectedColumns.foreach { column =>
assert(transformed.columns.contains(column))
}
}
test("copy estimator and model") {
val iforest1 = new IForest()
val iforest2 = iforest1.copy(ParamMap.empty)
iforest1.params.foreach { p =>
if (iforest1.isDefined(p)) {
(iforest1.getOrDefault(p), iforest2.getOrDefault(p)) match {
case (Array(values), Array(newValues)) =>
assert(values === newValues, s"Values do not match on param ${p.name}.")
case (value, newValue) =>
assert(value === newValue, s"Values do not match on param ${p.name}.")
}
} else {
assert(!iforest2.isDefined(p), s"Param ${p.name} shouldn't be defined.")
}
}
val model1 = iforest1.fit(dataset)
val model2 = model1.copy(ParamMap.empty)
model1.params.foreach { p =>
if (model1.isDefined(p)) {
(model1.getOrDefault(p), model2.getOrDefault(p)) match {
case (Array(values), Array(newValues)) =>
assert(values === newValues, s"Values do not match on param ${p.name}.")
case (value, newValue) =>
assert(value === newValue, s"Values do not match on param ${p.name}.")
}
} else {
assert(!model2.isDefined(p), s"Param ${p.name} shouldn't be defined.")
}
}
assert(model1.summary.featuresCol === model2.summary.featuresCol)
assert(model1.summary.predictionCol === model2.summary.predictionCol)
assert(model1.summary.anomalyScoreCol === model2.summary.anomalyScoreCol)
}
test("read/write") {
def checkTreeNodes(node: IFNode, node2: IFNode): Unit = {
(node, node2) match {
case (node: IFInternalNode, node2: IFInternalNode) =>
assert(node.featureValue === node2.featureValue)
assert(node.featureIndex === node2.featureIndex)
checkTreeNodes(node.leftChild, node2.leftChild)
checkTreeNodes(node.rightChild, node2.rightChild)
case (node: IFLeafNode, node2: IFLeafNode) =>
assert(node.numInstance === node2.numInstance)
case _ =>
throw new AssertionError("Found mismatched nodes")
}
}
def checkModelData(model: IForestModel, model2: IForestModel): Unit = {
val trees = model.trees
val trees2 = model2.trees
assert(trees.length === trees2.length)
try {
trees.zip(trees2).foreach { case (node, node2) =>
checkTreeNodes(node, node2)
}
} catch {
case ex: Exception => throw new AssertionError(
"checkModelData failed since the two trees were not identical.\n"
)
}
}
//TODO figure out why it doesn't work
// val iforest = new IForest()
// testEstimatorAndModelReadWrite(
// iforest,
// dataset,
// IForestSuite.allParamSettings,
// IForestSuite.allParamSettings,
// checkModelData
// )
}
test("boundary case") {
intercept[IllegalArgumentException] {
new IForest().setMaxSamples(-1)
}
intercept[IllegalArgumentException] {
new IForest().setMaxFeatures(-1)
}
intercept[IllegalArgumentException] {
new IForest().setMaxDepth(-1)
}
intercept[IllegalArgumentException] {
new IForest().setContamination(-1)
}
intercept[IllegalArgumentException] {
val iforest = new IForest()
.setMaxSamples(20)
iforest.fit(dataset)
}
}
}
object IForestSuite {
case class TestRow(features: Vector)
def generateIForestData(spark: SparkSession, rows: Int, dim: Int): DataFrame = {
val sc = spark.sparkContext
val rdd = sc.parallelize(1 to rows).map(i => Vectors.dense(Array.fill(dim)(i.toDouble)))
.map(v => TestRow(v))
spark.createDataFrame(rdd)
}
case class dataSchema(features: Vector, label: Double)
def generateDataWithLabel(spark: SparkSession): DataFrame = {