BigQuery

What is a query:

A query is how you retrieve information from a database.

For example, a customer has a database containing their customer information — such as the history of what they’ve bought — and they want to know which customers spent more than $1,000 last year. They could write a query that returns a table with only those customers’ information. Usually, database queries are written in structured query language (SQL).

GCP BigQuery defined:

Storing and querying massive datasets can be time consuming and expensive without the right hardware and infrastructure. Google BigQuery is an enterprise data warehouse that solves this problem by enabling super-fast SQL queries using the processing power of Google’s infrastructure:

How BigQuery works:

BigQuery is Google’s fully managed, petabyte-scale, low cost analytics data warehouse. BigQuery is serverless, there is no infrastructure to manage, and you don’t need a database administrator, so you can focus on analyzing data to find meaningful insights, use familiar SQL, and take advantage of our pay-as-you-go model. BigQuery is a powerful big data analytics platform used by all types of organizations, from startups to Fortune 500 companies.

Common use cases for BigQuery:

Now that you understand what BigQuery is and how it works, check out this video that discusses some common use cases for BigQuery, including game telemetry, adtech, eRetail, and operational analytics.

Read more »

Storage and Database Overview:

With the rise of the digital age the amount of data has grown exponentially, but where exactly are you supposed to keep it all? This video breaks down customer needs for data storage and databases, and how GCP can help them transfer their data to the cloud, leverage that data for advanced analytics, and even archive data for safekeeping.
Google Data Transfers

Data Tranfers defined:

Google offers a suite of solutions that enable customers to move data to GCP fast, regardless of that data’s location or type, network size and utilization, or its planned use.

Data Transfer Options:

Depending on the customer needs, we have options to help customers move data.

1、Online transfer:Customers can use their network to move data to Cloud Storage.

2、Cloud Storage Transfer Service:Customers can transfer their data from one cloud to another.

3、Transfer Appliance:Customers can select between 100-480 TB rackable appliances to securely ship data to GCS.

4、BigQuery Data Transfer Service:Schedule and automate data transfers from your SaaS applications to BigQuery.

Read more »

Compute engine

Google Compute Engine是一个基础架构服务,可以让你用Google的服务器来运行Linux的虚拟机,得到更强大的数据运算能力。谷歌在I/O大会上表示,Compute Engine服务比对手的产品更具性价比,每一美元所购买的计算能力要高出对手50%。谷歌Compute Engine服务的背后是由大量的Linux虚拟机组成,此外用于该服务的处理器共计771886核。

Compute Engine defined:

Google Compute Engine is a type of infrastructure as a service (IaaS). Compute Engine delivers scalable, high-performance virtual machines running in Google’s innovative data centers and worldwide fiber network. Compute Engine provides practically unlimited computing power using virtual machines (VMs) in the cloud.
Customers can run any computing workload on Compute Engine such as, web-server hosting, application hosting, and/or application backends.

Examples include mobile gaming servers, automated bidding for online advertising space, and batch-data processing, such as Hadoop and logs processing. Other examples are media rendering, transcoding, financial-market simulation, and genomics, such as DNA sequencing.

**Business Value propositions: **

1、Industry-leading performance

2、Low cost

Technical value propositions:

1、Fast and efficient networking

Read more »

协同过滤算法是广告推荐中最流行的算法之一,Spark mllib提供了完整的算法实现,Spark mllib中ALS算法的核心就是将稀疏评分矩阵分解为用户特征向量矩阵和产品特征向量矩阵的乘积,交替使用最小二乘法逐步计算用户/产品特征向量,使得差平方和最小。通过用户/产品特征向量的矩阵来预测某个用户对某个产品的评分。

ALS算法实现了对指定用户推荐商品列表,对指定特定商品推荐隐藏用户,在实际工程中,我们可能需要计算Item2Item的相关性,即用户和用户之间的相似度,商品和商品之间的相似度。如广告推荐过程中,需要扩展人群或者扩展APP应用列表。利用Spark ALS算法生成Item的特征向量,利用局部敏感哈希算法(LSH)计算向量的相似度。

在工程应用过程中,ALS算法需要的数据是这样的:

我们的工程数据,可能是这样的:

