1、walk 用于递归遍历文件夹,获取所有文件。
2、os.path 文件、文件夹路径等操作。
首先,申明下版本
zookeeper-3.4.7.tar.gz
kafka_2.10-0.8.2.2.tgz 这个一定要用0.8.2x的版本 经过多次实验验证 切记切记
kafka
version pykafka? rdkafka?
0.8.1 no no
0.8.2 yes yes
0.9.0 planned planned
zookeeper的配置就不多说了 建议大家安装一个zkui 包名zkui-2.0-snapshot-jar-with-dependencies.jar 配置文件可以找我要啊 网上应该也有
kafka的配置我这里就提及下重点 关于host.name这个参数
如果我们想远程消费这个kafka 一定要把这个定义成能访问的ip 比如我想在内网消费这个kafka 就要用内网ip绑定
host.name=192.168.0.10
启动zookeeper 和 kafka
flume沿用这个配置
#定义agent的名字为statge_nginx
stage_nginx.sources = s1
stage_nginx.channels = m1
stage_nginx.sinks = sink
#定义source的一些设置
stage_nginx.sources.s1.type = exec
stage_nginx.sources.s1.channels = m1
stage_nginx.sources.s1.command = tail -f /logs/nginx/log/www/info.access.log
#定义sink
stage_nginx.sinks.sink.type = org.apache.flume.sink.kafka.kafkasink
stage_nginx.sinks.sink.topic = t_nginx
stage_nginx.sinks.sink.brokerlist = 172.31.9.125:9091
stage_nginx.sinks.sink.requiredacks = 0
stage_nginx.sinks.sink.batchsize = 20
stage_nginx.sinks.sink.channel = m1
#定义channel
stage_nginx.channels.m1.type = memory
stage_nginx.channels.m1.capacity = 100
我们消费下nginx的日志
下面是在内网远程消费
直接消费kakfa
#!/usr/bin/python
# -*- coding:utf-8 -*-
from pykafka import kafkaclient
import logging
client = kafkaclient(hosts="192.168.1.10:9092")
topic = client.topics['t_nginx']
consumer = topic.get_simple_consumer(
consumer_group="simplegroup",
# auto_offset_reset=offsettype.earliest,
reset_offset_on_start=true
)
for message in consumer:
if message is not none:
print message.offset, message.value
从zookeeper消费
#!/usr/bin/python
# -*- coding:utf-8 -*-
from pykafka import kafkaclient
import logging
client = kafkaclient(hosts="192.168.1.10:2181,192.168.1.10:2182,192.168.1.10:2183")
topic = client.topics['t_nginx']
balanced_consumer= topic.get_balanced_consumer(
consumer_group='group1',
auto_commit_enable=true,
# reset_offset_on_start=true,
zookeeper_connect='192.168.1.10:2181,192.168.1.10:2182,192.168.1.10:2183'
)
for message in balanced_consumer:
if message is not none:
print message.offset, message.value
其他的操作大家自己改啊 比如分析啊 入库啊 什么的
Hadoop|
Apache Pig|
Apache Kafka|
Apache Storm|
Impala|
Zookeeper|
SAS|
TensorFlow|
人工智能基础|
Apache Kylin|
Openstack|
Flink|
MapReduce|
大数据|
云计算|
用户登录
还没有账号?立即注册
用户注册
投稿取消
文章分类: |
|
还能输入300字
上传中....