日志

Spark 大规模稀疏矩阵乘法

spark 可以通过 BlockMatrix 进行矩阵相乘,但其在大规模稀疏矩阵场景有非常严重的性能问题,本文通过基于 RDD 和 DataFrame 两种方式实现基于 spark 的大规模稀疏矩阵乘法运算。

一、矩阵乘法运算

1

二、通过 BlockMatrix 进行矩阵相乘

3

from pyspark.mllib.linalg.distributed import *
from pyspark.sql import SparkSession

ss = SparkSession.builder.appName("test") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .getOrCreate()

sc = ss.sparkContext
sc.setLogLevel("WARN")

M_rdd = sc.parallelize([(0,0, 1), (0,1, 2), (0,2, 3), (1,0, 4), (1,1, 5), (1,2, 6)])
N_rdd = sc.parallelize([(0,0, 7), (0,1, 8), (1,0, 9), (1,1, 10), (2,0, 11), (2,1, 12)])
M = CoordinateMatrix(M_rdd).toBlockMatrix()
N = CoordinateMatrix(N_rdd).toBlockMatrix()
M.multiply(N).toCoordinateMatrix().entries.collect()

######## 输出 #############
# [MatrixEntry(0, 0, 58.0),
#  MatrixEntry(1, 0, 139.0),
#  MatrixEntry(0, 1, 64.0),
#  MatrixEntry(1, 1, 154.0)]

三、BlockMatrix 的性能问题

在 spark 的官方文档中关于 multiply 这个方法的描述如下

Left multiplies this BlockMatrix by other, another BlockMatrix. The colsPerBlock of this matrix must equal the rowsPerBlock of other. If other contains any SparseMatrix blocks, they will have to be converted to DenseMatrix blocks. The output BlockMatrix will only consist of DenseMatrix blocks. This may cause some performance issues until support for multiplying two sparse matrices is added.

也就是说,BlockMatrix 在进行矩阵乘法时会先把稀疏矩阵转换成稠密矩阵!!对于一个 10000 x 10000 的稀疏矩阵,实际存储的可能只有几万个非零元素,而转换成稠密矩阵后,你需要对所有 10000 x 10000 = 1亿 个元素提供存储空间!!而实际场景面对的稀疏矩阵的维度远远大于 10000,所以 BlockMatrix 无法适用于大规模稀疏矩阵运算。

四、矩阵乘法公式

CodeCogsEqn
图片 1
从矩阵乘法公式可知,矩阵相乘主要有以下环节:
1.左矩阵的列号(j)与右矩阵的行号(j)相同的元素进行两两相乘得到 MN_ik。
2.对所有具有相同下标(ik)的 MN_ik 进行相加,即得到 P_ik。

五、基于 RDD 实现矩阵乘法

阅读全文

日志

Hadoop NameNode 高可用架构

NameNode是HDFS(hadoop分布式文件系统)的核心组件,在hadoop 1.x中NameNode存在SPOF(单点故障)问题,NameNode存储了HDFS的元数据信息,一旦NameNode宕机那么整个HDFS就无法访问,依赖HDFS的服务也会被波及(HBase、Hive…)同样无法访问,整个集群陷入瘫痪。NameNode的单点故障问题也使得Hadoop在1.x时代一直都只能用作离线存储和离线计算,无法满足对高可用要求很高的应用场景。Hadoop2.x针对NameNode的SPOF问题提出了高可用架构方案(HA),目前已经能在生产环境下应用。本文主要介绍该高可用架构的主备切换机制。

一、NameNode高可用架构

Hadoop NameNode高可用架构

Hadoop NameNode高可用架构

二、组件概述

Active NameNode 与 Standby NameNode

在NameNode的HA方案中有两个不同状态的NameNode,分别为活跃态(Active)和后备态(Standby),其中只有Active NameNode能对外提供服务,Standby NameNode会根据Active NameNode的状态变化,在必要时可切换成Active.

ZKFC

