Details

    • Sub-task
    • Status: Closed
    • Major
    • Resolution: Not A Problem
    • v0.3.0
    • None
    • None
    • None

    Description

      Features

      • High Level Stream-Oriented
      • Declarative Streaming
      • Metadata Driven
      • Native Scala internal DSL
      • Support Scala Programing or Script/Configure in *.egl
      • Support static policy definition / dynamical policy loader
      • IDE friendly features like sql-prefix and xml as email template.
      • Name Reference

      Syntax

      // Topology Definition API by extends or script
      import org.apache.eagle.stream.dsl.experimental.KafkaInterface._
      import org.apache.eagle.stream.dsl.experimental.DruidInterface._
      
      // #!/bin/bash
      // exec scala "$0" "$@"
      // !#
      // # start
      define ("metricStream_1") as ("name" -> 'string, "value"->'double, "timestamp"->'long) from
        kafka(topic="metricStream_1",zk=conf"kafka.zk.hosts",deserializer="")
      
      define ("metricStream_2") as ("name" -> 'string, "value"->'double, "timestamp"->'long) from
        kafka(topic="metricStream_2")
      
      define ("logStream_3") from kafka(topic="logStream_3")
      
      // filter by function
      filter ("logStream_3") by {(line,collector) => collector.collect(line)} as ("name" -> 'string, "value"->'double, "timestamp"->'long)
      // "logStream_3" as ("name" -> 'string, "value"->'double, "timestamp"->'long)
      
      // filter by pattern and rename stream
      filter("logStream_3"->"logStream_3_parsed") by """(?<timestamp>\d{4}-\d{2}-\d{2})""".r as ("name" -> 'string, "value"->'double, "timestamp"-> datetime("YYYY-MM-DD"))
      
      alert partitionBy "metricStream_1.metricType" parallism 1 by {sql"""
        from metricStream_1[component=='dn' and metricType=="RpcActivityForPort50020.RpcQueueTimeNumOps"].time[3600]
        select sum(value) group by host output every 1 hour insert into alertStream;
      """}
      
      aggregate partitionBy "metricStream_1.metricType" parallism 2 by {sql"""
        from metricStream_1[component=='dn' and metricType=="RpcActivityForPort50020.RpcQueueTimeNumOps"].time[3600]
        select sum(value) group by host output every 1 hour insert into aggregatedMetricStream_1;
      """}
      
      'alertStream ~> kafka("alert_topic",zk=conf"kafka.zk.hosts")
      "alertStream" to mail(
        from = "sender@eagle.incubator.apache.org",
        to = "receiver@eagle.incubator.apache.org",
        smtp = "localhost:25",
        template =
          <html>
            <head>
            <title>Alert Notification</title>
            </head>
            <body>
              <h1>Message</h1>
              <p>$message</p>
            </body>
          </html>
      )
      
      // split stream by logic
      'aggregatedMetricStream_1 to kafka("aggregated_stream_dn") where "component == 'dn'" partitionBy "aggregatedMetricStream_1.metricType"
      'aggregatedMetricStream_1 ~> druid("aggregated_stream_nn")  where "component == 'nn'" partitionBy "aggregatedMetricStream_1.metricType"
      // # end
      

      Attachments

        Activity

          People

            qingwzhao Qingwen Zhao
            haoch Hao Chen
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: