spark 可以通过 BlockMatrix 进行矩阵相乘,但其在大规模稀疏矩阵场景有非常严重的性能问题,本文通过基于 RDD 和 DataFrame 两种方式实现基于 spark 的大规模稀疏矩阵乘法运算。
一、矩阵乘法运算
二、通过 BlockMatrix 进行矩阵相乘
from pyspark.mllib.linalg.distributed import * from pyspark.sql import SparkSession ss = SparkSession.builder.appName("test") \ .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \ .getOrCreate() sc = ss.sparkContext sc.setLogLevel("WARN") M_rdd = sc.parallelize([(0,0, 1), (0,1, 2), (0,2, 3), (1,0, 4), (1,1, 5), (1,2, 6)]) N_rdd = sc.parallelize([(0,0, 7), (0,1, 8), (1,0, 9), (1,1, 10), (2,0, 11), (2,1, 12)]) M = CoordinateMatrix(M_rdd).toBlockMatrix() N = CoordinateMatrix(N_rdd).toBlockMatrix() M.multiply(N).toCoordinateMatrix().entries.collect() ######## 输出 ############# # [MatrixEntry(0, 0, 58.0), # MatrixEntry(1, 0, 139.0), # MatrixEntry(0, 1, 64.0), # MatrixEntry(1, 1, 154.0)]
三、BlockMatrix 的性能问题
在 spark 的官方文档中关于 multiply 这个方法的描述如下
Left multiplies this BlockMatrix by other, another BlockMatrix. The colsPerBlock of this matrix must equal the rowsPerBlock of other. If other contains any SparseMatrix blocks, they will have to be converted to DenseMatrix blocks. The output BlockMatrix will only consist of DenseMatrix blocks. This may cause some performance issues until support for multiplying two sparse matrices is added.
也就是说,BlockMatrix 在进行矩阵乘法时会先把稀疏矩阵转换成稠密矩阵!!对于一个 10000 x 10000 的稀疏矩阵,实际存储的可能只有几万个非零元素,而转换成稠密矩阵后,你需要对所有 10000 x 10000 = 1亿 个元素提供存储空间!!而实际场景面对的稀疏矩阵的维度远远大于 10000,所以 BlockMatrix 无法适用于大规模稀疏矩阵运算。
四、矩阵乘法公式
从矩阵乘法公式可知,矩阵相乘主要有以下环节:
1.左矩阵的列号(j)与右矩阵的行号(j)相同的元素进行两两相乘得到 MN_ik。
2.对所有具有相同下标(ik)的 MN_ik 进行相加,即得到 P_ik。
五、基于 RDD 实现矩阵乘法
思路
- 先通过 map 函数构建以左矩阵的列号(j)和右矩阵的行号(j)作为 key 的数据结构。即 (j, (i,L_val)) 和 (j, (k,R_val))。
- 对上一步骤进行 join 运算得到 (j, [(i,L_val), (k,R_val)])。
- 对上一步骤的值通过 map 函数构建以左矩阵的行号(i)和右矩阵的列号(k)作为 key , 且对应元素的积作为 value 的数据结构。即 ((i,k), L_val*R_val)。
- 对上一步骤进行 reduceByKey, 对 (i,k) 相同的两两结果进行相加即得到第i行第k列的结果, 即 P_ik。
实现
from pyspark.mllib.linalg.distributed import * from pyspark.sql import SparkSession ss = SparkSession.builder.appName("test") \ .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \ .getOrCreate() sc = ss.sparkContext sc.setLogLevel("WARN") M_rdd = sc.parallelize([(0, 0, 1), (0, 1, 2), (0, 2, 3), (1, 0, 4), (1, 1, 5), (1, 2, 6)]) N_rdd = sc.parallelize([(0, 0, 7), (0, 1, 8), (1, 0, 9), (1, 1, 10), (2, 0, 11), (2, 1, 12)]) M = CoordinateMatrix(M_rdd) N = CoordinateMatrix(N_rdd) M = M.entries.map(lambda entry: (entry.j, (entry.i, entry.value))) N = N.entries.map(lambda entry: (entry.i, (entry.j, entry.value))) matrix_entries = M.join(N).values().map( lambda x: ((x[0][0], x[1][0]), x[0][1] * x[1][1]) ).reduceByKey( lambda x, y: x + y ).map( lambda x: MatrixEntry(x[0][0], x[0][1], x[1]) ) matrix = CoordinateMatrix(matrix_entries) matrix.entries.collect() ######## 输出 ############# # [MatrixEntry(0, 0, 58.0), # MatrixEntry(1, 0, 139.0), # MatrixEntry(0, 1, 64.0), # MatrixEntry(1, 1, 154.0)]
六、基于 DataFrame 实现矩阵乘法
思路
- 基于 DataFrame 方式并非将每一列号作为一个单独字段,而是将矩阵里面的每一个元素看成一个三维数据,即(行号,列号,值),将一个矩阵转化为一个包含三个字段的 table,对于一个稀疏矩阵,只需要记录非零元素,即 100 个非零元素就对应 DataFrame 的 100 行。
- 计算逻辑与 RDD 方式非常相似,主要有三个流程 join => groupby => agg。
- 基于 DataFrame 方式能大幅度提升运算速度,因为 DataFrame 相比 RDD 有更佳的优化支持。
实现
from pyspark.mllib.linalg.distributed import * from pyspark.sql import SparkSession, functions as F ss = SparkSession.builder.appName("test") \ .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \ .getOrCreate() sc = ss.sparkContext sc.setLogLevel("WARN") M_rdd = sc.parallelize([(0, 0, 1), (0, 1, 2), (0, 2, 3), (1, 0, 4), (1, 1, 5), (1, 2, 6)]) N_rdd = sc.parallelize([(0, 0, 7), (0, 1, 8), (1, 0, 9), (1, 1, 10), (2, 0, 11), (2, 1, 12)]) M = ss.createDataFrame(M_rdd, ['l_row', 'l_column', 'l_val']) N = ss.createDataFrame(N_rdd, ['r_row', 'r_column', 'r_val']) M.join( N, M['l_column']==N['r_row'] ).groupBy( 'l_row', 'r_column' ).agg( F.sum(M['l_val']*N['r_val']) ).toDF('row', 'column', 'val').show() ######## 输出 ############# # +---+------+---+ # |row|column|val| # +---+------+---+ # | 1| 1|154| # | 0| 1| 64| # | 1| 0|139| # | 0| 0| 58| # +---+------+---+
七、性能对比
1. DataFrame 方式性能上最好;
2. BlockMatrix 方式虽然比 RDD 方式速度更快,但当矩阵维度增大后非常容易出现 OOM 问题;