ZKFC即ZKFailoverController,是基于Zookeeper的故障转移控制器,它负责控制NameNode的主备切换,ZKFC会监测NameNode的健康状态,当发现Active NameNode出现异常时会通过Zookeeper进行一次新的选举,完成Active和Standby状态的切换

HealthMonitor

阅读全文

日志

Hadoop之YARN/MRv2

YARN又称为Mapreduce version 2(MRv2)是hadoop2.x的新架构,它将旧Hadoop Mapreduce框架中的JobTracker的资源管理和作业生命周期管理拆分成两个组件即ResourceManager(RM)和ApplicationMaster(AM)

一、为何需要MRv2?

mr1vsmr2

MRv1与MRv2对比

MRv1资源管理问题

  1. Hadoop1.0引入了“slot”的概念,每个slot代表了各个节点上的一份资源(CPU、内存等),MRv1把Map和Reduce的资源单独区分,即Map slot、Reduce slot,两个阶段的slot不能共享,这意味着资源的利用率大大降低
  2. 非MR应用不能分享资源,所以只能运行MR计算框架的应用
  3. 每个集群只有一个JobTracker,限制了集群的扩展,集群规模限制在4000个节点左右

MRv2资源管理方案

  1. 舍弃“slot”的概念,每个节点以“资源”(CPU、内存等)为单位分配给有需要的应用
  2. 支持运行MR应用和非MR应用
  3. JobTracker的大量功能被迁移到ApplicationMaster(AM),集群内可以存在多个AM(每个应用程序都拥有一个独立的AM),集群可以扩展到上万个节点

二、YARN架构

YARN_Architecture

YARN架构图(via Apache Hadoop)

资源管理器(ResourceManager,RM)

ResourceManager运行在主节点(Master)上,负责全局资源调度(分配/回收),处理各个应用的资源请求,ResourceManager由调度器(Scheduler)和应用管理器(ApplicationsManager, AsM)组成

  • 调度器(Scheduler)
    调度器根据资源调度策略(例如Capacity Scheduler、Fair Scheduler),将包含适当资源(CPU、内存等)的资源容器(Container)分配给相应的节点,应用程序的各个任务均在容器内执行,且只能使用容器分配到的资源.调度器只负责资源调度,不关心应用的执行状态.
    阅读全文
日志

Hadoop之HDFS(NameNode、DataNode、SecondaryNameNode)

HDFS(hadoop分布式文件系统)是Hadoop的核心组成部分,HDFS采用master/slave架构,一个HDFS集群由一个NameNode(不考虑HA/Federation)和多个DataNode组成

hdfs架构

HDFS架构图(via Apache Hadoop)

一、NameNode

  • NameNode是HDFS的中心,也称作Master
  • NameNode只保存HDFS的元数据,负责管理HDFS的命名空间(namespace)和控制文件的访问操作
  • NameNode不保存任何实际的数据或数据集,真正的数据由DataNode负责存储
  • NameNode拥有HDFS内所有文件的数据块(blocks)列表及其位置,因而NameNode能通过这些数据块信息来重构对应的文件
  • NameNode是HDFS的核心,一旦NameNode挂了,整个集群将无法访问
  • NameNode具有单点故障问题(Hadoop2之后可以通过High Available方案解决)
  • NameNode需要配置相对较多的内存(相比DataNode而言),因为NameNode会把HDFS的命名空间和文件数据块映射(Blockmap)保存在内存中,这也意味着集群的横向扩展受到NameNode的限制,因为集群增长到一定的规模后NameNode需要的内存也会更大,另外由于所有的元数据操作都需要通过NameNode进行,这意味着集群的性能受到NameNode的限制(Hadoop2之后可以通过Federation方案解决)
  • NameNode有两个核心的数据结构,FsImage和EditsLog,FsImage是HDFS命名空间、文件数据块映射、文件属性等信息的镜像,EditsLog相当于一个日志文件,它记录了对HDFS元数据进行修改的所有事务操作,当NameNode启动时会首先合并FsImage和EditsLog,得到HDFS的最新状态然后写入FsImage镜像文件中,并使用一个新的EditsLog文件进行记录

