签到成功

知道了

CNDBA社区CNDBA社区

Storm 概念 与 架构

2019-03-10 17:35 2270 0 转载 Storm
作者: dave

1 Storm 简介

   Apache Storm是一个分布式实时大数据处理系统。Storm设计用于在容错和水平可扩展方法中处理大量数据。它是一个流数据框架,具有最高的摄取率。虽然Storm是无状态的,它通过Apache ZooKeeper管理分布式环境和集群状态。它很简单,您可以并行地对实时数据执行各种操作。

   Apache Storm继续成为实时数据分析的领导者。Storm易于设置和操作,并且它保证每个消息将通过拓扑至少处理一次。

1.1 Apache Storm vs Hadoop

   基本上Hadoop和Storm框架用于分析大数据。两者互补,在某些方面有所不同。Apache Storm执行除持久性之外的所有操作,而Hadoop在所有方面都很好,但滞后于实时计算。

http://www.cndba.cn/cndba/dave/article/3344

下表比较了Storm和Hadoop的属性。

Storm Hadoop
实时流处理 批量处理
无状态 有状态
主/从架构与基于ZooKeeper的协调。主节点称为nimbus,从属节点是主管。 具有/不具有基于ZooKeeper的协调的主-从结构。主节点是作业跟踪器,从节点是任务跟踪器。
Storm流过程在集群上每秒可以访问数万条消息。 Hadoop分布式文件系统(HDFS)使用MapReduce框架来处理大量的数据,需要几分钟或几小时。
Storm拓扑运行直到用户关闭或意外的不可恢复故障。 MapReduce作业按顺序执行并最终完成。
两者都是分布式和容错的
如果nimbus / supervisor死机,重新启动使它从它停止的地方继续,因此没有什么受到影响。 如果JobTracker死机,所有正在运行的作业都会丢失。

1.2 使用Apache Storm的例子

  Apache Storm对于实时大数据流处理非常有名。因此,大多数公司都将Storm用作其系统的一个组成部分。一些值得注意的例子如下:

  • 1.Twitter - Twitter正在使用Apache Storm作为其“发布商分析产品”。 “发布商分析产品”处理Twitter平台中的每个tweets和点击。 Apache Storm与Twitter基础架构深度集成。
  • 2.NaviSite - NaviSite正在使用Storm进行事件日志监控/审计系统。系统中生成的每个日志都将通过Storm。Storm将根据配置的正则表达式集检查消息,如果存在匹配,那么该特定消息将保存到数据库。
  • 3.Wego - Wego是位于新加坡的旅行元搜索引擎。旅行相关数据来自世界各地的许多来源,时间不同。Storm帮助Wego搜索实时数据,解决并发问题,并为最终用户找到最佳匹配。

1.3 Apache Storm优势

下面是Apache Storm提供的好处列表:

  1. Storm是开源的,强大的,用户友好的。它可以用于小公司和大公司。
  2. Storm是容错的,灵活的,可靠的,并且支持任何编程语言。
  3. 允许实时流处理。
  4. Storm是令人难以置信的快,因为它具有巨大的处理数据的力量。
  5. Storm可以通过线性增加资源来保持性能,即使在负载增加的情况下。它是高度可扩展的。
  6. Storm在几秒钟或几分钟内执行数据刷新和端到端传送响应取决于问题。它具有非常低的延迟。
  7. Storm有操作智能。
  8. Storm提供保证的数据处理,即使群集中的任何连接的节点死或消息丢失。

2 Storm核心概念

  Storm是一个开源的实时计算系统,它提供了一系列的基本元素用于进行计算:Topology、Stream、Spout、Bolt等等。
Storm集群和Hadoop集群表面上看很类似。但是Hadoop上运行的是MapReduce jobs,而在Storm上运行的是拓扑(topology),这两者之间是非常不一样的,一个关键的区别是: 一个MapReduce job最终会结束, 而一个topology永远会运行(除非你手动kill掉)。

