示例:python模拟日志生成+Flume+Kafka+Spark

系统 1620 0

生成模拟数据

  1. 编写 generate_log.py
            
              
                #coding=UTF-8
              
              
                import
              
               random

              
                import
              
               time

url_paths
              
                =
              
              
                [
              
              
                "class/112.html"
              
              
                ,
              
              
                "class/128.html"
              
              
                ,
              
              
                "class/145.html"
              
              
                ,
              
              
                "class/130.html"
              
              
                ,
              
              
                "class/146.html"
              
              
                ,
              
              
                "class/131.html"
              
              
                ,
              
              
                "learn/821"
              
              
                ,
              
              
                "course/list"
              
              
                ]
              
              

ip_slices
              
                =
              
              
                [
              
              
                132
              
              
                ,
              
              
                156
              
              
                ,
              
              
                124
              
              
                ,
              
              
                10
              
              
                ,
              
              
                29
              
              
                ,
              
              
                167
              
              
                ,
              
              
                143
              
              
                ,
              
              
                187
              
              
                ,
              
              
                30
              
              
                ,
              
              
                46
              
              
                ,
              
              
                55
              
              
                ,
              
              
                63
              
              
                ,
              
              
                72
              
              
                ,
              
              
                87
              
              
                ,
              
              
                98
              
              
                ,
              
              
                168
              
              
                ]
              
              

http_referers
              
                =
              
              
                [
              
              
                "https://www.baidu.com/s?wd={query}"
              
              
                ,
              
              
                "https://www.sogou.com/web?query={query}"
              
              
                ,
              
              
                "https://cn.bing.com/search?q={query}"
              
              
                ,
              
              
                "https://www.so.com/s?q={query}"
              
              
                ]
              
              

search_keyword
              
                =
              
              
                [
              
              
                "spark sql实战"
              
              
                ,
              
              
                "hadoop 基础"
              
              
                ,
              
              
                "storm实战"
              
              
                ,
              
              
                "spark streaming实战"
              
              
                ]
              
              

status_code
              
                =
              
              
                [
              
              
                "200"
              
              
                ,
              
              
                "404"
              
              
                ,
              
              
                "500"
              
              
                ]
              
              
                def
              
              
                sample_status_code
              
              
                (
              
              
                )
              
              
                :
              
              
                return
              
               random
              
                .
              
              sample
              
                (
              
              status_code
              
                ,
              
              
                1
              
              
                )
              
              
                [
              
              
                0
              
              
                ]
              
              
                def
              
              
                sample_referer
              
              
                (
              
              
                )
              
              
                :
              
              
                if
              
               random
              
                .
              
              uniform
              
                (
              
              
                0
              
              
                ,
              
              
                1
              
              
                )
              
              
                >
              
              
                0.2
              
              
                :
              
              
                return
              
              
                "-"
              
              
    refer_str
              
                =
              
              random
              
                .
              
              sample
              
                (
              
              http_referers
              
                ,
              
              
                1
              
              
                )
              
              
    query_str
              
                =
              
              random
              
                .
              
              sample
              
                (
              
              search_keyword
              
                ,
              
              
                1
              
              
                )
              
              
                return
              
               refer_str
              
                [
              
              
                0
              
              
                ]
              
              
                .
              
              
                format
              
              
                (
              
              query
              
                =
              
              query_str
              
                [
              
              
                0
              
              
                ]
              
              
                )
              
              
                def
              
              
                sample_url
              
              
                (
              
              
                )
              
              
                :
              
              
                return
              
               random
              
                .
              
              sample
              
                (
              
              url_paths
              
                ,
              
              
                1
              
              
                )
              
              
                [
              
              
                0
              
              
                ]
              
              
                def
              
              
                sample_ip
              
              
                (
              
              
                )
              
              
                :
              
              
                slice
              
              
                =
              
              random
              
                .
              
              sample
              
                (
              
              ip_slices
              
                ,
              
              
                4
              
              
                )
              
              
                return
              
              
                "."
              
              
                .
              
              join
              
                (
              
              
                [
              
              
                str
              
              
                (
              
              item
              
                )
              
              
                for
              
               item 
              
                in
              
              
                slice
              
              
                ]
              
              
                )
              
              
                def
              
              
                generate_log
              
              
                (
              
              count
              
                =
              
              
                10
              
              
                )
              
              
                :
              
              
    time_str
              
                =
              
              time
              
                .
              
              strftime
              
                (
              
              
                "%Y-%m-%d %H:%M:%S"
              
              
                ,
              
              time
              
                .
              
              localtime
              
                (
              
              
                )
              
              
                )
              
              

    f
              
                =
              
              
                open
              
              
                (
              
              
                "C:/Users/DaiRenLong/Desktop/streaming_access.log"
              
              
                ,
              
              
                "w+"
              
              
                )
              
              
                while
              
               count 
              
                >=
              
              
                1
              
              
                :
              
              
        query_log
              
                =
              
              
                "{ip}\t{local_time}\t\"GET /{url} HTTP/1.1\"\t{status_code}\t{refer}"
              
              
                .
              
              
                format
              
              
                (
              
              url
              
                =
              
              sample_url
              
                (
              
              
                )
              
              
                ,
              
              ip
              
                =
              
              sample_ip
              
                (
              
              
                )
              
              
                ,
              
              refer
              
                =
              
              sample_referer
              
                (
              
              
                )
              
              
                ,
              
              status_code
              
                =
              
              sample_status_code
              
                (
              
              
                )
              
              
                ,
              
              local_time
              
                =
              
              time_str
              
                )
              
              
                print
              
              
                (
              
              query_log
              
                )
              
              
        f
              
                .
              
              write
              
                (
              
              query_log
              
                +
              
              
                "\n"
              
              
                )
              
              
        count
              
                =
              
              count
              
                -
              
              
                1
              
              
                if
              
               __name__ 
              
                ==
              
              
                '__main__'
              
              
                :
              
              
                # 每一分钟生成一次日志信息
              
              
                while
              
              
                True
              
              
                :
              
              
        generate_log
              
                (
              
              
                )
              
              
        time
              
                .
              
              sleep
              
                (
              
              
                60
              
              
                )
              
            
          
  1. 日志文件对接flume==>kafka
    Flume配置文件: https://blog.csdn.net/drl_blogs/article/details/95192574#execkafkaconf_1
  2. 运行flume
            
              flume-ng agent \
--name exec-memory-logger \
--conf conf 
              
                $FLUME_HOME
              
              /conf \
--conf-file 
              
                $FLUME_HOME
              
              /conf/streaming_project.conf \
-Dflume.root.logger
              
                =
              
              INFO,console 
              
                &
              
            
          
  1. 运行kafka消费者
            
               kafka-console-consumer.sh \
 --zookeeper hadoop01:2181 \
 --topic kafka_streaming_topic

            
          
  1. 运行python文件测试
            
               python generate_log.py

            
          
  1. 查看kafka消费者消费者是否有信息

  2. 编写代码打通通道

            
              
                import
              
               org
              
                .
              
              apache
              
                .
              
              log4j
              
                .
              
              
                {
              
              Level
              
                ,
              
               Logger
              
                }
              
              
                import
              
               org
              
                .
              
              apache
              
                .
              
              spark
              
                .
              
              SparkConf

              
                import
              
               org
              
                .
              
              apache
              
                .
              
              spark
              
                .
              
              streaming
              
                .
              
              kafka
              
                .
              
              KafkaUtils

              
                import
              
               org
              
                .
              
              apache
              
                .
              
              spark
              
                .
              
              streaming
              
                .
              
              
                {
              
              Seconds
              
                ,
              
               StreamingContext
              
                }
              
              

object kafka_Receiver_streaming 
              
                {
              
              
  Logger
              
                .
              
              
                getLogger
              
              
                (
              
              
                "org"
              
              
                )
              
              
                .
              
              
                setLevel
              
              
                (
              
              Level
              
                .
              
              WARN
              
                )
              
              
  def 
              
                main
              
              
                (
              
              args
              
                :
              
               Array
              
                [
              
              String
              
                ]
              
              
                )
              
              
                :
              
               Unit 
              
                =
              
              
                {
              
              
    val sparkConf 
              
                =
              
              
                new
              
              
                SparkConf
              
              
                (
              
              
                )
              
              
                .
              
              
                setAppName
              
              
                (
              
              
                "kafka_Receiver_streaming"
              
              
                )
              
              
                .
              
              
                setMaster
              
              
                (
              
              
                "local[*]"
              
              
                )
              
              
                .
              
              
                set
              
              
                (
              
              
                "spark.port.maxRetries"
              
              
                ,
              
              
                "100"
              
              
                )
              
              

    val ssc 
              
                =
              
              
                new
              
              
                StreamingContext
              
              
                (
              
              sparkConf
              
                ,
              
              
                Seconds
              
              
                (
              
              
                60
              
              
                )
              
              
                )
              
              

    val messages 
              
                =
              
               KafkaUtils
              
                .
              
              
                createStream
              
              
                (
              
              ssc
              
                ,
              
              
                "hadoop01:2181"
              
              
                ,
              
              
                "test"
              
              
                ,
              
              
                Map
              
              
                (
              
              
                "kafka_streaming_topic"
              
              
                -
              
              
                >
              
              
                1
              
              
                )
              
              
                )
              
              
    messages
              
                .
              
              
                map
              
              
                (
              
              _
              
                .
              
              _2
              
                )
              
              
                .
              
              
                count
              
              
                (
              
              
                )
              
              
                .
              
              
                print
              
              
                (
              
              
                )
              
              

    ssc
              
                .
              
              
                start
              
              
                (
              
              
                )
              
              
    ssc
              
                .
              
              
                awaitTermination
              
              
                (
              
              
                )
              
              
                }
              
              
                }
              
            
          
  1. 运行代码查看结果

更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论