SecondaryNameNode

“SecondaryNamenode”这个名字具有误导性,它不能和DataNode交互,更不能替代NameNode,相反它是用来弥补NameNode的一些缺点,由于NameNode启动时会合并FsImage和EditsLog,但随着集群的运行时间变长,EditsLog会变得非常庞大,这意味着下一次启动需要花很长的时间来进行合并操作.

SecondaryNameNode负责解决以上的问题
阅读全文

日志

kafka集群部署

Kafka是一个分布式的,可分区的,支持冗余备份的日志服务,提供消息系统功能,它相当于一条插口式的高速数据总线,主要用于处理活跃的流式数据,能有效降低系统组网的复杂度,和编程复杂度。本文主要介绍如何部署Kafka集群。

Kafka集群依赖Zookeeper集群协调管理,所以部署Kafka集群之前我们需要先搭建好Zookeeper Cluster

集群环境

假设有以下集群节点
hadoop1(192.168.10.1)
hadoop2(192.168.10.2)
hadoop3(192.168.10.3)

Zookeeper集群搭建

1)下载Zookeeper

Zookeeper Download

2)安装

将安装包解压到集群各个节点的指定目录下($ZK_HOME)

3)配置

在$ZK_HOME/conf/创建zoo.cfg配置文件

#基本时间单位,控制心跳和超时的基准
tickTime=2000

#允许 follower (相对于 leader 而言的“客户端”)连接并同步到 leader 的初始化连接时间.
#它以 tickTime 的倍数来表示。当超过设置倍数的 tickTime 时间,则连接失败.
initLimit=10

#leader 与 follower 之间发送消息,请求和应答时间长度.
#如果 follower 在设置的时间内不能与leader 进行通信,那么此 follower 将被丢弃.
syncLimit=5

#自动清理事务日志和快照的时间间隔(小时)
autopurge.purgeInterval=24

#需要保留的快照文件数
autopurge.snapRetainCount=30

#存储数据的位置
dataDir=/hadoop/zookeeper

#监听客户端连接端口
clientPort=2181

#指定集群的机器配置,配置格式为“server.id=host:port:port”,其中id必须唯一.
#并且需要在各台机器的dataDir下创建一个myid的文件,在myid文件中输入对应机器的id值.
#另外,第一个端口( 2888 )是follower机器连接到leader机器的端口,第二个端口(3888)是用来进行leader选举的端口.
server.1=hadoop1:2888:3888
server.2=hadoop2:2888:3888
server.3=hadoop3:2888:3888

4)运行

在各个节点上启动zookeeper

$ZK_HOME/bin/zkServer.sh start

观察zookeeper运行状态

$ZK_HOME/bin/zkServer.sh status

Kafka集群搭建

阅读全文

日志

Elasticsearch数据备份和恢复

无论从容灾还是容错角度来看,数据的安全性都十分重要,Elasticsearch(以下简称ES)提供了Snapshot和Restore模块,用于对单个索引甚至整个集群进行备份和恢复。

环境

Elasticsearch 2.1.1

基本流程

ES的Snapshot/Restore流程可以概括为以下几个步骤

1)创建用于备份的远程仓库

2)往仓库创建快照

3)检查快照状态

4)恢复已备份快照

具体操作

阅读全文

日志

Spark集群资源动态分配

Spark默认采取预分配方式给各个application分配资源,每个application会独占所有分配到的资源直到整个生命周期的结束.对于长周期任务,在workload低峰阶段空闲的资源将一直被抢占而得不到有效利用,无疑是相当浪费.Spark1.2开始引入动态资源分配(Dynamic Resource Allocation)机制,支持资源弹性分配.

动态资源分配