我们需要对数据进行编号映射,数据编号是全量的。LSH算法的优势是,可以在线性时间内获取相似的topK向量,类似于搜索引擎和NLP算法实现,大多数情况下无法对全量数据进行计算,这样复杂度是n的平方,海量数据n的平方复杂度是可怕的。局部敏感哈希的基本思想类似于一种空间域转换思想,LSH算法基于一个假设,如果两个向量在原有的数据空间是相似的,那么分别经过哈希函数转换以后的它们也具有很高的相似度;相反,如果它们本身是不相似的,那么经过转换后它们应仍不具有相似性。

以下是完整代码实现,可以直接运行(附赠一个跨月的路径获取方式):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import java.text.SimpleDateFormat
import java.util.{Calendar, Date}

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.{SparkConf, SparkContext, mllib}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.recommendation.ALS
import com.soundcloud.lsh.{Cosine, Lsh, NearestNeighbours}
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.mllib.recommendation.Rating

import scala.collection.mutable.ArrayBuffer
object CosineLSHJoinSpark {

def main(args: Array[String]){

val conf = new SparkConf().setAppName("CosineLSHJoinSpark")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.format("orc").load(getLastNDaysPath(15))
df.show

val (ratings,productMap) = datatransform_formkpixel(df)
ratings.take(20).foreach(println)
// Build the recommendation model using ALS
val rank = 10
val numIterations = 10
val model = ALS.train(ratings, rank, numIterations, 0.02)

val vectorArray: ArrayBuffer[IndexedRow] = new ArrayBuffer[IndexedRow]()
val fdata=model.productFeatures.collect()

for(i <- 0 until fdata.length) {
if(fdata(i)!=null && fdata(i)._1!=null && fdata(i)._2!=null) {
val t=fdata(i) //(int,Array[double]) (productID,vector)
val ir=IndexedRow(i,Vectors.dense(t._2))
vectorArray.append(ir)
}
}
val idxrows = sc.parallelize(vectorArray)
val idxmat: IndexedRowMatrix = new IndexedRowMatrix(idxrows)
/*
val rows = Seq(
IndexedRow(1, Vectors.dense(1.0, 1.0, 1.0)),
IndexedRow(2, Vectors.dense(2.0, 2.0, 2.0)),
IndexedRow(5, Vectors.dense(6.0, 3.0, 2.0))
)
val matrix = new IndexedRowMatrix(sparkSession.sparkContext.parallelize(rows))
*/
val lsh = new Lsh(
minCosineSimilarity = 0.1,
dimensions = 20,
numNeighbours = 10,
numPermutations = 2
)
val similariyMatrix = lsh.join(idxmat)

val orderTable=similariyMatrix.entries.groupBy(tup => tup.i).flatMap(tup =>{
tup._2.toList.sortWith((a,b) =>a.value>b.value)
})

val results = orderTable.map {
entry =>
"%s %s %.6f".format(productMap(entry.i), productMap(entry.j), entry.value)
}

results.take(20).foreach(println)
results.saveAsTextFile("this is save path ")

// above will print:
// item:2 item:5 cosine:0.91
// item:1 item:5 cosine:0.91
// item:1 item:2 cosine:1,00
}
def datatransform(df:DataFrame) ={
val r = df.rdd
val affData = r.flatMap(row => {
val result: ArrayBuffer[(String, String)] = new ArrayBuffer[(String, String)]()
val ifa: String = if (row(0) != null) row.getString(0) else null
val bundle: Array[String] = if (row(1) != null) row.getSeq[String](1).toArray[String] else null
if (bundle != null && bundle.length > 1) {
bundle.foreach(b => {
result.append((ifa, b.trim))
})
}
result
}).filter(x => x._1 != null && x._2 != null )
val stringData=affData.map(x =>(x._1,x._2,1))

//get distinct names and products and create maps from them
val ifaname = stringData.map(_._1).distinct.sortBy(x => x).zipWithIndex.collectAsMap
val products = stringData.map(_._2).distinct.sortBy(x => x).zipWithIndex.collectAsMap

val data_rating=stringData.map(r => Rating(ifaname(r._1).toInt,products(r._2).toInt,r._3))
val reproducts=products.map(line=>(line._2,line._1))
(data_rating,reproducts)
}
def datatransform_formkpixel(df:DataFrame) ={
val r = df.rdd
val affData = r.flatMap(row => {
val result: ArrayBuffer[(String, String)] = new ArrayBuffer[(String, String)]()
val ifa: String = if (row(8) != null) row.getString(8) else null
val content = if (row(16) != null) row.getString(16) else null
val content_ids: Array[String] = if (content != null) content.split(",") else null
if (content_ids != null && content_ids.length > 1) {
content_ids.foreach(b => {
result.append((ifa, b.trim))
})
}
result
}).filter(x => x!=null && x._1 != null && x._2 != null)
val stringData=affData.map(x =>(x._2,x._1,1)) //(content id,ifa,1)

//get distinct names and products and create maps from them
val ifaname = stringData.map(_._1).distinct.sortBy(x => x).zipWithIndex.collectAsMap
val products = stringData.map(_._2).distinct.sortBy(x => x).zipWithIndex.collectAsMap

val data_rating=stringData.map(r => Rating(ifaname(r._1).toInt,products(r._2).toInt,r._3))
val reproducts=products.map(line=>(line._2,line._1))
(data_rating,reproducts)
}

def getLastNDaysPath(days: Int ): String = {
//println(date)
val dateFormat: SimpleDateFormat = new SimpleDateFormat( "yyyyMMdd" )
val date=dateFormat.format(new Date())
val dateF: Date = dateFormat.parse(date)
val cal: Calendar = Calendar.getInstance()
cal.setTime(dateF)
val dateArr = new ArrayBuffer[String]()
//dateArr.append(date)
var i = 1
while ( i <= days ) {
cal.add( Calendar.DATE, -i )
dateArr.append(dateFormat.format( cal.getTime() ))
i = i + 1
cal.setTime(dateF)
}
val path = "this is path prefix/{"+ dateArr.mkString(",") +"}/*/*"
println(path)
path
}
}
Read more »

上篇讲解了GBDT算法的实现,我们需要对模型结果进行可视化。注意基于Spark版本的模型存储需要调用model.nativeBooster.saveModel,保证模型文件通用性。分布式文件无法在线上预测中使用,也不方便模型可视化。详情参考我的GBDT算法原理分析,模型可视化展示决策树。

考虑到实际生产中需要显示特征名称,而不仅仅是特征编号,源码中新增了featureconf配置,可以更好地展示决策树。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import xgboost as xgb
import codecs
import os
import sys
import getopt


def main(argv):
modelfile = ""
featureconf = ""

try:
opts, args = getopt.getopt(argv, "hm:f:", ["modelfile=", "featureconf="])
except getopt.GetoptError:
print
'plotTree.py -m <modelfile> -f <featureconf> '
for opt, arg in opts:
if opt == '-h':
print
'plotTree.py -m <modelfile> -f <featureconf>'
sys.exit()
elif opt in ("-m", "--modelfile"):
modelfile = arg
elif opt in ("-f", "--featureconf"):
featureconf = arg

fmap = {}
if os.path.exists(modelfile):
if os.path.exists(featureconf):
if os.path.exists("./featuremap.txt"):
os.remove("./featuremap.txt")
filemap = open("./featuremap.txt", "wb")
filemap.write(("0 null q" + '\n').encode())

for line in open(featureconf, "rb"):
# print(line)
content = line.decode().strip('\n').split('\t')
# print(content)
if len(content) >= 3:
fmap[content[1]] = content[2]
temp = (content[1] + " " + content[2] + " " + "q" + '\n').encode()
filemap.write(temp)
filemap.close()
xgb_model = xgb.Booster()
xgb_model.load_model(modelfile)
score = xgb_model.get_fscore()
importance = {}
for key, value in score.items():
temp = key[1:None]
if temp in fmap:
importance[fmap[temp]] = int(value)
else:
importance[key] = int(value)
sort_importance = sorted(importance.items(), key=lambda importance: importance[1])

for values in sort_importance[::-1]:
print(values)
tree = xgb_model.get_dump()
i = 0
while i < len(tree):
f = codecs.open(modelfile + "_" + str(i)+'.png', mode='wb')
g = xgb.to_graphviz(xgb_model, './featuremap.txt', i)
f.write(g.pipe('png'))
f.close()
i = i+1
print("success!")


if __name__ == "__main__":
main(sys.argv[1:])

实际效果如下:

Read more »