http://www.cndba.cn/cndba/dave/article/3344

  在Storm的集群里面有两种节点: 控制节点(master node)和工作节点(worker node)。控制节点上面运行一个叫Nimbus后台程序,它的作用类似Hadoop里面的JobTracker,Nimbus负责在集群里面分发代码,分配计算任务给机器,并且监控状态。每一个工作节点上面运行一个叫做Supervisor的进程。Supervisor会监听分配给它那台机器的工作,根据需要启动/关闭工作进程worker。每一个工作进程执行一个topology的一个子集;一个运行的topology由运行在很多机器上的很多工作进程worker组成。(一个supervisor里面有多个workder,一个worker是一个JVM。可以配置worker的数量,对应的是conf/storm.yaml中的supervisor.slot的数量)

  Nimbus和Supervisor之间的所有协调工作都是通过Zookeeper集群完成。另外,Nimbus进程和Supervisor进程都是快速失败(fail-fast)和无状态的。所有的状态要么在zookeeper里面, 要么在本地磁盘上。这也就意味着你可以用kill -9来杀死Nimbus和Supervisor进程,然后再重启它们,就好像什么都没有发生过,这个设计使得Storm异常的稳定。

http://www.cndba.cn/cndba/dave/article/3344

2.1 Topology

   在Storm中,一个实时应用的计算任务被打包作为Topology发布,这同Hadoop的MapReduce任务相似。但是有一点不同的是:在Hadoop中,MapReduce任务最终会执行完成后结束;而在Storm中,Topology任务一旦提交后永远不会结束,除非你显示去停止任务。计算任务Topology是由不同的Spouts和Bolts,通过数据流(Stream)连接起来的图。http://www.cndba.cn/cndba/dave/article/3344

http://www.cndba.cn/cndba/dave/article/3344

2.2 数据模型Turple

   storm使用tuple来作为它的数据模型。每个tuple是一堆值,每个值有一个名字,并且每个值可以是任何类型,在我的理解里面一个tuple可以看作一个java对象。总体来看,storm支持所有的基本类型:字符串以及字节数组作为tuple的值类型。你也可以使用你自己定义的类型来作为值类型,只要你实现对应的序列化器(serializer)。
   一个Tuple代表数据流中的一个基本的处理单元,它可以包含多个Field,每个Field表示一个属性。比如举例一个,三个字段(taskID:int; StreamID:String; ValueList: List):
   Tuple是一个Key-Value的Map,由于各个组件间传递的tuple的字段名称已经事先定义好了,所以Tuple只需要按序填入各个Value,所以就是一个Value List。一个没有边界的,源源不断的,连续的Tuple序列就组成了Stream。
   topology里面的每个节点必须定义它要发射的tuple的每个字段。

2.3 worker(进程)

   一个topology可能会在一个或者多个worker(工作进程)里面执行,每个worker是一个物理JVM并且执行整个topology的一部分。比如,对于并行度是300的topology来说,如果我们使用50个工作进程worker来执行,那么每个工作进程会处理其中的6个tasks。Storm会尽量均匀的工作分配给所有的worker,setBolt 的最后一个参数是你想为bolts的并行量。 http://www.cndba.cn/cndba/dave/article/3344

2.4 Spouts

   消息源spout是Storm里面一个topology里面的消息生产者。一般来说消息源会从一个外部源读取数据并且向topology里面发出消息:tuple。Spout可以是可靠的也可以是不可靠的,如果这个tuple没有被storm成功处理,可靠的消息源spouts可以重新发射一个tuple,但是不可靠的消息源spouts一旦发出一个tuple就不能重发了。

   消息源可以发射多条消息流stream。使用OutputFieldsDeclarer。declareStream来定义多个stream,然后使用SpoutOutputCollector来发射指定的stream。代码上是这样的:collector.emit(new Values(str));http://www.cndba.cn/cndba/dave/article/3344

   Spout类里面最重要的方法是nextTuple。要么发射一个新的tuple到topology里面或者简单的返回如果已经没有新的tuple。要注意的是nextTuple方法不能阻塞,因为storm在同一个线程上面调用所有消息源spout的方法。另外两个比较重要的spout方法是ack和fail。storm在检测到一个tuple被整个topology成功处理的时候调用ack,否则调用fail。storm只对可靠的spout调用ack和fail。

2.5 Bolts

   所有的消息处理逻辑被封装在bolts里面。Bolts可以做很多事情:过滤,聚合,查询数据库等等。

   Bolts可以简单的做消息流的传递(来一个元组,调用一次execute)。复杂的消息流处理往往需要很多步骤,从而也就需要经过很多bolts。比如算出一堆图片里面被转发最多的图片就至少需要两步:第一步算出每个图片的转发数量,第二步找出转发最多的前10个图片。(如果要把这个过程做得更具有扩展性那么可能需要更多的步骤)。

   Bolts可以发射多条消息流, 使用OutputFieldsDeclarer.declareStream定义stream,使用OutputCollector.emit来选择要发射的stream。

   Bolts的主要方法是execute,它以一个tuple作为输入,bolts使用OutputCollector来发射tuple(spout使用SpoutOutputCollector来发射指定的stream),bolts必须要为它处理的每一个tuple调用OutputCollector的ack方法,以通知Storm这个tuple被处理完成了,从而通知这个tuple的发射者spouts。一般的流程是: bolts处理一个输入tuple, 发射0个或者多个tuple, 然后调用ack通知storm自己已经处理过这个tuple了。storm提供了一个IBasicBolt会自动调用ack。

2.6 Reliability

   Storm保证每个tuple会被topology完整的执行。Storm会追踪由每个spout tuple所产生的tuple树(一个bolt处理一个tuple之后可能会发射别的tuple从而形成树状结构),并且跟踪这棵tuple树什么时候成功处理完。每个topology都有一个消息超时的设置,如果storm在这个超时的时间内检测不到某个tuple树到底有没有执行成功,那么topology会把这个tuple标记为执行失败,并且过一会儿重新发射这个tuple(超时的时间在storm0.9.0.1版本中是可以设置的,默认是30s)。

http://www.cndba.cn/cndba/dave/article/3344

2.7 Tasks

   每一个spout和bolt会被当作很多task在整个集群里执行。每一个executor对应到一个线程,在这个线程上运行多个task,而stream grouping则是定义怎么从一堆task发射tuple到另外一堆task。你可以调用TopologyBuilder类的setSpout和setBolt来设置并行度。SetSpout里面的并行度参数含义:parallelism_hint the number of tasks that should be assigned to execute this spout. Each task will run on a thread in a process somwehere around the cluster。(执行这个spout安排了N个tasks。每个task是一个线程,他们都在同一个进程中。)setBolt的参数含义也是一样的。

简单总结概述如下:http://www.cndba.cn/cndba/dave/article/3344

Nimbus:主节点,是一个调度中心,负责分发任务
Supervisor:从节点,任务执行的地方
Worker:任务工作进程,一个Supervisor中可以有多个Worker。
Executor:Worker进程在执行任务时,会启动多个Executor线程
Topology:任务的抽象概念。由于storm是流式计算的框架,它的数据流和拓扑图很像,所以它的任务就叫topology。
Spout:从数据源获取数据并进行分发。
Bolt:得到Spout或者上一个Bolt的数据,然后进行处理后交给下一个Bolt处理。
Tuple:在storm中,一条数据可以理解为是一个Tuple。

 一个Topology可以包含多个worker ,一个worker只能对应于一个topology。worker process是一个topology的子集。
 一个worker可以包含多个executor,一个executor只能对应于一个component(spout或者bolt)。
 Task就是具体的处理逻辑,一个executor线程可以执行一个或多个tasks。线程就是资源,task就是要运行的任务。

3 Storm数据流模型

  数据流(Stream)是Storm中对数据进行的抽象,它是时间上无界的tuple元组序列。在Topology中,Spout是Stream的源头。负责为Topology从特定数据源发射Stream;Bolt可以接收任意多个Stream作为输入,然后进行数据的加工处理过程,如果需要,Bolt还可以发射出新的Stream给下级Bolt进行处理。下面是一个Topology内部Spout和Bolt之间的数据流关系:

  Topology中每一个计算组件(Spout和Bolt)都有一个并行执行度,在创建Topology时可以进行指定,Storm会在集群内分配对应并行度个数的线程来同时执行这一组件。
  那么,有一个问题:既然对于一个Spout或Bolt,都会有多个task线程来运行,那么如何在两个组件(Spout和Bolt)之间发送tuple元组呢?Storm提供了若干种数据流分发(Stream Grouping)策略用来解决这一问题。
  在Topology定义时,需要为每个Bolt指定接收什么样的Stream作为其输入(注:Spout并不需要接收Stream,只会发射Stream)。
  目前Storm中提供了以下7种Stream Grouping策略:Shuffle Grouping、Fields Grouping、All Grouping、Global Grouping、Non Grouping、Direct Grouping、Local or shuffle grouping。

3.1 Stream groupings

Storm里面有7种类型的stream grouping:

  1. Shuffle Grouping: 随机分组, 随机派发stream里面的tuple,保证每个bolt接收到的tuple数目大致相同。
  2. Fields Grouping:按字段分组,比如按userid来分组,具有同样userid的tuple会被分到相同的Bolts里的一个task。而不同的userid则会被分配到不同的bolts里的task。
  3. All Grouping:广播发送,对于每一个tuple,所有的bolts都会收到。
  4. Global Grouping:全局分组, 这个tuple被分配到storm中的一个bolt的其中一个task,再具体一点就是分配给id值最低的那个task。
  5. Non Grouping:不分组,这个分组的意思是说stream不关心到底谁会收到它的tuple。目前这种分组和Shuffle grouping是一样的效果。有一点不同的是storm会把这个bolt放到这个bolt的订阅者同一个线程里面去执行。
  6. Direct Grouping: 直接分组, 这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。。只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的task的id (OutputCollector.emit方法也会返回task的id)。
  7. Local or shuffle grouping:如果目标bolt有一个或者多个task在同一个工作进程worker中,tuple将会被随机发生给这些tasks。否则,和普通的Shuffle Grouping行为一致。

3.2 消息的可靠性保证 — ack机制

   一条数据在Spout中形成一个Tuple,然后交给一个个Bolt执行,那我们怎么保证这个Tuple被完整的执行了呢?这里的完整执行说的是这个Tuple必须在后面的每一个Bolt都成功处理,假设在一个Bolt中发生异常导致失败,这就不能算完整处理。http://www.cndba.cn/cndba/dave/article/3344

   为了保证消息处理过程中的可靠性,storm使用了ack机制。storm会专门启动若干acker线程,来追踪tuple的处理过程。acker线程数量可以设置。

   每一个Tuple在Spout中生成的时候,都会分配到一个64位的messageId。通过对messageId进行哈希我们可以执行要对哪个acker线程发送消息来通知它监听这个Tuple。

   acker线程收到消息后,会将发出消息的Spout和那个messageId绑定起来。然后开始跟踪该tuple的处理流程。如果这个tuple全部都处理完,那么acker线程就会调用发起这个tuple的那个spout实例的ack()方法。如果超过一定时间这个tuple还没处理完,那么acker线程就会调用对应spout的fail()方法,通知spout消息处理失败。spout组件就可以重新发送这个tuple。

  从上面的介绍我们知道了,tuple数据的流向会形成一个拓扑图,也可以理解成是一个tuple树。这个拓扑图的节点可能会有很多个,如果要把这些节点全部保存起来,处理大量的数据时势必会造成内存溢出。

  对于这个难题,storm使用了一种非常巧妙的方法,使用20个字节就可以追踪一个tuple是否被完整的执行。这也是storm的一个突破性的技术。

ack机制的具体原理
我们都知道,自己异或自己,结果肯定为零( a ^ a = 0)。ack中就利用这个特性

  1. acker对于每个spout-tuple保存一个ack-val的校验值,它的初始值是0, 然后每发射一个tuple/ack一个tuple,那么tuple的id都要跟这个校验值异或一下。注意,这里的tuple的id不是spout-tuple的id,和我们上面理解的messageId不是一个概念,要区分一下,是每个新生产的tuple的id,这个tupleId是随机生成的64位比特值
  2. 之后把得到的值更新为ack-val的新值。那么假设每个发射出去的tuple都被ack了, 那么最后ack-val一定是0(因为一个数字跟自己异或得到的值是0)。

  举个例子,比如发射了某个tuple,就 ack-val ^ tupleId,然后ack了某个tuple,就再ack-val ^ tupleId,这样,ack-val 最终又变成了0,说明tuple已经全部处理成功了。

3.3 Storm的事务拓扑

  1. 事务拓扑(transactional topology)是storm0.7引入的特性,在0.8版本以后的版本中已经被封装为Trident,提供了更加便利和直观的接口。因为篇幅所限,在此对事务拓扑做一个简单的介绍。
  2. 事务拓扑的目的是为了满足对消息处理有着极其严格要求的场景,例如实时计算某个用户的成交笔数,要求结果完全精确,不能多也不能少。Storm的事务拓扑是完全基于它底层的spout/bolt/acker原语实现的。通过一层巧妙的封装得出一个优雅的实现。
  3. 事务拓扑简单来说就是将消息分为一个个的批(batch),同一批内的消息以及批与批之间的消息可以并行处理,另一方面,用户可以设置某些bolt为committer,storm可以保证committer的finishBatch()操作是按严格不降序的顺序执行的。用户可以利用这个特性通过简单的编程技巧实现消息处理的精确。

4 Storm集群架构

  Storm集群采用主从架构方式,主节点是Nimbus,从节点是Supervisor,有关调度相关的信息存储到ZooKeeper集群中,架构如下图所示:

Nimbus
  Storm集群的Master节点,负责分发用户代码,指派给具体的Supervisor节点上的Worker节点,去运行Topology对应的组件(Spout/Bolt)的Task。

Supervisor
  Storm集群的从节点,负责管理运行在Supervisor节点上的每一个Worker进程的启动和终止。通过Storm的配置文件中的supervisor.slots.ports配置项,可以指定在一个Supervisor上最大允许多少个Slot,每个Slot通过端口号来唯一标识,一个端口号对应一个Worker进程(如果该Worker进程被启动)。

Worker
  运行具体处理组件逻辑的进程。Worker运行的任务类型只有两种,一种是Spout任务,一种是Bolt任务。

Executor(执行者)
  执行器只是工作进程产生的单个线程。执行器运行一个或多个任务,但仅用于特定的spout或bolt。

Task
  worker中每一个spout/bolt的线程称为一个task. 在storm0.8之后,task不再与物理线程对应,不同spout/bolt的task可能会共享一个物理线程,该线程称为executor。

ZooKeeper
  用来协调Nimbus和Supervisor,如果Supervisor因故障出现问题而无法运行Topology,Nimbus会第一时间感知到,并重新分配Topology到其它可用的Supervisor上运行

5 Storm工作流程

  一个工作的Storm集群应该有一个Nimbus和一个或多个supervisors。另一个重要的节点是Apache ZooKeeper,它将用于nimbus和supervisors之间的协调。

现在让我们仔细看看Apache Storm的工作流程 –

  1. 最初,nimbus将等待“Storm拓扑”提交给它。
  2. 一旦提交拓扑,它将处理拓扑并收集要执行的所有任务和任务将被执行的顺序。
  3. 然后,nimbus将任务均匀分配给所有可用的supervisors。
  4. 在特定的时间间隔,所有supervisor将向nimbus发送心跳以通知它们仍然运行着。
  5. 当supervisor终止并且不向心跳发送心跳时,则nimbus将任务分配给另一个supervisor。
  6. 当nimbus本身终止时,supervisor将在没有任何问题的情况下对已经分配的任务进行工作。
  7. 一旦所有的任务都完成后,supervisor将等待新的任务进去。
  8. 同时,终止nimbus将由服务监控工具自动重新启动。
  9. 重新启动的网络将从停止的地方继续。同样,终止supervisor也可以自动重新启动。由于网络管理程序和supervisor都可以自动重新启动,并且两者将像以前一样继续,因此Storm保证至少处理所有任务一次。
  10. 一旦处理了所有拓扑,则网络管理器等待新的拓扑到达,并且类似地,管理器等待新的任务。

默认情况下,Storm集群中有两种模式:

  1. 本地模式 -此模式用于开发,测试和调试,因为它是查看所有拓扑组件协同工作的最简单方法。在这种模式下,我们可以调整参数,使我们能够看到我们的拓扑如何在不同的Storm配置环境中运行。在本地模式下,storm拓扑在本地机器上在单个JVM中运行。
  2. 生产模式 -在这种模式下,我们将拓扑提交到工作Storm集群,该集群由许多进程组成,通常运行在不同的机器上。如在storm的工作流中所讨论的,工作集群将无限地运行,直到它被关闭。