Spark的资源动态分配机制主要基于application的当前任务(task)负载,以executor为粒度(以Spark1.2为例)动态向集群申请或释放资源,这意味着空闲的资源将得到有效的回收,供其他application利用.

Spark1.2仅支持Yarn模式,从Spark1.6开始,支持standalone、Yarn、Mesos.

安装与配置

1. Spark配置

阅读全文

日志

通过Sqoop向Hive导入ORC表

Sqoop在很长一段时间都只支持导入为textfile、avrofile、sequencefile等格式,如果需要将数据导入为ORC、parquet等格式的Hive Table往往需要分两个步骤完成(先导出临时表,再通过Hive转换)。而从Sqoop 1.4.4开始,Sqoop集成了HCatalog,我们可以轻易地实现多格式支持。

HCatalog配置

Sqoop需要依赖HCatalog的lib,所以需要配置环境变量$HCAT_HOME,一般从hive目录下即可找到hcatalog的相关路径


导入命令

sqoop import 
--connect jdbc:mysql://127.0.0.1:3306/test 
--username your_user_name --password your_passwd 
--table table_name --driver com.mysql.jdbc.Driver 
--create-hcatalog-table 
--hcatalog-table table_name 
--hcatalog-partition-keys month,day 
--hcatalog-partition-values 12,09 
--hcatalog-storage-stanza 'stored as orc tblproperties ("orc.compress"="SNAPPY")'

参数说明

阅读全文

日志

E.L.K搭建实时日志分析平台

环境

Linux:Ubuntu 14.04.2 LTS

Java:1.8.0

Elasticsearch安装

Elasticsearch(以下简称ES)是一个基于Lucence的搜索服务器,具有高效的实时分析能力,搭建ES需要安装Java环境,设置好JAVA_HOME参数即可.

1)下载ES(本例为2.2.0)

点击下载ES

2)解压后在根目录执行以下命令启动ES

bin/elasticsearch

3)检查是否安装成功

curl -X GET http://localhost:9200

 返回以下信息则搭建成功

{
  "name" : "Prodigy",
  "cluster_name" : "elasticsearch",
  "version" : {
    "number" : "2.2.0",
    "build_hash" : "8ff36d139e16f8720f2947ef62c8167a888992fe",
    "build_timestamp" : "2016-01-27T13:32:39Z",
    "build_snapshot" : false,
    "lucene_version" : "5.4.1"
  },
  "tagline" : "You Know, for Search"
}

Kibana安装

Kibana是一个数据可视化平台,它可以让你通过惊艳、强大的制图进行数据交互.

1)下载Kibana(本例为4.4.0)

点击下载(Kibana4.4.0-Linux-64Bit)

其他版本下载(注意Kibana版本是否兼容你所安装的ES版本)

https://www.elastic.co/downloads/kibana

2)配置Kibana

阅读全文

日志

Thrift操作HBase一段时间后服务挂掉的问题

最近在python项目中通过thrift API操作HBase,发现thrift server在执行操作后运行一段时间就会crash,一番折腾发现这是HBase-thrift的天炸BUG

抛出异常

[thrift-worker-11] thrift.ThriftServerRunner$HBaseHandler: Can't get the location
at org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:309)  at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:153)
at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:61)
at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
    ...

异常原因描述

当空闲的连接从thrift server被清除的时候,该连接之下的table实例依然缓存在线程的local cache中,当再次操作table的时候自然会出现 Can’t get the location 的异常,因为连接早已关闭.


异常影响

受影响版本:

0.98.13, 1.1.1, 1.0.1.1, 1.1.0.1

已修复版本:

2.0.0, 0.98.14, 1.0.2, 1.2.0, 1.1.2, 1.3.0

Thrift影响:

该Bug出现在Thrift1中,Thrift2没有这个问题


解决方法

替换已修复版本的jar依赖,注意版本兼容问题(例如:v1.1.1的修复参考请替换为v1.1.2

转载请注明出处:

© http://hejunhao.me

第 1 页,共 2 页12