Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.1.0
-
None
Description
The TaskMetricGroup does not close the ioMetrics metric group. This causes that metrics registered under the ioMetrics are not deregistered after the termination of a job.
Attachments
Issue Links
- links to
Activity
Github user zentol commented on the issue:
https://github.com/apache/flink/pull/2210
Calling close() isn't necessary since no metric should ever be registered directly on the IOMetricGroup. This group is supposed to only contain pre-defined metrics that can be accessed from multiple places. These metrics are registered on the TaskMetricGroup, and thus cleaned up.
A proper fix would be to override the addMetric() method to throw an exception when that is done, or to refer registration to the TaskMetricGroup.
Addendum for clarification: TaskMetrics aren't technically cleaned up at the end of the job, but when no task for a given job runs on a TM.
Given the current usage of the IOMetricGroup i sincerely doubt that is caused an actual issue, and thus would like you provide more information on the issue you faced.
Github user tillrohrmann commented on the issue:
https://github.com/apache/flink/pull/2210
The problem was that the `currentLowWatermark` gauges weren't deregistered after the job terminated. This metric is registered by the `StreamInputProcessor` directly on the `IOMetricGroup` but never deregistered, because the `ioMetrics` group is never closed by the `TaskMetricGroup`.
I guess that the problem is then that the `StreamInputProcessor` shouldn't register the `currentLowWatermark` metric. Instead, the IOMetricGroup should preregister the corresponding metric?
Github user tillrohrmann commented on the issue:
https://github.com/apache/flink/pull/2210
The culprit is `StreamInputProcessor.java:220`
Github user zentol commented on the issue:
https://github.com/apache/flink/pull/2210
We could do that but it wouldn't be pretty. You would need a weird setValue() method on that Gauge that the StreamInputProcessor can use.
Forwarding the register calls to the parent is the best solution i can come up with at the moment.
Github user tillrohrmann commented on the issue:
https://github.com/apache/flink/pull/2210
Why do we give the `StreamInputProcessor` a reference to the `IOMetricGroup` in the first place? Why not giving it the corresponding `TaskMetricGroup`?
Github user zentol commented on the issue:
https://github.com/apache/flink/pull/2210
It gets an IOMG because in the past it would forward this group to its deserializers, for Input byte counting etc. . I didn't change it cause i forgot about my own convention on how to use the IOMG.
The IOMG only exist for the purpose of keeping details out of the TaskMG (the getters mostly), and to prevent users of the IMOG to be able to access other SubGroups of the TaskMG (separation of concerns). There is de-facto no difference between a metric registered on a TaskMG or IOMG.
Thus, the underlying issue imo is that the IOMG extends AbstractMetricGroup; giving it more functionality than it should have. Instead it should extend a class as outlined below.
```
public abstract class SubMetricGroup<P extends MetricGroup> implements MetricGroup {
private P parent;
public SubMetricGroup(P parent)
{ this.parent = parent; } @Override
public Counter counter(int name)
@Override
public Counter counter(String name) { return parent.counter(name); }
@Override
public <C extends Counter> C counter(int name, C counter)
@Override
public <C extends Counter> C counter(String name, C counter) { return parent.counter(name, counter); }
@Override
public <T, G extends Gauge<T>> G gauge(int name, G gauge)
@Override
public <T, G extends Gauge<T>> G gauge(String name, G gauge) { return parent.gauge(name, gauge); }
@Override
public <H extends Histogram> H histogram(String name, H histogram)
@Override
public <H extends Histogram> H histogram(int name, H histogram) { return parent.histogram(name, histogram); }
@Override
public MetricGroup addGroup(int name)
@Override
public MetricGroup addGroup(String name) { return parent.addGroup(name); }
```
Github user tillrohrmann commented on the issue:
https://github.com/apache/flink/pull/2210
Ah ok, I see the underlying problem now. I will adapt my PR wrt your feedback.
Github user tillrohrmann commented on the issue:
https://github.com/apache/flink/pull/2210
I've addressed your comments @zentol and introduced a `ProxyMetricGroup` which simply forwards all calls as you've suggested.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/2210#discussion_r70597751
— Diff: flink-core/src/main/java/org/apache/flink/metrics/groups/ProxyMetricGroup.java —
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.groups;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Metric group which forwards all registration calls to its parent metric group.
+ *
+ * @param <P> Type of the parent metric group
+ */
+@Internal
+public class ProxyMetricGroup<P extends MetricGroup> implements MetricGroup {
+ private final P parentMetricGroup;
+
+ public ProxyMetricGroup(P parentMetricGroup)
+
+ @Override
+ public final void close() {
+ parentMetricGroup.close();
— End diff –
It is safer to not do anything here. `close()` should only be called by the Task on the `TaskMetricGroup`, this would open us to the possibility of components closing the TaskMG as well.
There's also the looming StackOverflow when someone puts `ioMetrics.close()` into the `TaskMetricGroup#close()`.
Now that i think about it i believe `close()` (and by extension, `isClosed()`) has no business being in the MetricGroup interface in the first place, as users actually don't need to call it.
Github user zentol commented on the issue:
https://github.com/apache/flink/pull/2210
Only had a small comment, otherwise +1.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2210#discussion_r70614548
— Diff: flink-core/src/main/java/org/apache/flink/metrics/groups/ProxyMetricGroup.java —
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.groups;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Metric group which forwards all registration calls to its parent metric group.
+ *
+ * @param <P> Type of the parent metric group
+ */
+@Internal
+public class ProxyMetricGroup<P extends MetricGroup> implements MetricGroup {
+ private final P parentMetricGroup;
+
+ public ProxyMetricGroup(P parentMetricGroup)
+
+ @Override
+ public final void close() {
+ parentMetricGroup.close();
— End diff –
Hmm you're right, it is a bit tricky. You don't want to close the complete parent metric group. But I guess you would want to unregister the metrics registered by the `IOMetricGroup` if you call `IOMetricGroup#close`. However, this is not really possible at the moment. I will remove the `parent.close` call then.
I agree that `close` and `isClosed` should only be used internally and not being exposed to the user. This could be a follow-up task.
Github user tillrohrmann commented on the issue:
https://github.com/apache/flink/pull/2210
Merging...
GitHub user tillrohrmann opened a pull request:
https://github.com/apache/flink/pull/2210
FLINK-4167[metrics] Close IOMetricGroup in TaskMetricGroupThis closes the `ioMetrics` metric group contained in `TaskMetricGroup`. Not closing this group caused that metrics of this group weren't deregistered after the end of a job.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tillrohrmann/flink fixIOMetricsGroup
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/2210.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2210
commit c0f028f1e8605c332973afec2b5c1144a2815318
Author: Till Rohrmann <trohrmann@apache.org>
Date: 2016-07-07T09:37:39Z
FLINK-4167[metrics] Close IOMetricGroup in TaskMetricGroup