日志

Spark 大规模稀疏矩阵乘法

spark 可以通过 BlockMatrix 进行矩阵相乘,但其在大规模稀疏矩阵场景有非常严重的性能问题,本文通过基于 RDD 和 DataFrame 两种方式实现基于 spark 的大规模稀疏矩阵乘法运算。

一、矩阵乘法运算

1

二、通过 BlockMatrix 进行矩阵相乘

3

from pyspark.mllib.linalg.distributed import *
from pyspark.sql import SparkSession

ss = SparkSession.builder.appName("test") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .getOrCreate()

sc = ss.sparkContext
sc.setLogLevel("WARN")

M_rdd = sc.parallelize([(0,0, 1), (0,1, 2), (0,2, 3), (1,0, 4), (1,1, 5), (1,2, 6)])
N_rdd = sc.parallelize([(0,0, 7), (0,1, 8), (1,0, 9), (1,1, 10), (2,0, 11), (2,1, 12)])
M = CoordinateMatrix(M_rdd).toBlockMatrix()
N = CoordinateMatrix(N_rdd).toBlockMatrix()
M.multiply(N).toCoordinateMatrix().entries.collect()

######## 输出 #############
# [MatrixEntry(0, 0, 58.0),
#  MatrixEntry(1, 0, 139.0),
#  MatrixEntry(0, 1, 64.0),
#  MatrixEntry(1, 1, 154.0)]

三、BlockMatrix 的性能问题

在 spark 的官方文档中关于 multiply 这个方法的描述如下

Left multiplies this BlockMatrix by other, another BlockMatrix. The colsPerBlock of this matrix must equal the rowsPerBlock of other. If other contains any SparseMatrix blocks, they will have to be converted to DenseMatrix blocks. The output BlockMatrix will only consist of DenseMatrix blocks. This may cause some performance issues until support for multiplying two sparse matrices is added.

也就是说,BlockMatrix 在进行矩阵乘法时会先把稀疏矩阵转换成稠密矩阵!!对于一个 10000 x 10000 的稀疏矩阵,实际存储的可能只有几万个非零元素,而转换成稠密矩阵后,你需要对所有 10000 x 10000 = 1亿 个元素提供存储空间!!而实际场景面对的稀疏矩阵的维度远远大于 10000,所以 BlockMatrix 无法适用于大规模稀疏矩阵运算。

四、矩阵乘法公式

CodeCogsEqn
图片 1
从矩阵乘法公式可知,矩阵相乘主要有以下环节:
1.左矩阵的列号(j)与右矩阵的行号(j)相同的元素进行两两相乘得到 MN_ik。
2.对所有具有相同下标(ik)的 MN_ik 进行相加,即得到 P_ik。

五、基于 RDD 实现矩阵乘法

阅读全文

日志

K-Means聚类算法(实践篇)– 基于Spark Mlib的图像压缩案例

Spark Mlib 机器学习库集成了许多常用的机器学习算法,本文以K-Means算法为例结合图像压缩案例,简单介绍K-Means的应用。关于K-Means算法理论可以参考 → K-Means聚类算法(理论篇)

案例介绍

图像压缩

1)一张图由一系列像素组成,每个像素的颜色都由R、G、B值构成(不考虑Alpha),即R、G、B构成了颜色的三个基本特征,例如一个白色的像素点可以表示为(255,255,255)。

2)一张800×600的图片有480000个颜色数据,通过K-Means算法将这些颜色数据归类到K种颜色中,通过训练模型计算原始颜色对应的颜色分类,替换后生成新的图片。

Spark Mlib K-Means应用(Java + Python)

阅读全文

日志

Spark集群资源动态分配

Spark默认采取预分配方式给各个application分配资源,每个application会独占所有分配到的资源直到整个生命周期的结束.对于长周期任务,在workload低峰阶段空闲的资源将一直被抢占而得不到有效利用,无疑是相当浪费.Spark1.2开始引入动态资源分配(Dynamic Resource Allocation)机制,支持资源弹性分配.

动态资源分配

Spark的资源动态分配机制主要基于application的当前任务(task)负载,以executor为粒度(以Spark1.2为例)动态向集群申请或释放资源,这意味着空闲的资源将得到有效的回收,供其他application利用.

Spark1.2仅支持Yarn模式,从Spark1.6开始,支持standalone、Yarn、Mesos.

安装与配置

1. Spark配置

阅读全文

日志

大数据的技术生态圈

本文来源知乎一个题为“如何用形象的比喻描述大数据的技术生态?”的精彩回答

大数据本身是个很宽泛的概念,Hadoop生态圈(或者泛生态圈)基本上都是为了处理超过单机尺度的数据处理而诞生的。你可以把它比作一个厨房所以需要的各种工具。锅碗瓢盆,各有各的用处,互相之间又有重合。你可以用汤锅直接当碗吃饭喝汤,你可以用小刀或者刨子去皮。但是每个工具有自己的特性,虽然奇怪的组合也能工作,但是未必是最佳选择。

大数据,首先你要能存的下大数据

传统的文件系统是单机的,不能横跨不同的机器。HDFS(Hadoop Distributed FileSystem)的设计本质上是为了大量的数据能横跨成百上千台机器,但是你看到的是一个文件系统而不是很多文件系统。比如你说我要获取/hdfs/tmp/file1的数据,你引用的是一个文件路径,但是实际的数据存放在很多不同的机器上。你作为用户,不需要知道这些,就好比在单机上你不关心文件分散在什么磁道什么扇区一样。HDFS为你管理这些数据。

存的下数据之后,你就开始考虑怎么处理数据

虽然HDFS可以为你整体管理不同机器上的数据,但是这些数据太大了。一台机器读取成T上P的数据(很大的数据哦,比如整个东京热有史以来所有高清电影的大小甚至更大),一台机器慢慢跑也许需要好几天甚至好几周。对于很多公司来说,单机处理是不可忍受的,比如微博要更新24小时热博,它必须在24小时之内跑完这些处理。那么我如果要用很多台机器处理,我就面临了如何分配工作,如果一台机器挂了如何重新启动相应的任务,机器之间如何互相通信交换数据以完成复杂的计算等等。这就是MapReduce / Tez / Spark的功能。MapReduce是第一代计算引擎,Tez和Spark是第二代。MapReduce的设计,采用了很简化的计算模型,只有Map和Reduce两个计算过程(中间用Shuffle串联),用这个模型,已经可以处理大数据领域很大一部分问题了。

那什么是Map什么是Reduce?

阅读全文