Details
-
Improvement
-
Status: Closed
-
Minor
-
Resolution: Won't Fix
-
1.1.1
-
None
-
None
Description
The current KafkaTridentSpoutOpque, does not have any metrics. We can use the metrics() call of the KafkaConsumer https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#metrics(), to get various metrics and map that to the storm metrics.
Eg:
We can add a generic KafkaClientMetric which would implement IMetric and has a getValueAndReset(), where the consumer metric calls are made.
requiredMetric can be initialized to any metrics like records-lag-max.
@Override public Object getValueAndReset() { for (Map.Entry<MetricName, ? extends Metric> metricKeyVal : ((Map<MetricName, ? extends Metric>) kafkaConsumer.metrics()).entrySet()) { // Sample structure of Metric // MetricName [name=records-lag-max, group=consumer-fetch-manager-metrics, description=The maximum lag in terms of number of records for any partition in this window, tags={client-id=consumer-1}] metric.name()=MetricName [name=records-lag-max, group=consumer-fetch-manager-metrics, description=The maximum lag in terms of number of records for any partition in this window, tags={client-id=consumer-1}] metric.value()=-Infinity Metric metric = metricKeyVal.getValue(); if(metric.metricName().name().equals(requiredMetric)) { return metric.value(); } } return null; }