spark shuffle 是spark job中某些算子触发的操作,更详细点说,当rdd依赖中出现宽依赖的时候,就会触发shuffle 操作,shuffle 操作通常会伴随着不同executor/host之间数据的复制,也正因如此,导致shuffle 的代价高以及对应的复杂性。

举个最简单的例子,spark 中的算子 reduceByKey,该算子会生成一个新的rdd,这个新rdd中会对父rdd中相同key的value 按照指定的函数操作形成一个新的value。复杂的地方在于,相同的key 数据可能存在于父rdd的多个partition中,这就需要我们读取所有partition 中相同key值的数据然后聚合再做计算,这就是一个典型的shuffle操作。

产生shuffle 的算子大致有以下三类:

reparition 操作:repartition, coalesce等

ByKey操作:groupByKey reduceByKey等。

join操作:cogroup join.

shuffle 操作通常会伴随着磁盘io,数据的序列化/反序列化,网络io,这些操作相对比较耗时间,往往会成为一个分布式计算任务的瓶颈,spark 也为此花了大力气进行spark shuffle的优化。

在map阶段,除了map的业务逻辑外,还有shuffle write的过程,这个过程涉及到序列化、磁盘IO等耗时操作;在reduce阶段,除了reduce的业务逻辑外,还有前面shuffle read过程,这个过程涉及到网络IO、反序列化等耗时操作。所以整个shuffle过程是极其昂贵的,spark在shuffle的实现上也做了很多优化改进,随着版本的迭代发布,spark shuffle的实现也逐步得到改进。

在map阶段(shuffle write),会按照partition id以及key对记录进行排序,将所有partition的数据写在同一个文件中,该文件中的记录首先是按照partition id排序一个一个分区的顺序排列,每个partition内部是按照key进行排序存放,map task运行期间会顺序写每个partition的数据,并通过一个索引文件记录每个partition的大小和偏移量。这样一来,每个map task一次只开两个文件描述符,一个写数据,一个写索引,大大减轻了Hash Shuffle大量文件描述符的问题,即使一个executor有K个core,那么最多一次性开K*2个文件描述符。

在reduce阶段(shuffle read),reduce task拉取数据做combine时不再是采用HashMap,而是采用ExternalAppendOnlyMap,该数据结构在做combine时,如果内存不足,会刷写磁盘,很大程度的保证了鲁棒性,避免大数据情况下的OOM。

总体上看来Sort Shuffle解决了Hash Shuffle的所有弊端,但是因为需要其shuffle过程需要对记录进行排序,所以在性能上有所损失。

Read more »

GBDT 是常用的机器学习算法之一,因其出色的特征自动组合能力和高效的运算大受欢迎。这里简单介绍一下 GBDT 算法的原理.

  • 1、决策树的分类

决策树分为两大类,分类树和回归树。

分类树用于分类标签值,如晴天/阴天/雾/雨、用户性别、网页是否是垃圾页面;

回归树用于预测实数值,如明天的温度、用户的年龄、网页的相关程度;

两者的区别:

  • 分类树的结果不能进行加减运算,晴天 晴天没有实际意义;
  • 回归树的结果是预测一个数值,可以进行加减运算,例如 20 岁 3 岁=23 岁。
  • GBDT 中的决策树是回归树,预测结果是一个数值,在点击率预测方面常用 GBDT,例如用户点击某个内容的概率

2、GBDT 概念

GBDT 的全称是 Gradient Boosting Decision Tree,梯度提升决策树。要理解 GBDT,首先就要理解这个 B(Boosting)。Boosting 是一族可将弱学习器提升为强学习器的算法,属于集成学习(ensemble learning)的范畴。Boosting 方法基于这样一种思想:对于一个复杂任务来说,将多个专家的判断进行适当的综合所得出的判断,要比其中任何一个专家单独的判断要好。通俗地说,就是”三个臭皮匠顶个诸葛亮”的道理。基于梯度提升算法的学习器叫做 GBM(Gradient Boosting Machine)。理论上,GBM 可以选择各种不同的学习算法作为基学习器。GBDT 实际上是 GBM 的一种情况。演示例子:

考虑一个简单的例子来演示 GBDT 算法原理。

Read more »

XGBoost提供了maven包,配置后即可使用,核心代码如下,以下代码可以直接运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import java.text.SimpleDateFormat
import java.util.{Calendar, Date}

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.{SparkConf, SparkContext, mllib}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.recommendation.ALS
import com.soundcloud.lsh.{Cosine, Lsh, NearestNeighbours}
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.mllib.recommendation.Rating

import scala.collection.mutable.ArrayBuffer
object CosineLSHJoinSpark {

def main(args: Array[String]){

val conf = new SparkConf().setAppName("CosineLSHJoinSpark")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.format("orc").load(getLastNDaysPath(15))
df.show

val (ratings,productMap) = datatransform_formkpixel(df)
ratings.take(20).foreach(println)
// Build the recommendation model using ALS
val rank = 10
val numIterations = 10
val model = ALS.train(ratings, rank, numIterations, 0.02)

val vectorArray: ArrayBuffer[IndexedRow] = new ArrayBuffer[IndexedRow]()
val fdata=model.productFeatures.collect()

for(i <- 0 until fdata.length) {
if(fdata(i)!=null && fdata(i)._1!=null && fdata(i)._2!=null) {
val t=fdata(i) //(int,Array[double]) (productID,vector)
val ir=IndexedRow(i,Vectors.dense(t._2))
vectorArray.append(ir)
}
}
val idxrows = sc.parallelize(vectorArray)
val idxmat: IndexedRowMatrix = new IndexedRowMatrix(idxrows)
/*
val rows = Seq(
IndexedRow(1, Vectors.dense(1.0, 1.0, 1.0)),
IndexedRow(2, Vectors.dense(2.0, 2.0, 2.0)),
IndexedRow(5, Vectors.dense(6.0, 3.0, 2.0))
)
val matrix = new IndexedRowMatrix(sparkSession.sparkContext.parallelize(rows))
*/
val lsh = new Lsh(
minCosineSimilarity = 0.1,
dimensions = 20,
numNeighbours = 10,
numPermutations = 2
)
val similariyMatrix = lsh.join(idxmat)

val orderTable=similariyMatrix.entries.groupBy(tup => tup.i).flatMap(tup =>{
tup._2.toList.sortWith((a,b) =>a.value>b.value)
})

val results = orderTable.map {
entry =>
"%s %s %.6f".format(productMap(entry.i), productMap(entry.j), entry.value)
}

results.take(20).foreach(println)
results.saveAsTextFile("this is save path ")

// above will print:
// item:2 item:5 cosine:0.91
// item:1 item:5 cosine:0.91
// item:1 item:2 cosine:1,00
}
def datatransform(df:DataFrame) ={
val r = df.rdd
val affData = r.flatMap(row => {
val result: ArrayBuffer[(String, String)] = new ArrayBuffer[(String, String)]()
val ifa: String = if (row(0) != null) row.getString(0) else null
val bundle: Array[String] = if (row(1) != null) row.getSeq[String](1).toArray[String] else null
if (bundle != null && bundle.length > 1) {
bundle.foreach(b => {
result.append((ifa, b.trim))
})
}
result
}).filter(x => x._1 != null && x._2 != null )
val stringData=affData.map(x =>(x._1,x._2,1))

//get distinct names and products and create maps from them
val ifaname = stringData.map(_._1).distinct.sortBy(x => x).zipWithIndex.collectAsMap
val products = stringData.map(_._2).distinct.sortBy(x => x).zipWithIndex.collectAsMap

val data_rating=stringData.map(r => Rating(ifaname(r._1).toInt,products(r._2).toInt,r._3))
val reproducts=products.map(line=>(line._2,line._1))
(data_rating,reproducts)
}
def datatransform_formkpixel(df:DataFrame) ={
val r = df.rdd
val affData = r.flatMap(row => {
val result: ArrayBuffer[(String, String)] = new ArrayBuffer[(String, String)]()
val ifa: String = if (row(8) != null) row.getString(8) else null
val content = if (row(16) != null) row.getString(16) else null
val content_ids: Array[String] = if (content != null) content.split(",") else null
if (content_ids != null && content_ids.length > 1) {
content_ids.foreach(b => {
result.append((ifa, b.trim))
})
}
result
}).filter(x => x!=null && x._1 != null && x._2 != null)
val stringData=affData.map(x =>(x._2,x._1,1)) //(content id,ifa,1)

//get distinct names and products and create maps from them
val ifaname = stringData.map(_._1).distinct.sortBy(x => x).zipWithIndex.collectAsMap
val products = stringData.map(_._2).distinct.sortBy(x => x).zipWithIndex.collectAsMap

val data_rating=stringData.map(r => Rating(ifaname(r._1).toInt,products(r._2).toInt,r._3))
val reproducts=products.map(line=>(line._2,line._1))
(data_rating,reproducts)
}

def getLastNDaysPath(days: Int ): String = {
//println(date)
val dateFormat: SimpleDateFormat = new SimpleDateFormat( "yyyyMMdd" )
val date=dateFormat.format(new Date())
val dateF: Date = dateFormat.parse(date)
val cal: Calendar = Calendar.getInstance()
cal.setTime(dateF)
val dateArr = new ArrayBuffer[String]()
//dateArr.append(date)
var i = 1
while ( i <= days ) {
cal.add( Calendar.DATE, -i )
dateArr.append(dateFormat.format( cal.getTime() ))
i = i + 1
cal.setTime(dateF)
}
val path = "this is path prefix/{"+ dateArr.mkString(",") +"}/*/*"
println(path)
path
}
}

