Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4167

TaskMetricGroup does not close IOMetricGroup

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 1.1.0
    • 1.1.0
    • Runtime / Metrics
    • None

    Description

      The TaskMetricGroup does not close the ioMetrics metric group. This causes that metrics registered under the ioMetrics are not deregistered after the termination of a job.

      Attachments

        Issue Links

          Activity

            githubbot ASF GitHub Bot added a comment -

            GitHub user tillrohrmann opened a pull request:

            https://github.com/apache/flink/pull/2210

            FLINK-4167 [metrics] Close IOMetricGroup in TaskMetricGroup

            This closes the `ioMetrics` metric group contained in `TaskMetricGroup`. Not closing this group caused that metrics of this group weren't deregistered after the end of a job.

            You can merge this pull request into a Git repository by running:

            $ git pull https://github.com/tillrohrmann/flink fixIOMetricsGroup

            Alternatively you can review and apply these changes as the patch at:

            https://github.com/apache/flink/pull/2210.patch

            To close this pull request, make a commit to your master/trunk branch
            with (at least) the following in the commit message:

            This closes #2210


            commit c0f028f1e8605c332973afec2b5c1144a2815318
            Author: Till Rohrmann <trohrmann@apache.org>
            Date: 2016-07-07T09:37:39Z

            FLINK-4167 [metrics] Close IOMetricGroup in TaskMetricGroup


            githubbot ASF GitHub Bot added a comment - GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/2210 FLINK-4167 [metrics] Close IOMetricGroup in TaskMetricGroup This closes the `ioMetrics` metric group contained in `TaskMetricGroup`. Not closing this group caused that metrics of this group weren't deregistered after the end of a job. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixIOMetricsGroup Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2210.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2210 commit c0f028f1e8605c332973afec2b5c1144a2815318 Author: Till Rohrmann <trohrmann@apache.org> Date: 2016-07-07T09:37:39Z FLINK-4167 [metrics] Close IOMetricGroup in TaskMetricGroup
            githubbot ASF GitHub Bot added a comment -

            Github user zentol commented on the issue:

            https://github.com/apache/flink/pull/2210

            Calling close() isn't necessary since no metric should ever be registered directly on the IOMetricGroup. This group is supposed to only contain pre-defined metrics that can be accessed from multiple places. These metrics are registered on the TaskMetricGroup, and thus cleaned up.

            A proper fix would be to override the addMetric() method to throw an exception when that is done, or to refer registration to the TaskMetricGroup.

            Addendum for clarification: TaskMetrics aren't technically cleaned up at the end of the job, but when no task for a given job runs on a TM.

            Given the current usage of the IOMetricGroup i sincerely doubt that is caused an actual issue, and thus would like you provide more information on the issue you faced.

            githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/2210 Calling close() isn't necessary since no metric should ever be registered directly on the IOMetricGroup. This group is supposed to only contain pre-defined metrics that can be accessed from multiple places. These metrics are registered on the TaskMetricGroup, and thus cleaned up. A proper fix would be to override the addMetric() method to throw an exception when that is done, or to refer registration to the TaskMetricGroup. Addendum for clarification: TaskMetrics aren't technically cleaned up at the end of the job, but when no task for a given job runs on a TM. Given the current usage of the IOMetricGroup i sincerely doubt that is caused an actual issue, and thus would like you provide more information on the issue you faced.
            githubbot ASF GitHub Bot added a comment -

            Github user tillrohrmann commented on the issue:

            https://github.com/apache/flink/pull/2210

            The problem was that the `currentLowWatermark` gauges weren't deregistered after the job terminated. This metric is registered by the `StreamInputProcessor` directly on the `IOMetricGroup` but never deregistered, because the `ioMetrics` group is never closed by the `TaskMetricGroup`.

            I guess that the problem is then that the `StreamInputProcessor` shouldn't register the `currentLowWatermark` metric. Instead, the IOMetricGroup should preregister the corresponding metric?

            githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2210 The problem was that the `currentLowWatermark` gauges weren't deregistered after the job terminated. This metric is registered by the `StreamInputProcessor` directly on the `IOMetricGroup` but never deregistered, because the `ioMetrics` group is never closed by the `TaskMetricGroup`. I guess that the problem is then that the `StreamInputProcessor` shouldn't register the `currentLowWatermark` metric. Instead, the IOMetricGroup should preregister the corresponding metric?
            githubbot ASF GitHub Bot added a comment -

            Github user tillrohrmann commented on the issue:

            https://github.com/apache/flink/pull/2210

            The culprit is `StreamInputProcessor.java:220`

            githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2210 The culprit is `StreamInputProcessor.java:220`
            githubbot ASF GitHub Bot added a comment -

            Github user zentol commented on the issue:

            https://github.com/apache/flink/pull/2210

            We could do that but it wouldn't be pretty. You would need a weird setValue() method on that Gauge that the StreamInputProcessor can use.

            Forwarding the register calls to the parent is the best solution i can come up with at the moment.

            githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/2210 We could do that but it wouldn't be pretty. You would need a weird setValue() method on that Gauge that the StreamInputProcessor can use. Forwarding the register calls to the parent is the best solution i can come up with at the moment.
            githubbot ASF GitHub Bot added a comment -

            Github user tillrohrmann commented on the issue:

            https://github.com/apache/flink/pull/2210

            Why do we give the `StreamInputProcessor` a reference to the `IOMetricGroup` in the first place? Why not giving it the corresponding `TaskMetricGroup`?

            githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2210 Why do we give the `StreamInputProcessor` a reference to the `IOMetricGroup` in the first place? Why not giving it the corresponding `TaskMetricGroup`?
            githubbot ASF GitHub Bot added a comment -

            Github user zentol commented on the issue:

            https://github.com/apache/flink/pull/2210

            It gets an IOMG because in the past it would forward this group to its deserializers, for Input byte counting etc. . I didn't change it cause i forgot about my own convention on how to use the IOMG.

            The IOMG only exist for the purpose of keeping details out of the TaskMG (the getters mostly), and to prevent users of the IMOG to be able to access other SubGroups of the TaskMG (separation of concerns). There is de-facto no difference between a metric registered on a TaskMG or IOMG.

            Thus, the underlying issue imo is that the IOMG extends AbstractMetricGroup; giving it more functionality than it should have. Instead it should extend a class as outlined below.

            ```
            public abstract class SubMetricGroup<P extends MetricGroup> implements MetricGroup {
            private P parent;

            public SubMetricGroup(P parent)

            { this.parent = parent; }

            @Override
            public Counter counter(int name)

            { return parent.counter(name); }

            @Override
            public Counter counter(String name) { return parent.counter(name); }

            @Override
            public <C extends Counter> C counter(int name, C counter)

            { return parent.counter(name, counter); }

            @Override
            public <C extends Counter> C counter(String name, C counter) { return parent.counter(name, counter); }

            @Override
            public <T, G extends Gauge<T>> G gauge(int name, G gauge)

            { return parent.gauge(name, gauge); }

            @Override
            public <T, G extends Gauge<T>> G gauge(String name, G gauge) { return parent.gauge(name, gauge); }

            @Override
            public <H extends Histogram> H histogram(String name, H histogram)

            { return parent.histogram(name, histogram); }

            @Override
            public <H extends Histogram> H histogram(int name, H histogram) { return parent.histogram(name, histogram); }

            @Override
            public MetricGroup addGroup(int name)

            { return parent.addGroup(name); }

            @Override
            public MetricGroup addGroup(String name) { return parent.addGroup(name); }

            ```

            githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/2210 It gets an IOMG because in the past it would forward this group to its deserializers, for Input byte counting etc. . I didn't change it cause i forgot about my own convention on how to use the IOMG. The IOMG only exist for the purpose of keeping details out of the TaskMG (the getters mostly), and to prevent users of the IMOG to be able to access other SubGroups of the TaskMG (separation of concerns). There is de-facto no difference between a metric registered on a TaskMG or IOMG. Thus, the underlying issue imo is that the IOMG extends AbstractMetricGroup; giving it more functionality than it should have. Instead it should extend a class as outlined below. ``` public abstract class SubMetricGroup<P extends MetricGroup> implements MetricGroup { private P parent; public SubMetricGroup(P parent) { this.parent = parent; } @Override public Counter counter(int name) { return parent.counter(name); } @Override public Counter counter(String name) { return parent.counter(name); } @Override public <C extends Counter> C counter(int name, C counter) { return parent.counter(name, counter); } @Override public <C extends Counter> C counter(String name, C counter) { return parent.counter(name, counter); } @Override public <T, G extends Gauge<T>> G gauge(int name, G gauge) { return parent.gauge(name, gauge); } @Override public <T, G extends Gauge<T>> G gauge(String name, G gauge) { return parent.gauge(name, gauge); } @Override public <H extends Histogram> H histogram(String name, H histogram) { return parent.histogram(name, histogram); } @Override public <H extends Histogram> H histogram(int name, H histogram) { return parent.histogram(name, histogram); } @Override public MetricGroup addGroup(int name) { return parent.addGroup(name); } @Override public MetricGroup addGroup(String name) { return parent.addGroup(name); } ```
            githubbot ASF GitHub Bot added a comment -

            Github user tillrohrmann commented on the issue:

            https://github.com/apache/flink/pull/2210

            Ah ok, I see the underlying problem now. I will adapt my PR wrt your feedback.

            githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2210 Ah ok, I see the underlying problem now. I will adapt my PR wrt your feedback.
            githubbot ASF GitHub Bot added a comment -

            Github user tillrohrmann commented on the issue:

            https://github.com/apache/flink/pull/2210

            I've addressed your comments @zentol and introduced a `ProxyMetricGroup` which simply forwards all calls as you've suggested.

            githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2210 I've addressed your comments @zentol and introduced a `ProxyMetricGroup` which simply forwards all calls as you've suggested.
            githubbot ASF GitHub Bot added a comment -

            Github user zentol commented on a diff in the pull request:

            https://github.com/apache/flink/pull/2210#discussion_r70597751

            — Diff: flink-core/src/main/java/org/apache/flink/metrics/groups/ProxyMetricGroup.java —
            @@ -0,0 +1,100 @@
            +/*
            + * Licensed to the Apache Software Foundation (ASF) under one
            + * or more contributor license agreements. See the NOTICE file
            + * distributed with this work for additional information
            + * regarding copyright ownership. The ASF licenses this file
            + * to you under the Apache License, Version 2.0 (the
            + * "License"); you may not use this file except in compliance
            + * with the License. You may obtain a copy of the License at
            + *
            + * http://www.apache.org/licenses/LICENSE-2.0
            + *
            + * Unless required by applicable law or agreed to in writing, software
            + * distributed under the License is distributed on an "AS IS" BASIS,
            + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
            + * See the License for the specific language governing permissions and
            + * limitations under the License.
            + */
            +
            +package org.apache.flink.metrics.groups;
            +
            +import org.apache.flink.annotation.Internal;
            +import org.apache.flink.metrics.Counter;
            +import org.apache.flink.metrics.Gauge;
            +import org.apache.flink.metrics.Histogram;
            +import org.apache.flink.metrics.MetricGroup;
            +import org.apache.flink.util.Preconditions;
            +
            +/**
            + * Metric group which forwards all registration calls to its parent metric group.
            + *
            + * @param <P> Type of the parent metric group
            + */
            +@Internal
            +public class ProxyMetricGroup<P extends MetricGroup> implements MetricGroup {
            + private final P parentMetricGroup;
            +
            + public ProxyMetricGroup(P parentMetricGroup)

            { + this.parentMetricGroup = Preconditions.checkNotNull(parentMetricGroup); + }

            +
            + @Override
            + public final void close() {
            + parentMetricGroup.close();
            — End diff –

            It is safer to not do anything here. `close()` should only be called by the Task on the `TaskMetricGroup`, this would open us to the possibility of components closing the TaskMG as well.

            There's also the looming StackOverflow when someone puts `ioMetrics.close()` into the `TaskMetricGroup#close()`.

            Now that i think about it i believe `close()` (and by extension, `isClosed()`) has no business being in the MetricGroup interface in the first place, as users actually don't need to call it.

            githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2210#discussion_r70597751 — Diff: flink-core/src/main/java/org/apache/flink/metrics/groups/ProxyMetricGroup.java — @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.groups; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.Preconditions; + +/** + * Metric group which forwards all registration calls to its parent metric group. + * + * @param <P> Type of the parent metric group + */ +@Internal +public class ProxyMetricGroup<P extends MetricGroup> implements MetricGroup { + private final P parentMetricGroup; + + public ProxyMetricGroup(P parentMetricGroup) { + this.parentMetricGroup = Preconditions.checkNotNull(parentMetricGroup); + } + + @Override + public final void close() { + parentMetricGroup.close(); — End diff – It is safer to not do anything here. `close()` should only be called by the Task on the `TaskMetricGroup`, this would open us to the possibility of components closing the TaskMG as well. There's also the looming StackOverflow when someone puts `ioMetrics.close()` into the `TaskMetricGroup#close()`. Now that i think about it i believe `close()` (and by extension, `isClosed()`) has no business being in the MetricGroup interface in the first place, as users actually don't need to call it.
            githubbot ASF GitHub Bot added a comment -

            Github user zentol commented on the issue:

            https://github.com/apache/flink/pull/2210

            Only had a small comment, otherwise +1.

            githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/2210 Only had a small comment, otherwise +1.
            githubbot ASF GitHub Bot added a comment -

            Github user tillrohrmann commented on a diff in the pull request:

            https://github.com/apache/flink/pull/2210#discussion_r70614548

            — Diff: flink-core/src/main/java/org/apache/flink/metrics/groups/ProxyMetricGroup.java —
            @@ -0,0 +1,100 @@
            +/*
            + * Licensed to the Apache Software Foundation (ASF) under one
            + * or more contributor license agreements. See the NOTICE file
            + * distributed with this work for additional information
            + * regarding copyright ownership. The ASF licenses this file
            + * to you under the Apache License, Version 2.0 (the
            + * "License"); you may not use this file except in compliance
            + * with the License. You may obtain a copy of the License at
            + *
            + * http://www.apache.org/licenses/LICENSE-2.0
            + *
            + * Unless required by applicable law or agreed to in writing, software
            + * distributed under the License is distributed on an "AS IS" BASIS,
            + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
            + * See the License for the specific language governing permissions and
            + * limitations under the License.
            + */
            +
            +package org.apache.flink.metrics.groups;
            +
            +import org.apache.flink.annotation.Internal;
            +import org.apache.flink.metrics.Counter;
            +import org.apache.flink.metrics.Gauge;
            +import org.apache.flink.metrics.Histogram;
            +import org.apache.flink.metrics.MetricGroup;
            +import org.apache.flink.util.Preconditions;
            +
            +/**
            + * Metric group which forwards all registration calls to its parent metric group.
            + *
            + * @param <P> Type of the parent metric group
            + */
            +@Internal
            +public class ProxyMetricGroup<P extends MetricGroup> implements MetricGroup {
            + private final P parentMetricGroup;
            +
            + public ProxyMetricGroup(P parentMetricGroup)

            { + this.parentMetricGroup = Preconditions.checkNotNull(parentMetricGroup); + }

            +
            + @Override
            + public final void close() {
            + parentMetricGroup.close();
            — End diff –

            Hmm you're right, it is a bit tricky. You don't want to close the complete parent metric group. But I guess you would want to unregister the metrics registered by the `IOMetricGroup` if you call `IOMetricGroup#close`. However, this is not really possible at the moment. I will remove the `parent.close` call then.

            I agree that `close` and `isClosed` should only be used internally and not being exposed to the user. This could be a follow-up task.

            githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2210#discussion_r70614548 — Diff: flink-core/src/main/java/org/apache/flink/metrics/groups/ProxyMetricGroup.java — @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.groups; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.Preconditions; + +/** + * Metric group which forwards all registration calls to its parent metric group. + * + * @param <P> Type of the parent metric group + */ +@Internal +public class ProxyMetricGroup<P extends MetricGroup> implements MetricGroup { + private final P parentMetricGroup; + + public ProxyMetricGroup(P parentMetricGroup) { + this.parentMetricGroup = Preconditions.checkNotNull(parentMetricGroup); + } + + @Override + public final void close() { + parentMetricGroup.close(); — End diff – Hmm you're right, it is a bit tricky. You don't want to close the complete parent metric group. But I guess you would want to unregister the metrics registered by the `IOMetricGroup` if you call `IOMetricGroup#close`. However, this is not really possible at the moment. I will remove the `parent.close` call then. I agree that `close` and `isClosed` should only be used internally and not being exposed to the user. This could be a follow-up task.
            githubbot ASF GitHub Bot added a comment -

            Github user tillrohrmann commented on the issue:

            https://github.com/apache/flink/pull/2210

            Merging...

            githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2210 Merging...
            trohrmann Till Rohrmann added a comment -

            Fixed via d17fe4f636b56f2200444c59a0dead9010dfaa5d

            trohrmann Till Rohrmann added a comment - Fixed via d17fe4f636b56f2200444c59a0dead9010dfaa5d
            githubbot ASF GitHub Bot added a comment -

            Github user asfgit closed the pull request at:

            https://github.com/apache/flink/pull/2210

            githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2210

            People

              trohrmann Till Rohrmann
              trohrmann Till Rohrmann
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: