Warning: getimagesize(): open_basedir restriction in effect. File(/etc/pki/tls/certs/ca-bundle.crt) is not within the allowed path(s): (/www/wwwroot/blog/:/tmp/) in /www/wwwroot/blog/usr/plugins/AMP/Action.php on line 470

Warning: failed loading cafile stream: `/etc/pki/tls/certs/ca-bundle.crt' in /www/wwwroot/blog/usr/plugins/AMP/Action.php on line 470

Warning: getimagesize(): Failed to enable crypto in /www/wwwroot/blog/usr/plugins/AMP/Action.php on line 470

Warning: getimagesize(https://blog.windawings.com/go/p34FmMs2/): failed to open stream: operation failed in /www/wwwroot/blog/usr/plugins/AMP/Action.php on line 470
Apache Spot

無限風域

Apache Spot


Warning: getimagesize(/www/wwwroot/blog/go/p34FmMs2/): failed to open stream: No such file or directory in /www/wwwroot/blog/usr/plugins/AMP/Action.php on line 521

Warning: getimagesize(/www/wwwroot/blog/go/KV4AKXiq/): failed to open stream: No such file or directory in /www/wwwroot/blog/usr/plugins/AMP/Action.php on line 521

[RUBYTOC]

<center>

<iframe width="560" height="315" src="//www.youtube.com/embed/lqdSicFKsWg" frameborder="0" gesture="media" allow="encrypted-media" allowfullscreen class="youtube"></iframe>

</center>

[引言]^(Preface)


  Apache Spot由Intel和Cloudera向Apache基金会贡献, 其前身为Intel在Cloudera平台上开发的开源专案Open Network Insight(ONI), 采用了Open Data Model(ODM)规格化安全资料。Spot基于高效的流量与数据包分析和机器学习, 提供识别云环境下的潜在安全威胁和未知的网络攻击的能力。考虑其使用规格化的安全数据集, 未来具有开源共享安全数据集提高机器识别能力的潜质。
  目前Spot支持NetFlow, sFlow, DNS和Proxy的网络流量分析, 以HDFS和Hive提供存储, 由Spark提供计算, 基于LDA算法提供无监督机器学习(unsupervised)以及Jupyter提供图形化支持。
  Spot尚处于孵化阶段(Incubate), 由Centrify, Cloudera, Cybraics, Endgame, Intel, Jask, Streamsets, Webroot等厂商投入开发与贡献。

[介绍]^(Introduction)


[优势]^(Advantages)

  Apache Spot基于以上四种网络流量进行分析, 其实是三种, 它把网站流量划分了内外网两种, 另外两个分为代理流量和DNS. 值得一提的是, 官网上表示需要一天后才初具功能, 也就是需要初始的训练集来进行机器学习。
<center></center>

[机制]^(Works)

  Apache Spot通过使用机器学习来识别流量特征, 并标识出各个流量簇的唯一行为。其主要对网络流量采用富文集, 噪声过滤, 白名单和启发式算法的方式进行处理,从而生成可能造成安全威胁的名单。
<center></center>

[并行框架]^(Parallel Ingest Framework)

  目前其采用kafka和flume对流量数据批量加载到HDFS和Hive中, 数据由多种格式存储以便于搜索, 机器学习, 转移到法律执行过程模型或者导入其他系统。

Kafka

<center></center>
  kafka分布式发布订阅消息系统(基于zookeeper, scala和java编写, 原LinkedIn开发), 可通过Hadoop的并行加载机制统一线上和离线消息处理, kafka可把消息种子(feed)分成多个[主题]^(topic), 比如spot-ingest划分出来的flow, dns和proxy三个主题. 每个[消息]^(record)由一个key, 一个value和时间戳构成。
<center></center>

[生产者]^(Producer)

  生产者可[发布]^(push)消息到(append, 顺序写磁盘, 经验证, 顺序写磁盘效率比随机写内存还要高, 这是Kafka高吞吐率的一个很重要的保证)一个或多个主题中的[分区]^(partition)里(即发布到broker集群里该消息主题分区所在的leader中(通过查询zookeeper/brokers/..../state节点找到leader), 分区选择(即消息路由)由key进行hash运算, 若指定了分区则直接路由, 若key和分区都未指定, 则使用round-robin). producer.type默认为sync同步push消息, 为async异步时, 生产者可以以batch形式push消息, 提高broker性能的同时消息失去可靠性。

[消费者]^(Consumer)

  消费者可订阅一个或多个主题, 从broker(一个kafka节点为一个broker, 一个或多个broker组成一个kafka集群)[拉取]^(pull)已发布的消息, 一条消息只有一个消费者处理。可创建[消费者组]^(consumer group)作为逻辑上的订阅者, 组内多个消费者可拓展性能和容错。组内消费者共享Group ID, 与主题一一对应; 主题有多个分区, 一个分区可由组内的一个消费者消费, 分区是消费的最小单位; 一般来说, 分区数可大于等于组内消费者数, 多的消费者永远也无法分配到可供消费的分区, 但多个分区可由同一个消费者消费。当需求多线程消费者实例时, 需要生产者也以批量模式发布消息到broker的随机分区中(若分区不够随机, 其他线程的消费者也无法消费到自己以外的分区); 消费者会提交自己的消费[偏移量]^(offset), 0.8版本偏移量由broker管理(提交到zookeeper节点/consumers/<group.id>/offsets/<topic>/<partitionId>), 0.9版本后由消费者组管理(提交到__consumeroffsets topic摆脱对zookeeper的依赖, offsets.topic.num.partitions设置其分区数, 默认50, 分区位置由Group Id的哈希值对其求余)。

[协调者]^(Coordinator)

  0.8版本的协调者借助zookeeper对消费者组进行管理, 协调者监听zk节点变化, 消费者自己决定分区分配方案并抢占注册; 0.9以后版本则每个消费者组分配一个协调者, 消费者组的第一个消费者启动后向Kafka Server确定其组的协调者, 之后则与协调者进行协调通信。协调者监听主题和消费者组判断是否做rebalance(一组协议, 规定消费者组基于range, round-robin或一个待开发的新分配器分配分区(分配在消费者组 leader完成), 有三种触发条件: 组成员变更; 订阅主题数变更; 订阅主题分区数变更。有五个协议处理协调者问题: Heartbeat, LeaveGroup, JoinGroup: 消费者 → 协调者; SyncGroup: 消费者组 leader → 协调者 → 消费者组 memeber; DescribeGroup:显示组的所有信息, 包括成员信息, 协议名称, 分配方案, 订阅信息等)。一个消费者组对应一个协调者, 分区leader所在broker即为被选定的协调者, 每次rebalance产生新的rebalance generation(递增整数)。消费者组有五种状态: Dead(组内没有任何成员的最终状态, 组的元数据被协调者移除, 请求结果:UNKNOWN_MEMBER_ID); Empty(组内无成员但是位移信息还没有过期, 能响应JoinGroup); PreparingRebalance(组准备开启新的rebalance, 等待成员加入); AwaitingSync(等待消费者组 leader将分配方案传给各成员); Stable(rebalance完成, 可开始进行消费)。

[分区]^(Partition)

  每个主题下存在多个分区, 分区是一个不可变的顺序消费队列, 分区中存在一个唯一表示的偏移量, 该偏移量由唯一消费者控制, 而各个消费者间的偏移量互不影响。分区使得一个主题可分布到broker的多个服务器中, 同时自身可拷贝多份作为备份容错。既然涉及到备份容错, 则分区由leader和0...N个follower存储, leader处理该分区所有读写请求, follower备份数据, 从而使broker通过分区容错(主题配置[复制因子]^(replication facto)为N, 即可允许N - 1个服务器宕机而不丢失已提交的消息)。

[分段]^(Segment)

  分区由多个分段组成, 分段由.index索引文件和.log数据文件组成。.index记录消息序号和物理地址偏移量, 第一个消息序号为1, 偏移量为0, 是该分区全局的第一个消息, .index文件名由其上一个分段的.index文件的最后一个消息的物理偏移量确定, 这样消费者使用偏移量(.index文件名数值+消息序号)即可通过线性复杂度查找, 根据对应分段的.index文件, 再通过.index的索引位置找到对应数据。(消费者通过3712的偏移量, 找到<$i$>.index ($3712 \\le i \\le$ <下一个.index文件名 >), 找到里面序号为<$i$ - 3712>的索引, 获得索引值, 即物理偏移量, 即可根据物理偏移量获取到此消息)

AR ISR OSR

$$\begin{equation}\begin{split}
\text{AR(assigned replicas所有副本)} &= \text{ISR(in-sync replicas副本同步队列, 由分区leader维护的列表)}\\
&  +\text{ OSR(outof-sync replicas超时副本同步队列,}\\
&    \text{由超过replica.lag.time.max.ms和新加入的follower加入)}
\end{split}\end{equation}$$

LEO HW

  取一个分区对应ISR中最小的LEO(log end offset, 分区最后一个消息的偏移量)作为HW(hight water mark高水位, leader和follower都有自己的HW), 消费者能获取到的最新消息即为[高水位]^(HW)所在的消息, 内部broker的读取请求则没有高水位限制。

Controller

  kafka集群中的一个broker将会被选举为Controller负责分区管理和副本状态管理, zookeeper中/brokers/topics/<topic>/partitions/<partition>/state对ISR进行管理, 该节点由Controller和Leader共同维护。 分区改变leader, 则Controller发起LeaderAndIsrRequest通知所有replicas; ISR变化, 则由leader更新该节点的ISR信息。当ISR中副本的LEO不一致时, 如果此时leader挂掉, 选举新的leader时并不是按照LEO的高低进行选举而是按照ISR中的顺序选举。

Acks Min-Insync-Relicas Unclean-Leader-Election-Enable

$$\begin{cases}
request.required.acks = -1\text{时和min.insync.replicas配合使用, 表示生产者需要等待ISR中所有follower的ack才确认消息已被接收;}\\
request.required.acks = 0\text{时, 生产者不等待ack确认; 为1时只等待leader的ack(leader宕机数据丢失);}\\

\\begin{cases}
request.required.acks = -1\\\\
min.insync.replicas = 2\\\\
unclean.leader.election.enable = false\\\\
AR = 3
\\end{cases}\\text{时, 则ISR中任意一个broker宕机, 则消息只能读取, 都宕机, 则该replicas(即该分区)失去可用性。}

\end{cases}$$

Delivery Guarantee

  a) At most once: 消息可能会丢,但绝不会重复传输;
  b) At least once: 消息绝不会丢,但可能会重复传输 (目前kafka采用的生产者发布消息的形式, 引起消息去重问题, 要求消息具备幂等性, 设置GUID(globally unique identifier)标记消息, 在客户端中做去重则要求集中式缓存(redis, memcached...));
  c) Exactly once: 每条消息肯定会被传输一次且仅传输一次。

Flume

<center></center>
  flume可分布式日志收集系统(java1.6+编写, 由Cloudera开发, 2009年捐赠给Apache基金会, Hadoop相关组件)将不同源([扇入]^(fan in))的海量日志进行收集,提供对数据的简单处理, 并写到不同目的地([扇出]^(fan out))。flume由原来的OG版本到现在NG版本, 进行了架构重构,两个版本互不兼容。经过架构重构后, NG版本演变为了一个轻量工具, 适应各种方式收集日志, 支持failover和负载均衡。
<center></center>
<center></center>

[事件]^(Event)

  事件是flume内部数据传输的基本单元, 也是事务的基本单元。event由可选headers(key-value键值对, 不同的source生成不同的header, 允许修改event添加header, 需要拦截器和选择器机制的支持从而对event进行路由)和body(字节数组, 事件的实际内容)组成(只支持如下格式:[{header:{"key": "value"}, body:"context"},])。事件由client产生, 经由client → agent{source(netcat, http...) → channel(memory, jdbc, file...) → sink(hdfs, hbase, null...)} → destination的路线到达目的地, 在送达sink前需要在channel缓存event, 数据达到sink后才会删除channel中的缓存,而在event从源到目的的迁移的抽象成为[流]^(flow)。

[数据源]^(Source)

<center></center>

  source从client接收数据, 并把数据以event的格式临时存放在一个或多个channel中, 直到event被sink成功发送到destination后, channel才将临时数据删除。在Spooling Directory Source中, 被发送过的数据被标记.COMPLETED(后缀名, 可自定义). source支持的类型如下:

类型说明实现类
Avro支持 Avro协议(实际上是Avro RPC), AvroLegacySource(0.9.x OG版本兼容), 内置支持AvroSource, AvroLegacySource
Thrift支持Thrift 协议, ThriftLegacySource(0.9.x OG版本兼容), 内置支持ThriftLegacySource
Exec基于Unix Command在标准输出上生产数据ExecSource
JMS从JMS系统(消息, 主题)中读取数据, ActiveMQ已测
Spooling Directory监控指定目录内数据变更
Netcat监听端口(官网上只写了tcp type), 将流经端口的每一个文本行数据作为event输入NetcatSource
Twitter 1% firehose通过API持续下载Twitter数据(experimental)
Sequence Generator序列生成器数据源, 生产序列数据SequenceGeneratorSource
Syslog读取syslog数据产生Event, 支持UDP和TCP两种协议SyslogTcpSource, SyslogUDPSource
HTTP基于HTTP POST或GET方式的数据源, 支持 JSON和BLOB 表示形式
ScribeSourceScribeSource
StressSource主要用于测试, 由连续的具有相同payload的Event构成, 不用于生产环境StressSource
(custom type as FQCN)自定义Source(custom FQCN)
[管道]^(Channel)

  Channel是一个临时的存储容器, 它把从source接收到的数据以event格式存储起来, 直到event被sink消费后才把临时数据删除。Channel可和多个source和sink连接, 成为source和sink的桥梁, 提供数据收发的一致性保证。常见类型为Memory, JDBC和File. Memory提供高速吞吐但无法保证数据的完整性, File通过持久化数据至磁盘保证数据的完整性。

类型说明实现类
Memory存储在内存中, 高速但非持久化的管道MemoryChannel
File可读写、映射和操作文件的管道, 存储Event在磁盘中FileChannel
JDBC基于JDBC的可持久化的管道, 内置支持DerbyJDBCChannel
Recoverable Memory基于本地文件存储系统提供持久化的管道(据说建议用FileChannel代替)RecoverableMemoryChannel
Spillable Memory存储在内存和磁盘中, 内存队列满则持久化到磁盘上(Experimental)
PseudoTxnMemory测试用Channel, 不用于生产环境PseudoTxnMemoryChannel
(custom type as FQCN)自定义Channel(custom FQCN)
[水槽]^(Sink)

<center></center>

  Sink从channel中读取并删除event, 将event传递到Hbase、HDFS或flow pipeline的下一个agent等。Sink类型如下:

类型说明实现类
HDFS写入所有Event到HDFS, 支持Rolling, Bucketing, HDFS-200 Append等HDFSEventSink
Hbase从Channel读取Event并写入Hbase(从Async的实现类来看似乎可以异步)HBaseSink, AsyncHBaseSink
Logger基于配置好的日志子系统以info日志级别记录events(默认log4j)LoggerSink
Avro对所有Event使用预置的Avro协议发送到配置的RPC端口,当同配置Avro Source时, 形成分层的集合AvroSink
File Roll存储在内存和磁盘中, 内存队列满则持久化到磁盘上(Experimental)RollingFileSink 
IRCevent在IRC回放IRCSink
Null写入/dev/null, 即丢弃所有EventNullSink
Morphline Solr写入Solr搜索服务器(Clustered)
ElasticSearch写入Elastic Search搜索服务器(Clustered)ElasticSearchSink
Thrift写入ThriftThriftSink
Kite Dataset写入到Kite Dataset(Experimental)
(custom type as FQCN)自定义Sink(custom FQCN)
[拦截器]^(Interceptor)

<center></center>
<center>来源于 http://blog.csdn.net/ty_laurel/article/details/54585726</center>

  拦截器用于source与channel之间对event数据进行处理, 主要对event的header进行CUD, 拦截器有如下类型:

类型说明实现类
Host加入Agent主机名或IP地址HostInterceptor$Builder
Timestamp加入当前时间戳(ms)TimestampInterceptor$Builder
Static加入一组静态的[键值对]^(key-value)StaticInterceptor$Builder
Regex Filter对Event Body中做正则提取出需要的匹配项RegexFilteringInterceptor$Builder
UUID加入UUID
MorphlineCloudera开源ETL(ElasticSearch, Logstash, Kibana)框架(目前由Kite主导开发), 富配置转化链, Grok正则解析
(custom type as FQCN)自定义Interceptor(custom FQCN)

  关于Morphline框架在Flume中的作用, 由以下两个图展示:
<center></center>
<center>来源于 https://allthingshadoop.com/2014/05</center>
<center></center>
<center>来源于 http://vinoyang.com/2015/11/20/build-log-sys-with-flume-and-morphline</center>

[管道选择器]^(Channel Selector)
类型说明实现类
Replicating把从Source接收到的所有Event发往所有的Channel(default)ReplicatingChannelSelector
multiplexing根据Event Header中的Key选择管道MultiplexingChannelSelector
(custom type as FQCN)自定义Channel Selector(custom FQCN)
[水槽处理器]^(Sink Processor)

  Sink Processtor可用于激活sinks中特定sink用于[负载均衡]^(load balance)。

类型说明实现类
Default单独的SinkDefaultSinkProcessor
Failover维护Sinks优先级列表, 保证只要有一个Sink即可处理Event(故障转移)(若没有设置优先级则按声明顺序)FailoverSinkProcessor
Load Balance维护Active Sinks索引列表, 选择机制有Round Robin(default)和Random, 也可以自定义继承AbstractSinkSelectorLoadBalancingSinkProcessor
(custom type as FQCN)自定义Sink Processor(custom FQCN)
[事件序列器]^(Event Serializer)

  Event需要对其Body进行序列化, 主要有以下类型: <span>EventSerializer$Builder</span>(Text, Avro Event), EventSerializer(SimpleHbaseEventSerializer, SimpleAsyncHbaseEventSerializer, RegexHbaseEventSerializer), HbaseEventSerializer(HbaseSink的自定义序列化), AsyncHbaseEventSerializer(AsyncHbaseSink的自定义序列化), <span>EventSerializer$Builder</span>(除了HbaseSink和AsyncHbaseSink外其他Sink的自定义序列化)。

HDFS

<center></center>

  Hadoop[分布式文件系统]^(distributed fileSystem)为Hadoop(java, 前yahoo主导)核心子项目, 基于流式数据访问模式处理超大文件需求开发, 适用于一次写入多次读取的应用场景, 不适合低延迟数据访问需求, 小文件存储, 并发写入(不允许多线程同时写入同一文件)和随机修改(仅支持数据[追加]^(append))。主要有Client(调用HDFS API, 从NameNode获取文件元数据与DataNode交互数据读写), NameNode(元数据节点, 管理元数据, 分配数据存储节点), DataNode(数据存储节点, 负责数据存储读写与冗余备份)。此项目主要采用Spark计算和Hive, Impala的查询, MapReduce使用不明显, 暂时不写MapReduce, ResourceManager(Yarn), TaskTracker以及JobTrackter计算任务框架。

<center></center>

Hive

<center></center>

  Hive(java, Facebook)是构建在Hadoop上的数据仓库框架, 使用HQL(类SQL语法(不支持主键外键, 可创建索引), 转化SQL为分布式作业, 支持MapReduce(最稳), Spark或Tez)对数据执行CRUD, 提供流式API, 适用于传统数据仓库业务, 不适用于低延迟的交互访问。Hive几乎是Hadoop上的SQL标准, 适合离线ETL与大数据离线Ad-hoc查询, 以及特大规模数据集合精准结果的查询。对于需要交互式的Ad-hoc查询方案, 通常选择Impala, Presto等。

<center></center>

[组件]^(Compenent)

<center></center>

<center></center>
<center>来源于 http://www.codedata.cn/hacknews/146917894602293594</center>

[数据模型]^(Data Model)

<center></center>
<center>来源于 http://www.codedata.cn/hacknews/146917894602293594</center>

[数据类型]^(Data Type)
类型支持说明
TinyInt1-byte signed integer, -128 ~ 127后缀Y
SmallInt2-byte signed integer, -32,768 ~ 32,767后缀S
Int/Integer(默认)4-byte signed integer, -2147483648 ~ 2147483647
BigInt8-byte signed integer, -9223372036854775808 ~ 9223372036854775807后缀L, 大于BIGINT的值需要使用BD后缀和Decimal(38,0)处理
Float4-byte single precision floating point number
Double8-byte double precision floating point number
Decimal在Hive 0.11.0介绍 (HIVE-2693), 在Hive 0.13.0改进 (HIVE-3976)
TimeStampHive 0.8.0及其以上支持支持传统的 UNIX 时间戳和可选的纳秒精度
DdateHive 0.12.0及其以上支持
IntervalHive 1.2.0及其以上支持
String可用单双引号表示, 以C风格转义
VarChar1 ~ 65535, 超过varchar(length)中length的长度的字串将被截断, 尾部空格影响比较结果
Char1~255, 固定长度, 字串比指定长度短的部分由空格代替, 尾部空格不影响比较结果
Booleantrue or false
BinaryHive 0.8.0及其以上支持
Array由相同类型元素组成, 下标从0开始
Map[键值对]^(key-value)
Struct可包含不同类型元素, dot(.)符号访问语法
UnionHive 0.7.0及其以上支持, 可综合以上数据类型据说还没有完全支持,官方建议只用于查看

  复杂类型的使用方法:

create table employees (
    name string,
    salary float,
    subordinates array<string>,
    deductions map<string, float>,
    address struct<street:string, city:string, state:string, zip:int>
) partitioned by (country string, state string);
[优化]^(Optimization)

  Hive中最重要的部分是Group By和Join, MapReduce就是Group By或者Join的过程。

Impala

<center></center>
  Impala(c++)在Cloudera受Google dremel启发下开发的实时SQL查询引擎(分布式大规模并行处理(MPP)数据库引擎), 功能上与shark(依赖Hive)和Drill(apache)类似。它脱离了Hive中MR批处理的缓慢, 采用与商用并行关系数据库类似的由planner, coordinator, exec engine组成的分布式查询引擎降低延迟。
<center></center>
Impala特性:

Impala Daemon

  Impala Daemon是一个在集群的每个DN上运行的守护进程(impalad), 主要接收client, hue, jdbc/odbc请求, 执行query并返回给协调节点, 负责与statestore通信汇报节点状态。在2.9+版本中, 可以分配集群中coordinator和exec engine角色给不同主机从而提高高并发负载的可伸缩性。impalad包含了planner, coordinator, exec engine, 一般impalad与DN在同一节点(data local)。当某个分发执行的impalad失败时, 整个计划任务都返回失败, 不过再次提交一次查询也没有多少消耗。

  Impalad的查询执行分为frontend(java, 以JNI嵌入impalad, 生成查询计划)和backend(c++, 执行查询)。Frontend先生成单机查询计划(与关系型数据库执行计划相同, 查询优化方法类似), 后生成分布式查询计划(减少数据移动, 数据与计算尽量放一起)。

<center></center>

  上图为三张表join后做聚合再排序取topN的例子。Impala查询优化器支持代价模型(以表和分区为基数, 每列distinct值个数做统计估算执行计划代价, 从而生成较优执行计划)。可以看到上图左侧则为frontend生成的单击查询计划, 进而转换为6个segment(彩色无边框圆角矩形背景, 每个segment可由单个主机独立执行, 为计划子树)的分布式查询计划。Impala支持表广播(把join中一个表广播到相关节点, 如图t3)和哈希重分布(根据join字段哈希值重新分布两张表数据, 如图t1, t2)两种分布式join方式。分布式查询计划中的聚合函数先对本地数据进行分组聚合(如图Pre-Agg)降低数据量和数据重分布, 再把上一步的结果在汇总聚合(如图MergeAgg)计算出最终结果, topN计划过程同理。BackEnd从frontend接收plan segment执行, 执行性能优化方面, 有向量执行(getNext处理一批数据, 多个操作符做pipeline), LLVM IR, IO本地化以及parquet列式存储。

Impala Statestore

  Impala Statestore负责收集各个impalad进程的资源信息, 各节点健康情况以及同步节点信息。只需要在一个主机上运行这样的一个进程即可(statestored), 如果某个impalad因为硬件故障, 网络错误, 软件问题或其他原因脱机, statestore会通知其他impalad避免向不可访问节点发出查询请求。statestore在impala集群中并不是一个关键进程, 如果statestore未运行或无法访问, impalad照常继续运行并分配工作, 相对的集群会缺少健壮性。

Impala Catalog

  Impala Catalog(Impala 1.2+)把Impala表的元数据分法到各个impalad中, 实现DDL。当impalad节点插入或查询时, impalad把自己的操作结果通知statestore, 之后statestore通知catalogd更新元数据信息。所以一般把catalogd和statestored放在同一个主机上运行, 且该主机不再运行impalad提供查询服务, 避免集群管理出现问题。

Impala文件格式
类型格式压缩CreateInsert
ParquetStructuredSnappy<br/>GZIP
TextUnstructuredLZO✔<br/>如果建表时没有指定存储类型,默认采用未压缩的 text,字段由 ASCII 编码的 0x01 字符串分割✔<br/>如果使用了 LZO 压缩,则只能通过 Hive 建表和插入数据
AvroStructuredSnappy<br/>GZIP<br/>Deflate<br/>BZIP2✔<br/>Impala 1.4.0+支持,之前的版本只能通过 Hive 来建表。✖<br/>只能通过 LOAD DATA 的方式将已经转换好格式的数据加载进去,或者使用 Hive 来插入数据
RCFileStructuredSnappy<br/>GZIP<br/>Deflate<br/>BZIP2✖<br/>只能通过 LOAD DATA 的方式将已经转换好格式的数据加载进去,或者使用 Hive 来插入数据
SequenceFileStructuredSnappy<br/>GZIP<br/>Deflate<br/>BZIP2✖<br/>只能通过 LOAD DATA 的方式将已经转换好格式的数据加载进去,或者使用 Hive 来插入数据
Impala压缩编码
类型说明
Snappy推荐编码, 压缩率和压缩速度有很好的平衡性, snappy压缩速度很快, 但比gzip压缩率低, Impala不支持snappy压缩的text file
GZIP压缩率高, Impala不支持gzip格式的text file
DeflateImpala不支持deflate压缩的text file
BZIP2Impala不支持bzip2压缩的text file
LZO只用于text file, Impala可查询lzo压缩的text格式数据表但不支持insert, 只能通过Hive完成insert

Spark

[机器学习]^(Machine Learning)

Scala

[操作分析]^(Operational Analytics)

[开放数据模型]^(Open Data Model)

[用例]^(UseCase)

[分析]^(Analysis)


[模块]^(Module)

Spot-Ingest

<center></center>

  Spot-Ingest基于分布式架构, 使用spot-collector守护进程针对不同采集源产生数据时, 通过kafka, spark streaming, hive和hdfs提供近99.99999%的数据完整性。

Spot-Collector

  Spot-Collector基于文件系统在后台监视文件系统的新文件。当网络工具产生新文件或较早产生的数据留在监视路径上时, collectors使用解析工具(比如nfdump和tshark)转化为可读格式, 并把其原始格式存储到Hadoop用于取证, 以avro-parquet格式存储在Hive中以便做SQL查询。$\\gt$ 1MB的文件提供其文件名和hdfs路径给kafka, $\\lt$ 1MB的文件提供其data event给kafka并在之后由spark streaming做进一步处理(在其github上描述是proxy的pipeline才用spark streaming处理, 不知道一般proxy数据是否是$\\lt$ 1MB的)。

Spot-Ingest Kafka

<center></center>
  从图上看是生成flow, dns和proxy三个主题, 各主题分区数由spot-worker的数量决定, spot-collectors作为提供者传输数据到kafka存储, spot-worker作为消费者消费kafka中三个主题的数据。

Spot-Worker

  Spot-Worker作为后台守护进程订阅指定的kafka主题和分区, 并在特定的Hive表中读取解析存储数据, 该数据在将来由ML算法消费。当前有两种worker, 通过定义的解析器多线程处理数据的python worker和使用spark-streaming context(micro batching)执行spark应用处理来自kafka数据的spark-streaming worker.

Spot-ML

<center></center>
  Spot-ML包含执行可疑连接的例程, 分析采集自网络中的netflow, dns或proxy日志。通过分析一系列网络事件, 生成一个最不可能和最可疑的事件列表, 该过程依赖于spot-ingest来加载netflow, dns和proxy记录。它使用[主题建模]^(topic model)来发现正常和异常行为, 把IP关联的日志集合作为文档, 并使用Latent Dirichlet Allocation(LDA)来发现这些文档集合中隐藏的语义结构。Spot-ML为每个IP地址的网络行为提供[概率模型]^(probabilistic model), 即赋予每个网络日志条目被该模型一个估算的概率(或得分), 得分较低的事件被标记为可疑以便进一步分析。
  LDA基于三层的贝叶斯模型, 是一个用于离散(discrete)数据的生成概率模型(generative probabilistic model), 例如文本全集。在这个模型中文档的每个单词(word)都是由一组基础主题混合生成的。LDA在网络流量中, 通过聚合和离散将网络日志条目转换为单词。这样, 文档对应IP地址, 日志条目的单词(与一个IP地址相关)和主题对应公共网络活动的概要文档。

Spot-OA

Spot-Setup

[环境]^(Environment)

<center></center>

[演示]^(Demo)

[参考]^(Reference)


  [01]  kafka 基础知识梳理, Go_小易, 2017-08-14 17:57
  [02]  kafka 学习笔记:知识点整理, cyfonly, 2016-10-12 22:13
  [03]  kafka 数据可靠性深度解读, 朱小厮, 2017-05-02 19:19
  [04]  Kafka 消费组 (consumer group), heidsoft, 2017-10-20 09:53
  [05]  Flume NG 基本架构及原理, dantezhao, 2016-09-14 21:52
  [06]  Flume 架构以及应用介绍, 安静的技术控, 2016-05-31 12:35
  [07]  日志系统之 Flume 采集加 morphline 解析, yanghua, 2015-11-20
  [08]  flume 拦截器及问题解决, ty_laurel, 2017-01-17 18:50
  [09]  【漫画解读】HDFS 存储原理, 雪飘飘, 2016-02-22 13:50
  [10]  深刻理解 HDFS 工作机制, Pickle, 2017-01-11 08:59
  [11]  Apache Hive Design, Administrator, 2015-11-08
  [12]  杨卓荦:Hive 原理及查询优化, anand, 2016-07-07
  [13]  Impala(多图手机用户慎入,理论 + 实践), SET, 2016-09-21 23:33
  [14]  怎么理解 impala(impala 工作原理是什么), 邱明成, 2017-02-12 09:28
  [15]  Apache Spot, 2016-03-29

To be Continued...

当前页面是本站的「Google AMP」版。查看和发表评论请点击:完整版 »