部分功能需要重新编译xgboost4j-spark,如需AWS S3服务(直接读取和存储文件至S3路径)

Read more »

常用的机器学习库/框架

Scikit-learn/sklearn当前最新版本(2019-12)不支持GPU和深度学习算法,不支持跨节点分布式训练,但是支持单节点多CPU并行训练。

sparkit-learn ,spark-sklearn以及sparkml都不同,sparkml又分为rdd-based api和dataframe-based api,rdd-based api以后不准备维护了(spark3.0要废弃这个api),dataframe-based api更容易使用。

spark-sklearn是一个把spark和sklearn集成的包,api接口尽量与sklearn类似,对于分布式训练,spark-sklearn只能对超参数搜索和交叉验证这样的简单task支持跨节点分布式训练。
sparkit-learn可以对底层算法做分布式训练,和sklearn是更深度的集成。
sparkml的api则和sklearn没有关系,但是它支持跨节点分布式训练,当前不直接支持GPU。
一般来说,sklearn适合做原型开发或者单机训练,而生产环境中如果数据集很大并且存在HDFS中,可以考虑用sparkml。

Tensorflow也有一些built-in的传统机器学习模型比如GBDT等。

虽然TF可以做传统机器学习,但是TF更适合用来做深度学习。
	TF没有sklearn那么丰富的特征工程的API。
有时候为了用1个框架统一做传统机器学习和深度学习,可以选择TF。
可以把sklearn和TF结合起来使用:Sklearn做特征工程,TF用来训练模型。
如果要用到高基数category特征,用TF来做embedding可能是更好的选择。
	比如在计算广告CTR预估,推荐系统排序模型,个性化搜索排序模型这些场景更推荐使用TF做embedding并基于深度神经网络来做。
	当然TF做embedding,在用其他库做传统机器学习模型这样的pipeline也可以。
如果不需要用embedding encoding,优先选择sklearn,Xgboost,lightgbm,catboost, sparkml这些库。

当前GPU加速已经应用到深度学习中,并且正在集成到传统机器学习模型中,包括sklearn,sparkml,tensorflow等。

Read more »

模型部署前的工程化思考

模型后处理:

在训练完模型上线之前,可能需要所谓的模型后处理阶段,这个一般在深度学习中是必须要考虑的:
	根据模型部署的runtime环境和一些需求来决定是否需要对模型进行压缩(比如利用deep compress),从而在损失一点模型性能的情况下得到更小的模型。
	考虑是否需要对模型进行基于目标runtime的编译优化
		比如利用SM的Neo或者Nvidia tensorRT。

如何集成到当前的业务系统中:

是单独基于REST API做serving还是把inference代码集成到现有代码中。
训练时生成的模型,在预测时如何识别该模型。
	是否训练和预测时使用不同的语言或者框架,
	是否需要某种通用的模型格式比如ONNX。

模型预测时的并发和QPS要求:

单机还是多机?
使用GPU还是CPU?
如何更好的利用GPU的计算和显存资源?

模型inference的runtime环境:

模型跑在单独的虚机,还是容器中。

模型的版本控制:

Read more »