import org.apache.eagle.stream.dsl.experimental.KafkaInterface._
import org.apache.eagle.stream.dsl.experimental.DruidInterface._
define ("metricStream_1") as ("name" -> 'string, "value"->'double, "timestamp"->'long) from
kafka(topic="metricStream_1",zk=conf"kafka.zk.hosts",deserializer="")
define ("metricStream_2") as ("name" -> 'string, "value"->'double, "timestamp"->'long) from
kafka(topic="metricStream_2")
define ("logStream_3") from kafka(topic="logStream_3")
filter ("logStream_3") by {(line,collector) => collector.collect(line)} as ("name" -> 'string, "value"->'double, "timestamp"->'long)
filter("logStream_3"->"logStream_3_parsed") by """(?<timestamp>\d{4}-\d{2}-\d{2})""".r as ("name" -> 'string, "value"->'double, "timestamp"-> datetime("YYYY-MM-DD"))
alert partitionBy "metricStream_1.metricType" parallism 1 by {sql"""
from metricStream_1[component=='dn' and metricType=="RpcActivityForPort50020.RpcQueueTimeNumOps"].time[3600]
select sum(value) group by host output every 1 hour insert into alertStream;
"""}
aggregate partitionBy "metricStream_1.metricType" parallism 2 by {sql"""
from metricStream_1[component=='dn' and metricType=="RpcActivityForPort50020.RpcQueueTimeNumOps"].time[3600]
select sum(value) group by host output every 1 hour insert into aggregatedMetricStream_1;
"""}
'alertStream ~> kafka("alert_topic",zk=conf"kafka.zk.hosts")
"alertStream" to mail(
from = "sender@eagle.incubator.apache.org",
to = "receiver@eagle.incubator.apache.org",
smtp = "localhost:25",
template =
<html>
<head>
<title>Alert Notification</title>
</head>
<body>
<h1>Message</h1>
<p>$message</p>
</body>
</html>
)
'aggregatedMetricStream_1 to kafka("aggregated_stream_dn") where "component == 'dn'" partitionBy "aggregatedMetricStream_1.metricType"
'aggregatedMetricStream_1 ~> druid("aggregated_stream_nn") where "component == 'nn'" partitionBy "aggregatedMetricStream_1.metricType"