6 Storm分布式消息系统

  Apache Storm处理实时数据,并且输入通常来自消息排队系统。外部分布式消息系统将提供实时计算所需的输入。Spout将从消息系统读取数据,并将其转换为元组并输入到Apache Storm中。有趣的是,Apache Storm在内部使用其自己的分布式消息传递系统,用于其nimbus和主管之间的通信。

6.1 什么是分布式消息系统?

  分布式消息传递基于可靠消息队列的概念。消息在客户端应用程序和消息系统之间异步排队。分布式消息传递系统提供可靠性,可扩展性和持久性的好处。

  大多数消息模式遵循发布 - 订阅模型(简称发布 - 订阅),其中消息的发送者称为发布者,而想要接收消息的那些被称为订阅者。

  一旦消息已经被发送者发布,订阅者可以在过滤选项的帮助下接收所选择的消息。通常我们有两种类型的过滤,一种是基于主题的过滤,另一种是基于内容的过滤。

  需要注意的是,pub-sub模型只能通过消息进行通信。它是一个非常松散耦合的架构;甚至发件人不知道他们的订阅者是谁。许多消息模式使消息代理能够交换发布消息以便由许多订户及时访问。一个现实生活的例子是Dish电视,它发布不同的渠道,如运动,电影,音乐等,任何人都可以订阅自己的频道集,并获得他们订阅的频道时可用。

下表描述了一些流行的高吞吐量消息传递系统:

分布式消息系统 描述
Apache Kafka Kafka是在LinkedIn公司开发的,后来它成为Apache的一个子项目。 Apache Kafka基于brokerenabled的,持久的,分布式的发布订阅模型。 Kafka是快速,可扩展和高效的。
RabbitMQ RabbitMQ是一个开源的分布式鲁棒消息应用程序。它易于使用并在所有平台上运行。
JMS(Java Message Service) J MS是一个开源API,支持创建,读取和从一个应用程序向另一个应用程序发送消息。它提供有保证的消息传递并遵循发布 - 订阅模型。
ActiveMQ ActiveMQ消息系统是JMS的开源API。
ZeroMQ ZeroMQ是无代理的对等体消息处理。它提供推拉,路由器 - 经销商消息模式。
Kestrel Kestrel是一个快速,可靠,简单的分布式消息队列。

6.2 Thrift协议

  Thrift在Facebook上构建,用于跨语言服务开发和远程过程调用(RPC)。后来,它成为一个开源的Apache项目。Apache Thrift是一种接口定义语言,允许以容易的方式在定义的数据类型之上定义新的数据类型和服务实现。

  Apache Thrift也是一个支持嵌入式系统,移动应用程序,Web应用程序和许多其他编程语言的通信框架。与Apache Thrift相关的一些关键功能是它的模块化,灵活性和高性能。此外,它可以在分布式应用程序中执行流式处理,消息传递和RPC。

  Storm广泛使用Thrift协议进行内部通信和数据定义。Storm拓扑只是Thrift Structs。在Apache Storm中运行拓扑的Storm Nimbus是一个Thrift服务。

用户评论
* 以下用户言论只代表其个人观点,不代表CNDBA社区的观点或立场
dave

dave

关注

人的一生应该是这样度过的:当他回首往事的时候,他不会因为虚度年华而悔恨,也不会因为碌碌无为而羞耻;这样,在临死的时候,他就能够说:“我的整个生命和全部精力,都已经献给世界上最壮丽的事业....."

  • 2262
    原创
  • 3
    翻译
  • 578
    转载
  • 192
    评论
  • 访问:8067499次
  • 积分:4349
  • 等级:核心会员
  • 排名:第1名
精华文章
    最新问题
    查看更多+
    热门文章
      热门用户
      推荐用户
        Copyright © 2016 All Rights Reserved. Powered by CNDBA · 皖ICP备2022006297号-1·

        QQ交流群

        注册联系QQ