您的位置:首页 > 数码常识数码常识
flume对接kafka(flume采集数据到kafka)
2025-05-10人已围观
flume对接kafka(flume采集数据到kafka)
温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。
flume采集数据到kafka
Fayson的github:
https://github.com/fayson/cdhproject
提示:代码块部分可以左右滑动查看噢
1.文档编写目的
在前面的文章Fayson也介绍了一些关于Flume的文章《
非Kerberos环境下Kafka数据到Flume进Hive表
》、《
如何使用Flume准实时建立Solr的全文索引
》、《
如何在Kerberos环境使用Flume采集Kafka数据并写入HDFS
》和《
如何使用Flume采集Kafka数据写入Kudu
》,本篇文章Fayson主要介绍在非Kerberos的CDH集群中使用Flume采集Kafka数据写入HBase。本文的数据流图如下:
内容概述
1.环境准备及开发自定义HBaseSink
2.配置Flume Agent
3.流程测试
4.总结
测试环境
1.CM和CDH版本为5.12.1
2.采用root用户操作
前置条件
1. Flume已安装
2.HBase服务已安装且正常运行
2.环境准备
1.准备向Kafka发送数据的脚本
这里脚本Fayson就不在详细说明了,前面的文章及Github上均有说明:
https://github.com/fayson/cdhproject/tree/master/kafkademo/0283-kafka-shell
(可左右滑动)
2.通过hbase shell命令创建HBase测试表
create 'fayson_ods_deal_daily','info'
3.开发HBaseSink示例
在CDH集群中Flume-ng默认添加了HBaseSink依赖包,但HBaseSink依赖包只支持两种序列化模式:
SimpleHbaseEventSerializer:将整个Event的Body部分当做完整的一列写入HBase
RegexHbaseEventSerializer:根据正则表达式将Event Body拆分到不同的列
写正则表达式Fayson不擅长,对于复杂结构数据时正则表达式的复杂度可想而知且不便于维护,所以这里Fayson选择使用自定义的HBaseSink方式来完成Json数据的解析及rowkey的指定。
1.在前面Fayson创建的flume-sink工程上继续开发HBaseSink
2.开发HBaseSink需要添加HBase相关的依赖包
<!-- HBase Sink 依赖包 --><dependency> <groupId>org.apache.flume.flume-ng-sinks</groupId> <artifactId>flume-ng-hbase-sink</artifactId> <version>1.6.0-cdh5.12.1</version></dependency><!-- HBase Client 依赖包 --><dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.0-cdh5.12.1</version></dependency>
(可左右滑动)
3.借助于原生的HBaseSink重新创建了一个FaysonHBaseSink类,该类为指定的sink.type类型,由于代码过长,该类只贴代码片段
public class FaysonHBaseSink extends AbstractSink implements Configurable { private String tableName; private byte[] columnFamily; //增加自定义Rowkey字段,可以用多个列组合,以","分割 ... private static final Logger logger=LoggerFactory.getLogger(com.cloudera.hbase.FaysonHBaseSink.class); private FaysonHBaseEventSerializer serializer; @Override public Status process() throws EventDeliveryException { Status status=Status.READY; Channel channel=getChannel(); Transaction txn=channel.getTransaction(); List<Row> actions=new LinkedList<Row>(); List<Increment> incs=new LinkedList<Increment>(); try { txn.begin(); if (serializer instanceof BatchAware) { ((BatchAware)serializer).onBatchStart(); } long i=0; for (; i < batchSize; i++) { Event event=channel.take(); if (event==null) { if (i==0) { status=Status.BACKOFF; sinkCounter.incrementBatchEmptyCount(); } else { sinkCounter.incrementBatchUnderflowCount(); } break; } else { if(rowKeys !=null && rowKeys.length() > 0) { serializer.initialize(event, columnFamily, rowKeys); } else { serializer.initialize(event, columnFamily); } actions.addAll(serializer.getActions()); incs.addAll(serializer.getIncrements()); } } if (i==batchSize) { sinkCounter.incrementBatchCompleteCount(); } sinkCounter.addToEventDrainAttemptCount(i); putEventsAndCommit(actions, incs, txn); } catch (Throwable e) { try{ txn.rollback(); } catch (Exception e2) { logger.error("Exception in rollback. Rollback might not have been " + "successful." , e2); } logger.error("Failed to commit transaction." + "Transaction rolled back.", e); if(e instanceof Error || e instanceof RuntimeException){ logger.error("Failed to commit transaction." + "Transaction rolled back.", e); Throwables.propagate(e); } else { logger.error("Failed to commit transaction." + "Transaction rolled back.", e); throw new EventDeliveryException("Failed to commit transaction." + "Transaction rolled back.", e); } } finally { txn.close(); } return status; }}
(可左右滑动)
4.增加FaysonHBaseEventSerializer接口类继承原生的HBaseEventSerializer接口类,增加initialize(Event var1, byte[] var2, String var3)方法用于处理指定rowkey
package com.cloudera.hbase;import org.apache.flume.Event;import org.apache.flume.sink.hbase.HbaseEventSerializer;public interface FaysonHBaseEventSerializer extends HbaseEventSerializer { void initialize(Event var1, byte[] var2, String var3);}
5.新增FaysonHBaseSinkConstants类,用于定义自定义HBaseSink的常量
package com.cloudera.hbase;public class FaysonHBaseSinkConstants { public static final String CONFIG_ROWKEYS="rowkeys";}
(可左右滑动)
6.新增JsonHBaseEventSerializer类处理JSON格式数据的序列化类,继承FaysonHBaseEventSerializer类,由于代码太长该类只贴代码片段
public class JsonHBaseEventSerializer implements FaysonHBaseEventSerializer { private static final Logger logger=LoggerFactory.getLogger(com.cloudera.hbase.JsonHBaseEventSerializer.class); private LinkedHashSet<String> rowkeySet; @Override public void initialize(Event event, byte[] columnFamily, String rowkeys) { this.headers=event.getHeaders(); this.payload=event.getBody(); this.cf=columnFamily; rowkeySet=new LinkedHashSet<>(); logger.info("rowkeys:" + rowkeys); for(String rowkey : rowkeys.split(",")) { rowkeySet.add(rowkey); } } @Override public List<Row> getActions() throws FlumeException { List<Row> actions=Lists.newArrayList(); //将JSON消息转换为Map对象 Map<String, String> resultMap=JsonStr2Map.jsonStr2Map(new String(payload, charset)); try { byte[] rowKey; if(!rowkeySet.isEmpty()) { //如果rowkeySet集合不为空则使用自定义的rowkey StringBuffer rowkeyBuff=new StringBuffer(); for(String rowkey : rowkeySet) { rowkeyBuff.append(resultMap.get(rowkey) + "-"); } rowKey=rowkeyBuff.substring(0, rowkeyBuff.length()-1).getBytes(); //移除Map中不需要存入Column的列 for(String rowkey : rowkeySet) { resultMap.remove(rowkey); } } else { if (rowKeyIndex < 0) { rowKey=getRowKey(); } else { rowKey=resultMap.get(rowKeyIndex + 1).getBytes(Charsets.UTF_8); } } Put put=new Put(rowKey); for(Map.Entry<String, String> entry : resultMap.entrySet()) { put.add(cf, entry.getKey().getBytes(), entry.getValue().getBytes(Charsets.UTF_8)); } if (depositHeaders) { for (Map.Entry<String, String> entry : headers.entrySet()) { put.add(cf, entry.getKey().getBytes(charset), entry.getValue().getBytes(charset)); } } actions.add(put); } catch (Exception e) { throw new FlumeException("Could not get row key!", e); } return actions; }}
(可左右滑动)
7.将开发好的代码使用mvn命令打包
mvn clean package
将打包好的flume-sink-1.0-SNAPSHOT.jar部署到集群所有节点的/opt/cloudera/parcels/CDH/lib/flume-ng/lib目录下
[root@cdh01 shell]# sh bk_cp.sh node.list /root/flume-sink-1.0-SNAPSHOT.jar /opt/cloudera/parcels/CDH/lib/flume-ng/lib/
(可左右滑动)
4.配置Flume Agent
1.登录CM,进flume服务界面,点击“配置”
2.在Agent类别的“配置文件”中输入如下内容:
kafka.sources=source1kafka.channels=channel1kafka.sinks=sink1kafka.sources.source1.type=org.apache.flume.source.kafka.KafkaSourcekafka.sources.source1.kafka.bootstrap.servers=cdh01.fayson.com:9092,cdh02.fayson.com:9092,cdh03.fayson.com:9092kafka.sources.source1.kafka.topics=kafka_sparkstreaming_kudu_topickafka.sources.source1.kafka.consumer.group.id=flume-consumerkafka.sources.source1.channels=channel1kafka.channels.channel1.type=memorykafka.channels.channel1.capacity=10000kafka.channels.channel1.transactionCapacity=1000kafka.sinks.sink1.channel=channel1kafka.sinks.sink1.type=com.cloudera.hbase.FaysonHBaseSinkkafka.sinks.sink1.table=fayson_ods_deal_dailykafka.sinks.sink1.columnFamily=infokafka.sinks.sink1.rowkeys=id,mobile_phone_numkafka.sinks.sink1.serializer=com.cloudera.hbase.JsonHBaseEventSerializer
(可左右滑动)
3.保存flume配置,并重启Flume服务
5.流程测试
1.进入0283-kafka-shell目录执行命令向Kafka的kafka_sparkstreaming_kudu_topic发送消息
[root@cdh01 0283-kafka-shell]# sh run.sh ods_user_600.txt
2.通过Hue查看HBase的fayson_ods_deal_daily表
可以看到数据已写入到HBase的fayson_ods_deal_daily表,查看表总数与发送Kafka数量一致
6.总结
1.HBaseSink支持的序列化方式比少,所以Fayson自己写了JsonHBaseEventSerializer类用于解析JSON数据。
2.需要将自定义开发的Jar包部署到${ FLUME_HOME} /lib目录下
3.使用原生的Sink无法指定HBase的rowkey,这里Fayson在自己的自定义Sink中增加了对rowkey的指定,同样可以指定多个列为rowkey,JSON数据的key作为HBase的Column。
GitHub地址:
https://github.com/fayson/cdhproject/tree/master/flumesink/src/main/java/com/cloudera/hbase
提示:代码块部分可以左右滑动查看噢
为天地立心,为生民立命,为往圣继绝学,为万世开太平。
温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。
上面就是小居数码小编今天给大家介绍的关于(flume采集数据到kafka)的全部内容,希望可以帮助到你,想了解更多关于数码知识的问题,欢迎关注我们,并收藏,转发,分享。
94%的朋友还想知道的:
免费的数据恢复软件哪个好(免费数据恢复软件推荐)
简单的excel完成率公式,好用,值得收藏的数据(excel完成率用什么公式)
数据分析软件推荐(数据分析主流软件有哪些)
数据统计分析快速制作的方法(数据统计分析怎么做)
155971
温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。
flume采集数据到kafka
Fayson的github:
https://github.com/fayson/cdhproject
提示:代码块部分可以左右滑动查看噢
1.文档编写目的
在前面的文章Fayson也介绍了一些关于Flume的文章《
非Kerberos环境下Kafka数据到Flume进Hive表
》、《
如何使用Flume准实时建立Solr的全文索引
》、《
如何在Kerberos环境使用Flume采集Kafka数据并写入HDFS
》和《
如何使用Flume采集Kafka数据写入Kudu
》,本篇文章Fayson主要介绍在非Kerberos的CDH集群中使用Flume采集Kafka数据写入HBase。本文的数据流图如下:
内容概述
1.环境准备及开发自定义HBaseSink
2.配置Flume Agent
3.流程测试
4.总结
测试环境
1.CM和CDH版本为5.12.1
2.采用root用户操作
前置条件
1. Flume已安装
2.HBase服务已安装且正常运行
2.环境准备
1.准备向Kafka发送数据的脚本
这里脚本Fayson就不在详细说明了,前面的文章及Github上均有说明:
https://github.com/fayson/cdhproject/tree/master/kafkademo/0283-kafka-shell
(可左右滑动)
2.通过hbase shell命令创建HBase测试表
create 'fayson_ods_deal_daily','info'
3.开发HBaseSink示例
在CDH集群中Flume-ng默认添加了HBaseSink依赖包,但HBaseSink依赖包只支持两种序列化模式:
SimpleHbaseEventSerializer:将整个Event的Body部分当做完整的一列写入HBase
RegexHbaseEventSerializer:根据正则表达式将Event Body拆分到不同的列
写正则表达式Fayson不擅长,对于复杂结构数据时正则表达式的复杂度可想而知且不便于维护,所以这里Fayson选择使用自定义的HBaseSink方式来完成Json数据的解析及rowkey的指定。
1.在前面Fayson创建的flume-sink工程上继续开发HBaseSink
2.开发HBaseSink需要添加HBase相关的依赖包
<!-- HBase Sink 依赖包 --><dependency> <groupId>org.apache.flume.flume-ng-sinks</groupId> <artifactId>flume-ng-hbase-sink</artifactId> <version>1.6.0-cdh5.12.1</version></dependency><!-- HBase Client 依赖包 --><dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.0-cdh5.12.1</version></dependency>
(可左右滑动)
3.借助于原生的HBaseSink重新创建了一个FaysonHBaseSink类,该类为指定的sink.type类型,由于代码过长,该类只贴代码片段
public class FaysonHBaseSink extends AbstractSink implements Configurable { private String tableName; private byte[] columnFamily; //增加自定义Rowkey字段,可以用多个列组合,以","分割 ... private static final Logger logger=LoggerFactory.getLogger(com.cloudera.hbase.FaysonHBaseSink.class); private FaysonHBaseEventSerializer serializer; @Override public Status process() throws EventDeliveryException { Status status=Status.READY; Channel channel=getChannel(); Transaction txn=channel.getTransaction(); List<Row> actions=new LinkedList<Row>(); List<Increment> incs=new LinkedList<Increment>(); try { txn.begin(); if (serializer instanceof BatchAware) { ((BatchAware)serializer).onBatchStart(); } long i=0; for (; i < batchSize; i++) { Event event=channel.take(); if (event==null) { if (i==0) { status=Status.BACKOFF; sinkCounter.incrementBatchEmptyCount(); } else { sinkCounter.incrementBatchUnderflowCount(); } break; } else { if(rowKeys !=null && rowKeys.length() > 0) { serializer.initialize(event, columnFamily, rowKeys); } else { serializer.initialize(event, columnFamily); } actions.addAll(serializer.getActions()); incs.addAll(serializer.getIncrements()); } } if (i==batchSize) { sinkCounter.incrementBatchCompleteCount(); } sinkCounter.addToEventDrainAttemptCount(i); putEventsAndCommit(actions, incs, txn); } catch (Throwable e) { try{ txn.rollback(); } catch (Exception e2) { logger.error("Exception in rollback. Rollback might not have been " + "successful." , e2); } logger.error("Failed to commit transaction." + "Transaction rolled back.", e); if(e instanceof Error || e instanceof RuntimeException){ logger.error("Failed to commit transaction." + "Transaction rolled back.", e); Throwables.propagate(e); } else { logger.error("Failed to commit transaction." + "Transaction rolled back.", e); throw new EventDeliveryException("Failed to commit transaction." + "Transaction rolled back.", e); } } finally { txn.close(); } return status; }}
(可左右滑动)
4.增加FaysonHBaseEventSerializer接口类继承原生的HBaseEventSerializer接口类,增加initialize(Event var1, byte[] var2, String var3)方法用于处理指定rowkey
package com.cloudera.hbase;import org.apache.flume.Event;import org.apache.flume.sink.hbase.HbaseEventSerializer;public interface FaysonHBaseEventSerializer extends HbaseEventSerializer { void initialize(Event var1, byte[] var2, String var3);}
5.新增FaysonHBaseSinkConstants类,用于定义自定义HBaseSink的常量
package com.cloudera.hbase;public class FaysonHBaseSinkConstants { public static final String CONFIG_ROWKEYS="rowkeys";}
(可左右滑动)
6.新增JsonHBaseEventSerializer类处理JSON格式数据的序列化类,继承FaysonHBaseEventSerializer类,由于代码太长该类只贴代码片段
public class JsonHBaseEventSerializer implements FaysonHBaseEventSerializer { private static final Logger logger=LoggerFactory.getLogger(com.cloudera.hbase.JsonHBaseEventSerializer.class); private LinkedHashSet<String> rowkeySet; @Override public void initialize(Event event, byte[] columnFamily, String rowkeys) { this.headers=event.getHeaders(); this.payload=event.getBody(); this.cf=columnFamily; rowkeySet=new LinkedHashSet<>(); logger.info("rowkeys:" + rowkeys); for(String rowkey : rowkeys.split(",")) { rowkeySet.add(rowkey); } } @Override public List<Row> getActions() throws FlumeException { List<Row> actions=Lists.newArrayList(); //将JSON消息转换为Map对象 Map<String, String> resultMap=JsonStr2Map.jsonStr2Map(new String(payload, charset)); try { byte[] rowKey; if(!rowkeySet.isEmpty()) { //如果rowkeySet集合不为空则使用自定义的rowkey StringBuffer rowkeyBuff=new StringBuffer(); for(String rowkey : rowkeySet) { rowkeyBuff.append(resultMap.get(rowkey) + "-"); } rowKey=rowkeyBuff.substring(0, rowkeyBuff.length()-1).getBytes(); //移除Map中不需要存入Column的列 for(String rowkey : rowkeySet) { resultMap.remove(rowkey); } } else { if (rowKeyIndex < 0) { rowKey=getRowKey(); } else { rowKey=resultMap.get(rowKeyIndex + 1).getBytes(Charsets.UTF_8); } } Put put=new Put(rowKey); for(Map.Entry<String, String> entry : resultMap.entrySet()) { put.add(cf, entry.getKey().getBytes(), entry.getValue().getBytes(Charsets.UTF_8)); } if (depositHeaders) { for (Map.Entry<String, String> entry : headers.entrySet()) { put.add(cf, entry.getKey().getBytes(charset), entry.getValue().getBytes(charset)); } } actions.add(put); } catch (Exception e) { throw new FlumeException("Could not get row key!", e); } return actions; }}
(可左右滑动)
7.将开发好的代码使用mvn命令打包
mvn clean package
将打包好的flume-sink-1.0-SNAPSHOT.jar部署到集群所有节点的/opt/cloudera/parcels/CDH/lib/flume-ng/lib目录下
[root@cdh01 shell]# sh bk_cp.sh node.list /root/flume-sink-1.0-SNAPSHOT.jar /opt/cloudera/parcels/CDH/lib/flume-ng/lib/
(可左右滑动)
4.配置Flume Agent
1.登录CM,进flume服务界面,点击“配置”
2.在Agent类别的“配置文件”中输入如下内容:
kafka.sources=source1kafka.channels=channel1kafka.sinks=sink1kafka.sources.source1.type=org.apache.flume.source.kafka.KafkaSourcekafka.sources.source1.kafka.bootstrap.servers=cdh01.fayson.com:9092,cdh02.fayson.com:9092,cdh03.fayson.com:9092kafka.sources.source1.kafka.topics=kafka_sparkstreaming_kudu_topickafka.sources.source1.kafka.consumer.group.id=flume-consumerkafka.sources.source1.channels=channel1kafka.channels.channel1.type=memorykafka.channels.channel1.capacity=10000kafka.channels.channel1.transactionCapacity=1000kafka.sinks.sink1.channel=channel1kafka.sinks.sink1.type=com.cloudera.hbase.FaysonHBaseSinkkafka.sinks.sink1.table=fayson_ods_deal_dailykafka.sinks.sink1.columnFamily=infokafka.sinks.sink1.rowkeys=id,mobile_phone_numkafka.sinks.sink1.serializer=com.cloudera.hbase.JsonHBaseEventSerializer
(可左右滑动)
3.保存flume配置,并重启Flume服务
5.流程测试
1.进入0283-kafka-shell目录执行命令向Kafka的kafka_sparkstreaming_kudu_topic发送消息
[root@cdh01 0283-kafka-shell]# sh run.sh ods_user_600.txt
2.通过Hue查看HBase的fayson_ods_deal_daily表
可以看到数据已写入到HBase的fayson_ods_deal_daily表,查看表总数与发送Kafka数量一致
6.总结
1.HBaseSink支持的序列化方式比少,所以Fayson自己写了JsonHBaseEventSerializer类用于解析JSON数据。
2.需要将自定义开发的Jar包部署到${ FLUME_HOME} /lib目录下
3.使用原生的Sink无法指定HBase的rowkey,这里Fayson在自己的自定义Sink中增加了对rowkey的指定,同样可以指定多个列为rowkey,JSON数据的key作为HBase的Column。
GitHub地址:
https://github.com/fayson/cdhproject/tree/master/flumesink/src/main/java/com/cloudera/hbase
提示:代码块部分可以左右滑动查看噢
为天地立心,为生民立命,为往圣继绝学,为万世开太平。
温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。
上面就是小居数码小编今天给大家介绍的关于(flume采集数据到kafka)的全部内容,希望可以帮助到你,想了解更多关于数码知识的问题,欢迎关注我们,并收藏,转发,分享。
94%的朋友还想知道的:
免费的数据恢复软件哪个好(免费数据恢复软件推荐)
简单的excel完成率公式,好用,值得收藏的数据(excel完成率用什么公式)
数据分析软件推荐(数据分析主流软件有哪些)
数据统计分析快速制作的方法(数据统计分析怎么做)
155971
很赞哦! ()